You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/04 21:25:18 UTC
[iceberg] branch master updated: Spark 3.4: Handle skew in writes (#7520)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7fa5fca9b1 Spark 3.4: Handle skew in writes (#7520)
7fa5fca9b1 is described below
commit 7fa5fca9b1121950f03dce6d8b985b283dc256b6
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu May 4 14:25:12 2023 -0700
Spark 3.4: Handle skew in writes (#7520)
---
.../iceberg/spark/extensions/TestDelete.java | 67 +++++++++++++++++++
.../apache/iceberg/spark/extensions/TestMerge.java | 78 ++++++++++++++++++++++
.../iceberg/spark/extensions/TestUpdate.java | 69 +++++++++++++++++++
.../spark/source/SparkPositionDeltaWrite.java | 5 ++
.../apache/iceberg/spark/source/SparkWrite.java | 5 ++
5 files changed, 224 insertions(+)
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 6fa8d7f965..336d40cca0 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -19,10 +19,12 @@
package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;
@@ -43,9 +45,11 @@ import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
@@ -68,6 +72,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTableWithFilters;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.RowLevelWrite;
+import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
@@ -103,6 +108,68 @@ public abstract class TestDelete extends SparkRowLevelOperationsTestBase {
sql("DROP TABLE IF EXISTS parquet_table");
}
+ @Test
+ public void testSkewDelete() throws Exception {
+ createAndInitPartitionedTable();
+
+ Employee[] employees = new Employee[100];
+ for (int index = 0; index < 100; index++) {
+ employees[index] = new Employee(index, "hr");
+ }
+ append(tableName, employees);
+ append(tableName, employees);
+ append(tableName, employees);
+ append(tableName, employees);
+
+ // set the open file cost large enough to produce a separate scan task per file
+ // use hash distribution to trigger a shuffle
+ Map<String, String> tableProps =
+ ImmutableMap.of(
+ SPLIT_OPEN_FILE_COST,
+ String.valueOf(Integer.MAX_VALUE),
+ DELETE_DISTRIBUTION_MODE,
+ DistributionMode.HASH.modeName());
+ sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+ createBranchIfNeeded();
+
+ // enable AQE and set the advisory partition size small enough to trigger a split
+ // set the number of shuffle partitions to 2 to only have 2 reducers
+ withSQLConf(
+ ImmutableMap.of(
+ SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+ SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+ () -> {
+ SparkPlan plan =
+ executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
+ Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+ });
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+
+ if (mode(table) == COPY_ON_WRITE) {
+ // CoW DELETE requests the remaining records to be clustered by `_file`
+ // each task contains only 1 file and therefore writes only 1 shuffle block
+ // that means 4 shuffle blocks are distributed among 2 reducers
+ // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+ // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+ validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
+ } else {
+ // MoR DELETE requests the deleted records to be clustered by `_spec_id` and `_partition`
+ // all tasks belong to the same partition and therefore write only 1 shuffle block per task
+ // that means there are 4 shuffle blocks, all assigned to the same reducer
+ // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
+ // otherwise, there would be 1 task processing 4 shuffle blocks
+ validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
+ }
+
+ Assert.assertEquals(
+ "Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget()));
+ }
+
@Test
public void testDeleteWithoutScanningTable() throws Exception {
createAndInitPartitionedTable();
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 4ec78ec385..18d42ca6ae 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -19,10 +19,12 @@
package org.apache.iceberg.spark.extensions;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.MERGE_MODE;
import static org.apache.iceberg.TableProperties.MERGE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import static org.apache.spark.sql.functions.lit;
@@ -95,6 +97,82 @@ public abstract class TestMerge extends SparkRowLevelOperationsTestBase {
sql("DROP TABLE IF EXISTS source");
}
+ @Test
+ public void testSkewMerge() {
+ createAndInitTable("id INT, salary INT, dep STRING");
+ sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+ String[] records = new String[100];
+ for (int index = 0; index < 100; index++) {
+ records[index] = String.format("{ \"id\": %d, \"salary\": 100, \"dep\": \"hr\" }", index);
+ }
+ append(tableName, records);
+ append(tableName, records);
+ append(tableName, records);
+ append(tableName, records);
+
+ // set the open file cost large enough to produce a separate scan task per file
+ // use hash distribution to trigger a shuffle
+ Map<String, String> tableProps =
+ ImmutableMap.of(
+ SPLIT_OPEN_FILE_COST,
+ String.valueOf(Integer.MAX_VALUE),
+ MERGE_DISTRIBUTION_MODE,
+ DistributionMode.HASH.modeName());
+ sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+ createBranchIfNeeded();
+
+ spark.range(0, 100).createOrReplaceTempView("source");
+
+ // enable AQE and set the advisory partition size small enough to trigger a split
+ // set the number of shuffle partitions to 2 to only have 2 reducers
+ // set the min coalesce partition size small enough to avoid coalescing
+ withSQLConf(
+ ImmutableMap.of(
+ SQLConf.SHUFFLE_PARTITIONS().key(), "4",
+ SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), "100",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+ SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+ () -> {
+ SparkPlan plan =
+ executeAndKeepPlan(
+ "MERGE INTO %s t USING source "
+ + "ON t.id = source.id "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET salary = -1 ",
+ commitTarget());
+ Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+ });
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+
+ if (mode(table) == COPY_ON_WRITE) {
+ // CoW MERGE would perform a join on `id` and then cluster records by `dep`
+ // the first shuffle distributes records into 4 shuffle partitions so that rows can be merged
+ // after existing and new rows are merged, the data is clustered by `dep`
+ // each task with merged data contains records for the same table partition
+ // that means there are 4 shuffle blocks, all assigned to the same reducer
+ // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+ // otherwise, there would be 1 task processing all 4 shuffle blocks
+ validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
+ } else {
+ // MoR MERGE would perform a join on `id` and then cluster data based on the partition
+ // all tasks belong to the same partition and therefore write only 1 shuffle block per task
+ // that means there are 4 shuffle blocks, all assigned to the same reducer
+ // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
+ // otherwise, there would be 1 task processing 4 shuffle blocks
+ validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
+ }
+
+ Assert.assertEquals(
+ "Row count must match",
+ 400L,
+ scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget()));
+ }
+
@Test
public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() {
createAndInitTable(
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 776fbb9600..ccfd83c733 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -24,7 +24,9 @@ import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP;
import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP;
import static org.apache.iceberg.SnapshotSummary.DELETED_FILES_PROP;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
@@ -44,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
@@ -64,6 +67,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.After;
@@ -98,6 +102,71 @@ public abstract class TestUpdate extends SparkRowLevelOperationsTestBase {
sql("DROP TABLE IF EXISTS deleted_employee");
}
+ @Test
+ public void testSkewUpdate() {
+ createAndInitTable("id INT, dep STRING");
+ sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+ String[] records = new String[100];
+ for (int index = 0; index < 100; index++) {
+ records[index] = String.format("{ \"id\": %d, \"dep\": \"hr\" }", index);
+ }
+ append(tableName, records);
+ append(tableName, records);
+ append(tableName, records);
+ append(tableName, records);
+
+ // set the open file cost large enough to produce a separate scan task per file
+ // use hash distribution to trigger a shuffle
+ Map<String, String> tableProps =
+ ImmutableMap.of(
+ SPLIT_OPEN_FILE_COST,
+ String.valueOf(Integer.MAX_VALUE),
+ UPDATE_DISTRIBUTION_MODE,
+ DistributionMode.HASH.modeName());
+ sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+ createBranchIfNeeded();
+
+ // enable AQE and set the advisory partition size small enough to trigger a split
+ // set the number of shuffle partitions to 2 to only have 2 reducers
+ withSQLConf(
+ ImmutableMap.of(
+ SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+ SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+ () -> {
+ SparkPlan plan =
+ executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget());
+ Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+ });
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+
+ if (mode(table) == COPY_ON_WRITE) {
+ // CoW UPDATE requests the updated records to be clustered by `_file`
+ // each task contains only 1 file and therefore writes only 1 shuffle block
+ // that means 4 shuffle blocks are distributed among 2 reducers
+ // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+ // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+ validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
+ } else {
+ // MoR UPDATE requests the deleted records to be clustered by `_spec_id` and `_partition`
+ // all tasks belong to the same partition and therefore write only 1 shuffle block per task
+ // that means there are 4 shuffle blocks, all assigned to the same reducer
+ // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
+ // otherwise, there would be 1 task processing 4 shuffle blocks
+ validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
+ }
+
+ Assert.assertEquals(
+ "Row count must match",
+ 200L,
+ scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget()));
+ }
+
@Test
public void testExplain() {
createAndInitTable("id INT, dep STRING");
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 74d46339ee..416eb6a9ee 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -135,6 +135,11 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
return requiredDistribution;
}
+ @Override
+ public boolean distributionStrictlyRequired() {
+ return false;
+ }
+
@Override
public SortOrder[] requiredOrdering() {
return requiredOrdering;
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index a080fcead1..9e3a15e738 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -137,6 +137,11 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
return requiredDistribution;
}
+ @Override
+ public boolean distributionStrictlyRequired() {
+ return false;
+ }
+
@Override
public SortOrder[] requiredOrdering() {
return requiredOrdering;