You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/28 07:14:07 UTC

[GitHub] [iceberg] szehon-ho opened a new pull request, #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

szehon-ho opened a new pull request, #4847:
URL: https://github.com/apache/iceberg/pull/4847

   This is a pre-requisite for expire-snapshot performance improvement in : https://github.com/apache/iceberg/pull/4736
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r906393640


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -43,6 +52,7 @@
  * This table may return duplicate rows.
  */
 public class AllManifestsTable extends BaseMetadataTable {
+  private static final int REF_SNAPSHOT_ID = 18;

Review Comment:
   Yea that's it actually, its used in two places.  Cleaner just to put 18 in both?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r912213086


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +222,188 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH;
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (noneMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (anyMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      /**
+       * Comparison of snapshot reference and literal, using long comparator.
+       *
+       * @param ref           bound reference, comparison attempted only if reference is for reference_snapshot_id
+       * @param lit           literal value to compare with snapshot id.
+       * @param desiredResult function to apply to long comparator result, returns true if result is as expected.
+       * @return false if comparator does not achieve desired result, true otherwise
+       */
+      private <T> Boolean compareSnapshotRef(BoundReference<T> ref, Literal<T> lit,
+                                             Function<Integer, Boolean> desiredResult) {
+        if (isSnapshotRef(ref)) {
+          Literal<Long> longLit = lit.to(Types.LongType.get());

Review Comment:
   @szehon-ho, once the expression is bound, you are guaranteed that the `lit` corresponding to the ref will be a `LongType` because binding coerces the literal types to match the corresponding term. This shouldn't be needed, unless you're doing it to get a `Long` comparator because you have a `long snapshotId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r912264795


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);

Review Comment:
   OK works for me , I will make a follow up pr then!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883944798


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   Yeah, I think to perform the filtering we just need to go through all the snapshots and evaluate the expression on each one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883945923


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   Essentially a custom expression visitor applied on `Snapshot` object?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Spark 3.2: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r887379793


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   Yea this makes sense to me, trying an implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r906831887


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);

Review Comment:
   We talked about these a bit offline, I think the gt/lt's here probably won't every be useful but I don't see a problem in including them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907920744


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -112,21 +123,26 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
       Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
-      return CloseableIterable.withNoopClose(Iterables.transform(table().snapshots(), snap -> {
+      SnapshotEvaluator snapshotEvaluator = new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(),

Review Comment:
   I might keep it as argument, the class is more re-usable if it takes the schema as an argument, was thinking we could add this for other metadata tables at some point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907852616


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -112,21 +123,26 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
       Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
-      return CloseableIterable.withNoopClose(Iterables.transform(table().snapshots(), snap -> {
+      SnapshotEvaluator snapshotEvaluator = new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(),

Review Comment:
   nit: if you want to keep it on one line, you could just use `MANIFEST_FILE_SCHEMA` in the constructor of the evaluator class (you have access to it there)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883783558


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   > A data scan, for example, can't scan more than one snapshot.
   
   It is true but `ALL_*` tables, on the other hand, don't support `useSnapshot`, `asOfTime` and incremental features.
   
   > What about exposing a reference_snapshot_id column
   
   I'd be open for that too. It will be the snapshot ID of the manifest list we read these manifests from, right? Adding a new column is fairly easy. How can we do the filtering? We need to filter `Snapshot`s having a generic filter `Expression` that can reference any columns. Is there an easy way to perform the filtering?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r905410109


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean in = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!in) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean notIn = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!notIn) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    /**
+     * Comparison of snapshot reference and literal
+     * @param ref bound reference, comparison attempted only if reference is for reference_snapshot_id
+     * @param lit literal value to compare with snapshot id.
+     * @param compareResult function to apply to comparison result, which returns true if rows might match
+     * @return boolean true if rows might match, false if cannot match
+     */
+    private <T> Boolean compare(BoundReference<T> ref,
+                                Literal<T> lit,
+                                Function<Integer, Boolean> compareResult) {

Review Comment:
   Renamed, hope its a little better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907918873


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);

Review Comment:
   Yea I think its the same in this case, but thought its safer to use the Comparator lookup for Long type to get the official Iceberg way to compare (if it ever differs from Java's comparator)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907921417


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -43,6 +52,7 @@
  * This table may return duplicate rows.
  */
 public class AllManifestsTable extends BaseMetadataTable {
+  private static final int REF_SNAPSHOT_ID = 18;

Review Comment:
   Side comment / non-blocking: I'm a _big_ fan of using named constants for schema element IDs.
   
   Should we update to use named position constants throughout, particularly for schemas like the metadata tables that are evolving (note that `content` is listed first but has ID 14). But is the cherry-picking that would be required by a large diff like this considered an unnecessary change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907938573


##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -991,18 +1005,34 @@ public void testAllManifestsTable() {
     Table manifestTable = loadTable(tableIdentifier, "all_manifests");
     Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
 
-    List<ManifestFile> manifests = Lists.newArrayList();
-
     df1.select("id", "data").write()
         .format("iceberg")
         .mode("append")
         .save(loadLocation(tableIdentifier));
 
-    manifests.addAll(table.currentSnapshot().allManifests());
+    table.updateProperties()
+        .set(TableProperties.FORMAT_VERSION, "2")
+        .commit();
+
+    DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedFiles(table.io()), null);
+    PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
+    StructLike dataFilePartition = dataFile.partition();
+
+    PositionDelete<InternalRow> delete = PositionDelete.create();
+    delete.set(dataFile.path(), 0L, null);
+
+    DeleteFile deleteFile = writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete));
+
+    table.newRowDelta()
+        .addDeletes(deleteFile)
+        .commit();
 
     table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
 
-    manifests.addAll(table.currentSnapshot().allManifests());
+    Stream<Pair<Long, ManifestFile>> snapshotIdToManifests =
+        StreamSupport.stream(table.snapshots().spliterator(), false)

Review Comment:
   Nit: Can you add an inline comment for what the `false` stands for? If its something like parallel or something then don’t worry about it, but usually it helps me reading the code to have the parameter name in-lined like `, /* caseSensitive */ false);`.
   
   But again, thinking on it (and the usage of spliterator), if it’s parallel or something then feel free to ignore it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r906392672


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -112,21 +123,29 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
       Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
-      return CloseableIterable.withNoopClose(Iterables.transform(table().snapshots(), snap -> {
+      SnapshotEvaluator snapshotEvaluator = new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(),
+          isCaseSensitive());
+      Iterable<Snapshot> filteredSnapshots = Iterables.filter(table().snapshots(),
+          snapshot -> snapshotEvaluator.eval(snapshot));
+
+      return CloseableIterable.withNoopClose(Iterables.transform(filteredSnapshots, snap -> {
         if (snap.manifestListLocation() != null) {
           DataFile manifestListAsDataFile = DataFiles.builder(PartitionSpec.unpartitioned())
               .withInputFile(io.newInputFile(snap.manifestListLocation()))
               .withRecordCount(1)
               .withFormat(FileFormat.AVRO)
               .build();
-          return new ManifestListReadTask(io, schema(), specs, new BaseFileScanTask(
-              manifestListAsDataFile, null,
-              schemaString, specString, residuals));
+          return new ManifestListReadTask(

Review Comment:
   We generally don't do 1 arg a line, minor formatting nit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r908116610


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -43,6 +52,7 @@
  * This table may return duplicate rows.
  */
 public class AllManifestsTable extends BaseMetadataTable {
+  private static final int REF_SNAPSHOT_ID = 18;

Review Comment:
   Yea I am not sure if there is consensus, Russell made a discussion in https://github.com/apache/iceberg/pull/4847#discussion_r906391975 but didn't finished (and for the other constants, they are never referenced outside).  But If so, we should do it in another pr as its unrelated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r908119579


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +222,188 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH;
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (noneMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (anyMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      /**
+       * Comparison of snapshot reference and literal, using long comparator.
+       *
+       * @param ref           bound reference, comparison attempted only if reference is for reference_snapshot_id
+       * @param lit           literal value to compare with snapshot id.
+       * @param desiredResult function to apply to long comparator result, returns true if result is as expected.
+       * @return false if comparator does not achieve desired result, true otherwise
+       */
+      private <T> Boolean compareSnapshotRef(BoundReference<T> ref, Literal<T> lit,
+                                             Function<Integer, Boolean> desiredResult) {
+        if (isSnapshotRef(ref)) {
+          Literal<Long> longLit = lit.to(Types.LongType.get());

Review Comment:
   Yea it seems it's checked when trying to make the predicate:
   ```
   java.lang.NullPointerException: Cannot create expression literal from null
   	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907)
   	at org.apache.iceberg.expressions.Literals.from(Literals.java:62)
   	at org.apache.iceberg.expressions.UnboundPredicate.<init>(UnboundPredicate.java:40)
   	at org.apache.iceberg.expressions.Expressions.equal(Expressions.java:175)
   ```
   
   So i think it's not necessary, as it's pretty internal.  Also, none of the other Evaluators check this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r908116610


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -43,6 +52,7 @@
  * This table may return duplicate rows.
  */
 public class AllManifestsTable extends BaseMetadataTable {
+  private static final int REF_SNAPSHOT_ID = 18;

Review Comment:
   Yea I am not sure if there is consensus, Russell made a discussion in https://github.com/apache/iceberg/pull/4847#discussion_r906391975 but didn't finished (not sure if there is a big value for the other constants if they are never referenced outside?).  But If so, we should do it in another pr as its unrelated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#issuecomment-1139111519

   Let me take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#issuecomment-1140039446

   > adding this column this would enforce that we return duplicates
   
   Not necessarily, we could return any one or maybe the latest one. There aren't that many snapshots, so we can figure out an ordering and return the latest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r910469443


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +226,190 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,

Review Comment:
   Yea this is copy + add from ManifestsTable class (should have mentioned), so if we fix it can be in another pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho merged pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho merged PR #4847:
URL: https://github.com/apache/iceberg/pull/4847


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#issuecomment-1167975002

   Let me take another look as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907919684


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -112,21 +123,26 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
       Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
-      return CloseableIterable.withNoopClose(Iterables.transform(table().snapshots(), snap -> {
+      SnapshotEvaluator snapshotEvaluator = new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(),
+          isCaseSensitive());
+      Iterable<Snapshot> filteredSnapshots = Iterables.filter(table().snapshots(),
+          snapshot -> snapshotEvaluator.eval(snapshot));

Review Comment:
   Ah great.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907936140


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +222,188 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH;
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (noneMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (anyMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      /**
+       * Comparison of snapshot reference and literal, using long comparator.
+       *
+       * @param ref           bound reference, comparison attempted only if reference is for reference_snapshot_id
+       * @param lit           literal value to compare with snapshot id.
+       * @param desiredResult function to apply to long comparator result, returns true if result is as expected.
+       * @return false if comparator does not achieve desired result, true otherwise
+       */
+      private <T> Boolean compareSnapshotRef(BoundReference<T> ref, Literal<T> lit,
+                                             Function<Integer, Boolean> desiredResult) {
+        if (isSnapshotRef(ref)) {
+          Literal<Long> longLit = lit.to(Types.LongType.get());

Review Comment:
   I imagine it’s unlikely to happen as the query should throw before this, but any concern with potential `null` being passed in for`lit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#issuecomment-1163525761

   I'll have time to take a look today. Sorry for the delay!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883783558


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   > A data scan, for example, can't scan more than one snapshot.
   
   It is true but `ALL_*` tables, on the other hand, don't support `useSnapshot`, `asOfTime` and incremental features.
   
   > What about exposing a reference_snapshot_id column
   
   I'd be open for that too. It will be the snapshot ID of the manifest list we read these manifests from, right? Adding a new column is easy. How can we do the filtering? We need to filter `Snapshot`s having a generic filter `Expression` that can reference any columns. Is there an easy way to perform the filtering?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883155660


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   I am thinking about the best way to support this. I agree about having a Spark read option but I wonder whether we should have scan options or some sort of `useSnapshots(ids)` method in `TableScan`. We can only support that method in `ALL_*` metadata tables and throw an exception otherwise.
   
   Thoughts, @rdblue @szehon-ho @RussellSpitzer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883783558


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   > A data scan, for example, can't scan more than one snapshot.
   
   It is true but `ALL_*` tables, on the other hand, don't support `useSnapshot`, `asOfTime` and incremental features.
   
   > What about exposing a reference_snapshot_id column
   
   I'd be open for that too. It will be the snapshot ID of the manifest list we read these manifests from, right? Adding a new column is fairly easy. How can we do the filtering? We need to filter `Snapshot`s and we have a generic filter `Expression` that can reference any columns. Is there an easy way to perform the filtering?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883255609


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   @rdblue yea , I considered that.  My thought at the time is that same manifest may have different reference_snapshot_id .  Currently I guess we don't do de-duplication in all-manifest, but this would enforce that we return one manifest per reference_snapshot_id (ie, for an append-only table, the first manifest would appear as many times as snapshots.).  Also seems like this would require a bit complex code like adding a ProjectionEvaluator for snapshot filters.  Not totally against it, and maybe this is a useful feature even outside, but seems a bit harder to do in my view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883980353


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   @aokolnychyi, that, or we could just pattern match and reject anything but IN predicates. As long as we're the ones controlling what filters we use, we have some flexibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907918873


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);

Review Comment:
   Yea I think its the same in this case, but thought its safer to use the Comparator lookup for Long type to get the official Iceberg way to compare (if it ever differs from Java's comparator).  I guess this is for all the comparisons in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r912210579


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);

Review Comment:
   Sorry I'm late to this review, but the `literalSet` will use the correct comparator for a given type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r904520180


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {

Review Comment:
   nit: should it be `referenceSnapshotId` to match the name of the field?



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean in = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!in) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean notIn = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!notIn) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    /**
+     * Comparison of snapshot reference and literal
+     * @param ref bound reference, comparison attempted only if reference is for reference_snapshot_id

Review Comment:
   nit: I'd add an empty line before params



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());

Review Comment:
   nit: Can we move this line into the if block not to instantiate it all the time?



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean in = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!in) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());

Review Comment:
   nit: Same here



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());

Review Comment:
   `Comparator<Object>` looked suspicious at first but I think it should work.



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean in = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!in) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean notIn = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!notIn) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    /**
+     * Comparison of snapshot reference and literal
+     * @param ref bound reference, comparison attempted only if reference is for reference_snapshot_id
+     * @param lit literal value to compare with snapshot id.
+     * @param compareResult function to apply to comparison result, which returns true if rows might match
+     * @return boolean true if rows might match, false if cannot match
+     */
+    private <T> Boolean compare(BoundReference<T> ref,
+                                Literal<T> lit,
+                                Function<Integer, Boolean> compareResult) {

Review Comment:
   nit: is there a better name than compareResult for this function?



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean in = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);

Review Comment:
   nit: If you want to, you can use `noneMatch` to avoid the negation in `if`:
   
   ```
   boolean noneMatch = literalSet.stream().noneMatch(lit -> comparator.compare(snapshotId, lit) == 0);
   if (noneMatch) {
     return ROWS_CANNOT_MATCH;
   }
   ```



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean in = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!in) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean notIn = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!notIn) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    /**
+     * Comparison of snapshot reference and literal
+     * @param ref bound reference, comparison attempted only if reference is for reference_snapshot_id
+     * @param lit literal value to compare with snapshot id.
+     * @param compareResult function to apply to comparison result, which returns true if rows might match
+     * @return boolean true if rows might match, false if cannot match
+     */
+    private <T> Boolean compare(BoundReference<T> ref,
+                                Literal<T> lit,
+                                Function<Integer, Boolean> compareResult) {
+
+      if (snapshotIdRef(ref)) {
+        Literal<Long> longLit = lit.to(Types.LongType.get());
+        int cmp = longLit.comparator().compare(snapshotId, longLit.value());
+        if (!compareResult.apply(cmp)) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    private <T> boolean snapshotIdRef(BoundReference<T> ref) {

Review Comment:
   nit: will it be a bit more readable if we added `is` to the method name? I know we don't always do that but I feel like it would make sense. What do you think?



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {

Review Comment:
   What about following what we did in `Evaluator`? The main idea is to have an evaluator that you instantiate once. That evaluator would have `eval(snapshot)` method and an inner visitor class. That way, we can do the binding only once instead of per snapshot and reuse the same evaluator for all snapshots.
   
   Let me know what you think about the `Evaluator` class.



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean in = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!in) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean notIn = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);

Review Comment:
   nit: Same here, you can use `anyMatch` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r904520180


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {

Review Comment:
   nit: Should it be `referenceSnapshotId` to match the name of the field?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883255609


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   @rdblue yea , I considered that.  My thought at the time is that same manifest may have reference in many snapshots, thus many references reference_snapshot_id, so it becomes messy.  Currently I guess we don't do de-duplication in all-manifest table, but adding this column this would enforce that we return duplicates, one time per snapshot that references it.  Also seems like this would require complex code like adding a ProjectionEvaluator for snapshot filtering, just for this one case.  Not totally against it, and maybe this is a useful column even for outside users, just seems a bit harder to do in my view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r906393640


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -43,6 +52,7 @@
  * This table may return duplicate rows.
  */
 public class AllManifestsTable extends BaseMetadataTable {
+  private static final int REF_SNAPSHOT_ID = 18;

Review Comment:
   Yea that's it actually, its used in two places.  Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907858177


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +226,190 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,

Review Comment:
   Well, it is probably because we did not have those counts populated initially so there may be old manifests without that info.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907858452


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +226,190 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,

Review Comment:
   That means I have to switch to optional delete counts as well. I'll follow up with a PR for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r912210891


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (noneMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);

Review Comment:
   Yeah, we should eventually update this. The set's `contains` method will be correct. It's probably not a big deal right now since there are so few snapshots.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r908108774


##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -991,18 +1005,34 @@ public void testAllManifestsTable() {
     Table manifestTable = loadTable(tableIdentifier, "all_manifests");
     Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
 
-    List<ManifestFile> manifests = Lists.newArrayList();
-
     df1.select("id", "data").write()
         .format("iceberg")
         .mode("append")
         .save(loadLocation(tableIdentifier));
 
-    manifests.addAll(table.currentSnapshot().allManifests());
+    table.updateProperties()
+        .set(TableProperties.FORMAT_VERSION, "2")
+        .commit();
+
+    DataFile dataFile = Iterables.getFirst(table.currentSnapshot().addedFiles(table.io()), null);
+    PartitionSpec dataFileSpec = table.specs().get(dataFile.specId());
+    StructLike dataFilePartition = dataFile.partition();
+
+    PositionDelete<InternalRow> delete = PositionDelete.create();
+    delete.set(dataFile.path(), 0L, null);
+
+    DeleteFile deleteFile = writePositionDeletes(table, dataFileSpec, dataFilePartition, ImmutableList.of(delete));
+
+    table.newRowDelta()
+        .addDeletes(deleteFile)
+        .commit();
 
     table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
 
-    manifests.addAll(table.currentSnapshot().allManifests());
+    Stream<Pair<Long, ManifestFile>> snapshotIdToManifests =
+        StreamSupport.stream(table.snapshots().spliterator(), false)

Review Comment:
   Yea I usually do, but this one is just parallel and pretty common, so maybe skip this time?  Would be great if StreamSupport has a default value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho closed pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho closed pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table
URL: https://github.com/apache/iceberg/pull/4847


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r905410243


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {

Review Comment:
   Makes sense, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r904527353


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean in = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!in) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean notIn = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!notIn) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    /**
+     * Comparison of snapshot reference and literal
+     * @param ref bound reference, comparison attempted only if reference is for reference_snapshot_id
+     * @param lit literal value to compare with snapshot id.
+     * @param compareResult function to apply to comparison result, which returns true if rows might match
+     * @return boolean true if rows might match, false if cannot match
+     */
+    private <T> Boolean compare(BoundReference<T> ref,
+                                Literal<T> lit,
+                                Function<Integer, Boolean> compareResult) {
+
+      if (snapshotIdRef(ref)) {
+        Literal<Long> longLit = lit.to(Types.LongType.get());
+        int cmp = longLit.comparator().compare(snapshotId, longLit.value());
+        if (!compareResult.apply(cmp)) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    private <T> boolean snapshotIdRef(BoundReference<T> ref) {

Review Comment:
   nit: Will it be a bit more readable if we added `is` to the method name? I know we don't always do that but I feel like it would make sense. What do you think?



##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +228,178 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referencedSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referencedSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator extends BoundExpressionVisitor<Boolean> {
+
+    private Expression boundExpr;
+    private Long snapshotId;
+
+    private static final boolean ROWS_MIGHT_MATCH = true;
+    private static final boolean ROWS_CANNOT_MATCH = false;
+
+    private SnapshotEvaluator(Expression expr, Snapshot snap, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+      this.snapshotId = snap.snapshotId();
+    }
+
+    private boolean eval() {
+      return ExpressionVisitors.visitEvaluator(boundExpr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH;
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean isNaN(BoundReference<T> ref) {
+      if (snapshotIdRef(ref)) {
+        return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+      } else {
+        return ROWS_MIGHT_MATCH;
+      }
+    }
+
+    @Override
+    public <T> Boolean notNaN(BoundReference<T> ref) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult < 0);
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult <= 0);
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult > 0);
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult >= 0);
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult == 0);
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      return compare(ref, lit, compareResult -> compareResult != 0);
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean in = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!in) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+      Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+      if (snapshotIdRef(ref)) {
+        boolean notIn = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+        if (!notIn) {
+          return ROWS_CANNOT_MATCH;
+        }
+      }
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    /**
+     * Comparison of snapshot reference and literal
+     * @param ref bound reference, comparison attempted only if reference is for reference_snapshot_id
+     * @param lit literal value to compare with snapshot id.
+     * @param compareResult function to apply to comparison result, which returns true if rows might match
+     * @return boolean true if rows might match, false if cannot match
+     */
+    private <T> Boolean compare(BoundReference<T> ref,
+                                Literal<T> lit,
+                                Function<Integer, Boolean> compareResult) {

Review Comment:
   nit: Is there a better name than compareResult for this function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907755654


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +226,190 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,

Review Comment:
   I think 0 may be the wrong thing here, shouldn't it be "null" if we don't have the info?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907753569


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -138,12 +154,18 @@ static class ManifestListReadTask implements DataTask {
     private final Schema schema;
     private final Map<Integer, PartitionSpec> specs;
     private final FileScanTask manifestListTask;
+    private final long referenceSnapshotId;
 
-    ManifestListReadTask(FileIO io, Schema schema, Map<Integer, PartitionSpec> specs, FileScanTask manifestListTask) {
+    ManifestListReadTask(FileIO io,

Review Comment:
   formatting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r906395431


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;

Review Comment:
   Shouldn't this always be rows don't match if "isSnapshotRef"? Like in isNan?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r906391975


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -43,6 +52,7 @@
  * This table may return duplicate rows.
  */
 public class AllManifestsTable extends BaseMetadataTable {
+  private static final int REF_SNAPSHOT_ID = 18;

Review Comment:
   Are we keeping this a constant because we refer to it again in the isSnapshotRef function? I just wonder because we don't do this for any of our other fields in the schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907850887


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -112,21 +123,26 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
       Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
-      return CloseableIterable.withNoopClose(Iterables.transform(table().snapshots(), snap -> {
+      SnapshotEvaluator snapshotEvaluator = new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(),
+          isCaseSensitive());
+      Iterable<Snapshot> filteredSnapshots = Iterables.filter(table().snapshots(),
+          snapshot -> snapshotEvaluator.eval(snapshot));

Review Comment:
   nit: this can fit one one line
   
   ```
   Iterable<Snapshot> filteredSnapshots = Iterables.filter(table().snapshots(), snapshotEvaluator::eval);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#issuecomment-1168325895

   Restarting as Flink test is frozen.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883255609


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   @rdblue yea , I considered that.  My thought at the time is that same manifest may have reference in many snapshots, thus many references reference_snapshot_id, so it becomes messy.  Currently I guess we don't do de-duplication in all-manifest table, but adding this column this would enforce that we return duplicates, one time per snapshot that references it.  Also seems like this would require complex code like adding a ProjectionEvaluator for snapshot filtering.  Not totally against it, and maybe this is a useful feature even outside, but seems a bit harder to do in my view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883255609


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   @rdblue yea , I considered that.  My thought at the time is that same manifest may have reference in many snapshots, thus many references reference_snapshot_id, so it becomes messy.  Currently I guess we don't do de-duplication in all-manifest table, but adding this column this would enforce that we return duplicates, one time per snapshot that references it.  Also seems like this would require complex code like adding a ProjectionEvaluator for snapshot filtering.  Not totally against it, and maybe this is a useful column even for outside users, but seems a bit harder to do in my view.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907866423


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +226,190 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,

Review Comment:
   I'd still use 0 for data files count when reading a delete manifest as null means unknown but we know there is no data file in a delete manifest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907752784


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (noneMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (anyMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      /**
+       * Comparison of snapshot reference and literal, using long comparator.
+       *
+       * @param ref           bound reference, comparison attempted only if reference is for reference_snapshot_id
+       * @param lit           literal value to compare with snapshot id.
+       * @param desiredResult function to apply to long comparator result, returns true if result is as expected.
+       * @return false if comparator does not achieve desired result, true otherwise
+       */
+      private <T> Boolean compareSnapshotRef(BoundReference<T> ref,

Review Comment:
   This (an others have little formatting issues)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907752243


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
+          if (noneMatch) {
+            return ROWS_CANNOT_MATCH;
+          }
+        }
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);

Review Comment:
   Another call of "contains?"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907751846


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +229,186 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
+        manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
+        ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
+        referenceSnapshotId
+    );
+  }
+
+  private static class SnapshotEvaluator {
+
+    private final Expression boundExpr;
+
+    private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
+      this.boundExpr = Binder.bind(structType, expr, caseSensitive);
+    }
+
+    private boolean eval(Snapshot snapshot) {
+      return new SnapshotEvalVisitor().eval(snapshot);
+    }
+
+    private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {
+
+      private long snapshotId;
+      private static final boolean ROWS_MIGHT_MATCH = true;
+      private static final boolean ROWS_CANNOT_MATCH = false;
+
+      private boolean eval(Snapshot snapshot) {
+        this.snapshotId = snapshot.snapshotId();
+        return ExpressionVisitors.visitEvaluator(boundExpr, this);
+      }
+
+      @Override
+      public Boolean alwaysTrue() {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public Boolean alwaysFalse() {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      @Override
+      public Boolean not(Boolean result) {
+        return !result;
+      }
+
+      @Override
+      public Boolean and(Boolean leftResult, Boolean rightResult) {
+        return leftResult && rightResult;
+      }
+
+      @Override
+      public Boolean or(Boolean leftResult, Boolean rightResult) {
+        return leftResult || rightResult;
+      }
+
+      @Override
+      public <T> Boolean isNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean notNull(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean isNaN(BoundReference<T> ref) {
+        if (isSnapshotRef(ref)) {
+          return ROWS_CANNOT_MATCH; // reference_snapshot_id column is never nan
+        } else {
+          return ROWS_MIGHT_MATCH;
+        }
+      }
+
+      @Override
+      public <T> Boolean notNaN(BoundReference<T> ref) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      @Override
+      public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
+      }
+
+      @Override
+      public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
+      }
+
+      @Override
+      public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
+      }
+
+      @Override
+      public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
+      }
+
+      @Override
+      public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
+      }
+
+      @Override
+      public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+        return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
+      }
+
+      @Override
+      public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
+        if (isSnapshotRef(ref)) {
+          Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
+          boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);

Review Comment:
   Can we do this with 
   
   ```literalSet.contains(snapshotId)?```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r907848096


##########
core/src/main/java/org/apache/iceberg/AllManifestsTable.java:
##########
@@ -204,4 +226,190 @@ public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
   }
+
+  static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
+    return StaticDataTask.Row.of(
+        manifest.content().id(),
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,

Review Comment:
   I think the problem is in the original schema in `ManifestsTable`, where the counts are required. I also did not notice that data file counts are optional in `AllManifestsTable` when I recently added counts for delete files. I think we should continue to use 0 for compatibility but I wonder why we have such a mismatch in nullability between these tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#issuecomment-1145497448

   @aokolnychyi change is ready if you guys have time for another pass, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#issuecomment-1150507969

   FYI @rdblue if you have time to take a look, as well, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r882993369


##########
core/src/main/java/org/apache/iceberg/util/PropertyUtil.java:
##########
@@ -74,6 +76,19 @@ public static String propertyAsString(Map<String, String> properties,
     return defaultValue;
   }
 
+  public static List<Long> propertyAsLongList(Map<String, String> properties,
+                                              String property) {
+    String value = properties.get(property);
+    if (value == null) {
+      return null;
+    }
+    return Splitter.on(",")

Review Comment:
   Nit: Might be good to declare the splitter one time as a private static field, as opposed to constructing it every time (unless there's some serialization-related reason it has to be reinstantiated).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883142482


##########
core/src/main/java/org/apache/iceberg/util/PropertyUtil.java:
##########
@@ -74,6 +76,19 @@ public static String propertyAsString(Map<String, String> properties,
     return defaultValue;
   }
 
+  public static List<Long> propertyAsLongList(Map<String, String> properties,
+                                              String property) {
+    String value = properties.get(property);
+    if (value == null) {
+      return null;
+    }
+    return Splitter.on(",")

Review Comment:
   +1 to Kyle's suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4847: Spark 3.2: Add snapshot-id filter property to AllManifest table

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#discussion_r883208374


##########
core/src/main/java/org/apache/iceberg/ScanProperties.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+public class ScanProperties {

Review Comment:
   This isn't terrible, but I would much rather expose this in some other way. I don't think adding it to `TableScan` makes sense because it is very specific to this feature and would cause confusion in other cases. A data scan, for example, can't scan more than one snapshot.
   
   What about exposing a `reference_snapshot_id` column (or with an _ for a metadata column)? If we did that, we could accept a filter from Spark, like `reference_snapshot_id IN (1, 2, 3, 4)` and special case the pushdown. We'd need to ensure that we give a consistent value for that column, which could be the latest snapshot that has the manifest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4847: Core: Add reference_snapshot_id column and filter to AllManifest table

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4847:
URL: https://github.com/apache/iceberg/pull/4847#issuecomment-1164820988

   Thanks for review, addressed the round of comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org