You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/04 21:25:18 UTC

[iceberg] branch master updated: Spark 3.4: Handle skew in writes (#7520)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fa5fca9b1 Spark 3.4: Handle skew in writes (#7520)
7fa5fca9b1 is described below

commit 7fa5fca9b1121950f03dce6d8b985b283dc256b6
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu May 4 14:25:12 2023 -0700

    Spark 3.4: Handle skew in writes (#7520)
---
 .../iceberg/spark/extensions/TestDelete.java       | 67 +++++++++++++++++++
 .../apache/iceberg/spark/extensions/TestMerge.java | 78 ++++++++++++++++++++++
 .../iceberg/spark/extensions/TestUpdate.java       | 69 +++++++++++++++++++
 .../spark/source/SparkPositionDeltaWrite.java      |  5 ++
 .../apache/iceberg/spark/source/SparkWrite.java    |  5 ++
 5 files changed, 224 insertions(+)

diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 6fa8d7f965..336d40cca0 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -19,10 +19,12 @@
 package org.apache.iceberg.spark.extensions;
 
 import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
 import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
 import static org.apache.iceberg.TableProperties.DELETE_MODE;
 import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
 import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
 import static org.apache.spark.sql.functions.lit;
 
@@ -43,9 +45,11 @@ import java.util.stream.Collectors;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericRecord;
@@ -68,6 +72,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTableWithFilters;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
 import org.apache.spark.sql.catalyst.plans.logical.RowLevelWrite;
+import org.apache.spark.sql.execution.SparkPlan;
 import org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable;
 import org.apache.spark.sql.internal.SQLConf;
 import org.assertj.core.api.Assertions;
@@ -103,6 +108,68 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
+    } else {
+      // MoR DELETE requests the deleted records to be clustered by `_spec_id` and `_partition`
+      // all tasks belong to the same partition and therefore write only 1 shuffle block per task
+      // that means there are 4 shuffle blocks, all assigned to the same reducer
+      // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
+      // otherwise, there would be 1 task processing 4 shuffle blocks
+      validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
+    }
+
+    Assert.assertEquals(
+        "Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget()));
+  }
+
   @Test
   public void testDeleteWithoutScanningTable() throws Exception {
     createAndInitPartitionedTable();
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 4ec78ec385..18d42ca6ae 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -19,10 +19,12 @@
 package org.apache.iceberg.spark.extensions;
 
 import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE;
 import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
 import static org.apache.iceberg.TableProperties.MERGE_MODE;
 import static org.apache.iceberg.TableProperties.MERGE_MODE_DEFAULT;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
 import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
 import static org.apache.spark.sql.functions.lit;
@@ -95,6 +97,82 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
     sql("DROP TABLE IF EXISTS source");
   }
 
+  @Test
+  public void testSkewMerge() {
+    createAndInitTable("id INT, salary INT, dep STRING");
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    String[] records = new String[100];
+    for (int index = 0; index < 100; index++) {
+      records[index] = String.format("{ \"id\": %d, \"salary\": 100, \"dep\": \"hr\" }", index);
+    }
+    append(tableName, records);
+    append(tableName, records);
+    append(tableName, records);
+    append(tableName, records);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            MERGE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    spark.range(0, 100).createOrReplaceTempView("source");
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    // set the min coalesce partition size small enough to avoid coalescing
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "4",
+            SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), "100",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan(
+                  "MERGE INTO %s t USING source "
+                      + "ON t.id = source.id "
+                      + "WHEN MATCHED THEN "
+                      + "  UPDATE SET salary = -1 ",
+                  commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW MERGE would perform a join on `id` and then cluster records by `dep`
+      // the first shuffle distributes records into 4 shuffle partitions so that rows can be merged
+      // after existing and new rows are merged, the data is clustered by `dep`
+      // each task with merged data contains records for the same table partition
+      // that means there are 4 shuffle blocks, all assigned to the same reducer
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 1 task processing all 4 shuffle blocks
+      validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
+    } else {
+      // MoR MERGE would perform a join on `id` and then cluster data based on the partition
+      // all tasks belong to the same partition and therefore write only 1 shuffle block per task
+      // that means there are 4 shuffle blocks, all assigned to the same reducer
+      // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
+      // otherwise, there would be 1 task processing 4 shuffle blocks
+      validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
+    }
+
+    Assert.assertEquals(
+        "Row count must match",
+        400L,
+        scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget()));
+  }
+
   @Test
   public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() {
     createAndInitTable(
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 776fbb9600..ccfd83c733 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -24,7 +24,9 @@ import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP;
 import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP;
 import static org.apache.iceberg.SnapshotSummary.DELETED_FILES_PROP;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
 import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE;
 import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL;
 import static org.apache.iceberg.TableProperties.UPDATE_MODE;
 import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
@@ -44,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
@@ -64,6 +67,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.SparkPlan;
 import org.apache.spark.sql.internal.SQLConf;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
@@ -98,6 +102,71 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
     sql("DROP TABLE IF EXISTS deleted_employee");
   }
 
+  @Test
+  public void testSkewUpdate() {
+    createAndInitTable("id INT, dep STRING");
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    String[] records = new String[100];
+    for (int index = 0; index < 100; index++) {
+      records[index] = String.format("{ \"id\": %d, \"dep\": \"hr\" }", index);
+    }
+    append(tableName, records);
+    append(tableName, records);
+    append(tableName, records);
+    append(tableName, records);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            UPDATE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW UPDATE requests the updated records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
+    } else {
+      // MoR UPDATE requests the deleted records to be clustered by `_spec_id` and `_partition`
+      // all tasks belong to the same partition and therefore write only 1 shuffle block per task
+      // that means there are 4 shuffle blocks, all assigned to the same reducer
+      // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
+      // otherwise, there would be 1 task processing 4 shuffle blocks
+      validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
+    }
+
+    Assert.assertEquals(
+        "Row count must match",
+        200L,
+        scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget()));
+  }
+
   @Test
   public void testExplain() {
     createAndInitTable("id INT, dep STRING");
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 74d46339ee..416eb6a9ee 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -135,6 +135,11 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;
+  }
+
   @Override
   public SortOrder[] requiredOrdering() {
     return requiredOrdering;
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index a080fcead1..9e3a15e738 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -137,6 +137,11 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;
+  }
+
   @Override
   public SortOrder[] requiredOrdering() {
     return requiredOrdering;