You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/05/04 00:52:26 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #7520: Spark 3.4: Handle skew in writes

aokolnychyi opened a new pull request, #7520:
URL: https://github.com/apache/iceberg/pull/7520

   This PR enables AQE to handle skew in writes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1184436358


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {

Review Comment:
   Tests for CoW row-level operations already cover `SparkWrite`, which is used in normal writes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#issuecomment-1533924785

   cc @singhpk234 @amogh-jahagirdar @jackye1995 @flyrain @RussellSpitzer @szehon-ho 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#issuecomment-1535434492

   Thanks for reviewing, @singhpk234 @RussellSpitzer!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#issuecomment-1535020515

   @RussellSpitzer, correct. This API does not exist in 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1184436358


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {

Review Comment:
   Tests for CoW row-level operations already cover `SparkWrite`, which is used in normal writes. There is not much logic on Iceberg side, the rest is covered by Spark tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi merged PR #7520:
URL: https://github.com/apache/iceberg/pull/7520


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185310489


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -137,6 +137,11 @@ public Distribution requiredDistribution() {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;

Review Comment:
   Let's stick to that then, I agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185192213


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    Map<String, String> summary = currentSnapshot.summary();
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_FILES_PROP));
+      Assert.assertEquals("Must produce 4 files", 4, addedFiles);
+    } else {
+      // MoR DELETE requests the deleted records to be clustered by `_spec_id` and `_partition`
+      // all tasks belong to the same partition and therefore write only 1 shuffle block per task
+      // that means there are 4 shuffle blocks, all assigned to the same reducer
+      // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
+      // otherwise, there would be 1 task processing 4 shuffle blocks
+      int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_DELETE_FILES_PROP));

Review Comment:
   [minor] can use PropertyUtil here



##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    Map<String, String> summary = currentSnapshot.summary();
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each

Review Comment:
   [doubt] should we also add a UT where coalese is happening ? 



##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    Map<String, String> summary = currentSnapshot.summary();
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_FILES_PROP));
+      Assert.assertEquals("Must produce 4 files", 4, addedFiles);

Review Comment:
   [minor] can this be moved to a private func for ex: assertAddedFiles to use in both MOR / COW ? 



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -135,6 +135,11 @@ public Distribution requiredDistribution() {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;

Review Comment:
   should we also check `ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED` is true as well before disabling this requirement ? otherwise it will be a no-op for `OptimizeSkewInRebalancePartitions`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185279632


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -135,6 +135,11 @@ public Distribution requiredDistribution() {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;

Review Comment:
   Is there ever a good reason to return `true` from this method? We don't require distributions to be strict and it is up to Spark to either handle the skew or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185338996


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -135,6 +135,11 @@ public Distribution requiredDistribution() {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;

Review Comment:
   I feel it is better to always return false and leave it up to Spark. It seems the safest way as Spark may add new configs or logic on when to do that in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185282407


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    Map<String, String> summary = currentSnapshot.summary();
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_FILES_PROP));
+      Assert.assertEquals("Must produce 4 files", 4, addedFiles);

Review Comment:
   Let me add something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #7520: Spark 3.4: Handle skew in writes

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#issuecomment-1535005605

   This is just for 3.4 because of the new rebalance code for writes right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185288269


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -137,6 +137,11 @@ public Distribution requiredDistribution() {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;

Review Comment:
   I was going to say I don't really mind our Compaction solution atm. I think disabling AQE is our best bet there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185227335


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -137,6 +137,11 @@ public Distribution requiredDistribution() {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;

Review Comment:
   I may actually need to move it to `SparkWriteBuilder` as `SparkWrite` is used for compaction. We explicitly disable table distribution/ordering and AQE in shuffling rewriters but not in bin-pack when the output spec mismatches.
   
   Thoughts, @RussellSpitzer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185338996


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -135,6 +135,11 @@ public Distribution requiredDistribution() {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;

Review Comment:
   I feel it is better to always return false and leave it us up to Spark. It seems the safest way as Spark may add new configs or logic on when to do that in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185337025


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -135,6 +135,11 @@ public Distribution requiredDistribution() {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;

Review Comment:
   Agree, I was mostly comming from, the point that we are overriding this and setting it to false, in a hope that spark **will** optimize the skew whereas if the above conf is disabled spark will never do the same. I am fine with keeping it as it is. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185281848


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    Map<String, String> summary = currentSnapshot.summary();
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_FILES_PROP));
+      Assert.assertEquals("Must produce 4 files", 4, addedFiles);
+    } else {
+      // MoR DELETE requests the deleted records to be clustered by `_spec_id` and `_partition`
+      // all tasks belong to the same partition and therefore write only 1 shuffle block per task
+      // that means there are 4 shuffle blocks, all assigned to the same reducer
+      // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks
+      // otherwise, there would be 1 task processing 4 shuffle blocks
+      int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_DELETE_FILES_PROP));

Review Comment:
   I did not use `PropertyUtil` as it requires a default value and won't fit on one line. I can switch, though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185276409


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    Map<String, String> summary = currentSnapshot.summary();
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each

Review Comment:
   I was planning to do so in a separate PR. This change focuses on skew.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7520: Spark 3.4: Handle skew in writes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185353693


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
+          Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    Map<String, String> summary = currentSnapshot.summary();
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_FILES_PROP));
+      Assert.assertEquals("Must produce 4 files", 4, addedFiles);

Review Comment:
   There was existing `validateProperty`, which I forgot about. I switched to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org