You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/22 22:57:10 UTC

[GitHub] [iceberg] szehon-ho opened a new pull request #2358: Add predicate push down for Partitions table

szehon-ho opened a new pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358


   -- Use ManifestPredicate to filter out ManifestFiles before reading them.
   -- Add a new projection to project Metadata table partition structure to partition columns
   -- This can be re-used for any similar Metadata table, ie Files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r638367070



##########
File path: core/src/test/java/org/apache/iceberg/TableTestBase.java
##########
@@ -98,6 +98,34 @@
       .withPartitionPath("data_bucket=3") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
+  static final DataFile FILE_PARTITION_0 = DataFiles.builder(SPEC)
+      .withPath("/path/to/data-0.parquet")
+      .withFileSizeInBytes(10)
+      .withPartitionPath("data_bucket=0")
+      .withPartition(TestHelpers.Row.of(0))

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-865400734


   @szehon-ho, I opened PR https://github.com/szehon-ho/iceberg/pull/1 with a few minor updates. That implements a couple fixes to the approach you use here and the tests are passing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639318608



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),
+        identitySpec,
+        caseSensitive);

Review comment:
       I see, yes I removed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-806072927


   Hi @aokolnychyi @rdblue no rush at all, but ping for attention when you have a free cycle for feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639319312



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),

Review comment:
       OK, I got it to work this way, (providing the ManifestGroup with the mapped partitionSpecsByIds, to match the partitionSpec I pass in which is extracted from the filter).
   
   I think to make this hybrid PartitionSpec would need to make public PartitionSpec constructor/copy methods that are currently package-protected (this way just uses the already public PartitionField Identity builder)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-865355043


   Hi @rdblue , if you have time to see if the last change is clearer (appending "partition." to the table's partition spec to match the filter).  Also @RussellSpitzer  if you had time to also take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639149470



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();

Review comment:
       Nit: I'd probably move this into the if clause since it is only used once.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-810485584


   Thanks for working on this, @szehon-ho! Looks like a good start. I think we can iterate on how this rewrites prediates from the table schema so that they can be applied to data files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-866353447


   Thanks again @rdblue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639319312



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),

Review comment:
       OK, I got it to work this way, (providing the ManifestGroup with the mapped partitionSpecsByIds, to match the partitionSpec I pass in which is extracted from the filter).
   
   I think to make this hybrid PartitionSpec would need to make public PartitionSpec constructor/copy methods that are currently package-protected (this way just uses the identity)

##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -159,6 +162,105 @@ public void testAllManifestsTableHonorsIgnoreResiduals() throws IOException {
     validateTaskScanResiduals(scan2, true);
   }
 
