You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "dramaticlly (via GitHub)" <gi...@apache.org> on 2023/03/23 23:54:19 UTC

[GitHub] [iceberg] dramaticlly opened a new pull request, #7190: Refactor PartitionsTable planning

dramaticlly opened a new pull request, #7190:
URL: https://github.com/apache/iceberg/pull/7190

   close #7189 
   
   - Maintain original functionality of parallelizable manifest read
   - hardcoded to avoid reading stats columns of data files in partition table
   
   CC @szehon-ho 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1160215997


##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
-  }
-
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+  protected void validatePartition(

Review Comment:
   thank you for the suggestion. I added a new method like what we did in the past but renamed to `validateSingletonPartition` as it seem to be suitable to verify data files with partition size = 1. I hope you dont hate it :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147049823


##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
-  }
-
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+  protected void validatePartition(
+      CloseableIterable<DataFile> dataFiles, int position, int partitionValue) {

Review Comment:
   this interface is also better to keep it as `CloseableIterable<ContentFile> `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on pull request #7190: Refactor PartitionsTable planning

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat commented on PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#issuecomment-1482156246

   Just reviewed very high level. Thanks, @dramaticlly for working on this and @szehon-ho for the guidance. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1160443178


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,27 +94,63 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {

Review Comment:
   ok. SGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1159072818


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,30 +94,72 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);

Review Comment:
   Nit: there's a bit more newline here than in normal Iceberg code base, I would just move this line up, as its also initializing some variables same as 'normalizedPartitionsBySpec'



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -156,43 +196,6 @@ private static PartitionData normalizePartition(
     return normalizedPartition;
   }
 
-  @VisibleForTesting
-  static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {

Review Comment:
   Thought about keeping this method as deprecated, but VisibleForTesting implies its a private method and should not be used outside unit tests.  So my thought is I'm ok with removing it.



##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
-  }
-
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+  protected void validatePartition(

Review Comment:
   Can we keep the other method that delegated to this one with 0 as default ?  Having 0 as default made the calls a bit easier to read.  Just a minor style preference



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,30 +94,72 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
+
       int[] normalizedPositions =
           normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
+              dataFile.specId(),
               specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    .select(BaseScan.SCAN_COLUMNS)); // don't select stats columns
+
+    return new ParallelIterable<>(tasks, scan.planExecutor());
+  }
+
   /**
-   * Builds an array of the field position of positions in the normalized partition type indexed by
-   * field position in the original partition type
+   * Builds an integer array to map the partition field from original to normalized where index of

Review Comment:
   I think this is a bit run-on and missing some sentence breaks.  What do you think about?
   ```
   Builds an integer array for a specific partition type to map its partitions to the final normalized type.
   
   The array represents fields in the original partition type, with the index being the field's index in the original partition type, and the value being the field's index in the normalized partiiton type.
   
   @table iceberg table
   @specId id of the specified partition type
   ...
   ```



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,30 +94,72 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
+

Review Comment:
   Same, I would keep the original structure and remove the newlines from the for block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1160326861


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,27 +94,63 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {

Review Comment:
   Yea I don't see an easy way in this pr , as there are going to be a few changes, let's do it in that pr right after this one then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1160365118


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,27 +94,63 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {

Review Comment:
   Thank you @ajantha-bhat. I guess as Szehon mentioned, to read delete files I guess majority of code can be reused by it would be `ManifestReader.readDeleteManifest()` instead and I assume the file count and file size will be aggregated differently so I didn't move this to ContentFile and keep the scope as refactoring. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147019979


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting

Review Comment:
   Could we consolidate these two VisibleForTesting methods?  Would just exposing dataFiles() work for those tests?  It'd be nice to not have to expose the Partition inner class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1160314442


##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,25 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
+  /** Used for asserting on data files where partition size = 1 */

Review Comment:
   Thanks !
   
   I think name /comment is a bit misleading.  The partition size  = 1 seems misleading (to me, it means how many partitions..), I think we want to mean that partition fields size is 1.  If we change the name, how about validateSingleFieldPartition(), and then maybe fix/remove the comment (remove if its a redundant).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147018350


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    // parallelize the manifest read and

