You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/01/25 08:04:08 UTC
[iceberg] branch master updated: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE (#3970)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a21abf5 Spark 3.2: Revise distribution and ordering for merge-on-read DELETE (#3970)
a21abf5 is described below
commit a21abf513c2541575d63da977350469cd0782e7c
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Jan 25 00:03:36 2022 -0800
Spark 3.2: Revise distribution and ordering for merge-on-read DELETE (#3970)
---
.../org/apache/iceberg/spark/SparkWriteConf.java | 22 +--
.../TestSparkDistributionAndOrderingUtil.java | 164 +++++++++++++++++++++
2 files changed, 167 insertions(+), 19 deletions(-)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 3b96ddb..27c2f4b 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -234,25 +234,9 @@ public class SparkWriteConf {
String deleteModeName = confParser.stringConf()
.option(SparkWriteOptions.DISTRIBUTION_MODE)
.tableProperty(TableProperties.DELETE_DISTRIBUTION_MODE)
- .parseOptional();
-
- if (deleteModeName != null) {
- DistributionMode deleteMode = DistributionMode.fromName(deleteModeName);
- if (deleteMode == HASH && table.spec().isUnpartitioned()) {
- return NONE;
- } else {
- return deleteMode;
- }
- } else {
- // use hash distribution if write distribution is range or hash and table is partitioned
- // avoid range-based shuffles unless the user asks explicitly
- DistributionMode writeMode = distributionMode();
- if (writeMode != NONE && table.spec().isPartitioned()) {
- return HASH;
- } else {
- return writeMode;
- }
- }
+ .defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)
+ .parse();
+ return DistributionMode.fromName(deleteModeName);
}
public boolean useTableDistributionAndOrdering() {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
index 42202d0..37cbc55 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
@@ -51,12 +51,27 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
private static final Distribution FILE_CLUSTERED_DISTRIBUTION = Distributions.clustered(new Expression[]{
Expressions.column(MetadataColumns.FILE_PATH.name())
});
+ private static final Distribution SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION = Distributions.clustered(new Expression[]{
+ Expressions.column(MetadataColumns.SPEC_ID.name()),
+ Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME)
+ });
private static final SortOrder[] EMPTY_ORDERING = new SortOrder[]{};
private static final SortOrder[] FILE_POSITION_ORDERING = new SortOrder[]{
Expressions.sort(Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
Expressions.sort(Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING)
};
+ private static final SortOrder[] SPEC_ID_PARTITION_FILE_ORDERING = new SortOrder[]{
+ Expressions.sort(Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING)
+ };
+ private static final SortOrder[] SPEC_ID_PARTITION_FILE_POSITION_ORDERING = new SortOrder[]{
+ Expressions.sort(Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
+ Expressions.sort(Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING)
+ };
@After
public void dropTable() {
@@ -1300,6 +1315,132 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
}
+ // ===================================================================================
+ // Distribution and ordering for merge-on-read DELETE operations with position deletes
+ // ===================================================================================
+ //
+ // delete mode is NOT SET -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+ // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+ // delete mode is HASH -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+ // delete mode is RANGE -> RANGE DISTRIBUTE BY _spec_id, _partition, _file +
+ // LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+
+ @Test
+ public void testDefaultPositionDeltaDeleteUnpartitionedTable() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ checkPositionDeltaDistributionAndOrdering(
+ table, DELETE, SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+ }
+
+ @Test
+ public void testNonePositionDeltaDeleteUnpartitionedTable() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateProperties()
+ .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
+ .commit();
+
+ checkPositionDeltaDistributionAndOrdering(
+ table, DELETE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+ }
+
+ @Test
+ public void testHashPositionDeltaDeleteUnpartitionedTable() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateProperties()
+ .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
+ .commit();
+
+ checkPositionDeltaDistributionAndOrdering(
+ table, DELETE, SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+ }
+
+ @Test
+ public void testRangePositionDeltaDeleteUnpartitionedTable() {
+ sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateProperties()
+ .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
+ .commit();
+
+ Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING);
+
+ checkPositionDeltaDistributionAndOrdering(
+ table, DELETE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+ }
+
+ @Test
+ public void testDefaultPositionDeltaDeletePartitionedTable() {
+ sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " +
+ "USING iceberg " +
+ "PARTITIONED BY (date, bucket(8, data))", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ checkPositionDeltaDistributionAndOrdering(
+ table, DELETE, SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+ }
+
+ @Test
+ public void testNonePositionDeltaDeletePartitionedTable() {
+ sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " +
+ "USING iceberg " +
+ "PARTITIONED BY (date, bucket(8, data))", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateProperties()
+ .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
+ .commit();
+
+ checkPositionDeltaDistributionAndOrdering(
+ table, DELETE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+ }
+
+ @Test
+ public void testHashPositionDeltaDeletePartitionedTable() {
+ sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " +
+ "USING iceberg " +
+ "PARTITIONED BY (date, bucket(8, data))", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateProperties()
+ .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
+ .commit();
+
+ checkPositionDeltaDistributionAndOrdering(
+ table, DELETE, SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+ }
+
+ @Test
+ public void testRangePositionDeltaDeletePartitionedTable() {
+ sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " +
+ "USING iceberg " +
+ "PARTITIONED BY (date, bucket(8, data))", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateProperties()
+ .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
+ .commit();
+
+ Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING);
+
+ checkPositionDeltaDistributionAndOrdering(
+ table, DELETE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+ }
+
private void checkWriteDistributionAndOrdering(Table table, Distribution expectedDistribution,
SortOrder[] expectedOrdering) {
SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
@@ -1337,4 +1478,27 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
throw new IllegalArgumentException("Unexpected command: " + command);
}
}
+
+ private void checkPositionDeltaDistributionAndOrdering(Table table, Command command,
+ Distribution expectedDistribution,
+ SortOrder[] expectedOrdering) {
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
+
+ DistributionMode mode = positionDeltaDistributionMode(command, writeConf);
+
+ Distribution distribution = SparkDistributionAndOrderingUtil.buildPositionDeltaDistribution(table, command, mode);
+ Assert.assertEquals("Distribution must match", expectedDistribution, distribution);
+
+ SortOrder[] ordering = SparkDistributionAndOrderingUtil.buildPositionDeltaOrdering(table, command, distribution);
+ Assert.assertArrayEquals("Ordering must match", expectedOrdering, ordering);
+ }
+
+ private DistributionMode positionDeltaDistributionMode(Command command, SparkWriteConf writeConf) {
+ switch (command) {
+ case DELETE:
+ return writeConf.positionDeleteDistributionMode();
+ default:
+ throw new IllegalArgumentException("Unexpected command: " + command);
+ }
+ }
}