You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/16 12:36:39 UTC

[arrow-datafusion] branch master updated: Minor: consolidate parquet `custom_reader` integration test into parquet_exec (#4175)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new f7201cf09 Minor: consolidate parquet `custom_reader` integration test into parquet_exec (#4175)
f7201cf09 is described below

commit f7201cf09c3dd238805f37448189fe5ceae5f376
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Nov 16 07:36:34 2022 -0500

    Minor: consolidate parquet `custom_reader` integration test into parquet_exec (#4175)
    
    * Minor: consolidate another parquet integration test into parquet_exec
    
    * Remove unecessary level of indent, old workaround
---
 datafusion/core/tests/custom_parquet_reader.rs | 264 -------------------------
 datafusion/core/tests/parquet/custom_reader.rs | 250 +++++++++++++++++++++++
 datafusion/core/tests/parquet/mod.rs           |   1 +
 3 files changed, 251 insertions(+), 264 deletions(-)

diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs
deleted file mode 100644
index b48061331..000000000
--- a/datafusion/core/tests/custom_parquet_reader.rs
+++ /dev/null
@@ -1,264 +0,0 @@
-// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081)
-#![allow(where_clauses_object_safety)]
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[cfg(test)]
-mod tests {
-    use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray};
-    use arrow::datatypes::{Field, Schema};
-    use arrow::record_batch::RecordBatch;
-    use bytes::Bytes;
-    use datafusion::assert_batches_sorted_eq;
-    use datafusion::config::ConfigOptions;
-    use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
-    use datafusion::datasource::listing::PartitionedFile;
-    use datafusion::datasource::object_store::ObjectStoreUrl;
-    use datafusion::physical_plan::file_format::{
-        FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics,
-        ParquetFileReaderFactory,
-    };
-    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
-    use datafusion::physical_plan::{collect, Statistics};
-    use datafusion::prelude::SessionContext;
-    use datafusion_common::DataFusionError;
-    use futures::future::BoxFuture;
-    use futures::{FutureExt, TryFutureExt};
-    use object_store::memory::InMemory;
-    use object_store::path::Path;
-    use object_store::{ObjectMeta, ObjectStore};
-    use parquet::arrow::async_reader::AsyncFileReader;
-    use parquet::arrow::ArrowWriter;
-    use parquet::errors::ParquetError;
-    use parquet::file::metadata::ParquetMetaData;
-    use std::io::Cursor;
-    use std::ops::Range;
-    use std::sync::Arc;
-    use std::time::SystemTime;
-
-    const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata";
-
-    #[tokio::test]
-    async fn route_data_access_ops_to_parquet_file_reader_factory() {
-        let c1: ArrayRef =
-            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
-        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
-        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
-
-        let batch = create_batch(vec![
-            ("c1", c1.clone()),
-            ("c2", c2.clone()),
-            ("c3", c3.clone()),
-        ]);
-
-        let file_schema = batch.schema().clone();
-        let (in_memory_object_store, parquet_files_meta) =
-            store_parquet_in_memory(vec![batch]).await;
-        let file_groups = parquet_files_meta
-            .into_iter()
-            .map(|meta| PartitionedFile {
-                object_meta: meta,
-                partition_values: vec![],
-                range: None,
-                extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
-            })
-            .collect();
-
-        // prepare the scan
-        let parquet_exec = ParquetExec::new(
-            FileScanConfig {
-                // just any url that doesn't point to in memory object store
-                object_store_url: ObjectStoreUrl::local_filesystem(),
-                file_groups: vec![file_groups],
-                file_schema,
-                statistics: Statistics::default(),
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                config_options: ConfigOptions::new().into_shareable(),
-                output_ordering: None,
-            },
-            None,
-            None,
-        )
-        .with_parquet_file_reader_factory(Arc::new(
-            InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)),
-        ));
-
-        let session_ctx = SessionContext::new();
-
-        let task_ctx = session_ctx.task_ctx();
-        let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
-
-        let expected = vec![
-            "+-----+----+----+",
-            "| c1  | c2 | c3 |",
-            "+-----+----+----+",
-            "| Foo | 1  | 10 |",
-            "|     | 2  | 20 |",
-            "| bar |    |    |",
-            "+-----+----+----+",
-        ];
-
-        assert_batches_sorted_eq!(expected, &read);
-    }
-
-    #[derive(Debug)]
-    struct InMemoryParquetFileReaderFactory(Arc<dyn ObjectStore>);
-
-    impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
-        fn create_reader(
-            &self,
-            partition_index: usize,
-            file_meta: FileMeta,
-            metadata_size_hint: Option<usize>,
-            metrics: &ExecutionPlanMetricsSet,
-        ) -> Result<Box<dyn AsyncFileReader + Send>, DataFusionError> {
-            let metadata = file_meta
-                .extensions
-                .as_ref()
-                .expect("has user defined metadata");
-            let metadata = metadata
-                .downcast_ref::<String>()
-                .expect("has string metadata");
-
-            assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]);
-
-            let parquet_file_metrics = ParquetFileMetrics::new(
-                partition_index,
-                file_meta.location().as_ref(),
-                metrics,
-            );
-
-            Ok(Box::new(ParquetFileReader {
-                store: Arc::clone(&self.0),
-                meta: file_meta.object_meta,
-                metrics: parquet_file_metrics,
-                metadata_size_hint,
-            }))
-        }
-    }
-
-    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
-        columns.into_iter().fold(
-            RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
-            |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
-        )
-    }
-
-    fn add_to_batch(
-        batch: &RecordBatch,
-        field_name: &str,
-        array: ArrayRef,
-    ) -> RecordBatch {
-        let mut fields = batch.schema().fields().clone();
-        fields.push(Field::new(field_name, array.data_type().clone(), true));
-        let schema = Arc::new(Schema::new(fields));
-
-        let mut columns = batch.columns().to_vec();
-        columns.push(array);
-        RecordBatch::try_new(schema, columns).expect("error; creating record batch")
-    }
-
-    async fn store_parquet_in_memory(
-        batches: Vec<RecordBatch>,
-    ) -> (Arc<dyn ObjectStore>, Vec<ObjectMeta>) {
-        let in_memory = InMemory::new();
-
-        let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches
-            .into_iter()
-            .enumerate()
-            .map(|(offset, batch)| {
-                let mut buf = Vec::<u8>::with_capacity(32 * 1024);
-                let mut output = Cursor::new(&mut buf);
-
-                let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None)
-                    .expect("creating writer");
-
-                writer.write(&batch).expect("Writing batch");
-                writer.close().unwrap();
-
-                let meta = ObjectMeta {
-                    location: Path::parse(format!("file-{offset}.parquet"))
-                        .expect("creating path"),
-                    last_modified: chrono::DateTime::from(SystemTime::now()),
-                    size: buf.len(),
-                };
-
-                (meta, Bytes::from(buf))
-            })
-            .collect();
-
-        let mut objects = Vec::with_capacity(parquet_batches.len());
-        for (meta, bytes) in parquet_batches {
-            in_memory
-                .put(&meta.location, bytes)
-                .await
-                .expect("put parquet file into in memory object store");
-            objects.push(meta);
-        }
-
-        (Arc::new(in_memory), objects)
-    }
-
-    /// Implements [`AsyncFileReader`] for a parquet file in object storage
-    struct ParquetFileReader {
-        store: Arc<dyn ObjectStore>,
-        meta: ObjectMeta,
-        metrics: ParquetFileMetrics,
-        metadata_size_hint: Option<usize>,
-    }
-
-    impl AsyncFileReader for ParquetFileReader {
-        fn get_bytes(
-            &mut self,
-            range: Range<usize>,
-        ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
-            self.metrics.bytes_scanned.add(range.end - range.start);
-
-            self.store
-                .get_range(&self.meta.location, range)
-                .map_err(|e| {
-                    ParquetError::General(format!(
-                        "AsyncChunkReader::get_bytes error: {}",
-                        e
-                    ))
-                })
-                .boxed()
-        }
-
-        fn get_metadata(
-            &mut self,
-        ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
-            Box::pin(async move {
-                let metadata = fetch_parquet_metadata(
-                    self.store.as_ref(),
-                    &self.meta,
-                    self.metadata_size_hint,
-                )
-                .await
-                .map_err(|e| {
-                    ParquetError::General(format!(
-                        "AsyncChunkReader::get_metadata error: {}",
-                        e
-                    ))
-                })?;
-                Ok(Arc::new(metadata))
-            })
-        }
-    }
-}
diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs
new file mode 100644
index 000000000..8123badb1
--- /dev/null
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray};
+use arrow::datatypes::{Field, Schema};
+use arrow::record_batch::RecordBatch;
+use bytes::Bytes;
+use datafusion::assert_batches_sorted_eq;
+use datafusion::config::ConfigOptions;
+use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::physical_plan::file_format::{
+    FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory,
+};
+use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion::physical_plan::{collect, Statistics};
+use datafusion::prelude::SessionContext;
+use datafusion_common::DataFusionError;
+use futures::future::BoxFuture;
+use futures::{FutureExt, TryFutureExt};
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectMeta, ObjectStore};
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::ArrowWriter;
+use parquet::errors::ParquetError;
+use parquet::file::metadata::ParquetMetaData;
+use std::io::Cursor;
+use std::ops::Range;
+use std::sync::Arc;
+use std::time::SystemTime;
+
+const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata";
+
+#[tokio::test]
+async fn route_data_access_ops_to_parquet_file_reader_factory() {
+    let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+    let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+    let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+    let batch = create_batch(vec![
+        ("c1", c1.clone()),
+        ("c2", c2.clone()),
+        ("c3", c3.clone()),
+    ]);
+
+    let file_schema = batch.schema().clone();
+    let (in_memory_object_store, parquet_files_meta) =
+        store_parquet_in_memory(vec![batch]).await;
+    let file_groups = parquet_files_meta
+        .into_iter()
+        .map(|meta| PartitionedFile {
+            object_meta: meta,
+            partition_values: vec![],
+            range: None,
+            extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
+        })
+        .collect();
+
+    // prepare the scan
+    let parquet_exec = ParquetExec::new(
+        FileScanConfig {
+            // just any url that doesn't point to in memory object store
+            object_store_url: ObjectStoreUrl::local_filesystem(),
+            file_groups: vec![file_groups],
+            file_schema,
+            statistics: Statistics::default(),
+            projection: None,
+            limit: None,
+            table_partition_cols: vec![],
+            config_options: ConfigOptions::new().into_shareable(),
+            output_ordering: None,
+        },
+        None,
+        None,
+    )
+    .with_parquet_file_reader_factory(Arc::new(InMemoryParquetFileReaderFactory(
+        Arc::clone(&in_memory_object_store),
+    )));
+
+    let session_ctx = SessionContext::new();
+
+    let task_ctx = session_ctx.task_ctx();
+    let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
+
+    let expected = vec![
+        "+-----+----+----+",
+        "| c1  | c2 | c3 |",
+        "+-----+----+----+",
+        "| Foo | 1  | 10 |",
+        "|     | 2  | 20 |",
+        "| bar |    |    |",
+        "+-----+----+----+",
+    ];
+
+    assert_batches_sorted_eq!(expected, &read);
+}
+
+#[derive(Debug)]
+struct InMemoryParquetFileReaderFactory(Arc<dyn ObjectStore>);
+
+impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
+    fn create_reader(
+        &self,
+        partition_index: usize,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> Result<Box<dyn AsyncFileReader + Send>, DataFusionError> {
+        let metadata = file_meta
+            .extensions
+            .as_ref()
+            .expect("has user defined metadata");
+        let metadata = metadata
+            .downcast_ref::<String>()
+            .expect("has string metadata");
+
+        assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]);
+
+        let parquet_file_metrics = ParquetFileMetrics::new(
+            partition_index,
+            file_meta.location().as_ref(),
+            metrics,
+        );
+
+        Ok(Box::new(ParquetFileReader {
+            store: Arc::clone(&self.0),
+            meta: file_meta.object_meta,
+            metrics: parquet_file_metrics,
+            metadata_size_hint,
+        }))
+    }
+}
+
+fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+    columns.into_iter().fold(
+        RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+        |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
+    )
+}
+
+fn add_to_batch(batch: &RecordBatch, field_name: &str, array: ArrayRef) -> RecordBatch {
+    let mut fields = batch.schema().fields().clone();
+    fields.push(Field::new(field_name, array.data_type().clone(), true));
+    let schema = Arc::new(Schema::new(fields));
+
+    let mut columns = batch.columns().to_vec();
+    columns.push(array);
+    RecordBatch::try_new(schema, columns).expect("error; creating record batch")
+}
+
+async fn store_parquet_in_memory(
+    batches: Vec<RecordBatch>,
+) -> (Arc<dyn ObjectStore>, Vec<ObjectMeta>) {
+    let in_memory = InMemory::new();
+
+    let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches
+        .into_iter()
+        .enumerate()
+        .map(|(offset, batch)| {
+            let mut buf = Vec::<u8>::with_capacity(32 * 1024);
+            let mut output = Cursor::new(&mut buf);
+
+            let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None)
+                .expect("creating writer");
+
+            writer.write(&batch).expect("Writing batch");
+            writer.close().unwrap();
+
+            let meta = ObjectMeta {
+                location: Path::parse(format!("file-{offset}.parquet"))
+                    .expect("creating path"),
+                last_modified: chrono::DateTime::from(SystemTime::now()),
+                size: buf.len(),
+            };
+
+            (meta, Bytes::from(buf))
+        })
+        .collect();
+
+    let mut objects = Vec::with_capacity(parquet_batches.len());
+    for (meta, bytes) in parquet_batches {
+        in_memory
+            .put(&meta.location, bytes)
+            .await
+            .expect("put parquet file into in memory object store");
+        objects.push(meta);
+    }
+
+    (Arc::new(in_memory), objects)
+}
+
+/// Implements [`AsyncFileReader`] for a parquet file in object storage
+struct ParquetFileReader {
+    store: Arc<dyn ObjectStore>,
+    meta: ObjectMeta,
+    metrics: ParquetFileMetrics,
+    metadata_size_hint: Option<usize>,
+}
+
+impl AsyncFileReader for ParquetFileReader {
+    fn get_bytes(
+        &mut self,
+        range: Range<usize>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+        self.metrics.bytes_scanned.add(range.end - range.start);
+
+        self.store
+            .get_range(&self.meta.location, range)
+            .map_err(|e| {
+                ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e))
+            })
+            .boxed()
+    }
+
+    fn get_metadata(
+        &mut self,
+    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+        Box::pin(async move {
+            let metadata = fetch_parquet_metadata(
+                self.store.as_ref(),
+                &self.meta,
+                self.metadata_size_hint,
+            )
+            .await
+            .map_err(|e| {
+                ParquetError::General(format!(
+                    "AsyncChunkReader::get_metadata error: {}",
+                    e
+                ))
+            })?;
+            Ok(Arc::new(metadata))
+        })
+    }
+}
diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs
index 00ca670e3..ab410bd76 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! Parquet integration tests
+mod custom_reader;
 mod filter_pushdown;
 mod page_pruning;
 mod row_group_pruning;