You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2022/01/25 08:04:08 UTC

[iceberg] branch master updated: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE (#3970)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a21abf5  Spark 3.2: Revise distribution and ordering for merge-on-read DELETE (#3970)
a21abf5 is described below

commit a21abf513c2541575d63da977350469cd0782e7c
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Jan 25 00:03:36 2022 -0800

    Spark 3.2: Revise distribution and ordering for merge-on-read DELETE (#3970)
---
 .../org/apache/iceberg/spark/SparkWriteConf.java   |  22 +--
 .../TestSparkDistributionAndOrderingUtil.java      | 164 +++++++++++++++++++++
 2 files changed, 167 insertions(+), 19 deletions(-)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 3b96ddb..27c2f4b 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -234,25 +234,9 @@ public class SparkWriteConf {
     String deleteModeName = confParser.stringConf()
         .option(SparkWriteOptions.DISTRIBUTION_MODE)
         .tableProperty(TableProperties.DELETE_DISTRIBUTION_MODE)
-        .parseOptional();
-
-    if (deleteModeName != null) {
-      DistributionMode deleteMode = DistributionMode.fromName(deleteModeName);
-      if (deleteMode == HASH && table.spec().isUnpartitioned()) {
-        return NONE;
-      } else {
-        return deleteMode;
-      }
-    } else {
-      // use hash distribution if write distribution is range or hash and table is partitioned
-      // avoid range-based shuffles unless the user asks explicitly
-      DistributionMode writeMode = distributionMode();
-      if (writeMode != NONE && table.spec().isPartitioned()) {
-        return HASH;
-      } else {
-        return writeMode;
-      }
-    }
+        .defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)
+        .parse();
+    return DistributionMode.fromName(deleteModeName);
   }
 
   public boolean useTableDistributionAndOrdering() {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
index 42202d0..37cbc55 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
@@ -51,12 +51,27 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
   private static final Distribution FILE_CLUSTERED_DISTRIBUTION = Distributions.clustered(new Expression[]{
       Expressions.column(MetadataColumns.FILE_PATH.name())
   });
+  private static final Distribution SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION = Distributions.clustered(new Expression[]{
+      Expressions.column(MetadataColumns.SPEC_ID.name()),
+      Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME)
+  });
 
   private static final SortOrder[] EMPTY_ORDERING = new SortOrder[]{};
   private static final SortOrder[] FILE_POSITION_ORDERING = new SortOrder[]{
       Expressions.sort(Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
       Expressions.sort(Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING)
   };
+  private static final SortOrder[] SPEC_ID_PARTITION_FILE_ORDERING = new SortOrder[]{
+      Expressions.sort(Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+      Expressions.sort(Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+      Expressions.sort(Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING)
+  };
+  private static final SortOrder[] SPEC_ID_PARTITION_FILE_POSITION_ORDERING = new SortOrder[]{
+      Expressions.sort(Expressions.column(MetadataColumns.SPEC_ID.name()), SortDirection.ASCENDING),
+      Expressions.sort(Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME), SortDirection.ASCENDING),
+      Expressions.sort(Expressions.column(MetadataColumns.FILE_PATH.name()), SortDirection.ASCENDING),
+      Expressions.sort(Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING)
+  };
 
   @After
   public void dropTable() {
@@ -1300,6 +1315,132 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // delete mode is NOT SET -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is RANGE -> RANGE DISTRIBUTE BY _spec_id, _partition, _file +
+  //                         LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+
+  @Test
+  public void testDefaultPositionDeltaDeleteUnpartitionedTable() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    checkPositionDeltaDistributionAndOrdering(
+        table, DELETE, SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+  }
+
+  @Test
+  public void testNonePositionDeltaDeleteUnpartitionedTable() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateProperties()
+        .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
+        .commit();
+
+    checkPositionDeltaDistributionAndOrdering(
+        table, DELETE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+  }
+
+  @Test
+  public void testHashPositionDeltaDeleteUnpartitionedTable() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateProperties()
+        .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
+        .commit();
+
+    checkPositionDeltaDistributionAndOrdering(
+        table, DELETE, SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+  }
+
+  @Test
+  public void testRangePositionDeltaDeleteUnpartitionedTable() {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateProperties()
+        .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
+        .commit();
+
+    Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING);
+
+    checkPositionDeltaDistributionAndOrdering(
+        table, DELETE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+  }
+
+  @Test
+  public void testDefaultPositionDeltaDeletePartitionedTable() {
+    sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " +
+        "USING iceberg " +
+        "PARTITIONED BY (date, bucket(8, data))", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    checkPositionDeltaDistributionAndOrdering(
+        table, DELETE, SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+  }
+
+  @Test
+  public void testNonePositionDeltaDeletePartitionedTable() {
+    sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " +
+        "USING iceberg " +
+        "PARTITIONED BY (date, bucket(8, data))", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateProperties()
+        .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
+        .commit();
+
+    checkPositionDeltaDistributionAndOrdering(
+        table, DELETE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+  }
+
+  @Test
+  public void testHashPositionDeltaDeletePartitionedTable() {
+    sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " +
+        "USING iceberg " +
+        "PARTITIONED BY (date, bucket(8, data))", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateProperties()
+        .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
+        .commit();
+
+    checkPositionDeltaDistributionAndOrdering(
+        table, DELETE, SPEC_ID_PARTITION_CLUSTERED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+  }
+
+  @Test
+  public void testRangePositionDeltaDeletePartitionedTable() {
+    sql("CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " +
+        "USING iceberg " +
+        "PARTITIONED BY (date, bucket(8, data))", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateProperties()
+        .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
+        .commit();
+
+    Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING);
+
+    checkPositionDeltaDistributionAndOrdering(
+        table, DELETE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
+  }
+
   private void checkWriteDistributionAndOrdering(Table table, Distribution expectedDistribution,
                                                  SortOrder[] expectedOrdering) {
     SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
@@ -1337,4 +1478,27 @@ public class TestSparkDistributionAndOrderingUtil extends SparkTestBaseWithCatal
         throw new IllegalArgumentException("Unexpected command: " + command);
     }
   }
+
+  private void checkPositionDeltaDistributionAndOrdering(Table table, Command command,
+                                                         Distribution expectedDistribution,
+                                                         SortOrder[] expectedOrdering) {
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());
+
+    DistributionMode mode = positionDeltaDistributionMode(command, writeConf);
+
+    Distribution distribution = SparkDistributionAndOrderingUtil.buildPositionDeltaDistribution(table, command, mode);
+    Assert.assertEquals("Distribution must match", expectedDistribution, distribution);
+
+    SortOrder[] ordering = SparkDistributionAndOrderingUtil.buildPositionDeltaOrdering(table, command, distribution);
+    Assert.assertArrayEquals("Ordering must match", expectedOrdering, ordering);
+  }
+
+  private DistributionMode positionDeltaDistributionMode(Command command, SparkWriteConf writeConf) {
+    switch (command) {
+      case DELETE:
+        return writeConf.positionDeleteDistributionMode();
+      default:
+        throw new IllegalArgumentException("Unexpected command: " + command);
+    }
+  }
 }