You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/05/26 20:00:27 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7688: Add adaptive split size

aokolnychyi commented on code in PR #7688:
URL: https://github.com/apache/iceberg/pull/7688#discussion_r1207278955


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -256,4 +265,95 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche
   public ThisT metricsReporter(MetricsReporter reporter) {
     return newRefinedScan(table(), schema(), context().reportWith(reporter));
   }
+
+  private Optional<Long> adaptiveSplitSize(long tableSplitSize) {
+    if (!PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.ADAPTIVE_SPLIT_PLANNING,
+        TableProperties.ADAPTIVE_SPLIT_PLANNING_DEFAULT)) {
+      return Optional.empty();
+    }
+
+    int minParallelism =
+        PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.SPLIT_MIN_PARALLELISM,
+            TableProperties.SPLIT_MIN_PARALLELISM_DEFAULT);
+
+    Preconditions.checkArgument(minParallelism > 0, "Minimum parallelism must be a positive value");
+
+    Snapshot snapshot =
+        Stream.of(context.snapshotId(), context.toSnapshotId())
+            .filter(Objects::nonNull)
+            .map(table::snapshot)
+            .findFirst()
+            .orElseGet(table::currentSnapshot);
+
+    if (snapshot == null || snapshot.summary() == null) {
+      return Optional.empty();
+    }
+
+    Map<String, String> summary = snapshot.summary();
+    long totalFiles =
+        PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_DATA_FILES_PROP, 0);
+    long totalSize = PropertyUtil.propertyAsLong(summary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 0);

Review Comment:
   I don't think looking at the total snapshot size or even partition stats will be that representative. In my view, knowing the amount of data we scan in a particular query and the number of slots in the cluster is critical. That's why I thought we would implement this feature at a higher level. One way to do that is #7714 where we know the amount of data to scan as we first plan files and then spit/combine them into task groups.
   
   I also think it is critical to be bi-directional. We should cover both small and large tables. A task in Spark is worth 1+ sec to create. Whenever we scan huge tables, we see a huge difference between 128MB and let's say 512MB or 1GB split size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org