You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:22 UTC

[iceberg] 07/18: Core: Fix data loss in compact action (#2196)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 4b2f3904e14ac1236c62f37c692c1904b04bfb9a
Author: Stephen-Robin <77...@users.noreply.github.com>
AuthorDate: Thu Feb 4 09:19:14 2021 +0800

    Core: Fix data loss in compact action (#2196)
    
    Fixes #2195.
---
 .../actions/BaseRewriteDataFilesAction.java        | 11 ++++-
 .../actions/TestRewriteDataFilesAction.java        | 54 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
index 7ce1357..2305388 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
@@ -226,7 +226,7 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
           return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
         })
         .flatMap(Streams::stream)
-        .filter(task -> task.files().size() > 1)
+        .filter(task -> task.files().size() > 1 || isPartialFileScan(task))
         .collect(Collectors.toList());
 
     if (combinedScanTasks.isEmpty()) {
@@ -273,6 +273,15 @@ public abstract class BaseRewriteDataFilesAction<ThisT>
     }
   }
 
+  private boolean isPartialFileScan(CombinedScanTask task) {
+    if (task.files().size() == 1) {
+      FileScanTask fileScanTask = task.files().iterator().next();
+      return fileScanTask.file().fileSizeInBytes() != fileScanTask.length();
+    } else {
+      return false;
+    }
+  }
+
   protected abstract FileIO fileIO();
 
   protected abstract List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTask);
diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
index 5c6ae59..5e72dd1 100644
--- a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
+++ b/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
@@ -20,8 +20,11 @@
 package org.apache.iceberg.actions;
 
 import java.io.File;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
@@ -37,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -311,6 +315,56 @@ public abstract class TestRewriteDataFilesAction extends SparkTestBase {
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesForLargeFile() throws AnalysisException {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+    Assert.assertNull("Table must be empty", table.currentSnapshot());
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList();
+
+    IntStream.range(0, 2000).forEach(i -> records1.add(new ThreeColumnRecord(i, "foo" + i, "bar" + i)));
+    Dataset<Row> df = spark.createDataFrame(records1, ThreeColumnRecord.class).repartition(1);
+    writeDF(df);
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(
+        new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"),
+        new ThreeColumnRecord(1, "DDDDDDDDDD", "DDDD")
+    );
+    writeRecords(records2);
+
+    table.refresh();
+
+    CloseableIterable<FileScanTask> tasks = table.newScan().planFiles();
+    List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+    DataFile maxSizeFile = Collections.max(dataFiles, Comparator.comparingLong(DataFile::fileSizeInBytes));
+    Assert.assertEquals("Should have 3 files before rewrite", 3, dataFiles.size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("origin");
+    long originalNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> originalRecords = sql("SELECT * from origin sort by c2");
+
+    Actions actions = Actions.forTable(table);
+
+    long targetSizeInBytes = maxSizeFile.fileSizeInBytes() - 10;
+    RewriteDataFilesActionResult result = actions
+        .rewriteDataFiles()
+        .targetSizeInBytes(targetSizeInBytes)
+        .splitOpenFileCost(1)
+        .execute();
+
+    Assert.assertEquals("Action should delete 4 data files", 4, result.deletedDataFiles().size());
+    Assert.assertEquals("Action should add 2 data files", 2, result.addedDataFiles().size());
+
+    spark.read().format("iceberg").load(tableLocation).createTempView("postRewrite");
+    long postRewriteNumRecords = spark.read().format("iceberg").load(tableLocation).count();
+    List<Object[]> rewrittenRecords = sql("SELECT * from postRewrite sort by c2");
+
+    Assert.assertEquals(originalNumRecords, postRewriteNumRecords);
+    assertEquals("Rows should be unchanged", originalRecords, rewrittenRecords);
+  }
+
   private void writeRecords(List<ThreeColumnRecord> records) {
     Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
     writeDF(df);