You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/06 10:07:10 UTC

[GitHub] [arrow-datafusion] thinkharderdev opened a new pull request, #2170: Handle merged schemas in parquet pruning

thinkharderdev opened a new pull request, #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #2161 
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   Adding schema merging to `ParquetFormat` broke pruning since the existing implementation assumes that each file in the `ListingTable` has the merged schema. In the best case this just prevents pruning row groups, but in certain cases (such as #2161) it can cause runtime errors and possibly incorrect query results in some cases. 
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   When gathering column statistics during pruning, we need to find row group columns by name instead of the index in the merged schema. 
   
   There is also a separate problem of gathering statistics in `ListingTable::list_files_for_scan` which has a similar problem. The fix in that case is a bit more involved so will handle in a follow-up PR. 
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   No
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] thinkharderdev commented on a diff in pull request #2170: Handle merged schemas in parquet pruning

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on code in PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170#discussion_r844308448


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -878,6 +880,39 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_intersection_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+        let filter = col("c2").eq(lit(0_i64));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c3 | c2 |",

Review Comment:
   Yeah, this looked wrong to me as well. What I think is happening is that the min/max aren't set the pruning predicates aren't applied. In a "real" query where this predicate was pushed down from a filter stage this would still get piped into a `FilerExec`. I think we would have to special case the scenario where we fill in a null column to conform to a merged schema which may be worth doing. I can double check though and make sure there's not a bug here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2170: Handle merged schemas in parquet pruning

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170#discussion_r844288768


##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -56,7 +56,11 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
 
     /// Infer the statistics for the provided object. The cost and accuracy of the
     /// estimated statistics might vary greatly between file formats.