+  @Test
+  public void testPartitionsTableScan() throws IOException {
+
+    // Make 4 Manifests
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(1, "partition", Types.StructType.of(
+            optional(1000, "data_bucket", Types.IntegerType.get())))).asStruct();
+
+    // Sanity check a variety of partition predicates.
+    TableScan scanNoFilter = partitionsTable.newScan()
+        .select("partition.data_bucket");
+    Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+    CloseableIterable<FileScanTask> tasksNoFilter = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanNoFilter);
+    Assert.assertEquals(4, Iterators.size(tasksNoFilter.iterator()));
+    validateIncludes(tasksNoFilter, 0);
+    validateIncludes(tasksNoFilter, 1);
+    validateIncludes(tasksNoFilter, 2);
+    validateIncludes(tasksNoFilter, 3);
+
+    Expression andEquals = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanAndEq = partitionsTable.newScan()
+        .filter(andEquals);
+    CloseableIterable<FileScanTask> tasksAndEq = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanAndEq);
+    Assert.assertEquals(1, Iterators.size(tasksAndEq.iterator()));
+    validateIncludes(tasksAndEq, 0);
+
+    Expression ltAnd = Expressions.and(
+        Expressions.lessThan("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanLtAnd = partitionsTable.newScan()
+        .filter(ltAnd);
+    CloseableIterable<FileScanTask> tasksLtAnd = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanLtAnd);
+    Assert.assertEquals(2, Iterators.size(tasksLtAnd.iterator()));
+    validateIncludes(tasksLtAnd, 0);
+    validateIncludes(tasksLtAnd, 1);
+
+    Expression or = Expressions.or(
+        Expressions.equal("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanOr = partitionsTable.newScan()
+        .filter(or);
+    CloseableIterable<FileScanTask> tasksOr = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanOr);
+    Assert.assertEquals(4, Iterators.size(tasksOr.iterator()));
+    validateIncludes(tasksOr, 0);
+    validateIncludes(tasksOr, 1);
+    validateIncludes(tasksOr, 2);
+    validateIncludes(tasksOr, 3);
+
+    Expression not = Expressions.not(
+        Expressions.lessThan("partition.data_bucket", 2));
+    TableScan scanNot = partitionsTable.newScan()
+        .filter(not);
+    CloseableIterable<FileScanTask> tasksNot = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanNot);
+    Assert.assertEquals(2, Iterators.size(tasksNot.iterator()));
+    validateIncludes(tasksNot, 2);
+    validateIncludes(tasksNot, 3);
+
+    Expression set = Expressions.in("partition.data_bucket", 2, 3);
+    TableScan scanSet = partitionsTable.newScan()
+        .filter(set);
+    CloseableIterable<FileScanTask> tasksSet = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanSet);
+    Assert.assertEquals(2, Iterators.size(tasksSet.iterator()));
+    validateIncludes(tasksSet, 2);
+    validateIncludes(tasksSet, 3);
+
+    Expression unary = Expressions.notNull("partition.data_bucket");
+    TableScan scanUnary = partitionsTable.newScan()
+        .filter(unary);
+    CloseableIterable<FileScanTask> tasksUnary = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanUnary);
+    Assert.assertEquals(4, Iterators.size(tasksUnary.iterator()));
+    validateIncludes(tasksUnary, 0);
+    validateIncludes(tasksUnary, 1);
+    validateIncludes(tasksUnary, 2);
+    validateIncludes(tasksUnary, 3);

Review comment:
       Yes, done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r604340026



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -98,21 +105,46 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
+
     PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
+    }
+    return partitions.all();
+  }
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.metadata(table.spec(), caseSensitive).project(scan.filter()),

Review comment:
       Is the purpose of the "metadata" projection just to rewrite predicates? For example, from `partition.x > 3` to `x > 3`?
   
   I'm not sure that I would consider that a projection, but I need to think about it a bit more.
   
   We can accomplish the same thing by creating an identity partition spec from the partition table's field names to the actual partition names. Or we could possibly just rewrite the predicates and remove `partition.`. Those options seem a bit simpler conceptually than using projection.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-865400734


   @szehon-ho, I opened PR https://github.com/szehon-ho/iceberg/pull/1 with a few minor updates. That implements a couple fixes to the approach you use here and the tests are passing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r635674026



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -98,21 +105,46 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
+
     PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
+    }
+    return partitions.all();
+  }
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.metadata(table.spec(), caseSensitive).project(scan.filter()),

Review comment:
       @szehon-ho, what I'm suggesting is that we can use the existing projections to do this rewrite work instead of creating a new projection. Using an existing projection with a different partition spec would be easier because you'd get the benefit you're talking about without adding a new form of projection that doesn't quite meet the definition of a projection.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho edited a comment on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho edited a comment on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-804450547


   This is an Implementation for Predicate Push down for Partition Table.  It is based on a conversation first with @RussellSpitzer 
    and then with some community in https://github.com/apache/iceberg/issues/2326.
   
   It is not the ultimate solution to the problem , which may need the Spark view catalog as discussed in the issue. But it should be complementary as the same logic can be used in Files Metadata table. 
   
   Also, as we don't know when view-catalog comes, this at least makes it so that we won't have partition query with a filter taking longer than an actual query with that filter :)
   
   If this change is the right way, next step would be implementing the same partition-filter on files-table to reduce the metadata scanned. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2358: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-804450547


   This is an Implementation for Predicate Push down for Partition Table.  It is based on a conversation first with @russell-spitzer and then with some community in https://github.com/apache/iceberg/issues/2326.
   
   It is not the ultimate solution to the problem , which may need the Spark view catalog as discussed in the issue. But it should be complementary as the same logic can be used in Files Metadata table. 
   
   Also, as we don't know when view-catalog comes, this at least makes it so that we won't have partition query with a filter taking longer than an actual query with that filter :)
   
   If this change is the right way, next step would be implementing the same partition-filter on files-table to reduce the metadata scanned. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-812745514


   @rdblue sure no problem, thanks for spending the time to review (know you guys are busy), left one question as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r604336837



