You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by wa...@apache.org on 2023/04/15 14:14:20 UTC
[arrow-datafusion] branch main updated: chore: make JsonOpener and CsvOpener public (#6004)
This is an automated email from the ASF dual-hosted git repository.
wayne pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 56bceb805e chore: make JsonOpener and CsvOpener public (#6004)
56bceb805e is described below
commit 56bceb805e4b7b3a6cd1db774e0583792fd47fd4
Author: Weny Xu <we...@gmail.com>
AuthorDate: Sat Apr 15 23:14:14 2023 +0900
chore: make JsonOpener and CsvOpener public (#6004)
* chore: make JsonOpener and CsvOpener pub
* feat: add examples for JsonOpener and CsvOpener
* Update datafusion-examples/examples/json_opener.rs
Co-authored-by: Ruihang Xia <wa...@gmail.com>
---------
Co-authored-by: Ruihang Xia <wa...@gmail.com>
---
datafusion-examples/Cargo.toml | 1 +
datafusion-examples/examples/csv_opener.rs | 92 +++++++++++++++++++++
datafusion-examples/examples/json_opener.rs | 94 ++++++++++++++++++++++
.../core/src/physical_plan/file_format/csv.rs | 40 ++++++++-
.../core/src/physical_plan/file_format/json.rs | 20 ++++-
.../core/src/physical_plan/file_format/mod.rs | 4 +-
6 files changed, 246 insertions(+), 5 deletions(-)
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index f53316d00b..b7bf1161f8 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -39,6 +39,7 @@ arrow = { workspace = true }
arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
async-trait = "0.1.41"
+bytes = "1.4"
dashmap = "5.4"
datafusion = { path = "../datafusion/core" }
datafusion-common = { path = "../datafusion/common" }
diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs
new file mode 100644
index 0000000000..0143b0297e
--- /dev/null
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{sync::Arc, vec};
+
+use datafusion::{
+ assert_batches_eq,
+ datasource::{
+ file_format::file_type::FileCompressionType, listing::PartitionedFile,
+ object_store::ObjectStoreUrl,
+ },
+ error::Result,
+ physical_plan::{
+ file_format::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
+ metrics::ExecutionPlanMetricsSet,
+ },
+ test_util::aggr_test_schema,
+};
+use futures::StreamExt;
+use object_store::local::LocalFileSystem;
+
+/// This example demonstrates a scanning against an Arrow data source (CSV) and
+/// fetching results
+#[tokio::main]
+async fn main() -> Result<()> {
+ let object_store = Arc::new(LocalFileSystem::new());
+ let schema = aggr_test_schema();
+
+ let config = CsvConfig::new(
+ 8192,
+ schema.clone(),
+ Some(vec![12, 0]),
+ true,
+ b',',
+ object_store,
+ );
+
+ let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED);
+
+ let testdata = datafusion::test_util::arrow_test_data();
+ let path = format!("{testdata}/csv/aggregate_test_100.csv");
+
+ let path = std::path::Path::new(&path).canonicalize()?;
+
+ let scan_config = FileScanConfig {
+ object_store_url: ObjectStoreUrl::local_filesystem(),
+ file_schema: schema.clone(),
+ file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
+ statistics: Default::default(),
+ projection: Some(vec![12, 0]),
+ limit: Some(5),
+ table_partition_cols: vec![],
+ output_ordering: None,
+ infinite_source: false,
+ };
+
+ let result =
+ FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
+ .unwrap()
+ .map(|b| b.unwrap())
+ .collect::<Vec<_>>()
+ .await;
+ assert_batches_eq!(
+ &[
+ "+--------------------------------+----+",
+ "| c13 | c1 |",
+ "+--------------------------------+----+",
+ "| 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | c |",
+ "| C2GT5KVyOPZpgKVl110TyZO0NcJ434 | d |",
+ "| AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | b |",
+ "| 0keZ5G8BffGwgF2RwQD59TFzMStxCB | a |",
+ "| Ig1QcuKsjHXkproePdERo2w0mYzIqd | b |",
+ "+--------------------------------+----+",
+ ],
+ &result
+ );
+ Ok(())
+}
diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs
new file mode 100644
index 0000000000..843bed4f61
--- /dev/null
+++ b/datafusion-examples/examples/json_opener.rs
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{sync::Arc, vec};
+
+use arrow_schema::{DataType, Field, Schema};
+use datafusion::{
+ assert_batches_eq,
+ datasource::{
+ file_format::file_type::FileCompressionType, listing::PartitionedFile,
+ object_store::ObjectStoreUrl,
+ },
+ error::Result,
+ physical_plan::{
+ file_format::{FileScanConfig, FileStream, JsonOpener},
+ metrics::ExecutionPlanMetricsSet,
+ },
+};
+use futures::StreamExt;
+use object_store::ObjectStore;
+
+/// This example demonstrates a scanning against an Arrow data source (JSON) and
+/// fetching results
+#[tokio::main]
+async fn main() -> Result<()> {
+ let object_store = object_store::memory::InMemory::new();
+ let path = object_store::path::Path::from("demo.json");
+ let data = bytes::Bytes::from(
+ r#"{"num":5,"str":"test"}
+ {"num":2,"str":"hello"}
+ {"num":4,"str":"foo"}"#,
+ );
+ object_store.put(&path, data).await.unwrap();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("num", DataType::Int64, false),
+ Field::new("str", DataType::Utf8, false),
+ ]));
+
+ let projected = Arc::new(schema.clone().project(&[1, 0])?);
+
+ let opener = JsonOpener::new(
+ 8192,
+ projected,
+ FileCompressionType::UNCOMPRESSED,
+ Arc::new(object_store),
+ );
+
+ let scan_config = FileScanConfig {
+ object_store_url: ObjectStoreUrl::local_filesystem(),
+ file_schema: schema.clone(),
+ file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
+ statistics: Default::default(),
+ projection: Some(vec![1, 0]),
+ limit: Some(5),
+ table_partition_cols: vec![],
+ output_ordering: None,
+ infinite_source: false,
+ };
+
+ let result =
+ FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
+ .unwrap()
+ .map(|b| b.unwrap())
+ .collect::<Vec<_>>()
+ .await;
+ assert_batches_eq!(
+ &[
+ "+-------+-----+",
+ "| str | num |",
+ "+-------+-----+",
+ "| test | 5 |",
+ "| hello | 2 |",
+ "| foo | 4 |",
+ "+-------+-----+",
+ ],
+ &result
+ );
+ Ok(())
+}
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 9826d32d0f..8c8d89e38f 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -188,8 +188,9 @@ impl ExecutionPlan for CsvExec {
}
}
+/// A Config for [`CsvOpener`]
#[derive(Debug, Clone)]
-struct CsvConfig {
+pub struct CsvConfig {
batch_size: usize,
file_schema: SchemaRef,
file_projection: Option<Vec<usize>>,
@@ -198,6 +199,27 @@ struct CsvConfig {
object_store: Arc<dyn ObjectStore>,
}
+impl CsvConfig {
+ /// Returns a [`CsvConfig`]
+ pub fn new(
+ batch_size: usize,
+ file_schema: SchemaRef,
+ file_projection: Option<Vec<usize>>,
+ has_header: bool,
+ delimiter: u8,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Self {
+ Self {
+ batch_size,
+ file_schema,
+ file_projection,
+ has_header,
+ delimiter,
+ object_store,
+ }
+ }
+}
+
impl CsvConfig {
fn open<R: std::io::Read>(&self, reader: R) -> csv::Reader<R> {
let datetime_format = None;
@@ -228,11 +250,25 @@ impl CsvConfig {
}
}
-struct CsvOpener {
+/// A [`FileOpener`] that opens a CSV file and yields a [`FileOpenFuture`]
+pub struct CsvOpener {
config: Arc<CsvConfig>,
file_compression_type: FileCompressionType,
}
+impl CsvOpener {
+ /// Returns a [`CsvOpener`]
+ pub fn new(
+ config: Arc<CsvConfig>,
+ file_compression_type: FileCompressionType,
+ ) -> Self {
+ Self {
+ config,
+ file_compression_type,
+ }
+ }
+}
+
impl FileOpener for CsvOpener {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index fb34175b85..95d9785666 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -162,13 +162,31 @@ impl ExecutionPlan for NdJsonExec {
}
}
-struct JsonOpener {
+/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
+pub struct JsonOpener {
batch_size: usize,
projected_schema: SchemaRef,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
}
+impl JsonOpener {
+ /// Returns a [`JsonOpener`]
+ pub fn new(
+ batch_size: usize,
+ projected_schema: SchemaRef,
+ file_compression_type: FileCompressionType,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Self {
+ Self {
+ batch_size,
+ projected_schema,
+ file_compression_type,
+ object_store,
+ }
+ }
+}
+
impl FileOpener for JsonOpener {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let store = self.object_store.clone();
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 141a737314..a50f3bf025 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -26,7 +26,7 @@ mod json;
mod parquet;
pub(crate) use self::csv::plan_to_csv;
-pub use self::csv::CsvExec;
+pub use self::csv::{CsvConfig, CsvExec, CsvOpener};
pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
use arrow::{
@@ -39,7 +39,7 @@ pub use avro::AvroExec;
use datafusion_physical_expr::PhysicalSortExpr;
pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
pub(crate) use json::plan_to_json;
-pub use json::NdJsonExec;
+pub use json::{JsonOpener, NdJsonExec};
use crate::datasource::{
listing::{FileRange, PartitionedFile},