You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/05/05 15:28:39 UTC

[iceberg] branch master updated: Spark 3.x: Support rewrite data files with starting sequence number (#4701) Backports (#3480)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bf582ebf6 Spark 3.x: Support rewrite data files with starting sequence number (#4701) Backports (#3480)
bf582ebf6 is described below

commit bf582ebf68ea05be26f38d786584d474aebe048b
Author: Rajarshi Sarkar <sr...@amazon.com>
AuthorDate: Thu May 5 20:58:32 2022 +0530

    Spark 3.x: Support rewrite data files with starting sequence number (#4701) Backports (#3480)
    
    Backport of #3480
---
 .../actions/BaseRewriteDataFilesSparkAction.java   |  8 ++-
 .../spark/actions/TestRewriteDataFilesAction.java  | 63 ++++++++++++++++++++++
 .../actions/BaseRewriteDataFilesSparkAction.java   |  8 ++-
 .../spark/actions/TestRewriteDataFilesAction.java  | 63 ++++++++++++++++++++++
 4 files changed, 140 insertions(+), 2 deletions(-)

diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 4f0c27fdc..cd12131bd 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -82,6 +82,7 @@ abstract class BaseRewriteDataFilesSparkAction
       PARTIAL_PROGRESS_ENABLED,
       PARTIAL_PROGRESS_MAX_COMMITS,
       TARGET_FILE_SIZE_BYTES,
+      USE_STARTING_SEQUENCE_NUMBER,
       REWRITE_JOB_ORDER
   );
 
@@ -91,6 +92,7 @@ abstract class BaseRewriteDataFilesSparkAction
   private int maxConcurrentFileGroupRewrites;
   private int maxCommits;
   private boolean partialProgressEnabled;
+  private boolean useStartingSequenceNumber;
   private RewriteJobOrder rewriteJobOrder;
   private RewriteStrategy strategy = null;
 
@@ -248,7 +250,7 @@ abstract class BaseRewriteDataFilesSparkAction
 
   @VisibleForTesting
   RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
-    return new RewriteDataFilesCommitManager(table, startingSnapshotId);
+    return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
   }
 
   private Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
@@ -396,6 +398,10 @@ abstract class BaseRewriteDataFilesSparkAction
         PARTIAL_PROGRESS_ENABLED,
         PARTIAL_PROGRESS_ENABLED_DEFAULT);
 
+    useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(),
+        USE_STARTING_SEQUENCE_NUMBER,
+        USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
+
     rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
         REWRITE_JOB_ORDER,
         REWRITE_JOB_ORDER_DEFAULT));
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 2e62fb0b2..3f465fe72 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RewriteJobOrder;
 import org.apache.iceberg.RowDelta;
@@ -74,6 +75,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
@@ -277,6 +279,67 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
         (long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
   }
 
+  @Test
+  public void testBinPackWithStartingSequenceNumber() {
+    Table table = createTablePartitioned(4, 2);
+    shouldHaveFiles(table, 8);
+    List<Object[]> expectedRecords = currentData();
+    table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
+    table.refresh();
+    long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+
+    Result result = basicRewrite(table)
+        .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
+    Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());
+
+    shouldHaveFiles(table, 4);
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    table.refresh();
+    Assert.assertTrue("Table sequence number should be incremented",
+        oldSequenceNumber < table.currentSnapshot().sequenceNumber());
+
+    Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
+    for (Row row : rows.collectAsList()) {
+      if (row.getInt(0) == 1) {
+        Assert.assertEquals("Expect old sequence number for added entries", oldSequenceNumber, row.getLong(2));
+      }
+    }
+  }
+
+  @Test
+  public void testBinPackWithStartingSequenceNumberV1Compatibility() {
+    Table table = createTablePartitioned(4, 2);
+    shouldHaveFiles(table, 8);
+    List<Object[]> expectedRecords = currentData();
+    table.refresh();
+    long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+    Assert.assertEquals("Table sequence number should be 0", 0, oldSequenceNumber);
+
+    Result result = basicRewrite(table)
+        .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
+    Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());
+
+    shouldHaveFiles(table, 4);
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    table.refresh();
+    Assert.assertEquals("Table sequence number should still be 0",
+        oldSequenceNumber, table.currentSnapshot().sequenceNumber());
+
+    Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
+    for (Row row : rows.collectAsList()) {
+      Assert.assertEquals("Expect sequence number 0 for all entries",
+          oldSequenceNumber, row.getLong(2));
+    }
+  }
+
   @Test
   public void testRewriteLargeTableHasResiduals() {
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 1cbfa4cec..5c3c349cd 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -82,6 +82,7 @@ abstract class BaseRewriteDataFilesSparkAction
       PARTIAL_PROGRESS_ENABLED,
       PARTIAL_PROGRESS_MAX_COMMITS,
       TARGET_FILE_SIZE_BYTES,
+      USE_STARTING_SEQUENCE_NUMBER,
       REWRITE_JOB_ORDER
   );
 