##########
File path: core/src/test/java/org/apache/iceberg/TableTestBase.java
##########
@@ -98,6 +98,34 @@
       .withPartitionPath("data_bucket=3") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
+  static final DataFile FILE_PARTITION_0 = DataFiles.builder(SPEC)
+      .withPath("/path/to/data-0.parquet")
+      .withFileSizeInBytes(10)
+      .withPartitionPath("data_bucket=0")
+      .withPartition(TestHelpers.Row.of(0))

Review comment:
       You don't need both `withPartitionPath` and `withPartition`. The first one just parses the partition path to create the partition tuple. It's better to use `withPartition` instead (the other files in this class are quite a bit older).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho edited a comment on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho edited a comment on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-804450547


   This is an Implementation for Predicate Push down for Partition Table.  It is based on a conversation first with @RussellSpitzer 
    and then in https://github.com/apache/iceberg/issues/2326.
   
   It is not the ultimate solution to the problem , which may need the Spark view catalog as discussed in the issue. But it should be complementary as the same logic can be used in Files Metadata table. 
   
   Also, as we don't know when view-catalog comes, this at least makes it so that we won't have partition query with a filter taking longer than an actual query with that filter :)
   
   If this change is the right way, next step would be implementing the same partition-filter on files-table to reduce the metadata scanned. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639160362



##########
File path: core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
##########
@@ -159,6 +162,105 @@ public void testAllManifestsTableHonorsIgnoreResiduals() throws IOException {
     validateTaskScanResiduals(scan2, true);
   }
 
+  @Test
+  public void testPartitionsTableScan() throws IOException {
+
+    // Make 4 Manifests
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(1, "partition", Types.StructType.of(
+            optional(1000, "data_bucket", Types.IntegerType.get())))).asStruct();
+
+    // Sanity check a variety of partition predicates.
+    TableScan scanNoFilter = partitionsTable.newScan()
+        .select("partition.data_bucket");
+    Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+    CloseableIterable<FileScanTask> tasksNoFilter = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanNoFilter);
+    Assert.assertEquals(4, Iterators.size(tasksNoFilter.iterator()));
+    validateIncludes(tasksNoFilter, 0);
+    validateIncludes(tasksNoFilter, 1);
+    validateIncludes(tasksNoFilter, 2);
+    validateIncludes(tasksNoFilter, 3);
+
+    Expression andEquals = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanAndEq = partitionsTable.newScan()
+        .filter(andEquals);
+    CloseableIterable<FileScanTask> tasksAndEq = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanAndEq);
+    Assert.assertEquals(1, Iterators.size(tasksAndEq.iterator()));
+    validateIncludes(tasksAndEq, 0);
+
+    Expression ltAnd = Expressions.and(
+        Expressions.lessThan("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanLtAnd = partitionsTable.newScan()
+        .filter(ltAnd);
+    CloseableIterable<FileScanTask> tasksLtAnd = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanLtAnd);
+    Assert.assertEquals(2, Iterators.size(tasksLtAnd.iterator()));
+    validateIncludes(tasksLtAnd, 0);
+    validateIncludes(tasksLtAnd, 1);
+
+    Expression or = Expressions.or(
+        Expressions.equal("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanOr = partitionsTable.newScan()
+        .filter(or);
+    CloseableIterable<FileScanTask> tasksOr = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanOr);
+    Assert.assertEquals(4, Iterators.size(tasksOr.iterator()));
+    validateIncludes(tasksOr, 0);
+    validateIncludes(tasksOr, 1);
+    validateIncludes(tasksOr, 2);
+    validateIncludes(tasksOr, 3);
+
+    Expression not = Expressions.not(
+        Expressions.lessThan("partition.data_bucket", 2));
+    TableScan scanNot = partitionsTable.newScan()
+        .filter(not);
+    CloseableIterable<FileScanTask> tasksNot = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanNot);
+    Assert.assertEquals(2, Iterators.size(tasksNot.iterator()));
+    validateIncludes(tasksNot, 2);
+    validateIncludes(tasksNot, 3);
+
+    Expression set = Expressions.in("partition.data_bucket", 2, 3);
+    TableScan scanSet = partitionsTable.newScan()
+        .filter(set);
+    CloseableIterable<FileScanTask> tasksSet = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanSet);
+    Assert.assertEquals(2, Iterators.size(tasksSet.iterator()));
+    validateIncludes(tasksSet, 2);
+    validateIncludes(tasksSet, 3);
+
+    Expression unary = Expressions.notNull("partition.data_bucket");
+    TableScan scanUnary = partitionsTable.newScan()
+        .filter(unary);
+    CloseableIterable<FileScanTask> tasksUnary = ((PartitionsTable) partitionsTable)
+        .planTasks((StaticTableScan) scanUnary);
+    Assert.assertEquals(4, Iterators.size(tasksUnary.iterator()));
+    validateIncludes(tasksUnary, 0);
+    validateIncludes(tasksUnary, 1);
+    validateIncludes(tasksUnary, 2);
+    validateIncludes(tasksUnary, 3);

