You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/06/27 02:20:18 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #7714: Spark 3.4: Adaptive split size

aokolnychyi opened a new pull request, #7714:
URL: https://github.com/apache/iceberg/pull/7714

   This PR is an alternative to #7688 and what was initially envisioned by #7465.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1207280101


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -58,6 +58,8 @@
     implements SupportsReportPartitioning {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkPartitioningAwareScan.class);
+  private static final long MIN_SPLIT_SIZE = 16 * 1024 * 1024; // 16 MB
+  private static final long MAX_SPLIT_SIZE = 1024 * 1024 * 1024; // 1 GB

Review Comment:
   Keep in mind this is WIP, our max can be aligned with the target file size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1275553402


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    if (readConf().adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext().defaultParallelism();

Review Comment:
   There may be issues if the parallelism is set by a config but I doubt people actually set that. We need to know the number of slots in the cluster and this seems to be the closest. What do you think?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1278031491


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java:
##########
@@ -80,10 +80,18 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
     this.branch = readConf.branch();
   }
 
+  protected JavaSparkContext sparkContext() {

Review Comment:
   I moved the method to `SparkScan` and neither `sparkContext` nor `readConf` are exposed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1220364643


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -246,6 +246,34 @@ public static <T extends ScanTask> List<T> mergeTasks(List<T> tasks) {
     return mergedTasks;
   }
 
+  public static long computeSplitSize(
+      long scanSize, int parallelism, long minSplitSize, long maxSplitSize) {
+
+    Preconditions.checkArgument(
+        minSplitSize < maxSplitSize,
+        "Min split size (%s) must be < max split size (%s)",
+        minSplitSize,
+        maxSplitSize);
+
+    // aim for a split per slot by default
+    int splitCount = parallelism;

Review Comment:
   It is probably to better target 2x splits per slot by default.



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -246,6 +246,34 @@ public static <T extends ScanTask> List<T> mergeTasks(List<T> tasks) {
     return mergedTasks;
   }
 
+  public static long computeSplitSize(
+      long scanSize, int parallelism, long minSplitSize, long maxSplitSize) {
+
+    Preconditions.checkArgument(
+        minSplitSize < maxSplitSize,
+        "Min split size (%s) must be < max split size (%s)",
+        minSplitSize,
+        maxSplitSize);
+
+    // aim for a split per slot by default
+    int splitCount = parallelism;

Review Comment:
   It is probably better target 2x splits per slot by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1275506879


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java:
##########
@@ -80,10 +80,18 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
     this.branch = readConf.branch();
   }
 
+  protected JavaSparkContext sparkContext() {

Review Comment:
   Can we get the parallelism a different way? What about exposing just the parallelism and not actually the conf or context?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1283804917


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    if (readConf().adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext().defaultParallelism();

Review Comment:
   @rdblue, I meant the core count would adjust once the cluster scales up. The initial job may not benefit from this. I wasn't sure whether that is a big deal given that acquiring new executors is generally slow. 
   
   I feel we should use the current core count if dynamic allocation is disabled (which we can check). When dynamic allocation is enabled, we can rely on the number of shuffle partitions or check the dynamic allocation config (e.g. we know the core count per each executor and the max number of executors). It seems the dynamic allocation config would give us a more precise estimate.
   
   Thoughts, @rdblue @ConeyLiu?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1284032956


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    if (readConf().adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext().defaultParallelism();

Review Comment:
   > I feel we should use the current core count if dynamic allocation is disabled (which we can check). 
   
   I agree with this. This should be easy to check and get the parallelism.
   
   > When dynamic allocation is enabled, we can rely on the number of shuffle partitions or check the dynamic allocation config (e.g. we know the core count per each executor and the max number of executors). It seems the dynamic allocation config would give us a more precise estimate.
   
   From my option. I would be more likely to calculate the parallelism from the max number of executors. Because the number of shuffle partitions seems more like a parameter for the shuffle stage or reduce stage parallelism.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi closed pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi closed pull request #7714: Spark 3.4: Adaptive split size
URL: https://github.com/apache/iceberg/pull/7714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1220193914


##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -207,6 +207,15 @@ private TableProperties() {}
   public static final String SPLIT_SIZE = "read.split.target-size";
   public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
 
+  public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = "read.split.adaptive-size.enabled";
+  public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = false;

Review Comment:
   I think it would be best to disable this initially cause it is such a sensitive place to change. If performs well, we may enable by default in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1275552332


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    if (readConf().adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext().defaultParallelism();

Review Comment:
   The default parallelism is populated via `TaskScheduler` from `SchedulerBackend`:
   
   ```
     override def defaultParallelism(): Int = {
       conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
     }
   ```
   
   The core count is being updated each time an executor is added/dropped so dynamic execution should work.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    if (readConf().adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext().defaultParallelism();

Review Comment:
   The default parallelism is populated via `TaskScheduler` from `SchedulerBackend`:
   
   ```
     override def defaultParallelism(): Int = {
       conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
     }
   ```
   
   The core count is being updated each time an executor is added/dropped so dynamic allocation should work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1275507298


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {

Review Comment:
   Don't we need this in other places as well? Or does everything go through `SparkPartiitoningAwareScan` now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1207284964


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +234,13 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {

Review Comment:
   We would need a property to enable/disable this functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1278210586


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    if (readConf().adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext().defaultParallelism();

Review Comment:
   @aokolnychyi We will use the `spark.dynamicAllocation.initialExecutors` * `spark.executor.cores` as the parallelism if dynamic resource allocation is enabled for a newly submitted application. The initial executors maybe is a small number (such as 2) when the application startup. Should this be a problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1207281264


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +234,13 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+    int parallelism = sparkContext().defaultParallelism();

Review Comment:
   This gives the number of cores in the cluster or `spark.default.parallelism` if set explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1220194694


##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -207,6 +207,15 @@ private TableProperties() {}
   public static final String SPLIT_SIZE = "read.split.target-size";
   public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
 
+  public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = "read.split.adaptive-size.enabled";
+  public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = false;
+
+  public static final String ADAPTIVE_SPLIT_MIN_SIZE_BYTES = "read.split.min-adaptive-size-bytes";

Review Comment:
   Another way to name these properties like this:
   
   ```
   read.split.adaptive-size.enabled
   read.split.adaptive-size.min-value-bytes
   read.split.adaptive-size.max-value-bytes
   ```



##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -207,6 +207,15 @@ private TableProperties() {}
   public static final String SPLIT_SIZE = "read.split.target-size";
   public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
 
+  public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = "read.split.adaptive-size.enabled";
+  public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = false;
+
+  public static final String ADAPTIVE_SPLIT_MIN_SIZE_BYTES = "read.split.min-adaptive-size-bytes";

Review Comment:
   Another way to name these properties:
   
   ```
   read.split.adaptive-size.enabled
   read.split.adaptive-size.min-value-bytes
   read.split.adaptive-size.max-value-bytes
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1207281264


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +234,13 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+    int parallelism = sparkContext().defaultParallelism();

Review Comment:
   This gives the number of cores in the cluster or `spark.default.parallelism` if set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1278034800


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {

Review Comment:
   The only place we miss is `SparkChangelogScan` as it plans tasks directly. We can update it later to plan files first or it will come automatically once we support adaptive split size in core.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi merged PR #7714:
URL: https://github.com/apache/iceberg/pull/7714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1278598322


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    if (readConf().adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext().defaultParallelism();

Review Comment:
   > The core count is being updated each time an executor is added/dropped so dynamic allocation should work.
   
   I don't think it would because the job may be planned before the initial stage is submitted and the cluster scales up. I think shuffle parallelism is the most reliable way to know how big to go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1278032227


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    if (readConf().adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext().defaultParallelism();

Review Comment:
   I took another look and believe the current logic would perform better than the number of shuffle partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1278034800


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {

Review Comment:
   The only place we miss is `SparkChangelogScan` as it plans tasks directly. We can update it later to plan files first or it will automatically inherit this functionality once we support adaptive split size in core.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1207284298


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +234,13 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();

Review Comment:
   This would even handle runtime filtering, so that the number of splits after runtime filtering may be different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1220193914


##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -207,6 +207,15 @@ private TableProperties() {}
   public static final String SPLIT_SIZE = "read.split.target-size";
   public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
 
+  public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = "read.split.adaptive-size.enabled";
+  public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = false;

Review Comment:
   I think it would be best to disable this initially cause it is such a sensitive place to change. If performs well, we may enable in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#issuecomment-1603503322

   I gave it a bit of testing on a cluster. In some cases, I actually experienced quite some degradation when the split size was adjusted to a higher value. The shuffle write time increased quite dramatically when I was processing entire records. I think it is related to the fact that Spark needs to sort the records based on reducer ID during the map phase of a shuffle if the hash shuffle manager is not used (> 200 reducers). There were cases when it helped but it seems too risky to do by default.
   
   I will rework this approach to only pick a smaller split size to utilize all cluster slots.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#issuecomment-1607846886

   @rdblue, I've updated the approach after testing it on the cluster. Could you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1207280101


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -58,6 +58,8 @@
     implements SupportsReportPartitioning {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkPartitioningAwareScan.class);
+  private static final long MIN_SPLIT_SIZE = 16 * 1024 * 1024; // 16 MB
+  private static final long MAX_SPLIT_SIZE = 1024 * 1024 * 1024; // 1 GB

Review Comment:
   Keep in mind this is WIP, our max can be aligned with the target file size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1220273424


##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -207,6 +207,15 @@ private TableProperties() {}
   public static final String SPLIT_SIZE = "read.split.target-size";
   public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
 
+  public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = "read.split.adaptive-size.enabled";
+  public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = false;
+
+  public static final String ADAPTIVE_SPLIT_MIN_SIZE_BYTES = "read.split.min-adaptive-size-bytes";

Review Comment:
   I think I prefer the alternative names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1220656743


##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -207,6 +207,15 @@ private TableProperties() {}
   public static final String SPLIT_SIZE = "read.split.target-size";
   public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
 
+  public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = "read.split.adaptive-size.enabled";
+  public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = false;
+
+  public static final String ADAPTIVE_SPLIT_MIN_SIZE_BYTES = "read.split.min-adaptive-size-bytes";

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1207284964


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +234,13 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {

Review Comment:
   We would need a property to enable/disable this functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1275506085


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {
+    if (readConf().adaptiveSplitSizeEnabled()) {
+      long scanSize = tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
+      int parallelism = sparkContext().defaultParallelism();

Review Comment:
   Why use the default parallelism instead of the shuffle partitions setting? Is this set correctly by default when using dynamic allocation? I've always used the shuffle partitions because that's more likely to be tuned correctly for a job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#discussion_r1275553654


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java:
##########
@@ -232,6 +232,16 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
     return taskGroups;
   }
 
+  private long targetSplitSize() {

Review Comment:
   I'll check but I think so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] puchengy commented on pull request #7714: Spark 3.4: Adaptive split size

Posted by "puchengy (via GitHub)" <gi...@apache.org>.
puchengy commented on PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#issuecomment-1656420773

   @aokolnychyi is there a plan to port this to spark 3.2 ? thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7714: Spark 3.4: Adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7714:
URL: https://github.com/apache/iceberg/pull/7714#issuecomment-1664743180

   @puchengy, we can, we have to discuss the best way to determine the parallelism, though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org