You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by wa...@apache.org on 2023/04/15 14:14:20 UTC

[arrow-datafusion] branch main updated: chore: make JsonOpener and CsvOpener public (#6004)

This is an automated email from the ASF dual-hosted git repository.

wayne pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 56bceb805e chore: make JsonOpener and CsvOpener public (#6004)
56bceb805e is described below

commit 56bceb805e4b7b3a6cd1db774e0583792fd47fd4
Author: Weny Xu <we...@gmail.com>
AuthorDate: Sat Apr 15 23:14:14 2023 +0900

    chore: make JsonOpener and CsvOpener public (#6004)
    
    * chore: make JsonOpener and CsvOpener pub
    
    * feat: add examples for JsonOpener and CsvOpener
    
    * Update datafusion-examples/examples/json_opener.rs
    
    Co-authored-by: Ruihang Xia <wa...@gmail.com>
    
    ---------
    
    Co-authored-by: Ruihang Xia <wa...@gmail.com>
---
 datafusion-examples/Cargo.toml                     |  1 +
 datafusion-examples/examples/csv_opener.rs         | 92 +++++++++++++++++++++
 datafusion-examples/examples/json_opener.rs        | 94 ++++++++++++++++++++++
 .../core/src/physical_plan/file_format/csv.rs      | 40 ++++++++-
 .../core/src/physical_plan/file_format/json.rs     | 20 ++++-
 .../core/src/physical_plan/file_format/mod.rs      |  4 +-
 6 files changed, 246 insertions(+), 5 deletions(-)

diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index f53316d00b..b7bf1161f8 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -39,6 +39,7 @@ arrow = { workspace = true }
 arrow-flight = { workspace = true }
 arrow-schema = { workspace = true }
 async-trait = "0.1.41"
+bytes = "1.4"
 dashmap = "5.4"
 datafusion = { path = "../datafusion/core" }
 datafusion-common = { path = "../datafusion/common" }
diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs
new file mode 100644
index 0000000000..0143b0297e
--- /dev/null
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{sync::Arc, vec};
+
+use datafusion::{
+    assert_batches_eq,
+    datasource::{
+        file_format::file_type::FileCompressionType, listing::PartitionedFile,
+        object_store::ObjectStoreUrl,
+    },
+    error::Result,
+    physical_plan::{
+        file_format::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
+        metrics::ExecutionPlanMetricsSet,
+    },
+    test_util::aggr_test_schema,
+};
+use futures::StreamExt;
+use object_store::local::LocalFileSystem;
+
+/// This example demonstrates a scanning against an Arrow data source (CSV) and
+/// fetching results
+#[tokio::main]
+async fn main() -> Result<()> {
+    let object_store = Arc::new(LocalFileSystem::new());
+    let schema = aggr_test_schema();
+
+    let config = CsvConfig::new(
+        8192,
+        schema.clone(),
+        Some(vec![12, 0]),
+        true,
+        b',',
+        object_store,
+    );
+
+    let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED);
+
+    let testdata = datafusion::test_util::arrow_test_data();
+    let path = format!("{testdata}/csv/aggregate_test_100.csv");
+
+    let path = std::path::Path::new(&path).canonicalize()?;
+
+    let scan_config = FileScanConfig {
+        object_store_url: ObjectStoreUrl::local_filesystem(),
+        file_schema: schema.clone(),
+        file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
+        statistics: Default::default(),
+        projection: Some(vec![12, 0]),
+        limit: Some(5),
+        table_partition_cols: vec![],
+        output_ordering: None,
+        infinite_source: false,
+    };
+
+    let result =
+        FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
+            .unwrap()
+            .map(|b| b.unwrap())
+            .collect::<Vec<_>>()
+            .await;
+    assert_batches_eq!(
+        &[
+            "+--------------------------------+----+",
+            "| c13                            | c1 |",
+            "+--------------------------------+----+",
+            "| 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | c  |",
+            "| C2GT5KVyOPZpgKVl110TyZO0NcJ434 | d  |",
+            "| AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | b  |",
+            "| 0keZ5G8BffGwgF2RwQD59TFzMStxCB | a  |",
+            "| Ig1QcuKsjHXkproePdERo2w0mYzIqd | b  |",
+            "+--------------------------------+----+",
+        ],
+        &result
+    );
+    Ok(())
+}
diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs
new file mode 100644
index 0000000000..843bed4f61
--- /dev/null
+++ b/datafusion-examples/examples/json_opener.rs
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{sync::Arc, vec};
+
+use arrow_schema::{DataType, Field, Schema};
+use datafusion::{
+    assert_batches_eq,
+    datasource::{
+        file_format::file_type::FileCompressionType, listing::PartitionedFile,
+        object_store::ObjectStoreUrl,
+    },
+    error::Result,
+    physical_plan::{
+        file_format::{FileScanConfig, FileStream, JsonOpener},
+        metrics::ExecutionPlanMetricsSet,
+    },
+};
+use futures::StreamExt;
+use object_store::ObjectStore;
+
+/// This example demonstrates a scanning against an Arrow data source (JSON) and
+/// fetching results
+#[tokio::main]
+async fn main() -> Result<()> {
+    let object_store = object_store::memory::InMemory::new();
+    let path = object_store::path::Path::from("demo.json");
+    let data = bytes::Bytes::from(
+        r#"{"num":5,"str":"test"}
+        {"num":2,"str":"hello"}
+        {"num":4,"str":"foo"}"#,
+    );
+    object_store.put(&path, data).await.unwrap();
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("num", DataType::Int64, false),
+        Field::new("str", DataType::Utf8, false),
+    ]));
+
+    let projected = Arc::new(schema.clone().project(&[1, 0])?);
+
+    let opener = JsonOpener::new(
+        8192,
+        projected,
+        FileCompressionType::UNCOMPRESSED,
+        Arc::new(object_store),
+    );
+
+    let scan_config = FileScanConfig {
+        object_store_url: ObjectStoreUrl::local_filesystem(),
+        file_schema: schema.clone(),
+        file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
+        statistics: Default::default(),
+        projection: Some(vec![1, 0]),
+        limit: Some(5),
+        table_partition_cols: vec![],
+        output_ordering: None,
+        infinite_source: false,
+    };
+
+    let result =
+        FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
+            .unwrap()
+            .map(|b| b.unwrap())
+            .collect::<Vec<_>>()
+            .await;
+    assert_batches_eq!(
+        &[
+            "+-------+-----+",
+            "| str   | num |",
+            "+-------+-----+",
+            "| test  | 5   |",
+            "| hello | 2   |",
+            "| foo   | 4   |",
+            "+-------+-----+",
+        ],
+        &result
+    );
+    Ok(())
+}
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 9826d32d0f..8c8d89e38f 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -188,8 +188,9 @@ impl ExecutionPlan for CsvExec {
     }
 }
 