Review comment:
       I haven't looked very closely, but this appears to be a really long test. My guess is that you're mixing test cases that should be in separate methods. Can you clean it up so that you're testing cases individually?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639147530



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {

Review comment:
       This should be called `planFiles` because it isn't planning tasks using the manifest group; it calls `planFiles` in the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639158941



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),

Review comment:
       It would be more clear if you produced an expression from this that was the extracted partition filter, then passed that into the `ManifestGroup`.
   
   I'm guessing that you're doing it this way because you need to rewrite the column names and this accomplishes that by binding to IDs rather than rewriting the names. I think to produce the right expression, you'd need to construct your partition spec so that the source field names are `PARTITION_FIELD_PREFIX + pf.name()` and the partition name is `pf.name()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639319312



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),

Review comment:
       OK, I got it to work this way, (providing the ManifestGroup with the mapped partitionSpecsByIds, to match the partitionSpec I pass in which is extracted from the filter).
   
   I think to make this hybrid PartitionSpec would need to make public PartitionSpec constructor/copy methods that are currently package-protected (this way just uses the already public PartitionSpec Identity builder)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639318512



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),
+        identitySpec,
+        caseSensitive);
+    ManifestGroup manifestGroup = new ManifestGroup(
+        table().io(), table().snapshot(snapshotId).dataManifests(), table().snapshot(snapshotId).deleteManifests())
+        .caseSensitive(caseSensitive)
+        .filterManifests(manifestFile -> eval.eval(manifestFile))
+        .select(DataTableScan.SCAN_COLUMNS)
+        .specsById(table().specs())
+        .ignoreDeleted();
+
+    if (ignoreResiduals) {
+      manifestGroup = manifestGroup.ignoreResiduals();
     }
 
-    return partitions.all();
+    if (PLAN_SCANS_WITH_WORKER_POOL && scan.snapshot().dataManifests().size() > 1) {
+      manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
+    }
+
+    return manifestGroup.planFiles();
   }
 
+

Review comment:
       Done

##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {

Review comment:
       Done

##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();

Review comment:
       Done

##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),
+        identitySpec,
+        caseSensitive);
+    ManifestGroup manifestGroup = new ManifestGroup(
+        table().io(), table().snapshot(snapshotId).dataManifests(), table().snapshot(snapshotId).deleteManifests())
+        .caseSensitive(caseSensitive)
+        .filterManifests(manifestFile -> eval.eval(manifestFile))
+        .select(DataTableScan.SCAN_COLUMNS)

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-865355043






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639155128



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),
+        identitySpec,
+        caseSensitive);
+    ManifestGroup manifestGroup = new ManifestGroup(
+        table().io(), table().snapshot(snapshotId).dataManifests(), table().snapshot(snapshotId).deleteManifests())
+        .caseSensitive(caseSensitive)
+        .filterManifests(manifestFile -> eval.eval(manifestFile))
+        .select(DataTableScan.SCAN_COLUMNS)

Review comment:
       I think before this was selecting *. Probably no need to change that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639147092



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),
+        identitySpec,
+        caseSensitive);
+    ManifestGroup manifestGroup = new ManifestGroup(
+        table().io(), table().snapshot(snapshotId).dataManifests(), table().snapshot(snapshotId).deleteManifests())
+        .caseSensitive(caseSensitive)
+        .filterManifests(manifestFile -> eval.eval(manifestFile))
+        .select(DataTableScan.SCAN_COLUMNS)
+        .specsById(table().specs())
+        .ignoreDeleted();
+
+    if (ignoreResiduals) {
+      manifestGroup = manifestGroup.ignoreResiduals();
     }
 
