You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "danielcweeks (via GitHub)" <gi...@apache.org> on 2023/05/23 03:20:55 UTC

[GitHub] [iceberg] danielcweeks opened a new pull request, #7688: Add adaptive split planning

danielcweeks opened a new pull request, #7688:
URL: https://github.com/apache/iceberg/pull/7688

   This PR adds adaptive split planning to help address issues around small tables being collapsed into a single task due to split combining.  This is achieved by reducing the split size to try to achieve a minimum parallelism based on coarse grain table-level stats.
   
   There are a number of cases this approach cannot account for including the difference between the filtered files/size vs the full table size.  The most benefit is for unpartitioned tables that either have multiple small files or can be split into smaller chunks to achieve higher parallelism.
   
   Alternatives/additions to this approach would be to modify the bin packing algorithm to distribute across the minimum number of bins based on a reduced split size and only combine once the minimum parallelism can be achieved.  This is a more complicated approach and still relies on knowing the right size to split on.  
   
   Additionally, we don't know at the time of calculating the split size whether we have offsets, so we cannot simply reduce the split size to zero (or near zero) because fixed split planning will then create too many splits. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #7688: Add adaptive split size

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#issuecomment-1568940063

   I think this is mentioned above, but it does feel like we are targeting this at the wrong place. If we have a min parallelism I think the controls should probably be centered around task coalescing. Currently for files with offsets we always break them into the maximal amount of offset tasks before recombining. The only real issue is for files without offsets correct? That's the only reason we may want to control the split size since they are cut up based on that property rather than actual offsets?
   
   I wonder if it might be clearer to just have a "Offset" codepath that just works during recombination and a special codepath for non-offset filetypes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7688: Add adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1205995743


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =

Review Comment:
   I think it is probably a good idea to have a context option for this so we can easily pass Spark's parallelism or the Flink operator's parallelism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7688: Add adaptive split size

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1210706269


##########
core/src/test/java/org/apache/iceberg/TestSplitPlanning.java:
##########
@@ -218,6 +237,17 @@ public void testSplitPlanningWithOffsets() {
         "We should get one task per row group", 32, Iterables.size(scan.planTasks()));
   }
 
+  @Test
+  public void testAdaptiveSplitPlanningWithOffests() {
+    table.updateProperties().set(TableProperties.ADAPTIVE_SPLIT_PLANNING, "true").commit();

Review Comment:
   This pathway needs a more test imho, we mostly deal with files with offsets so we should make sure we aren't doing anything weird in those cases. We also have a adaptive function above which has a lot of early exits and behaviors so we should make sure all of those exits are tested.



##########
core/src/test/java/org/apache/iceberg/TestSplitPlanning.java:
##########
@@ -218,6 +237,17 @@ public void testSplitPlanningWithOffsets() {
         "We should get one task per row group", 32, Iterables.size(scan.planTasks()));
   }
 