+/// A Config for [`CsvOpener`]
 #[derive(Debug, Clone)]
-struct CsvConfig {
+pub struct CsvConfig {
     batch_size: usize,
     file_schema: SchemaRef,
     file_projection: Option<Vec<usize>>,
@@ -198,6 +199,27 @@ struct CsvConfig {
     object_store: Arc<dyn ObjectStore>,
 }
 
+impl CsvConfig {
+    /// Returns a [`CsvConfig`]
+    pub fn new(
+        batch_size: usize,
+        file_schema: SchemaRef,
+        file_projection: Option<Vec<usize>>,
+        has_header: bool,
+        delimiter: u8,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Self {
+        Self {
+            batch_size,
+            file_schema,
+            file_projection,
+            has_header,
+            delimiter,
+            object_store,
+        }
+    }
+}
+
 impl CsvConfig {
     fn open<R: std::io::Read>(&self, reader: R) -> csv::Reader<R> {
         let datetime_format = None;
@@ -228,11 +250,25 @@ impl CsvConfig {
     }
 }
 
-struct CsvOpener {
+/// A [`FileOpener`] that opens a CSV file and yields a [`FileOpenFuture`]
+pub struct CsvOpener {
     config: Arc<CsvConfig>,
     file_compression_type: FileCompressionType,
 }
 
+impl CsvOpener {
+    /// Returns a [`CsvOpener`]
+    pub fn new(
+        config: Arc<CsvConfig>,
+        file_compression_type: FileCompressionType,
+    ) -> Self {
+        Self {
+            config,
+            file_compression_type,
+        }
+    }
+}
+
 impl FileOpener for CsvOpener {
     fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
         let config = self.config.clone();
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index fb34175b85..95d9785666 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -162,13 +162,31 @@ impl ExecutionPlan for NdJsonExec {
     }
 }
 
-struct JsonOpener {
+/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
+pub struct JsonOpener {
     batch_size: usize,
     projected_schema: SchemaRef,
     file_compression_type: FileCompressionType,
     object_store: Arc<dyn ObjectStore>,
 }
 
+impl JsonOpener {
+    /// Returns a  [`JsonOpener`]
+    pub fn new(
+        batch_size: usize,
+        projected_schema: SchemaRef,
+        file_compression_type: FileCompressionType,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Self {
+        Self {
+            batch_size,
+            projected_schema,
+            file_compression_type,
+            object_store,
+        }
+    }
+}
+
 impl FileOpener for JsonOpener {
     fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
         let store = self.object_store.clone();
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index 141a737314..a50f3bf025 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -26,7 +26,7 @@ mod json;
 mod parquet;
 
 pub(crate) use self::csv::plan_to_csv;
-pub use self::csv::CsvExec;
+pub use self::csv::{CsvConfig, CsvExec, CsvOpener};
 pub(crate) use self::parquet::plan_to_parquet;
 pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
 use arrow::{
@@ -39,7 +39,7 @@ pub use avro::AvroExec;
 use datafusion_physical_expr::PhysicalSortExpr;
 pub use file_stream::{FileOpenFuture, FileOpener, FileStream};
 pub(crate) use json::plan_to_json;
-pub use json::NdJsonExec;
+pub use json::{JsonOpener, NdJsonExec};
 
 use crate::datasource::{
     listing::{FileRange, PartitionedFile},