You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/06 19:05:10 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2170: Handle merged schemas in parquet pruning

alamb commented on code in PR #2170:
URL: https://github.com/apache/arrow-datafusion/pull/2170#discussion_r844288768


##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -56,7 +56,11 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
 
     /// Infer the statistics for the provided object. The cost and accuracy of the
     /// estimated statistics might vary greatly between file formats.
-    async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics>;
+    async fn infer_stats(

Review Comment:
   ```suggestion
       /// 
       /// `table_schema` is the (combined) schema of the overall table
       /// and may be a superset of the schema contained in this file.
       /// 
       /// TODO: should the file source return statistics for only columns referred to in the table schema?
       async fn infer_stats(
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -878,6 +880,39 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_intersection_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)

Review Comment:
   ```suggestion
           // batch1: c1(string), c3(int8)
   ```



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -369,10 +393,102 @@ mod tests {
 
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{
-        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
-        TimestampNanosecondArray,
+        ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        StringArray, TimestampNanosecondArray,
     };
+    use arrow::record_batch::RecordBatch;
+    use datafusion_common::ScalarValue;
     use futures::StreamExt;
+    use parquet::arrow::ArrowWriter;
+    use parquet::file::properties::WriterProperties;
+    use tempfile::NamedTempFile;
+
+    // Add a new column with the specified field name to the RecordBatch
+    fn add_to_batch(
+        batch: &RecordBatch,
+        field_name: &str,
+        array: ArrayRef,
+    ) -> RecordBatch {
+        let mut fields = batch.schema().fields().clone();
+        fields.push(Field::new(field_name, array.data_type().clone(), true));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut columns = batch.columns().to_vec();
+        columns.push(array);
+        RecordBatch::try_new(schema, columns).expect("error; creating record batch")
+    }
+
+    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+        columns.into_iter().fold(
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
+        )
+    }
+
+    async fn create_table(
+        batches: Vec<RecordBatch>,
+    ) -> Result<(Vec<NamedTempFile>, Schema)> {
+        let merged_schema =
+            Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))?;
+
+        let files: Vec<_> = batches
+            .into_iter()
+            .map(|batch| {
+                let output = tempfile::NamedTempFile::new().expect("creating temp file");
+
+                let props = WriterProperties::builder().build();
+                let file: std::fs::File = (*output.as_file())
+                    .try_clone()
+                    .expect("cloning file descriptor");
+                let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
+                    .expect("creating writer");
+
+                writer.write(&batch).expect("Writing batch");
+                writer.close().unwrap();
+                output
+            })
+            .collect();
+
+        Ok((files, merged_schema))
+    }
+
+    #[tokio::test]
+    async fn read_merged_batches() -> Result<()> {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+        let batch2 = create_batch(vec![("c2", c2)]);
+
+        let (files, schema) = create_table(vec![batch1, batch2]).await?;
+        let table_schema = Arc::new(schema);
+
+        let reader = local_object_reader(files[0].path().to_string_lossy().to_string());
+
+        let stats = fetch_statistics(reader, table_schema.clone())?;
+
+        assert_eq!(stats.num_rows, Some(3));
+        let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
+        let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
+        assert_eq!(c1_stats.null_count, Some(1));
+        assert_eq!(c2_stats.null_count, Some(3));

Review Comment:
   this is cool to fill in the null stats for the missing column 👍 



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -919,6 +955,70 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        let filter = col("c3").eq(lit(0_i8));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        // Predicate should prune all row groups
+        assert_eq!(read.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        // let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c2", c2)]);
+
+        let filter = col("c2").eq(lit(0_i64));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        let expected = vec![
+            "+-----+----+",

Review Comment:
   same thing here -- I wouldn't expect `null` values in `c2` to be returned...



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -46,7 +47,7 @@ use crate::error::Result;
 use crate::logical_plan::combine_filters;
 use crate::logical_plan::Expr;
 use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::file_format::ParquetExec;
+use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -878,6 +880,39 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_intersection_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)

Review Comment:
   ```suggestion
           // batch2: c3(int8), c2(int64)
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -878,6 +880,39 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_intersection_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+        let filter = col("c2").eq(lit(0_i64));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c3 | c2 |",

Review Comment:
   🤔  if the filter is `c2 = 0` then none of these rows should pass.... so something doesn't look quite right



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -369,10 +393,102 @@ mod tests {
 
     use crate::prelude::{SessionConfig, SessionContext};
     use arrow::array::{
-        BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
-        TimestampNanosecondArray,
+        ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
+        StringArray, TimestampNanosecondArray,
     };
+    use arrow::record_batch::RecordBatch;
+    use datafusion_common::ScalarValue;
     use futures::StreamExt;
+    use parquet::arrow::ArrowWriter;
+    use parquet::file::properties::WriterProperties;
+    use tempfile::NamedTempFile;
+
+    // Add a new column with the specified field name to the RecordBatch
+    fn add_to_batch(
+        batch: &RecordBatch,
+        field_name: &str,
+        array: ArrayRef,
+    ) -> RecordBatch {
+        let mut fields = batch.schema().fields().clone();
+        fields.push(Field::new(field_name, array.data_type().clone(), true));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut columns = batch.columns().to_vec();
+        columns.push(array);
+        RecordBatch::try_new(schema, columns).expect("error; creating record batch")
+    }
+
+    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {

Review Comment:
   This looks very similar / the same as `RecordBatch::try_from_iter`: https://docs.rs/arrow/11.1.0/arrow/record_batch/struct.RecordBatch.html#method.try_from_iter



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -379,8 +379,10 @@ macro_rules! get_min_max_values {
         let null_scalar: ScalarValue = data_type.try_into().ok()?;
 
         $self.row_group_metadata
-            .column(column_index)
-            .statistics()
+            .columns()
+            .iter()
+            .find(|c| c.column_descr().name() == &$column.name)

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -919,6 +955,70 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        let filter = col("c3").eq(lit(0_i8));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        // Predicate should prune all row groups
+        assert_eq!(read.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        // let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)

Review Comment:
   ```suggestion
           // batch2: c2(int64)
   ```



##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -919,6 +955,70 @@ mod tests {
         assert_batches_sorted_eq!(expected, &read);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        let filter = col("c3").eq(lit(0_i8));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
+            .await
+            .unwrap();
+
+        // Predicate should prune all row groups
+        assert_eq!(read.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_disjoint_schema_filter() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        // let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)

Review Comment:
   ```suggestion
           // batch1: c1(string)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org