You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "marvinlanhenke (via GitHub)" <gi...@apache.org> on 2024/04/29 13:42:47 UTC

[PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

marvinlanhenke opened a new pull request, #360:
URL: https://github.com/apache/iceberg-rust/pull/360

   ### Which issue does this PR close?
   Closes #359
   
   ### Rationale for this change
   The `partition_filter` (inclusive projection) is not only required by the `ManifestEvaluator` but also by the (incoming) `ExpressionEvaluator`. In order to avoid duplicate code as well as unnecessary computation of the partition filters, we should extract the computation of the partition filters and cache the results per partition_spec_id.
   
   With this refactor we (hopefully) should be able to integrate the `ExpressionEvaluator` and the `InclusiveMetricsEvaluator` more easily.
   
   ### What changes are included in this PR?
   - refactor: Extract and decouple computation of partition filters from construction logic of ManifestEvaluator
   - refactor: testsuite ManifestEvaluator
   - refactor: add FileScanStreamContext + helper fn to construct partition_spec & schema
   - refactor: add thin wrapper for PartitionFileCache & ManifestEvaluatorCache for better encapsulation
   
   ### Are these changes tested?
   Yes. Unit tests are included.
   If PR is okay; I will add some basis tests for the new structs (FileScanStreamContext, etc.) as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584111575


##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -16,74 +16,49 @@
 // under the License.
 
 use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
-use crate::expr::visitors::inclusive_projection::InclusiveProjection;
-use crate::expr::{Bind, BoundPredicate, BoundReference};
-use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef};
-use crate::{Error, ErrorKind};
+use crate::expr::{BoundPredicate, BoundReference};
+use crate::spec::{Datum, FieldSummary, ManifestFile};
+use crate::{Error, ErrorKind, Result};
 use fnv::FnvHashSet;
-use std::sync::Arc;
 