Review Comment:
   Comment is incomplete, let's just remove it as I don't see that much value.



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting

Review Comment:
   Could we consolidate these two VisibleForTesting methods?  Would just exposing dataFiles() work for those tests?  I don't really exposing Partition inner class.



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained via scan
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    // hardcoded to avoid scan stats column on partition table
+                    .select(BaseScan.SCAN_COLUMNS));
+
+    return (scan.planExecutor() != null)

Review Comment:
   Nit: do we need extra () here?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution

Review Comment:
   Nit: can we add one line before this?
   
   And also, if we keep comment, we can just do something shorter like : ```handle partition evolution```



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained via scan

Review Comment:
   Comment seems wrong , it's only data files.  I would suggest to remove it, as its pretty obvious what its's doing?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained via scan
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    // hardcoded to avoid scan stats column on partition table

Review Comment:
   Nit: I'd also vote to remove this comment, not sure if this line in particular needs explanation over other ones.  
   
   If needed maybe we can put it after the select like
   
   ```.select(SCAN_COLUMNS); // don't select stats columns```



##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
-  }
-
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+  protected void validatePartition(
+      Iterable<PartitionsTable.Partition> parts, int position, int partitionValue) {

Review Comment:
   Id actually prefer this to be CloseableIterable<DataFile>, as Partition seems a bit internal class and shouldn't be used.  I think it would be the same, unless I'm mistaken?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -238,5 +241,9 @@ void update(DataFile file) {
       this.dataFileCount += 1;
       this.specId = file.specId();
     }
+
+    StructLike key() {

Review Comment:
   Can remove, if we decide to make tests use 'planDataFiles'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147038788


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained via scan
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    // hardcoded to avoid scan stats column on partition table
+                    .select(BaseScan.SCAN_COLUMNS));
+
+    return (scan.planExecutor() != null)
+        ? new ParallelIterable<>(tasks, scan.planExecutor())
+        : CloseableIterable.concat(tasks);

Review Comment:
   Looks like upstream always set this in https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/TableScanContext.java#L333 so it's not null. Updated to use ParallelIterable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#issuecomment-1500554317

   Build failed, but I think it's due to the intermittent checkstyle issue referred above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho merged pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho merged PR #7190:
URL: https://github.com/apache/iceberg/pull/7190


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147023739


##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,19 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
-  }
-
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+  protected void validatePartition(
+      Iterable<PartitionsTable.Partition> parts, int position, int partitionValue) {

Review Comment:
   Id actually prefer this to be ```CloseableIterable<DataFile>```, as Partition seems a bit internal class and shouldn't be used.  I think it would be the same, unless I'm mistaken?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147048441


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -96,27 +94,63 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
   }
 
   private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
+
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {

Review Comment:
   I know we are just refactoring the current logic. So, `DataFile` is fine. 
   
   But to avoid back-and-forth changes (when we support delete file stats), Maybe we need to just call it `PlanFiles` and return `CloseableIterable<ContentFile> `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1160316371


##########
core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java:
##########
@@ -79,24 +80,25 @@ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals
     }
   }
 
