You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/05/05 15:28:39 UTC
[iceberg] branch master updated: Spark 3.x: Support rewrite data files with starting sequence number (#4701) Backports (#3480)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bf582ebf6 Spark 3.x: Support rewrite data files with starting sequence number (#4701) Backports (#3480)
bf582ebf6 is described below
commit bf582ebf68ea05be26f38d786584d474aebe048b
Author: Rajarshi Sarkar <sr...@amazon.com>
AuthorDate: Thu May 5 20:58:32 2022 +0530
Spark 3.x: Support rewrite data files with starting sequence number (#4701) Backports (#3480)
Backport of #3480
---
.../actions/BaseRewriteDataFilesSparkAction.java | 8 ++-
.../spark/actions/TestRewriteDataFilesAction.java | 63 ++++++++++++++++++++++
.../actions/BaseRewriteDataFilesSparkAction.java | 8 ++-
.../spark/actions/TestRewriteDataFilesAction.java | 63 ++++++++++++++++++++++
4 files changed, 140 insertions(+), 2 deletions(-)
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 4f0c27fdc..cd12131bd 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -82,6 +82,7 @@ abstract class BaseRewriteDataFilesSparkAction
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
+ USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER
);
@@ -91,6 +92,7 @@ abstract class BaseRewriteDataFilesSparkAction
private int maxConcurrentFileGroupRewrites;
private int maxCommits;
private boolean partialProgressEnabled;
+ private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;
@@ -248,7 +250,7 @@ abstract class BaseRewriteDataFilesSparkAction
@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
- return new RewriteDataFilesCommitManager(table, startingSnapshotId);
+ return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
}
private Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
@@ -396,6 +398,10 @@ abstract class BaseRewriteDataFilesSparkAction
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_ENABLED_DEFAULT);
+ useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(),
+ USE_STARTING_SEQUENCE_NUMBER,
+ USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
+
rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
REWRITE_JOB_ORDER,
REWRITE_JOB_ORDER_DEFAULT));
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 2e62fb0b2..3f465fe72 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
@@ -74,6 +75,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
@@ -277,6 +279,67 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
(long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
}
+ @Test
+ public void testBinPackWithStartingSequenceNumber() {
+ Table table = createTablePartitioned(4, 2);
+ shouldHaveFiles(table, 8);
+ List<Object[]> expectedRecords = currentData();
+ table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
+ table.refresh();
+ long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+
+ Result result = basicRewrite(table)
+ .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
+ .execute();
+ Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
+ Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());
+
+ shouldHaveFiles(table, 4);
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+
+ table.refresh();
+ Assert.assertTrue("Table sequence number should be incremented",
+ oldSequenceNumber < table.currentSnapshot().sequenceNumber());
+
+ Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
+ for (Row row : rows.collectAsList()) {
+ if (row.getInt(0) == 1) {
+ Assert.assertEquals("Expect old sequence number for added entries", oldSequenceNumber, row.getLong(2));
+ }
+ }
+ }
+
+ @Test
+ public void testBinPackWithStartingSequenceNumberV1Compatibility() {
+ Table table = createTablePartitioned(4, 2);
+ shouldHaveFiles(table, 8);
+ List<Object[]> expectedRecords = currentData();
+ table.refresh();
+ long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+ Assert.assertEquals("Table sequence number should be 0", 0, oldSequenceNumber);
+
+ Result result = basicRewrite(table)
+ .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
+ .execute();
+ Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
+ Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());
+
+ shouldHaveFiles(table, 4);
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+
+ table.refresh();
+ Assert.assertEquals("Table sequence number should still be 0",
+ oldSequenceNumber, table.currentSnapshot().sequenceNumber());
+
+ Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
+ for (Row row : rows.collectAsList()) {
+ Assert.assertEquals("Expect sequence number 0 for all entries",
+ oldSequenceNumber, row.getLong(2));
+ }
+ }
+
@Test
public void testRewriteLargeTableHasResiduals() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
index 1cbfa4cec..5c3c349cd 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
@@ -82,6 +82,7 @@ abstract class BaseRewriteDataFilesSparkAction
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES,
+ USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER
);
@@ -91,6 +92,7 @@ abstract class BaseRewriteDataFilesSparkAction
private int maxConcurrentFileGroupRewrites;
private int maxCommits;
private boolean partialProgressEnabled;
+ private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;
@@ -248,7 +250,7 @@ abstract class BaseRewriteDataFilesSparkAction
@VisibleForTesting
RewriteDataFilesCommitManager commitManager(long startingSnapshotId) {
- return new RewriteDataFilesCommitManager(table, startingSnapshotId);
+ return new RewriteDataFilesCommitManager(table, startingSnapshotId, useStartingSequenceNumber);
}
private Result doExecute(RewriteExecutionContext ctx, Stream<RewriteFileGroup> groupStream,
@@ -395,6 +397,10 @@ abstract class BaseRewriteDataFilesSparkAction
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_ENABLED_DEFAULT);
+ useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(),
+ USE_STARTING_SEQUENCE_NUMBER,
+ USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
+
rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
REWRITE_JOB_ORDER,
REWRITE_JOB_ORDER_DEFAULT));
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 2e62fb0b2..3f465fe72 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
@@ -74,6 +75,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
@@ -277,6 +279,67 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
(long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
}
+ @Test
+ public void testBinPackWithStartingSequenceNumber() {
+ Table table = createTablePartitioned(4, 2);
+ shouldHaveFiles(table, 8);
+ List<Object[]> expectedRecords = currentData();
+ table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
+ table.refresh();
+ long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+
+ Result result = basicRewrite(table)
+ .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
+ .execute();
+ Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
+ Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());
+
+ shouldHaveFiles(table, 4);
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+
+ table.refresh();
+ Assert.assertTrue("Table sequence number should be incremented",
+ oldSequenceNumber < table.currentSnapshot().sequenceNumber());
+
+ Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
+ for (Row row : rows.collectAsList()) {
+ if (row.getInt(0) == 1) {
+ Assert.assertEquals("Expect old sequence number for added entries", oldSequenceNumber, row.getLong(2));
+ }
+ }
+ }
+
+ @Test
+ public void testBinPackWithStartingSequenceNumberV1Compatibility() {
+ Table table = createTablePartitioned(4, 2);
+ shouldHaveFiles(table, 8);
+ List<Object[]> expectedRecords = currentData();
+ table.refresh();
+ long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+ Assert.assertEquals("Table sequence number should be 0", 0, oldSequenceNumber);
+
+ Result result = basicRewrite(table)
+ .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "true")
+ .execute();
+ Assert.assertEquals("Action should rewrite 8 data files", 8, result.rewrittenDataFilesCount());
+ Assert.assertEquals("Action should add 4 data file", 4, result.addedDataFilesCount());
+
+ shouldHaveFiles(table, 4);
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+
+ table.refresh();
+ Assert.assertEquals("Table sequence number should still be 0",
+ oldSequenceNumber, table.currentSnapshot().sequenceNumber());
+
+ Dataset<Row> rows = SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.ENTRIES);
+ for (Row row : rows.collectAsList()) {
+ Assert.assertEquals("Expect sequence number 0 for all entries",
+ oldSequenceNumber, row.getLong(2));
+ }
+ }
+
@Test
public void testRewriteLargeTableHasResiduals() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();