You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/03/09 21:17:11 UTC

[iceberg] branch master updated: Spark: Allow Bin-Pack optimization to work across different partition specs (#4279)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 610714e  Spark: Allow Bin-Pack optimization to work across different partition specs (#4279)
610714e is described below

commit 610714e46741807356054a86efc653f5e3ddb167
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Wed Mar 9 15:16:46 2022 -0600

    Spark: Allow Bin-Pack optimization to work across different partition specs (#4279)
    
    Previously, Bin-Pack would always use a NONE distribution for rewriting datafiles. This assumed
    that incoming datafiles were from the same partition as the new output files would
    be written to. When the spec is changed, the data must be resorted to place the data in the
    new correct partitions.
---
 .../spark/actions/Spark3BinPackStrategy.java       | 10 +++++++-
 .../spark/actions/TestRewriteDataFilesAction.java  | 28 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
index 965a9be..cf95bf2 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.BinPackStrategy;
@@ -67,12 +68,19 @@ public class Spark3BinPackStrategy extends BinPackStrategy {
           .option(SparkReadOptions.FILE_OPEN_COST, "0")
           .load(table.name());
 
+      // All files within a file group are written with the same spec, so check the first
+      boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+
+      // Invoke a shuffle if the partition spec of the incoming partition does not match the table
+      String distributionMode = requiresRepartition ? DistributionMode.RANGE.modeName() :
+          DistributionMode.NONE.modeName();
+
       // write the packed data into new files where each split becomes a new file
       scanDF.write()
           .format("iceberg")
           .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
           .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
-          .option(SparkWriteOptions.DISTRIBUTION_MODE, "none")
+          .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode)
           .mode("append")
           .save(table.name());
 
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 564c501..b652950 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -196,6 +196,34 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
   }
 
   @Test
+  public void testBinPackAfterPartitionChange() {
+    Table table = createTable();
+
+    writeRecords(20, SCALE, 20);
+    shouldHaveFiles(table, 20);
+    table.updateSpec().addField(Expressions.ref("c1")).commit();
+
+    List<Object[]> originalData = currentData();
+
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .option(SortStrategy.MIN_INPUT_FILES, "1")
+            .option(SortStrategy.MIN_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) + 1000))
+            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) + 1001))
+            .execute();
+
+    Assert.assertEquals("Should have 1 fileGroup because all files were not correctly partitioned",
+        1, result.rewriteResults().size());
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
+
+    shouldHaveSnapshots(table, 2);
+    shouldHaveACleanCache(table);
+    shouldHaveFiles(table, 20);
+  }
+
+  @Test
   public void testBinPackWithDeletes() throws Exception {
     Table table = createTablePartitioned(4, 2);
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();