-    async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics>;
+    async fn infer_stats(

Review Comment:
   ```suggestion
       /// 
       /// `table_schema` is the (combined) schema of the overall table
       /// and may be a superset of the schema contained in this file.
       /// 
       /// TODO: should the file source return statistics for only columns referred to in the table schema?
       async fn infer_stats(
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -878,6 +880,39 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_intersection_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)

Review Comment:
   ```suggestion
           // batch1: c1(string), c3(int8)
   ```



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -369,10 +393,102 @@ mod tests {
 
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{
-        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
-        TimestampNanosecondArray,
+        ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        StringArray, TimestampNanosecondArray,
     };
+    use arrow::record_batch::RecordBatch;
+    use datafusion_common::ScalarValue;
     use futures::StreamExt;
+    use parquet::arrow::ArrowWriter;
+    use parquet::file::properties::WriterProperties;
+    use tempfile::NamedTempFile;
+
+    // Add a new column with the specified field name to the RecordBatch
+    fn add_to_batch(
+        batch: &RecordBatch,
+        field_name: &str,
+        array: ArrayRef,
+    ) -> RecordBatch {
+        let mut fields = batch.schema().fields().clone();
+        fields.push(Field::new(field_name, array.data_type().clone(), true));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut columns = batch.columns().to_vec();
+        columns.push(array);
+        RecordBatch::try_new(schema, columns).expect("error; creating record batch")
+    }
+
+    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+        columns.into_iter().fold(
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
+        )
+    }
+
+    async fn create_table(
+        batches: Vec<RecordBatch>,
+    ) -> Result<(Vec<NamedTempFile>, Schema)> {
+        let merged_schema =
+            Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))?;
+
+        let files: Vec<_> = batches
+            .into_iter()
+            .map(|batch| {
+                let output = tempfile::NamedTempFile::new().expect("creating temp file");
+
+                let props = WriterProperties::builder().build();
+                let file: std::fs::File = (*output.as_file())
+                    .try_clone()
+                    .expect("cloning file descriptor");
+                let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
+                    .expect("creating writer");
+
+                writer.write(&batch).expect("Writing batch");
+                writer.close().unwrap();
+                output
+            })
+            .collect();
+
+        Ok((files, merged_schema))
+    }
+
+    #[tokio::test]
+    async fn read_merged_batches() -> Result<()> {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+        let batch2 = create_batch(vec![("c2", c2)]);
+
+        let (files, schema) = create_table(vec![batch1, batch2]).await?;
+        let table_schema = Arc::new(schema);
+
+        let reader = local_object_reader(files[0].path().to_string_lossy().to_string());
+
+        let stats = fetch_statistics(reader, table_schema.clone())?;
+
+        assert_eq!(stats.num_rows, Some(3));
+        let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
+        let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
+        assert_eq!(c1_stats.null_count, Some(1));
+        assert_eq!(c2_stats.null_count, Some(3));

Review Comment:
   this is cool to fill in the null stats for the missing column 👍 



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -919,6 +955,70 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        let filter = col("c3").eq(lit(0_i8));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        // Predicate should prune all row groups
+        assert_eq!(read.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        // let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c2", c2)]);
+
+        let filter = col("c2").eq(lit(0_i64));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        let expected = vec![
+            "+-----+----+",

Review Comment:
   same thing here -- I wouldn't expect `null` values in `c2` to be returned...



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -46,7 +47,7 @@ use crate::error::Result;
 use crate::logical_plan::combine_filters;
 use crate::logical_plan::Expr;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::file_format::ParquetExec;
+use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -878,6 +880,39 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_intersection_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)

Review Comment:
   ```suggestion
           // batch2: c3(int8), c2(int64)
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -878,6 +880,39 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_intersection_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+        let filter = col("c2").eq(lit(0_i64));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c3 | c2 |",

Review Comment:
   🤔  if the filter is `c2 = 0` then none of these rows should pass.... so something doesn't look quite right



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -369,10 +393,102 @@ mod tests {
 
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{
-        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
-        TimestampNanosecondArray,
+        ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        StringArray, TimestampNanosecondArray,
     };
+    use arrow::record_batch::RecordBatch;
+    use datafusion_common::ScalarValue;
     use futures::StreamExt;
+    use parquet::arrow::ArrowWriter;
+    use parquet::file::properties::WriterProperties;
+    use tempfile::NamedTempFile;
+
+    // Add a new column with the specified field name to the RecordBatch
+    fn add_to_batch(
+        batch: &RecordBatch,
+        field_name: &str,
+        array: ArrayRef,
+    ) -> RecordBatch {
+        let mut fields = batch.schema().fields().clone();
+        fields.push(Field::new(field_name, array.data_type().clone(), true));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut columns = batch.columns().to_vec();
+        columns.push(array);
+        RecordBatch::try_new(schema, columns).expect("error; creating record batch")
+    }
+
+    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {

Review Comment:
   This looks very similar / the same as `RecordBatch::try_from_iter`: https://docs.rs/arrow/11.1.0/arrow/record_batch/struct.RecordBatch.html#method.try_from_iter



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -379,8 +379,10 @@ macro_rules! get_min_max_values {
         let null_scalar: ScalarValue = data_type.try_into().ok()?;
 
         $self.row_group_metadata
-            .column(column_index)
-            .statistics()
+            .columns()
+            .iter()
+            .find(|c| c.column_descr().name() == &$column.name)

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -919,6 +955,70 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        let filter = col("c3").eq(lit(0_i8));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        // Predicate should prune all row groups
+        assert_eq!(read.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        // let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)

Review Comment:
   ```suggestion
           // batch2: c2(int64)
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -919,6 +955,70 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        let filter = col("c3").eq(lit(0_i8));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        // Predicate should prune all row groups
+        assert_eq!(read.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        // let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)

Review Comment:
   ```suggestion
           // batch1: c1(string)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2170: Handle merged schemas in parquet pruning

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170#discussion_r844309727


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -878,6 +880,39 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_intersection_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+        let filter = col("c2").eq(lit(0_i64));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c3 | c2 |",

Review Comment:
   Makes sense -- a  comment in the test to explain why it is ok would be helpful for future readers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2170: Handle merged schemas in parquet pruning

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170#issuecomment-1090649982

   FYI @Cheappie
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2170: Handle merged schemas in parquet pruning

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170#issuecomment-1090668778

   Thanks agian @thinkharderdev 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2170: Handle merged schemas in parquet pruning

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170#discussion_r844964402


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -1005,6 +1005,11 @@ mod tests {
             .await
             .unwrap();
 
+        // This does not look correct since the "c2" values in the result do not in fact match the predicate `c2 == 0`

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Cheappie commented on a diff in pull request #2170: Handle merged schemas in parquet pruning

Posted by GitBox <gi...@apache.org>.
Cheappie commented on code in PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170#discussion_r845538137


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -919,6 +955,73 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        let filter = col("c3").eq(lit(0_i8));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        // Predicate should prune all row groups
+        assert_eq!(read.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

Review Comment:
   I might miss some point, but why values in c2 are not materialized if we weren't able to _prune_ them ? I wonder how filter like `c2 eq 1_i64` can be satisfied against null array ?
   
   ```
               "+-----+----+",
               "| c1  | c2 |",
               "+-----+----+",
               "| Foo |    |",
               "|     |    |",
               "| bar |    |",
               "+-----+----+",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #2170: Handle merged schemas in parquet pruning

Posted by GitBox <gi...@apache.org>.
alamb merged PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2170: Handle merged schemas in parquet pruning

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170#discussion_r845539449


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -919,6 +955,73 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        let filter = col("c3").eq(lit(0_i8));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        // Predicate should prune all row groups
+        assert_eq!(read.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

Review Comment:
   I think the key point is that filtering is *also* applied after the initial parquet scan / pruning -- the pruning is just a first pass to try and reduce additional work.
   
   So subsequent `Filter` operations will actually handle filtering out the columns with `c2 = null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org