-/// Evaluates [`ManifestFile`]s to see if their partition summary matches a provided
-/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of [`ManifestFile`]s
+#[derive(Debug)]
+/// Evaluates a [`ManifestFile`] to see if the partition summaries
+/// match a provided [`BoundPredicate`].
+///
+/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
 /// in which data might be found that matches the TableScan's filter.
 pub(crate) struct ManifestEvaluator {
-    partition_schema: SchemaRef,
+    partition_schema_id: i32,
     partition_filter: BoundPredicate,
     case_sensitive: bool,
 }
 
 impl ManifestEvaluator {
     pub(crate) fn new(
-        partition_spec: PartitionSpecRef,
-        table_schema: SchemaRef,
-        filter: BoundPredicate,
+        partition_schema_id: i32,

Review Comment:
   I like the idea of removing the runtime check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {
+        match self.filter {
+            Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)),
+            None => Ok(None),
+        }
+    }
+
+    /// Creates a reference-counted [`PartitionSpec`] and a
+    /// corresponding schema based on the specified partition spec id.
+    fn create_partition_spec_and_schema(
+        &self,
+        spec_id: i32,
+    ) -> Result<(&PartitionSpecRef, SchemaRef)> {

Review Comment:
   > Could we also do this up-front in `new` or just once
   
   I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec and the partition_schema?
   
   Fixed the return type `PartitionSpecRef` though
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {
+        match self.filter {
+            Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)),
+            None => Ok(None),
+        }
+    }
+
+    /// Creates a reference-counted [`PartitionSpec`] and a
+    /// corresponding schema based on the specified partition spec id.
+    fn create_partition_spec_and_schema(
+        &self,
+        spec_id: i32,
+    ) -> Result<(&PartitionSpecRef, SchemaRef)> {

Review Comment:
   > Could we also do this up-front in `new` or just once
   
   I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec, unless we don't actually need the entry.partition_spec_id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#issuecomment-2083513030

   @Fokko thanks for clearing this up and thanks for the review. This was is the [line](https://github.com/apache/iceberg-python/blob/main/pyiceberg%2Ftable%2F__init__.py#L1647) that led me to believe we also use the inclusive projection in the expression evaluator.
   
   However, if I understand correctly we still needed to extract the partition filters since they will be needed for positional deletes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "sdd (via GitHub)" <gi...@apache.org>.
sdd commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583577520


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {
+        match self.filter {
+            Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)),
+            None => Ok(None),
+        }
+    }
+
+    /// Creates a reference-counted [`PartitionSpec`] and a
+    /// corresponding schema based on the specified partition spec id.
+    fn create_partition_spec_and_schema(
+        &self,
+        spec_id: i32,
+    ) -> Result<(&PartitionSpecRef, SchemaRef)> {

Review Comment:
   Seems a bit unexpected to return an `&PartitionSpecRef`. Usually you'd return either a `PartitionSpecRef` or a `&PartitionSpec`. Could we also do this up-front in `new` or just once, storing the `PartitionSpecRef` and `SchemaRef` in the context so that this method just clones those `Arc`s from the context?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583606448


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {
+        match self.filter {
+            Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)),
+            None => Ok(None),
+        }
+    }
+
+    /// Creates a reference-counted [`PartitionSpec`] and a
+    /// corresponding schema based on the specified partition spec id.
+    fn create_partition_spec_and_schema(
+        &self,
+        spec_id: i32,
+    ) -> Result<(&PartitionSpecRef, SchemaRef)> {

Review Comment:
   Sounds reasonable to me. I'll take a look tomorrow. Thanks for the suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584120346


##########
crates/iceberg/src/scan.rs:
##########
@@ -169,55 +177,66 @@ pub struct TableScan {
     filter: Option<Arc<Predicate>>,
 }
 
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
 impl TableScan {
-    /// Returns a stream of file scan tasks.
-
-    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        // Cache `ManifestEvaluatorFactory`s created as part of this scan
-        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = HashMap::new();
-
-        // these variables needed to ensure that we don't need to pass a
-        // reference to self into `try_stream`, as it expects references
-        // passed in to outlive 'static
-        let schema = self.schema.clone();
-        let snapshot = self.snapshot.clone();
-        let table_metadata = self.table_metadata.clone();
-        let file_io = self.file_io.clone();
-        let case_sensitive = self.case_sensitive;
-        let filter = self.filter.clone();
+    /// Returns a stream of [`FileScanTask`]s.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let context = FileScanStreamContext::new(
+            self.schema.clone(),
+            self.snapshot.clone(),
+            self.table_metadata.clone(),
+            self.file_io.clone(),
+            self.filter.clone(),
+            self.case_sensitive,
+        );
+
+        let bound_filter = context.bound_filter()?;
+
+        let mut partition_filter_cache = PartitionFilterCache::new();
+        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
 
         Ok(try_stream! {
-            let manifest_list = snapshot
-            .clone()
-            .load_manifest_list(&file_io, &table_metadata)
-            .await?;
+            let manifest_list = context
+                .snapshot
+                .load_manifest_list(&context.file_io, &context.table_metadata)
+                .await?;
 
-            // Generate data file stream
             for entry in manifest_list.entries() {
-                // If this scan has a filter, check the partition evaluator cache for an existing
-                // PartitionEvaluator that matches this manifest's partition spec ID.
-                // Use one from the cache if there is one. If not, create one, put it in
-                // the cache, and take a reference to it.
-                #[allow(clippy::map_entry)]
-                if let Some(filter) = filter.as_ref() {
-                    if !manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-                        manifest_evaluator_cache.insert(entry.partition_spec_id, Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), table_metadata.clone(), case_sensitive, filter)?);
-                    }
-                    let manifest_evaluator = &manifest_evaluator_cache[&entry.partition_spec_id];
+                if let Some(filter) = &bound_filter {

Review Comment:
   should be done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#issuecomment-2084698709

   @sdd thanks for the review and @Fokko thanks for merging this.
   
   There is still an unresolved issue though about "not caching" the partition_schemas. 
   I'll create an issue for that; so we can discuss the approach first; Since I defenitely could net some help here in terms of idiomatic Rust.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583607459


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {

Review Comment:
   Also sounds good to me. I'll take a look tomorrow. Thanks 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {
+        match self.filter {
+            Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)),
+            None => Ok(None),
+        }
+    }
+
+    /// Creates a reference-counted [`PartitionSpec`] and a
+    /// corresponding schema based on the specified partition spec id.
+    fn create_partition_spec_and_schema(
+        &self,
+        spec_id: i32,
+    ) -> Result<(&PartitionSpecRef, SchemaRef)> {

Review Comment:
   > Could we also do this up-front in `new` or just once
   
   I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec, unless we don't actually need the entry.partition_spec_id.
   
   As of right now, we only use it to make the runtime check in manifest evaluator if entry.partition_spec_id and partition_schema.schema_id() match.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584112041


##########
crates/iceberg/src/scan.rs:
##########
@@ -99,7 +107,7 @@ impl<'a> TableScanBuilder<'a> {
     }
 
     /// Build the table scan.
-    pub fn build(self) -> crate::Result<TableScan> {
+    pub fn build(self) -> Result<TableScan> {

Review Comment:
   @liurenjie1024 what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko merged PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "sdd (via GitHub)" <gi...@apache.org>.
sdd commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583574672


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {

Review Comment:
   Could we build the `Option<BoundPredicate>` inside `new`, and have new return a `Result<Self>`, and then have `bound_filter` return an `Option<&BoundPredicate>` instead so that we're effectively caching the bind operation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {
+        match self.filter {
+            Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)),
+            None => Ok(None),
+        }
+    }
+
+    /// Creates a reference-counted [`PartitionSpec`] and a
+    /// corresponding schema based on the specified partition spec id.
+    fn create_partition_spec_and_schema(
+        &self,
+        spec_id: i32,
+    ) -> Result<(&PartitionSpecRef, SchemaRef)> {

Review Comment:
   > Could we also do this up-front in `new` or just once
   
   I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec and the partition_schema, this way.
   
   However, it makes sense to just create a "new" partition_schema if we have a "new" spec_id. So I'll see where we can move the logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "liurenjie1024 (via GitHub)" <gi...@apache.org>.
liurenjie1024 commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1587013567


##########
crates/iceberg/src/scan.rs:
##########
@@ -169,55 +177,66 @@ pub struct TableScan {
     filter: Option<Arc<Predicate>>,
 }
 
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
 impl TableScan {
-    /// Returns a stream of file scan tasks.
-
-    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        // Cache `ManifestEvaluatorFactory`s created as part of this scan
-        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = HashMap::new();
-
-        // these variables needed to ensure that we don't need to pass a
-        // reference to self into `try_stream`, as it expects references
-        // passed in to outlive 'static
-        let schema = self.schema.clone();
-        let snapshot = self.snapshot.clone();
-        let table_metadata = self.table_metadata.clone();
-        let file_io = self.file_io.clone();
-        let case_sensitive = self.case_sensitive;
-        let filter = self.filter.clone();
+    /// Returns a stream of [`FileScanTask`]s.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let context = FileScanStreamContext::new(
+            self.schema.clone(),
+            self.snapshot.clone(),
+            self.table_metadata.clone(),
+            self.file_io.clone(),
+            self.filter.clone(),
+            self.case_sensitive,
+        );
+
+        let bound_filter = context.bound_filter()?;
+
+        let mut partition_filter_cache = PartitionFilterCache::new();
+        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
 
         Ok(try_stream! {
-            let manifest_list = snapshot
-            .clone()
-            .load_manifest_list(&file_io, &table_metadata)
-            .await?;
+            let manifest_list = context
+                .snapshot
+                .load_manifest_list(&context.file_io, &context.table_metadata)
+                .await?;
 
-            // Generate data file stream
             for entry in manifest_list.entries() {
-                // If this scan has a filter, check the partition evaluator cache for an existing
-                // PartitionEvaluator that matches this manifest's partition spec ID.
-                // Use one from the cache if there is one. If not, create one, put it in
-                // the cache, and take a reference to it.
-                #[allow(clippy::map_entry)]
-                if let Some(filter) = filter.as_ref() {
-                    if !manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-                        manifest_evaluator_cache.insert(entry.partition_spec_id, Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), table_metadata.clone(), case_sensitive, filter)?);
-                    }
-                    let manifest_evaluator = &manifest_evaluator_cache[&entry.partition_spec_id];
+                if let Some(filter) = &bound_filter {
+                    let partition_spec_id = entry.partition_spec_id;
+
+                    let (partition_spec, partition_schema) =
+                        context.create_partition_spec_and_schema(partition_spec_id)?;
+
+                    let partition_filter = partition_filter_cache.get(
+                        partition_spec_id,
+                        partition_spec.clone(),
+                        partition_schema.clone(),
+                        filter,
+                        context.case_sensitive,
+                    )?;
+
+                    let manifest_evaluator = manifest_evaluator_cache.get(
+                        partition_spec_id,
+                        partition_schema.schema_id(),
+                        partition_filter.clone(),
+                        context.case_sensitive,
+                    );
 
-                    // reject any manifest files whose partition values don't match the filter.
                     if !manifest_evaluator.eval(entry)? {
                         continue;
                     }
+
+                    // TODO: Create ExpressionEvaluator
                 }
 
-                let manifest = entry.load_manifest(&file_io).await?;
+                let manifest = entry.load_manifest(&context.file_io).await?;

Review Comment:
   Yes, we need to resolve #124 first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "liurenjie1024 (via GitHub)" <gi...@apache.org>.
liurenjie1024 commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1587013047


##########
crates/iceberg/src/scan.rs:
##########
@@ -99,7 +107,7 @@ impl<'a> TableScanBuilder<'a> {
     }
 
     /// Build the table scan.
-    pub fn build(self) -> crate::Result<TableScan> {
+    pub fn build(self) -> Result<TableScan> {

Review Comment:
   Sorry for late reply. I argee that we should have a convention, and  I prefer to remove the `crate::` prefix. I'm not sure if a linter could enfore this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584098222


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1584100390


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {
+    schema: SchemaRef,
+    snapshot: SnapshotRef,
+    table_metadata: TableMetadataRef,
+    file_io: FileIO,
+    filter: Option<Arc<Predicate>>,
+    case_sensitive: bool,
+}
+
+impl FileScanStreamContext {
+    /// Creates a new [`FileScanStreamContext`].
+    fn new(
+        schema: SchemaRef,
+        snapshot: SnapshotRef,
+        table_metadata: TableMetadataRef,
+        file_io: FileIO,
+        filter: Option<Arc<Predicate>>,
+        case_sensitive: bool,
+    ) -> Self {
+        Self {
+            schema,
+            snapshot,
+            table_metadata,
+            file_io,
+            filter,
+            case_sensitive,
+        }
+    }
+
+    /// Creates a [`BoundPredicate`] from row filter [`Predicate`].
+    fn bound_filter(&self) -> Result<Option<BoundPredicate>> {
+        match self.filter {
+            Some(ref filter) => Ok(Some(filter.bind(self.schema.clone(), self.case_sensitive)?)),
+            None => Ok(None),
+        }
+    }
+
+    /// Creates a reference-counted [`PartitionSpec`] and a
+    /// corresponding schema based on the specified partition spec id.
+    fn create_partition_spec_and_schema(
+        &self,
+        spec_id: i32,
+    ) -> Result<(&PartitionSpecRef, SchemaRef)> {

Review Comment:
   > Could we also do this up-front in `new` or just once
   
   I'm not sure we can do that, since we need the partition_spec_id from each `entry` inside the for-loop. However, the `FileScanStreamContext` need to be instantiated outside the stream. So I guess we can't "cache" the spec and the partition_schema?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#issuecomment-2084680618

   > Since I also wanted to tackle the impl of the ExpressionEvaluator I'd be grateful if you could tell me where I went wrong? My understanding is that the partition_expr is passed into the ExpressionEvaluator and that the expr is taken from the DefaultDict of partition filters which contains the inclusive projections?
   
   You're right, we need both of them 👍 
   
   > However, if I understand correctly we still needed to extract the partition filters since they will be needed for positional deletes.
   
   Yes, we need it there as well :) Let's move this forward. Thanks @marvinlanhenke for working on this, and @sdd for the prompt review 🙌 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "marvinlanhenke (via GitHub)" <gi...@apache.org>.
marvinlanhenke commented on PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#issuecomment-2082804625

   @sdd @Fokko @liurenjie1024 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "sdd (via GitHub)" <gi...@apache.org>.
sdd commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583579994


##########
crates/iceberg/src/scan.rs:
##########
@@ -314,6 +312,140 @@ impl TableScan {
     }
 }
 
+#[derive(Debug)]
+/// Holds the context necessary for file scanning operations
+/// in a streaming environment.
+struct FileScanStreamContext {

Review Comment:
   This is much cleaner 😎 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Refactor: Extract `partition_filters` from `ManifestEvaluator` [iceberg-rust]

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #360:
URL: https://github.com/apache/iceberg-rust/pull/360#discussion_r1583486316


##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -16,74 +16,49 @@
 // under the License.
 
 use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
-use crate::expr::visitors::inclusive_projection::InclusiveProjection;
-use crate::expr::{Bind, BoundPredicate, BoundReference};
-use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef};
-use crate::{Error, ErrorKind};
+use crate::expr::{BoundPredicate, BoundReference};
+use crate::spec::{Datum, FieldSummary, ManifestFile};
+use crate::{Error, ErrorKind, Result};
 use fnv::FnvHashSet;
-use std::sync::Arc;
 
-/// Evaluates [`ManifestFile`]s to see if their partition summary matches a provided
-/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of [`ManifestFile`]s
+#[derive(Debug)]
+/// Evaluates a [`ManifestFile`] to see if the partition summaries
+/// match a provided [`BoundPredicate`].
+///
+/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
 /// in which data might be found that matches the TableScan's filter.
 pub(crate) struct ManifestEvaluator {
-    partition_schema: SchemaRef,
+    partition_schema_id: i32,
     partition_filter: BoundPredicate,
     case_sensitive: bool,
 }
 
 impl ManifestEvaluator {
     pub(crate) fn new(
-        partition_spec: PartitionSpecRef,
-        table_schema: SchemaRef,
-        filter: BoundPredicate,
+        partition_schema_id: i32,

Review Comment:
   To align the naming:
   ```suggestion
           partition_spec_id: i32,
   ```



##########
crates/iceberg/src/expr/visitors/manifest_evaluator.rs:
##########
@@ -16,74 +16,49 @@
 // under the License.
 
 use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
-use crate::expr::visitors::inclusive_projection::InclusiveProjection;
-use crate::expr::{Bind, BoundPredicate, BoundReference};
-use crate::spec::{Datum, FieldSummary, ManifestFile, PartitionSpecRef, Schema, SchemaRef};
-use crate::{Error, ErrorKind};
+use crate::expr::{BoundPredicate, BoundReference};
+use crate::spec::{Datum, FieldSummary, ManifestFile};
+use crate::{Error, ErrorKind, Result};
 use fnv::FnvHashSet;
-use std::sync::Arc;
 
-/// Evaluates [`ManifestFile`]s to see if their partition summary matches a provided
-/// [`BoundPredicate`]. Used by [`TableScan`] to filter down the list of [`ManifestFile`]s
+#[derive(Debug)]
+/// Evaluates a [`ManifestFile`] to see if the partition summaries
+/// match a provided [`BoundPredicate`].
+///
+/// Used by [`TableScan`] to prune the list of [`ManifestFile`]s
 /// in which data might be found that matches the TableScan's filter.
 pub(crate) struct ManifestEvaluator {
-    partition_schema: SchemaRef,
+    partition_schema_id: i32,
     partition_filter: BoundPredicate,
     case_sensitive: bool,
 }
 
 impl ManifestEvaluator {
     pub(crate) fn new(
-        partition_spec: PartitionSpecRef,
-        table_schema: SchemaRef,
-        filter: BoundPredicate,
+        partition_schema_id: i32,

Review Comment:
   If it only for checking the spec-id, I would leave it out and make sure that we have proper tests to avoid having to do these runtime checks :)



##########
crates/iceberg/src/scan.rs:
##########
@@ -99,7 +107,7 @@ impl<'a> TableScanBuilder<'a> {
     }
 
     /// Build the table scan.
-    pub fn build(self) -> crate::Result<TableScan> {
+    pub fn build(self) -> Result<TableScan> {

Review Comment:
   Nit: should we have a convention to leave in, or remove the `crate::` prefix? Preferably also a checker



##########
crates/iceberg/src/scan.rs:
##########
@@ -169,55 +177,66 @@ pub struct TableScan {
     filter: Option<Arc<Predicate>>,
 }
 
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
 impl TableScan {
-    /// Returns a stream of file scan tasks.
-
-    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        // Cache `ManifestEvaluatorFactory`s created as part of this scan
-        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = HashMap::new();
-
-        // these variables needed to ensure that we don't need to pass a
-        // reference to self into `try_stream`, as it expects references
-        // passed in to outlive 'static
-        let schema = self.schema.clone();
-        let snapshot = self.snapshot.clone();
-        let table_metadata = self.table_metadata.clone();
-        let file_io = self.file_io.clone();
-        let case_sensitive = self.case_sensitive;
-        let filter = self.filter.clone();
+    /// Returns a stream of [`FileScanTask`]s.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let context = FileScanStreamContext::new(
+            self.schema.clone(),
+            self.snapshot.clone(),
+            self.table_metadata.clone(),
+            self.file_io.clone(),
+            self.filter.clone(),
+            self.case_sensitive,
+        );
+
+        let bound_filter = context.bound_filter()?;
+
+        let mut partition_filter_cache = PartitionFilterCache::new();
+        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
 
         Ok(try_stream! {
-            let manifest_list = snapshot
-            .clone()
-            .load_manifest_list(&file_io, &table_metadata)
-            .await?;
+            let manifest_list = context
+                .snapshot
+                .load_manifest_list(&context.file_io, &context.table_metadata)
+                .await?;
 
-            // Generate data file stream
             for entry in manifest_list.entries() {
-                // If this scan has a filter, check the partition evaluator cache for an existing
-                // PartitionEvaluator that matches this manifest's partition spec ID.
-                // Use one from the cache if there is one. If not, create one, put it in
-                // the cache, and take a reference to it.
-                #[allow(clippy::map_entry)]
-                if let Some(filter) = filter.as_ref() {
-                    if !manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-                        manifest_evaluator_cache.insert(entry.partition_spec_id, Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), table_metadata.clone(), case_sensitive, filter)?);
-                    }
-                    let manifest_evaluator = &manifest_evaluator_cache[&entry.partition_spec_id];
+                if let Some(filter) = &bound_filter {
+                    let partition_spec_id = entry.partition_spec_id;
+
+                    let (partition_spec, partition_schema) =
+                        context.create_partition_spec_and_schema(partition_spec_id)?;
+
+                    let partition_filter = partition_filter_cache.get(
+                        partition_spec_id,
+                        partition_spec.clone(),
+                        partition_schema.clone(),
+                        filter,
+                        context.case_sensitive,
+                    )?;
+
+                    let manifest_evaluator = manifest_evaluator_cache.get(
+                        partition_spec_id,
+                        partition_schema.schema_id(),
+                        partition_filter.clone(),
+                        context.case_sensitive,
+                    );
 
-                    // reject any manifest files whose partition values don't match the filter.
                     if !manifest_evaluator.eval(entry)? {
                         continue;
                     }
+
+                    // TODO: Create ExpressionEvaluator
                 }
 
-                let manifest = entry.load_manifest(&file_io).await?;
+                let manifest = entry.load_manifest(&context.file_io).await?;

Review Comment:
   For a future PR, you want to do this in parallel 👍 With FastAppend getting more traction, it is likely that there are many manifests.



##########
crates/iceberg/src/scan.rs:
##########
@@ -169,55 +177,66 @@ pub struct TableScan {
     filter: Option<Arc<Predicate>>,
 }
 
-/// A stream of [`FileScanTask`].
-pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
-
 impl TableScan {
-    /// Returns a stream of file scan tasks.
-
-    pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
-        // Cache `ManifestEvaluatorFactory`s created as part of this scan
-        let mut manifest_evaluator_cache: HashMap<i32, ManifestEvaluator> = HashMap::new();
-
-        // these variables needed to ensure that we don't need to pass a
-        // reference to self into `try_stream`, as it expects references
-        // passed in to outlive 'static
-        let schema = self.schema.clone();
-        let snapshot = self.snapshot.clone();
-        let table_metadata = self.table_metadata.clone();
-        let file_io = self.file_io.clone();
-        let case_sensitive = self.case_sensitive;
-        let filter = self.filter.clone();
+    /// Returns a stream of [`FileScanTask`]s.
+    pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
+        let context = FileScanStreamContext::new(
+            self.schema.clone(),
+            self.snapshot.clone(),
+            self.table_metadata.clone(),
+            self.file_io.clone(),
+            self.filter.clone(),
+            self.case_sensitive,
+        );
+
+        let bound_filter = context.bound_filter()?;
+
+        let mut partition_filter_cache = PartitionFilterCache::new();
+        let mut manifest_evaluator_cache = ManifestEvaluatorCache::new();
 
         Ok(try_stream! {
-            let manifest_list = snapshot
-            .clone()
-            .load_manifest_list(&file_io, &table_metadata)
-            .await?;
+            let manifest_list = context
+                .snapshot
+                .load_manifest_list(&context.file_io, &context.table_metadata)
+                .await?;
 
-            // Generate data file stream
             for entry in manifest_list.entries() {
-                // If this scan has a filter, check the partition evaluator cache for an existing
-                // PartitionEvaluator that matches this manifest's partition spec ID.
-                // Use one from the cache if there is one. If not, create one, put it in
-                // the cache, and take a reference to it.
-                #[allow(clippy::map_entry)]
-                if let Some(filter) = filter.as_ref() {
-                    if !manifest_evaluator_cache.contains_key(&entry.partition_spec_id) {
-                        manifest_evaluator_cache.insert(entry.partition_spec_id, Self::create_manifest_evaluator(entry.partition_spec_id, schema.clone(), table_metadata.clone(), case_sensitive, filter)?);
-                    }
-                    let manifest_evaluator = &manifest_evaluator_cache[&entry.partition_spec_id];
+                if let Some(filter) = &bound_filter {

Review Comment:
   Can we add a check here to make sure the `517: content` equals `data`:
   
   ![image](https://github.com/apache/iceberg-rust/assets/1134248/d687b434-3d91-485d-84a3-d3dea50e9784)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org