You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/16 12:36:39 UTC
[arrow-datafusion] branch master updated: Minor: consolidate parquet `custom_reader` integration test into parquet_exec (#4175)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f7201cf09 Minor: consolidate parquet `custom_reader` integration test into parquet_exec (#4175)
f7201cf09 is described below
commit f7201cf09c3dd238805f37448189fe5ceae5f376
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Nov 16 07:36:34 2022 -0500
Minor: consolidate parquet `custom_reader` integration test into parquet_exec (#4175)
* Minor: consolidate another parquet integration test into parquet_exec
* Remove unecessary level of indent, old workaround
---
datafusion/core/tests/custom_parquet_reader.rs | 264 -------------------------
datafusion/core/tests/parquet/custom_reader.rs | 250 +++++++++++++++++++++++
datafusion/core/tests/parquet/mod.rs | 1 +
3 files changed, 251 insertions(+), 264 deletions(-)
diff --git a/datafusion/core/tests/custom_parquet_reader.rs b/datafusion/core/tests/custom_parquet_reader.rs
deleted file mode 100644
index b48061331..000000000
--- a/datafusion/core/tests/custom_parquet_reader.rs
+++ /dev/null
@@ -1,264 +0,0 @@
-// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081)
-#![allow(where_clauses_object_safety)]
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[cfg(test)]
-mod tests {
- use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray};
- use arrow::datatypes::{Field, Schema};
- use arrow::record_batch::RecordBatch;
- use bytes::Bytes;
- use datafusion::assert_batches_sorted_eq;
- use datafusion::config::ConfigOptions;
- use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
- use datafusion::datasource::listing::PartitionedFile;
- use datafusion::datasource::object_store::ObjectStoreUrl;
- use datafusion::physical_plan::file_format::{
- FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics,
- ParquetFileReaderFactory,
- };
- use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
- use datafusion::physical_plan::{collect, Statistics};
- use datafusion::prelude::SessionContext;
- use datafusion_common::DataFusionError;
- use futures::future::BoxFuture;
- use futures::{FutureExt, TryFutureExt};
- use object_store::memory::InMemory;
- use object_store::path::Path;
- use object_store::{ObjectMeta, ObjectStore};
- use parquet::arrow::async_reader::AsyncFileReader;
- use parquet::arrow::ArrowWriter;
- use parquet::errors::ParquetError;
- use parquet::file::metadata::ParquetMetaData;
- use std::io::Cursor;
- use std::ops::Range;
- use std::sync::Arc;
- use std::time::SystemTime;
-
- const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata";
-
- #[tokio::test]
- async fn route_data_access_ops_to_parquet_file_reader_factory() {
- let c1: ArrayRef =
- Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
- let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
- let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
-
- let batch = create_batch(vec![
- ("c1", c1.clone()),
- ("c2", c2.clone()),
- ("c3", c3.clone()),
- ]);
-
- let file_schema = batch.schema().clone();
- let (in_memory_object_store, parquet_files_meta) =
- store_parquet_in_memory(vec![batch]).await;
- let file_groups = parquet_files_meta
- .into_iter()
- .map(|meta| PartitionedFile {
- object_meta: meta,
- partition_values: vec![],
- range: None,
- extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
- })
- .collect();
-
- // prepare the scan
- let parquet_exec = ParquetExec::new(
- FileScanConfig {
- // just any url that doesn't point to in memory object store
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![file_groups],
- file_schema,
- statistics: Statistics::default(),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- config_options: ConfigOptions::new().into_shareable(),
- output_ordering: None,
- },
- None,
- None,
- )
- .with_parquet_file_reader_factory(Arc::new(
- InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)),
- ));
-
- let session_ctx = SessionContext::new();
-
- let task_ctx = session_ctx.task_ctx();
- let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
-
- let expected = vec![
- "+-----+----+----+",
- "| c1 | c2 | c3 |",
- "+-----+----+----+",
- "| Foo | 1 | 10 |",
- "| | 2 | 20 |",
- "| bar | | |",
- "+-----+----+----+",
- ];
-
- assert_batches_sorted_eq!(expected, &read);
- }
-
- #[derive(Debug)]
- struct InMemoryParquetFileReaderFactory(Arc<dyn ObjectStore>);
-
- impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
- fn create_reader(
- &self,
- partition_index: usize,
- file_meta: FileMeta,
- metadata_size_hint: Option<usize>,
- metrics: &ExecutionPlanMetricsSet,
- ) -> Result<Box<dyn AsyncFileReader + Send>, DataFusionError> {
- let metadata = file_meta
- .extensions
- .as_ref()
- .expect("has user defined metadata");
- let metadata = metadata
- .downcast_ref::<String>()
- .expect("has string metadata");
-
- assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]);
-
- let parquet_file_metrics = ParquetFileMetrics::new(
- partition_index,
- file_meta.location().as_ref(),
- metrics,
- );
-
- Ok(Box::new(ParquetFileReader {
- store: Arc::clone(&self.0),
- meta: file_meta.object_meta,
- metrics: parquet_file_metrics,
- metadata_size_hint,
- }))
- }
- }
-
- fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
- columns.into_iter().fold(
- RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
- |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
- )
- }
-
- fn add_to_batch(
- batch: &RecordBatch,
- field_name: &str,
- array: ArrayRef,
- ) -> RecordBatch {
- let mut fields = batch.schema().fields().clone();
- fields.push(Field::new(field_name, array.data_type().clone(), true));
- let schema = Arc::new(Schema::new(fields));
-
- let mut columns = batch.columns().to_vec();
- columns.push(array);
- RecordBatch::try_new(schema, columns).expect("error; creating record batch")
- }
-
- async fn store_parquet_in_memory(
- batches: Vec<RecordBatch>,
- ) -> (Arc<dyn ObjectStore>, Vec<ObjectMeta>) {
- let in_memory = InMemory::new();
-
- let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches
- .into_iter()
- .enumerate()
- .map(|(offset, batch)| {
- let mut buf = Vec::<u8>::with_capacity(32 * 1024);
- let mut output = Cursor::new(&mut buf);
-
- let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None)
- .expect("creating writer");
-
- writer.write(&batch).expect("Writing batch");
- writer.close().unwrap();
-
- let meta = ObjectMeta {
- location: Path::parse(format!("file-{offset}.parquet"))
- .expect("creating path"),
- last_modified: chrono::DateTime::from(SystemTime::now()),
- size: buf.len(),
- };
-
- (meta, Bytes::from(buf))
- })
- .collect();
-
- let mut objects = Vec::with_capacity(parquet_batches.len());
- for (meta, bytes) in parquet_batches {
- in_memory
- .put(&meta.location, bytes)
- .await
- .expect("put parquet file into in memory object store");
- objects.push(meta);
- }
-
- (Arc::new(in_memory), objects)
- }
-
- /// Implements [`AsyncFileReader`] for a parquet file in object storage
- struct ParquetFileReader {
- store: Arc<dyn ObjectStore>,
- meta: ObjectMeta,
- metrics: ParquetFileMetrics,
- metadata_size_hint: Option<usize>,
- }
-
- impl AsyncFileReader for ParquetFileReader {
- fn get_bytes(
- &mut self,
- range: Range<usize>,
- ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
- self.metrics.bytes_scanned.add(range.end - range.start);
-
- self.store
- .get_range(&self.meta.location, range)
- .map_err(|e| {
- ParquetError::General(format!(
- "AsyncChunkReader::get_bytes error: {}",
- e
- ))
- })
- .boxed()
- }
-
- fn get_metadata(
- &mut self,
- ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
- Box::pin(async move {
- let metadata = fetch_parquet_metadata(
- self.store.as_ref(),
- &self.meta,
- self.metadata_size_hint,
- )
- .await
- .map_err(|e| {
- ParquetError::General(format!(
- "AsyncChunkReader::get_metadata error: {}",
- e
- ))
- })?;
- Ok(Arc::new(metadata))
- })
- }
- }
-}
diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs
new file mode 100644
index 000000000..8123badb1
--- /dev/null
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray};
+use arrow::datatypes::{Field, Schema};
+use arrow::record_batch::RecordBatch;
+use bytes::Bytes;
+use datafusion::assert_batches_sorted_eq;
+use datafusion::config::ConfigOptions;
+use datafusion::datasource::file_format::parquet::fetch_parquet_metadata;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::physical_plan::file_format::{
+ FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory,
+};
+use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion::physical_plan::{collect, Statistics};
+use datafusion::prelude::SessionContext;
+use datafusion_common::DataFusionError;
+use futures::future::BoxFuture;
+use futures::{FutureExt, TryFutureExt};
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectMeta, ObjectStore};
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::ArrowWriter;
+use parquet::errors::ParquetError;
+use parquet::file::metadata::ParquetMetaData;
+use std::io::Cursor;
+use std::ops::Range;
+use std::sync::Arc;
+use std::time::SystemTime;
+
+const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata";
+
+#[tokio::test]
+async fn route_data_access_ops_to_parquet_file_reader_factory() {
+ let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+ let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
+
+ let batch = create_batch(vec![
+ ("c1", c1.clone()),
+ ("c2", c2.clone()),
+ ("c3", c3.clone()),
+ ]);
+
+ let file_schema = batch.schema().clone();
+ let (in_memory_object_store, parquet_files_meta) =
+ store_parquet_in_memory(vec![batch]).await;
+ let file_groups = parquet_files_meta
+ .into_iter()
+ .map(|meta| PartitionedFile {
+ object_meta: meta,
+ partition_values: vec![],
+ range: None,
+ extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
+ })
+ .collect();
+
+ // prepare the scan
+ let parquet_exec = ParquetExec::new(
+ FileScanConfig {
+ // just any url that doesn't point to in memory object store
+ object_store_url: ObjectStoreUrl::local_filesystem(),
+ file_groups: vec![file_groups],
+ file_schema,
+ statistics: Statistics::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ config_options: ConfigOptions::new().into_shareable(),
+ output_ordering: None,
+ },
+ None,
+ None,
+ )
+ .with_parquet_file_reader_factory(Arc::new(InMemoryParquetFileReaderFactory(
+ Arc::clone(&in_memory_object_store),
+ )));
+
+ let session_ctx = SessionContext::new();
+
+ let task_ctx = session_ctx.task_ctx();
+ let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
+
+ let expected = vec![
+ "+-----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+-----+----+----+",
+ "| Foo | 1 | 10 |",
+ "| | 2 | 20 |",
+ "| bar | | |",
+ "+-----+----+----+",
+ ];
+
+ assert_batches_sorted_eq!(expected, &read);
+}
+
+#[derive(Debug)]
+struct InMemoryParquetFileReaderFactory(Arc<dyn ObjectStore>);
+
+impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
+ fn create_reader(
+ &self,
+ partition_index: usize,
+ file_meta: FileMeta,
+ metadata_size_hint: Option<usize>,
+ metrics: &ExecutionPlanMetricsSet,
+ ) -> Result<Box<dyn AsyncFileReader + Send>, DataFusionError> {
+ let metadata = file_meta
+ .extensions
+ .as_ref()
+ .expect("has user defined metadata");
+ let metadata = metadata
+ .downcast_ref::<String>()
+ .expect("has string metadata");
+
+ assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]);
+
+ let parquet_file_metrics = ParquetFileMetrics::new(
+ partition_index,
+ file_meta.location().as_ref(),
+ metrics,
+ );
+
+ Ok(Box::new(ParquetFileReader {
+ store: Arc::clone(&self.0),
+ meta: file_meta.object_meta,
+ metrics: parquet_file_metrics,
+ metadata_size_hint,
+ }))
+ }
+}
+
+fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+ columns.into_iter().fold(
+ RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+ |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
+ )
+}
+
+fn add_to_batch(batch: &RecordBatch, field_name: &str, array: ArrayRef) -> RecordBatch {
+ let mut fields = batch.schema().fields().clone();
+ fields.push(Field::new(field_name, array.data_type().clone(), true));
+ let schema = Arc::new(Schema::new(fields));
+
+ let mut columns = batch.columns().to_vec();
+ columns.push(array);
+ RecordBatch::try_new(schema, columns).expect("error; creating record batch")
+}
+
+async fn store_parquet_in_memory(
+ batches: Vec<RecordBatch>,
+) -> (Arc<dyn ObjectStore>, Vec<ObjectMeta>) {
+ let in_memory = InMemory::new();
+
+ let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches
+ .into_iter()
+ .enumerate()
+ .map(|(offset, batch)| {
+ let mut buf = Vec::<u8>::with_capacity(32 * 1024);
+ let mut output = Cursor::new(&mut buf);
+
+ let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None)
+ .expect("creating writer");
+
+ writer.write(&batch).expect("Writing batch");
+ writer.close().unwrap();
+
+ let meta = ObjectMeta {
+ location: Path::parse(format!("file-{offset}.parquet"))
+ .expect("creating path"),
+ last_modified: chrono::DateTime::from(SystemTime::now()),
+ size: buf.len(),
+ };
+
+ (meta, Bytes::from(buf))
+ })
+ .collect();
+
+ let mut objects = Vec::with_capacity(parquet_batches.len());
+ for (meta, bytes) in parquet_batches {
+ in_memory
+ .put(&meta.location, bytes)
+ .await
+ .expect("put parquet file into in memory object store");
+ objects.push(meta);
+ }
+
+ (Arc::new(in_memory), objects)
+}
+
+/// Implements [`AsyncFileReader`] for a parquet file in object storage
+struct ParquetFileReader {
+ store: Arc<dyn ObjectStore>,
+ meta: ObjectMeta,
+ metrics: ParquetFileMetrics,
+ metadata_size_hint: Option<usize>,
+}
+
+impl AsyncFileReader for ParquetFileReader {
+ fn get_bytes(
+ &mut self,
+ range: Range<usize>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+ self.metrics.bytes_scanned.add(range.end - range.start);
+
+ self.store
+ .get_range(&self.meta.location, range)
+ .map_err(|e| {
+ ParquetError::General(format!("AsyncChunkReader::get_bytes error: {}", e))
+ })
+ .boxed()
+ }
+
+ fn get_metadata(
+ &mut self,
+ ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+ Box::pin(async move {
+ let metadata = fetch_parquet_metadata(
+ self.store.as_ref(),
+ &self.meta,
+ self.metadata_size_hint,
+ )
+ .await
+ .map_err(|e| {
+ ParquetError::General(format!(
+ "AsyncChunkReader::get_metadata error: {}",
+ e
+ ))
+ })?;
+ Ok(Arc::new(metadata))
+ })
+ }
+}
diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs
index 00ca670e3..ab410bd76 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -16,6 +16,7 @@
// under the License.
//! Parquet integration tests
+mod custom_reader;
mod filter_pushdown;
mod page_pruning;
mod row_group_pruning;