+  @Test
+  public void testAdaptiveSplitPlanningWithOffests() {
+    table.updateProperties().set(TableProperties.ADAPTIVE_SPLIT_PLANNING, "true").commit();

Review Comment:
   This pathway needs a more test imho, we mostly deal with files with offsets so we should make sure we aren't doing anything weird in those cases. We also have an adaptive function above which has a lot of early exits and behaviors so we should make sure all of those exits are tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206097886


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+
+    if (totalFiles <= 0 || totalSize <= 0) {
+      return Optional.empty();
+    }
+
+    if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {

Review Comment:
   That might work and I think we can always enhance once more stats are available.  There's a lot of complexity that gets introduced in terms of filters, projection, partitions, etc.  But I think there's a lot of opportunity to improve there as well.  For example we also have column types and record counts which could lead to even smaller split sizes knowing that little of the data will be read per task. 
   
   However, it might also be more beneficial to look at enhancing bin packing because it has more insight into when/how these splits get combined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7688: Add adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1208658121


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -187,7 +193,10 @@ public long targetSplitSize() {
     long tableValue =
         PropertyUtil.propertyAsLong(
             table().properties(), TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
-    return PropertyUtil.propertyAsLong(context.options(), TableProperties.SPLIT_SIZE, tableValue);
+
+    long splitSize = adaptiveSplitSize(tableValue).orElse(tableValue);

Review Comment:
   I don't quite understand. I thought most tables didn't have a split size default explicitly set and would use the Iceberg default, `TableProperties.SPLIT_SIZE_DEFAULT`. Is that not the case?
   
   What I meant was that if the split size is explicitly set (not common, I think) then we should always use it because it is static.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7688: Add adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206085976


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+
+    if (totalFiles <= 0 || totalSize <= 0) {
+      return Optional.empty();
+    }
+
+    if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {
+      // If it is possible that splits could normally be calculated to meet the
+      // minimum parallelism, do not adjust the split size
+      return Optional.empty();
+    }
+
+    FileFormat fileFormat =
+        FileFormat.fromString(
+            table
+                .properties()
+                .getOrDefault(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+
+    if (!fileFormat.isSplittable()) {
+      return Optional.of(totalSize / totalFiles);

Review Comment:
   IIUC, after the check above, we have a case where individual files are not enough to get the parallelism we want. So this is trying to prevent any combining, right?
   
   Why not do that directly with a "do not combine" flag?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206096199


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(

Review Comment:
   We can move that in, it was just already being defined outside of this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206094835


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+
+    if (totalFiles <= 0 || totalSize <= 0) {
+      return Optional.empty();
+    }
+
+    if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {
+      // If it is possible that splits could normally be calculated to meet the
+      // minimum parallelism, do not adjust the split size
+      return Optional.empty();
+    }
+
+    FileFormat fileFormat =
+        FileFormat.fromString(
+            table
+                .properties()
+                .getOrDefault(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+
+    if (!fileFormat.isSplittable()) {
+      return Optional.of(totalSize / totalFiles);
+    }
+
+    long rowGroupSize;
+
+    switch (fileFormat) {
+      case PARQUET:
+        rowGroupSize =
+            PropertyUtil.propertyAsLong(
+                table.properties(),
+                TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+                TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+        break;
+      case ORC:
+        rowGroupSize =
+            PropertyUtil.propertyAsLong(
+                table.properties(),
+                TableProperties.ORC_BLOCK_SIZE_BYTES,
+                TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT);
+        break;
+      case AVRO:
+        rowGroupSize = 16 * 1024 * 1024;
+        break;
+      default:
+        rowGroupSize = tableSplitSize;
+    }
+
+    if (totalFiles <= 1) {
+      // For a table with a single small file, default to the smallest of
+      // the configured table split size or the format block size

Review Comment:
   The issue is if a file is large and has multiple row groups, then we only get a single task.  In most cases these will either be the same value or the data will be small and it won't matter.  The only outlier is when there are actually multiple row groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7688: Add adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206086794


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+
+    if (totalFiles <= 0 || totalSize <= 0) {
+      return Optional.empty();
+    }
+
+    if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {
+      // If it is possible that splits could normally be calculated to meet the
+      // minimum parallelism, do not adjust the split size
+      return Optional.empty();
+    }
+
+    FileFormat fileFormat =
+        FileFormat.fromString(
+            table
+                .properties()
+                .getOrDefault(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+
+    if (!fileFormat.isSplittable()) {
+      return Optional.of(totalSize / totalFiles);
+    }
+
+    long rowGroupSize;
+
+    switch (fileFormat) {
+      case PARQUET:
+        rowGroupSize =
+            PropertyUtil.propertyAsLong(
+                table.properties(),
+                TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+                TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+        break;
+      case ORC:
+        rowGroupSize =
+            PropertyUtil.propertyAsLong(
+                table.properties(),
+                TableProperties.ORC_BLOCK_SIZE_BYTES,
+                TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT);
+        break;
+      case AVRO:
+        rowGroupSize = 16 * 1024 * 1024;
+        break;
+      default:
+        rowGroupSize = tableSplitSize;
+    }
+
+    if (totalFiles <= 1) {
+      // For a table with a single small file, default to the smallest of
+      // the configured table split size or the format block size
+      return Optional.of(Math.min(rowGroupSize, tableSplitSize));
+    }
+
+    long minSplitSize = totalSize / minParallelism;
+    // target split size chosen to provide the most parallelism
+    long targetSplitSize = Math.min(minSplitSize, rowGroupSize);
+
+    return Optional.of(targetSplitSize);

Review Comment:
   Do you also plan to increase the target size if there are too many splits? Or is that not needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7688: Add adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#issuecomment-1564789279

   I'd like to take look on Monday too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7688: Add adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1205995047


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(

Review Comment:
   If this can access table properties, why read `tableSplitSize` outside and pass it in?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7688: Add adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206086496


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+
+    if (totalFiles <= 0 || totalSize <= 0) {
+      return Optional.empty();
+    }
+
+    if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {
+      // If it is possible that splits could normally be calculated to meet the
+      // minimum parallelism, do not adjust the split size
+      return Optional.empty();
+    }
+
+    FileFormat fileFormat =
+        FileFormat.fromString(
+            table
+                .properties()
+                .getOrDefault(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+
+    if (!fileFormat.isSplittable()) {
+      return Optional.of(totalSize / totalFiles);
+    }
+
+    long rowGroupSize;
+
+    switch (fileFormat) {
+      case PARQUET:
+        rowGroupSize =
+            PropertyUtil.propertyAsLong(
+                table.properties(),
+                TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+                TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+        break;
+      case ORC:
+        rowGroupSize =
+            PropertyUtil.propertyAsLong(
+                table.properties(),
+                TableProperties.ORC_BLOCK_SIZE_BYTES,
+                TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT);
+        break;
+      case AVRO:
+        rowGroupSize = 16 * 1024 * 1024;
+        break;
+      default:
+        rowGroupSize = tableSplitSize;
+    }
+
+    if (totalFiles <= 1) {
+      // For a table with a single small file, default to the smallest of
+      // the configured table split size or the format block size

Review Comment:
   I think this would be simpler using just the default split size (not the configured one that I think should override).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206086281


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -187,7 +193,10 @@ public long targetSplitSize() {
     long tableValue =
         PropertyUtil.propertyAsLong(
             table().properties(), TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
-    return PropertyUtil.propertyAsLong(context.options(), TableProperties.SPLIT_SIZE, tableValue);
+
+    long splitSize = adaptiveSplitSize(tableValue).orElse(tableValue);

Review Comment:
   No, because the table size is typically going to be configured as the default.  Small tables won't align well with that.  Even if it is set to a smaller value, it may collapse too much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7688: Add adaptive split size

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1210668930


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -187,7 +193,10 @@ public long targetSplitSize() {
     long tableValue =
         PropertyUtil.propertyAsLong(
             table().properties(), TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
-    return PropertyUtil.propertyAsLong(context.options(), TableProperties.SPLIT_SIZE, tableValue);
+
+    long splitSize = adaptiveSplitSize(tableValue).orElse(tableValue);

Review Comment:
   I agree with @rdblue, I thought by default we don't have this table property included in the metadata. The fields should be empty. If the property is explicitly set we should use the table value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1207327657


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);

Review Comment:
   I think we might be able to combine these two approaches in a reasonable way that's more generalizable.
   
   > I don't think looking at the total snapshot size or even partition stats will be that representative. In my view, knowing the amount of data we scan in a particular query and the number of slots in the cluster is critical. That's why I thought we would implement this feature at a higher level.
   
   I agree with this.  However, the most common places where this is a problem are really simple cases of unpartitioned tables with very little data.  This approach will only take effect if the table size is great than `minParallelism * splitSize` effectively.  So pretty much anything over a couple GB wouldn't be affected.
   
   > Whenever we scan huge tables, we see a huge difference between 128MB and let's say 512MB or 1GB split size.
   
   We've seen this in a lot of cases and you may even want to adjust to higher splits sizes if you're projecting fewer or smaller columns because the calculated splits is based on the whole row group size, but processing a few int columns can be much faster than string columns.
   
   > Relying on a table property for parallelism seems like shifting the complexity of tuning the split size. It varies from query to query and from cluster to cluster.
   
   I agree here as well, but I was hoping for a solution that wouldn't be spark specific.  I'm wondering if we can put most of the logic in terms of adjusting the split size here and then pass through the relevant information (scan size, parallelism, etc.) through the scan context.  That way we can leverage those properties in other engines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7688: Add adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1207278955


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);

Review Comment:
   I don't think looking at the total snapshot size or even partition stats will be that representative. In my view, knowing the amount of data we scan in a particular query and the number of slots in the cluster is critical. That's why I thought we would implement this feature at a higher level. One way to do that is #7714 where we know the amount of data to scan as we first plan files and then spit/combine them into task groups.
   
   I also think it is critical to be bi-directional. We should cover both small and large tables. A task in Spark is worth 1+ sec to create. Whenever we scan huge tables, we see a huge difference between 128MB and let's say 512MB or 1GB split size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7688: Add adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1207278955


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);

Review Comment:
   I don't think looking at the total snapshot size or even partition stats will be that representative. In my view, knowing the amount of data we scan in a particular query and the number of slots in the cluster is critical. That's why I thought we would implement this feature at a higher level. One way to do that is #7714 where we know the amount of data to scan as we first plan files and then spit/combine them into task groups.
   
   I also think it is critical to be bi-directional. We should cover both small and large tables. A task in Spark is worth 1+ sec to create. Whenever we scan huge tables, we see a huge difference between 128MB and let's say 512MB or 1GB split size. However, the challenge right now is the same table can be accessed in a variety of ways and setting the split size in each query is not realistic.
   
   Relying on a table property for parallelism seems like shifting the complexity of tuning the split size. It varies from query to query and from cluster to cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206088112


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+
+    if (totalFiles <= 0 || totalSize <= 0) {
+      return Optional.empty();
+    }
+
+    if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {
+      // If it is possible that splits could normally be calculated to meet the
+      // minimum parallelism, do not adjust the split size
+      return Optional.empty();
+    }
+
+    FileFormat fileFormat =
+        FileFormat.fromString(
+            table
+                .properties()
+                .getOrDefault(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+
+    if (!fileFormat.isSplittable()) {
+      return Optional.of(totalSize / totalFiles);

Review Comment:
   I don't think we want to do that because there's a big difference between lots of small files and just a few small files.  We still want to combine and turning that off could have bad consequences when there are lots of small files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7688: Add adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206084603


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())

Review Comment:
   I don't know that we want to use the `toSnapshotId`. I'd need to think through how adaptive split planning would work for incremental reads, and in the meantime it's probably easier to just return `Optional.empty` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7688: Add adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1205992355


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -187,7 +193,10 @@ public long targetSplitSize() {
     long tableValue =
         PropertyUtil.propertyAsLong(
             table().properties(), TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
-    return PropertyUtil.propertyAsLong(context.options(), TableProperties.SPLIT_SIZE, tableValue);
+
+    long splitSize = adaptiveSplitSize(tableValue).orElse(tableValue);

Review Comment:
   Shouldn't the table split size override the adaptive size? If so, we would not want to always pass in a table value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206097886


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+
+    if (totalFiles <= 0 || totalSize <= 0) {
+      return Optional.empty();
+    }
+
+    if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {

Review Comment:
   That might work and I think we can always enhance once more stats are available.  There's a lot of complexity that gets introduced in terms of filters, projection, partitions, etc.  But I think there's a log of opportunity to improve there as well.  For example we also have column types and record counts which could lead to even smaller split sizes knowing that little of the data will be read per task. 
   
   However, it might also be more beneficial to look at enhancing bin packing because it has more insight into when/how these splits get combined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7688: Add adaptive split size

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206085137


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+
+    if (totalFiles <= 0 || totalSize <= 0) {
+      return Optional.empty();
+    }
+
+    if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {

Review Comment:
   Do you think we would want to use partition stats instead of total size / total files in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206095696


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);
+
+    if (totalFiles <= 0 || totalSize <= 0) {
+      return Optional.empty();
+    }
+
+    if (totalFiles > minParallelism && totalSize >= tableSplitSize * minParallelism) {
+      // If it is possible that splits could normally be calculated to meet the
+      // minimum parallelism, do not adjust the split size
+      return Optional.empty();
+    }
+
+    FileFormat fileFormat =
+        FileFormat.fromString(
+            table
+                .properties()
+                .getOrDefault(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+
+    if (!fileFormat.isSplittable()) {
+      return Optional.of(totalSize / totalFiles);
+    }
+
+    long rowGroupSize;
+
+    switch (fileFormat) {
+      case PARQUET:
+        rowGroupSize =
+            PropertyUtil.propertyAsLong(
+                table.properties(),
+                TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+                TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+        break;
+      case ORC:
+        rowGroupSize =
+            PropertyUtil.propertyAsLong(
+                table.properties(),
+                TableProperties.ORC_BLOCK_SIZE_BYTES,
+                TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT);
+        break;
+      case AVRO:
+        rowGroupSize = 16 * 1024 * 1024;
+        break;
+      default:
+        rowGroupSize = tableSplitSize;
+    }
+
+    if (totalFiles <= 1) {
+      // For a table with a single small file, default to the smallest of
+      // the configured table split size or the format block size
+      return Optional.of(Math.min(rowGroupSize, tableSplitSize));
+    }
+
+    long minSplitSize = totalSize / minParallelism;
+    // target split size chosen to provide the most parallelism
+    long targetSplitSize = Math.min(minSplitSize, rowGroupSize);
+
+    return Optional.of(targetSplitSize);

Review Comment:
   I don't think that's generally going to be the case.  Combining still happens and I'm not clear there's a situation where too many splits will be produced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1206087095


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())

Review Comment:
   Yeah, I wasn't sure about incremental read, but this would at least get closer to the course grain table stats (e.g. older snapshots will typically have less data).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7688: Add adaptive split size

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1207278955


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);

Review Comment:
   I don't think looking at the total snapshot size or even partition stats will be that representative. In my view, knowing the amount of data we scan in a particular query and the number of slots in the cluster is critical. That's why I thought we would implement this feature at a higher level. One way to do that is #7714 where we know the amount of data to scan as we first plan files and then spit/combine them into task groups.
   
   I also think it is critical to be bi-directional. We should cover both small and large tables. A task in Spark is worth 1+ sec to create. Whenever we scan huge tables, we see a huge difference between 128MB and let's say 512MB or 1GB split size.
   
   Relying on a table property for parallelism seems like shifting the complexity of tuning the split size. It varies from query to query and from cluster to cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a diff in pull request #7688: Add adaptive split size

Posted by "danielcweeks (via GitHub)" <gi...@apache.org>.
danielcweeks commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1207327657


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);

Review Comment:
   I think we might be able to combine these two approaches in a reasonable way that's more generalizable.
   
   > I don't think looking at the total snapshot size or even partition stats will be that representative. In my view, knowing the amount of data we scan in a particular query and the number of slots in the cluster is critical. That's why I thought we would implement this feature at a higher level.
   
   I agree with this.  However, the most common places where this is a problem are really simple cases of unpartitioned tables with very little data.  This approach will only take effect if the table size is great than `minParallelism * splitSize` effectively.  So pretty much anything over a couple GB wouldn't be affected.
   
   > Whenever we scan huge tables, we see a huge difference between 128MB and let's say 512MB or 1GB split size.
   
   We've seen this in a lot of cases and you may even want to adjust to higher splits sizes if you're projecting fewer or smaller columns because the calculated splits are based on the whole row group size, but processing a few int columns can be much faster than string columns.
   
   > Relying on a table property for parallelism seems like shifting the complexity of tuning the split size. It varies from query to query and from cluster to cluster.
   
   I agree here as well, but I was hoping for a solution that wouldn't be spark specific.  I'm wondering if we can put most of the logic in terms of adjusting the split size here and then pass through the relevant information (scan size, parallelism, etc.) through the scan context.  That way we can leverage those properties in other engines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7688: Add adaptive split size

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1210705275


##########
core/src/test/java/org/apache/iceberg/TestSplitPlanning.java:
##########
@@ -218,6 +237,17 @@ public void testSplitPlanningWithOffsets() {
         "We should get one task per row group", 32, Iterables.size(scan.planTasks()));
   }
 
+  @Test
+  public void testAdaptiveSplitPlanningWithOffests() {
+    table.updateProperties().set(TableProperties.ADAPTIVE_SPLIT_PLANNING, "true").commit();
+

Review Comment:
   I think we should have some tests here that show that we are doing the correct thing even if not enough splits are available. For example if we have 1 file with a single offset. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org