@@ -91,6 +92,7 @@ abstract class BaseRewriteDataFilesSparkAction
   private int maxConcurrentFileGroupRewrites;
   private int maxCommits;
   private boolean partialProgressEnabled;
+  private boolean useStartingSequenceNumber;
   private RewriteJobOrder rewriteJobOrder;
   private RewriteStrategy strategy = null;
 
@@ -248,7 +250,7 @@ abstract class BaseRewriteDataFilesSparkAction
 
   @VisibleForTesting
   RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
-    return new RewriteDataFilesCommitManager(table, startingSnapshotId);
+    return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
   }
 
   private Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
@@ -395,6 +397,10 @@ abstract class BaseRewriteDataFilesSparkAction
         PARTIAL_PROGRESS_ENABLED,
         PARTIAL_PROGRESS_ENABLED_DEFAULT);
 
+    useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(),
+        USE_STARTING_SEQUENCE_NUMBER,
+        USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
+
     rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
         REWRITE_JOB_ORDER,
         REWRITE_JOB_ORDER_DEFAULT));
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 2e62fb0b2..3f465fe72 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RewriteJobOrder;
 import org.apache.iceberg.RowDelta;
@@ -74,6 +75,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
@@ -277,6 +279,67 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
         (long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
   }
 
+  @Test
+  public void testBinPackWithStartingSequenceNumber() {
+    Table table = createTablePartitioned(4, 2);
+    shouldHaveFiles(table, 8);
+    List<Object[]> expectedRecords = currentData();
+    table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
+    table.refresh();
+    long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+
+    Result result = basicRewrite(table)
+        .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
+    Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());
+
+    shouldHaveFiles(table, 4);
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    table.refresh();
+    Assert.assertTrue("Table sequence number should be incremented",
+        oldSequenceNumber < table.currentSnapshot().sequenceNumber());
+
+    Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
+    for (Row row : rows.collectAsList()) {
+      if (row.getInt(0) == 1) {
+        Assert.assertEquals("Expect old sequence number for added entries", oldSequenceNumber, row.getLong(2));
+      }
+    }
+  }
+
+  @Test
+  public void testBinPackWithStartingSequenceNumberV1Compatibility() {
+    Table table = createTablePartitioned(4, 2);
+    shouldHaveFiles(table, 8);
+    List<Object[]> expectedRecords = currentData();
+    table.refresh();
+    long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+    Assert.assertEquals("Table sequence number should be 0", 0, oldSequenceNumber);
+
+    Result result = basicRewrite(table)
+        .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
+        .execute();
+    Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
+    Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());
+
+    shouldHaveFiles(table, 4);
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+
+    table.refresh();
+    Assert.assertEquals("Table sequence number should still be 0",
+        oldSequenceNumber, table.currentSnapshot().sequenceNumber());
+
+    Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
+    for (Row row : rows.collectAsList()) {
+      Assert.assertEquals("Expect sequence number 0 for all entries",
+          oldSequenceNumber, row.getLong(2));
+    }
+  }
+
   @Test
   public void testRewriteLargeTableHasResiduals() {
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();