You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:22 UTC
[iceberg] 07/18: Core: Fix data loss in compact action (#2196)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 4b2f3904e14ac1236c62f37c692c1904b04bfb9a
Author: Stephen-Robin <77...@users.noreply.github.com>
AuthorDate: Thu Feb 4 09:19:14 2021 +0800
Core: Fix data loss in compact action (#2196)
Fixes #2195.
---
.../actions/BaseRewriteDataFilesAction.java | 11 ++++-
.../actions/TestRewriteDataFilesAction.java | 54 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
index 7ce1357..2305388 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
@@ -226,7 +226,7 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
})
.flatMap(Streams::stream)
- .filter(task -> task.files().size() > 1)
+ .filter(task -> task.files().size() > 1 || isPartialFileScan(task))
.collect(Collectors.toList());
if (combinedScanTasks.isEmpty()) {
@@ -273,6 +273,15 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
}
}
+ private boolean isPartialFileScan(CombinedScanTask task) {
+ if (task.files().size() == 1) {
+ FileScanTask fileScanTask = task.files().iterator().next();
+ return fileScanTask.file().fileSizeInBytes() != fileScanTask.length();
+ } else {
+ return false;
+ }
+ }
+
protected abstract FileIO fileIO();
protected abstract List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTask);
diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
index 5c6ae59..5e72dd1 100644
--- a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
+++ b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
@@ -20,8 +20,11 @@
package org.apache.iceberg.actions;
import java.io.File;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
@@ -37,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -311,6 +315,56 @@ public abstract class TestRewriteDataFilesAction extends SparkTestBase {
Assert.assertEquals("Rows must match", records, actualRecords);
}
+ @Test
+ public void testRewriteDataFilesForLargeFile() throws AnalysisException {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+ Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+ List<ThreeColumnRecord> records1 = Lists.newArrayList();
+
+ IntStream.range(0, 2000).forEach(i -> records1.add(new ThreeColumnRecord(i, "foo" + i, "bar" + i)));
+ Dataset<Row> df = spark.createDataFrame(records1, ThreeColumnRecord.class).repartition(1);
+ writeDF(df);
+
+ List<ThreeColumnRecord> records2 = Lists.newArrayList(
+ new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+ new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+ );
+ writeRecords(records2);
+
+ table.refresh();
+
+ CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+ List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ DataFile maxSizeFile = Collections.max(dataFiles, Comparator.comparingLong(DataFile::fileSizeInBytes));
+ Assert.assertEquals("Should have 3 files before rewrite", 3, dataFiles.size());
+
+ spark.read().format("iceberg").load(tableLocation).createTempView("origin");
+ long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+ List<Object[]> originalRecords = sql("SELECT * from origin sort by c2");
+
+ Actions actions = Actions.forTable(table);
+
+ long targetSizeInBytes = maxSizeFile.fileSizeInBytes() - 10;
+ RewriteDataFilesActionResult result = actions
+ .rewriteDataFiles()
+ .targetSizeInBytes(targetSizeInBytes)
+ .splitOpenFileCost(1)
+ .execute();
+
+ Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size());
+
+ spark.read().format("iceberg").load(tableLocation).createTempView("postRewrite");
+ long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+ List<Object[]> rewrittenRecords = sql("SELECT * from postRewrite sort by c2");
+
+ Assert.assertEquals(originalNumRecords, postRewriteNumRecords);
+ assertEquals("Rows should be unchanged", originalRecords, rewrittenRecords);
+ }
+
private void writeRecords(List<ThreeColumnRecord> records) {
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
writeDF(df);