You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "szehon-ho (via GitHub)" <gi...@apache.org> on 2023/04/05 23:20:35 UTC

[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7190: Refactor PartitionsTable planning

szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1159072818


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,30 +94,72 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);

Review Comment:
   Nit: there's a bit more newline here than in normal Iceberg code base, I would just move this line up, as its also initializing some variables same as 'normalizedPartitionsBySpec'



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -156,43 +196,6 @@ private static PartitionData normalizePartition(
     return normalizedPartition;
   }
 
-  @VisibleForTesting
-  static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {

Review Comment:
   Thought about keeping this method as deprecated, but VisibleForTesting implies its a private method and should not be used outside unit tests.  So my thought is I'm ok with removing it.



##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
-  }
-
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+  protected void validatePartition(

Review Comment:
   Can we keep the other method that delegated to this one with 0 as default ?  Having 0 as default made the calls a bit easier to read.  Just a minor style preference



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,30 +94,72 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
+
       int[] normalizedPositions =
           normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
+              dataFile.specId(),
               specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    .select(BaseScan.SCAN_COLUMNS)); // don't select stats columns
+
+    return new ParallelIterable<>(tasks, scan.planExecutor());
+  }
+
   /**
-   * Builds an array of the field position of positions in the normalized partition type indexed by
-   * field position in the original partition type
+   * Builds an integer array to map the partition field from original to normalized where index of

Review Comment:
   I think this is a bit run-on and missing some sentence breaks.  What do you think about?
   ```
   Builds an integer array for a specific partition type to map its partitions to the final normalized type.
   
   The array represents fields in the original partition type, with the index being the field's index in the original partition type, and the value being the field's index in the normalized partiiton type.
   
   @table iceberg table
   @specId id of the specified partition type
   ...
   ```



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,30 +94,72 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
+

Review Comment:
   Same, I would keep the original structure and remove the newlines from the for block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org