You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/06/24 00:25:13 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

aokolnychyi opened a new pull request, #7897:
URL: https://github.com/apache/iceberg/pull/7897

   This PR adds a new compaction option called `shuffle-partitions-per-file` for shuffle-based file rewriters.
   
   By default, our shuffling file rewriters assume each shuffle partition would become a separate output file. Attempting to generate large output files of 512 MB and more may strain the memory resources of the cluster as such rewrites would require lots of Spark memory. This parameter can be used to further divide up the data which will end up in a single file. For example, if the target file size is 2 GB, but the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will use a custom coalesce operation to stitch these sorted partitions back together into a single sorted file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1243003789


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such
+   * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+   * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+   * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+   * use a custom coalesce operation to stitch these sorted partitions back together into a single
+   * sorted file.
+   *
+   * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+   */
+  public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";

Review Comment:
   To be honest, I have never seen issues with this approach in any of our prod jobs in the last few years. Not applying this split if the size of the job is less than 128MB could be a valid step but it would require quite a bit of changes to pass more info around. I'd probably skip it for now until we experience any issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242727767


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -225,6 +249,43 @@ public void testRewriteDataFilesWithZOrder() {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
   }
 
+  @Test

Review Comment:
   This is nice, but can we also add a test that assert the sort order is preserved within partition?  (ex, small partition, and just assert that that the file is in order)



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such

Review Comment:
   and more => or higher



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such
+   * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+   * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+   * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+   * use a custom coalesce operation to stitch these sorted partitions back together into a single
+   * sorted file.
+   *
+   * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+   */
+  public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";

Review Comment:
   Not to block this change, but did we consider having shuffle-threshold?  Ie, if we have some partition with 2G but others that are way less than 512MB, no need to shuffle the ones that are less?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1240903803


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**

Review Comment:
   I tested the current implementation on a table with 1 TB of data and a cluster with 16 GB executors 7 cores each. The target file size is 1 GB (zstd Parquet data). Sort-based optimizations without this option were spilling and failed, I lost all executors one by one. I tried using 8 shuffle partitions per file and the operation succeeded without any failures and produced properly sized files.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#issuecomment-1607769773

   cc @szehon-ho @flyrain @RussellSpitzer @singhpk234 @amogh-jahagirdar @rdblue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#issuecomment-1608032052

   @singhpk234, I was originally planning to update the doc in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242908781


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such
+   * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+   * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+   * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+   * use a custom coalesce operation to stitch these sorted partitions back together into a single
+   * sorted file.
+   *
+   * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+   */
+  public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";

Review Comment:
   I see, but would there be issues in contending for pods.  Also wouldn't it make more sense to have 128MB as a conf  (shuffle-threshold), otherwise its always a bit dynamic depending on the max partition size?  Not sure if there are other issues with this approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242611891


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -225,6 +249,43 @@ public void testRewriteDataFilesWithZOrder() {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
   }
 
+  @Test
+  public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
+    createTable();
+    insertData(10 /* file count */);
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files("

Review Comment:
   It is a bit tricky in this case as the result plan would be `CallExec`, we don't have an easy way to inspect the triggered plan from the procedure. I did check manually, though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#issuecomment-1608544912

   I just realized we don't provide a comprehensive list of supported options in the docs. I have been meaning to improve our docs for a while, so I'll add this config then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242887570


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such
+   * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+   * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+   * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+   * use a custom coalesce operation to stitch these sorted partitions back together into a single
+   * sorted file.
+   *
+   * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+   */
+  public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";

Review Comment:
   It should be still fine to apply this optimization as there is no extra cost. I achieved best results with 128 MB shuffle blocks so it should be fairly safe to assume the operation would complete fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242881883


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such
+   * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+   * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+   * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+   * use a custom coalesce operation to stitch these sorted partitions back together into a single
+   * sorted file.
+   *
+   * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+   */
+  public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";

Review Comment:
   I was just wondering the use case, where we set shuffle-partitions-per-file to 4, because we want 2GB files but can only shuffle 512mb.  However, consider an Iceberg partition (rewrite group) that has only 512MB files during this rewrite.  Will we still shuffle to four partitions in this case and coalesce at end, unnecessarily?  I may be missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242484098


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -225,6 +249,43 @@ public void testRewriteDataFilesWithZOrder() {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
   }
 
+  @Test
+  public void testRewriteDataFilesWithZOrderAndMultipleShufflePartitionsPerFile() {
+    createTable();
+    insertData(10 /* file count */);
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files("

Review Comment:
   should we also assert that OrderAwareCoaleseExec is inserted by inspecting the plan ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242727767


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -225,6 +249,43 @@ public void testRewriteDataFilesWithZOrder() {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
   }
 
+  @Test

Review Comment:
   This is nice, but did we also add a test that assert the sort order is preserved within partition?  (ex, small partition, and just assert that that the file is in order)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi merged PR #7897:
URL: https://github.com/apache/iceberg/pull/7897


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242867703


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such
+   * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+   * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+   * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+   * use a custom coalesce operation to stitch these sorted partitions back together into a single
+   * sorted file.
+   *
+   * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+   */
+  public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";

Review Comment:
   You mean like switching to a local sort if the size of the data to compact is small?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1244107255


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such
+   * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+   * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+   * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+   * use a custom coalesce operation to stitch these sorted partitions back together into a single
+   * sorted file.
+   *
+   * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+   */
+  public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";

Review Comment:
   Sure, we can do it later then if there's a need



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1240903896


##########
spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/OrderAwareCoalesceExec.scala:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.PartitionCoalescer
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+
+case class OrderAwareCoalesceExec(

Review Comment:
   Inspired by `CoalesceExec` in Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242867386


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -225,6 +249,43 @@ public void testRewriteDataFilesWithZOrder() {
     assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
   }
 
+  @Test

Review Comment:
   There is a check below for the order of records. I just added a similar one for the regular sort, so we verify the order of records is correct both in regular sorts and in z-ordering.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7897: Spark 3.4: Multiple shuffle partitions per file in compaction

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7897:
URL: https://github.com/apache/iceberg/pull/7897#discussion_r1242881883


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java:
##########
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
 
+  /**
+   * The number of shuffle partitions to use for each output file. By default, this file rewriter
+   * assumes each shuffle partition would become a separate output file. Attempting to generate
+   * large output files of 512 MB and more may strain the memory resources of the cluster as such
+   * rewrites would require lots of Spark memory. This parameter can be used to further divide up
+   * the data which will end up in a single file. For example, if the target file size is 2 GB, but
+   * the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will
+   * use a custom coalesce operation to stitch these sorted partitions back together into a single
+   * sorted file.
+   *
+   * <p>Note using this parameter requires enabling Iceberg Spark session extensions.
+   */
+  public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";

Review Comment:
   I was just wondering the use case, where we set shuffle-partitions-per-file to 4, because we want 4GB files but can only shuffle 512mb.  However, consider an Iceberg partition (rewrite group) that has only 512MB files.  Will we still shuffle to four partitions in this case and coalesce at end, unnecessarily?  I may be missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org