-    return partitions.all();
+    if (PLAN_SCANS_WITH_WORKER_POOL && scan.snapshot().dataManifests().size() > 1) {
+      manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
+    }
+
+    return manifestGroup.planFiles();
   }
 
+

Review comment:
       Nit: unnecessary newline addition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r606473839



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -98,21 +105,46 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
+
     PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
+    }
+    return partitions.all();
+  }
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.metadata(table.spec(), caseSensitive).project(scan.filter()),

Review comment:
       Hi yes, it's basically this but it's a little more.  I should have explained a bit more my thinking in the PR.
   
   Beyond transforming predicate "partition.x" => "x", there's also logic to handle predicates that don't have to do with the partitions, for example "record_count" > 0.  It seems the other Projections (InclusiveProjection/StrictProjection) were handling this logic (returning false for non-partition predicates, and all the and/or/not, etc),so I thought to re-use it.  It seems we have to 'project' those out, as  ManifestEvaluator Constructor tries bind predicates to the table's partitionSpec, and non-partition predicates will fail to be bound.
   
   I can probably use a cloned partition spec as you said, but the problem would still remain, and I would have to write my own ExpressionVisitor that does the same logic?  Or let me know if I misunderstand.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#discussion_r639155438



##########
File path: core/src/main/java/org/apache/iceberg/PartitionsTable.java
##########
@@ -80,21 +88,51 @@ private DataTask task(TableScan scan) {
     return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount);
   }
 
-  private static Iterable<Partition> partitions(Table table, Long snapshotId) {
-    PartitionMap partitions = new PartitionMap(table.spec().partitionType());
-    TableScan scan = table.newScan();
+  private Iterable<Partition> partitions(StaticTableScan scan) {
+    CloseableIterable<FileScanTask> tasks = planTasks(scan);
 
-    if (snapshotId != null) {
-      scan = scan.useSnapshot(snapshotId);
+    PartitionMap partitions = new PartitionMap(table().spec().partitionType());
+    for (FileScanTask task : tasks) {
+      partitions.get(task.file().partition()).update(task.file());
     }
+    return partitions.all();
+  }
 
-    for (FileScanTask task : scan.planFiles()) {
-      partitions.get(task.file().partition()).update(task.file());
+  @VisibleForTesting
+  CloseableIterable<FileScanTask> planTasks(StaticTableScan scan) {
+    boolean caseSensitive = scan.isCaseSensitive();
+    boolean ignoreResiduals = scan.shouldIgnoreResiduals();
+    long snapshotId = scan.snapshot().snapshotId();
+
+    PartitionSpec tableSpec = table().spec();
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(schema());
+    tableSpec.fields().stream().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name()));
+    PartitionSpec identitySpec = identitySpecBuilder.build();
+
+    ManifestEvaluator eval = ManifestEvaluator.forPartitionFilter(
+        Projections.inclusive(identitySpec, caseSensitive).project(scan.filter()),
+        identitySpec,
+        caseSensitive);

Review comment:
       I don't quite understand why this uses its own `ManifestEvaluator` rather than passing the partition expression into `ManifestGroup.filterPartitions`. If you used that method then you would get both data file filtering based on the partition filter and the manifest filtering.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-865507113


   @rdblue thanks a lot for spending time on this and the guidance, it is definitely a bit hard to wrap my head around the interaction between the pruning, Projection, and Partition transform.  
   
   I went over your suggested changes and it looks good.  So to recap for my benefit, the key is having the PartitionSpec used in the Partition Filter actually be a transform that removes 'partition.' , allowing us to use it in Pruning with the underlying table's prefix-less PartitionSpec (instead of adding the 'partition' prefix to that as I was trying to do).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-866282812


   > So to recap for my benefit, the key is having the PartitionSpec used in the Partition Filter actually be a transform that removes 'partition.' , allowing us to use it in Pruning with the underlying table's prefix-less PartitionSpec (instead of adding the 'partition' prefix to that as I was trying to do).
   
   Yeah, instead of your original idea to create a projection that rewrites how we want, we're creating a partition spec so that the existing projections will make the changes: rewriting the field names and removing non-partition fields. Then we can pass the predicate directly to `ManifestGroup`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2358: Core: Add predicate push down for Partitions table

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2358:
URL: https://github.com/apache/iceberg/pull/2358#issuecomment-866303572


   Thanks, @szehon-ho! I merged this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org