-  protected void validateIncludesPartitionScan(
-      CloseableIterable<FileScanTask> tasks, int partValue) {
-    validateIncludesPartitionScan(tasks, 0, partValue);
+  /** Used for asserting on data files where partition size = 1 */

Review Comment:
   `validateSingleFieldPartition` it is and removed redundant comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly closed pull request #7190: Refactor PartitionsTable planning

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly closed pull request #7190: Refactor PartitionsTable planning
URL: https://github.com/apache/iceberg/pull/7190


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7190: Refactor PartitionsTable planning

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#discussion_r1147019781


##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }

Review Comment:
   Wondering is there an clean way to do this with java streams?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting
+  static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
     Types.StructType normalizedPartitionType = Partitioning.partitionType(table);
     PartitionMap partitions = new PartitionMap();
 
     // cache a position map needed by each partition spec to normalize partitions to final schema
     Map<Integer, int[]> normalizedPositionsBySpec =
         Maps.newHashMapWithExpectedSize(table.specs().size());
+    // logic to handle the partition evolution
+    int[] normalizedPositions =
+        normalizedPositionsBySpec.computeIfAbsent(
+            table.spec().specId(),
+            specId -> normalizedPositions(table, specId, normalizedPartitionType));
+
+    // parallelize the manifest read and
+    CloseableIterable<DataFile> datafiles = planDataFiles(scan);
 
-    for (FileScanTask task : tasks) {
-      PartitionData original = (PartitionData) task.file().partition();
-      int[] normalizedPositions =
-          normalizedPositionsBySpec.computeIfAbsent(
-              task.spec().specId(),
-              specId -> normalizedPositions(table, specId, normalizedPartitionType));
+    for (DataFile dataFile : datafiles) {
+      PartitionData original = (PartitionData) dataFile.partition();
       PartitionData normalized =
           normalizePartition(original, normalizedPartitionType, normalizedPositions);
-      partitions.get(normalized).update(task.file());
+      partitions.get(normalized).update(dataFile);
     }
+
     return partitions.all();
   }
 
+  @VisibleForTesting
+  static CloseableIterable<DataFile> planDataFiles(StaticTableScan scan) {
+    Table table = scan.table();
+    Snapshot snapshot = scan.snapshot();
+
+    // read list of data and delete manifests from current snapshot obtained via scan
+    CloseableIterable<ManifestFile> dataManifests =
+        CloseableIterable.withNoopClose(snapshot.dataManifests(table.io()));
+
+    LoadingCache<Integer, ManifestEvaluator> evalCache =
+        Caffeine.newBuilder()
+            .build(
+                specId -> {
+                  PartitionSpec spec = table.specs().get(specId);
+                  PartitionSpec transformedSpec = transformSpec(scan.tableSchema(), spec);
+                  return ManifestEvaluator.forRowFilter(
+                      scan.filter(), transformedSpec, scan.isCaseSensitive());
+                });
+
+    CloseableIterable<ManifestFile> filteredManifests =
+        CloseableIterable.filter(
+            dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<DataFile>> tasks =
+        CloseableIterable.transform(
+            filteredManifests,
+            manifest ->
+                ManifestFiles.read(manifest, table.io(), table.specs())
+                    .caseSensitive(scan.isCaseSensitive())
+                    // hardcoded to avoid scan stats column on partition table
+                    .select(BaseScan.SCAN_COLUMNS));
+
+    return (scan.planExecutor() != null)
+        ? new ParallelIterable<>(tasks, scan.planExecutor())
+        : CloseableIterable.concat(tasks);

Review Comment:
   Can we avoid the branching behavior here? It looks like `scan.planExecutor()` will never be null so we can always return `ParallelIterable<>`?



##########
core/src/main/java/org/apache/iceberg/PartitionsTable.java:
##########
@@ -95,28 +93,70 @@ private static StaticDataTask.Row convertPartition(Partition partition) {
         partition.key, partition.specId, partition.dataRecordCount, partition.dataFileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, StaticTableScan scan) {
-    CloseableIterable<FileScanTask> tasks = planFiles(scan);
+  @VisibleForTesting

Review Comment:
   Do we need the annotation here? Ideally we can test the behavior through the existing tests without having to expose this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#issuecomment-1482119401

   Approved for test run.  Also cc @RussellSpitzer @ajantha-bhat 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on pull request #7190: Refactor PartitionsTable planning

Posted by "dramaticlly (via GitHub)" <gi...@apache.org>.
dramaticlly commented on PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#issuecomment-1499760308

   Looks like Java CI build-checks failed for some a file which I did not touch 
   
   https://github.com/apache/iceberg/actions/runs/4633674115/jobs/8199130326?pr=7190
   ```
   * What went wrong:
   Execution failed for task ':iceberg-spark:iceberg-spark-3.3_2.12:checkstyleMain'.
   > A failure occurred while executing org.gradle.api.plugins.quality.internal.CheckstyleAction
      > An unexpected error occurred configuring and executing Checkstyle.
         > java.lang.Error: Error was thrown while processing /home/runner/work/iceberg/iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7190: Refactor PartitionsTable planning

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7190:
URL: https://github.com/apache/iceberg/pull/7190#issuecomment-1500532619

   Merged, thanks again for the work @dramaticlly !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org