You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/05/29 10:47:40 UTC
[arrow-datafusion] branch master updated: NdJson support (#404)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 321fda4 NdJson support (#404)
321fda4 is described below
commit 321fda40a47bcc494c5d2511b6e8b02c9ea975b4
Author: 思维 <he...@users.noreply.github.com>
AuthorDate: Sat May 29 18:47:30 2021 +0800
NdJson support (#404)
---
datafusion/src/datasource/csv.rs | 10 +-
datafusion/src/datasource/json.rs | 190 +++++++++++++
datafusion/src/datasource/mod.rs | 9 +
datafusion/src/physical_plan/csv.rs | 76 +----
datafusion/src/physical_plan/json.rs | 487 +++++++++++++++++++++++++++++++++
datafusion/src/physical_plan/mod.rs | 10 +-
datafusion/src/physical_plan/source.rs | 90 ++++++
datafusion/tests/jsons/1.json | 4 +
datafusion/tests/jsons/2.json | 12 +
9 files changed, 803 insertions(+), 85 deletions(-)
diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs
index 33cbeb1..10e6659 100644
--- a/datafusion/src/datasource/csv.rs
+++ b/datafusion/src/datasource/csv.rs
@@ -40,21 +40,13 @@ use std::string::String;
use std::sync::{Arc, Mutex};
use crate::datasource::datasource::Statistics;
-use crate::datasource::TableProvider;
+use crate::datasource::{Source, TableProvider};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::Expr;
use crate::physical_plan::csv::CsvExec;
pub use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::{common, ExecutionPlan};
-enum Source {
- /// Path to a single CSV file or a directory containing one of more CSV files
- Path(String),
-
- /// Read CSV data from a reader
- Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>),
-}
-
/// Represents a CSV file with a provided schema
pub struct CsvFile {
source: Source,
diff --git a/datafusion/src/datasource/json.rs b/datafusion/src/datasource/json.rs
new file mode 100644
index 0000000..f916f6c
--- /dev/null
+++ b/datafusion/src/datasource/json.rs
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited JSON data source
+//!
+//! This data source allows Line-delimited JSON string or files to be used as input for queries.
+//!
+
+use std::{
+ any::Any,
+ io::{BufReader, Read, Seek},
+ sync::{Arc, Mutex},
+};
+
+use crate::{
+ datasource::{Source, TableProvider},
+ error::{DataFusionError, Result},
+ physical_plan::{
+ common,
+ json::{NdJsonExec, NdJsonReadOptions},
+ ExecutionPlan,
+ },
+};
+use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable};
+
+use super::datasource::Statistics;
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a line-delimited JSON file with a provided schema
+pub struct NdJsonFile {
+ source: Source<Box<dyn SeekRead + Send + Sync + 'static>>,
+ schema: SchemaRef,
+ file_extension: String,
+ statistics: Statistics,
+}
+
+impl NdJsonFile {
+ /// Attempt to initialize a `NdJsonFile` from a path. The schema can be inferred automatically.
+ pub fn try_new(path: &str, options: NdJsonReadOptions) -> Result<Self> {
+ let schema = if let Some(schema) = options.schema {
+ schema
+ } else {
+ let filenames = common::build_file_list(path, options.file_extension)?;
+ if filenames.is_empty() {
+ return Err(DataFusionError::Plan(format!(
+ "No files found at {path} with file extension {file_extension}",
+ path = path,
+ file_extension = options.file_extension
+ )));
+ }
+
+ NdJsonExec::try_infer_schema(
+ filenames,
+ Some(options.schema_infer_max_records),
+ )?
+ .into()
+ };
+
+ Ok(Self {
+ source: Source::Path(path.to_string()),
+ schema,
+ file_extension: options.file_extension.to_string(),
+ statistics: Statistics::default(),
+ })
+ }
+
+ /// Attempt to initialize a `NdJsonFile` from a reader impls `Seek`. The schema can be inferred automatically.
+ pub fn try_new_from_reader<R: Read + Seek + Send + Sync + 'static>(
+ mut reader: R,
+ options: NdJsonReadOptions,
+ ) -> Result<Self> {
+ let schema = if let Some(schema) = options.schema {
+ schema
+ } else {
+ let mut bufr = BufReader::new(reader);
+ let schema = infer_json_schema_from_seekable(
+ &mut bufr,
+ Some(options.schema_infer_max_records),
+ )?
+ .into();
+ reader = bufr.into_inner();
+ schema
+ };
+ Ok(Self {
+ source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+ schema,
+ statistics: Statistics::default(),
+ file_extension: String::new(),
+ })
+ }
+}
+impl TableProvider for NdJsonFile {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn scan(
+ &self,
+ projection: &Option<Vec<usize>>,
+ batch_size: usize,
+ _filters: &[crate::logical_plan::Expr],
+ limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let opts = NdJsonReadOptions {
+ schema: Some(self.schema.clone()),
+ schema_infer_max_records: 0, // schema will always be provided, so it's unnecessary to infer schema
+ file_extension: self.file_extension.as_str(),
+ };
+ let batch_size = limit
+ .map(|l| std::cmp::min(l, batch_size))
+ .unwrap_or(batch_size);
+
+ let exec = match &self.source {
+ Source::Reader(maybe_reader) => {
+ if let Some(rdr) = maybe_reader.lock().unwrap().take() {
+ NdJsonExec::try_new_from_reader(
+ rdr,
+ opts,
+ projection.clone(),
+ batch_size,
+ limit,
+ )?
+ } else {
+ return Err(DataFusionError::Execution(
+ "You can only read once if the data comes from a reader"
+ .to_string(),
+ ));
+ }
+ }
+ Source::Path(p) => {
+ NdJsonExec::try_new(&p, opts, projection.clone(), batch_size, limit)?
+ }
+ };
+ Ok(Arc::new(exec))
+ }
+
+ fn statistics(&self) -> Statistics {
+ self.statistics.clone()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::prelude::*;
+ const TEST_DATA_BASE: &str = "tests/jsons";
+
+ #[tokio::test]
+ async fn csv_file_from_reader() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ let path = format!("{}/2.json", TEST_DATA_BASE);
+ ctx.register_table(
+ "ndjson",
+ Arc::new(NdJsonFile::try_new(&path, Default::default())?),
+ )?;
+ let df = ctx.sql("select sum(a) from ndjson")?;
+ let batches = df.collect().await?;
+ assert_eq!(
+ batches[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<arrow::array::Int64Array>()
+ .unwrap()
+ .value(0),
+ 100000000000011
+ );
+ Ok(())
+ }
+}
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index ac2f3d2..b46b9cc 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -20,9 +20,18 @@
pub mod csv;
pub mod datasource;
pub mod empty;
+pub mod json;
pub mod memory;
pub mod parquet;
pub use self::csv::{CsvFile, CsvReadOptions};
pub use self::datasource::{TableProvider, TableType};
pub use self::memory::MemTable;
+
+pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
+ /// Path to a single file or a directory containing one of more files
+ Path(String),
+
+ /// Read data from a reader
+ Reader(std::sync::Mutex<Option<R>>),
+}
diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs
index 96b24cc..9f88a53 100644
--- a/datafusion/src/physical_plan/csv.rs
+++ b/datafusion/src/physical_plan/csv.rs
@@ -18,7 +18,8 @@
//! Execution plan for reading CSV files
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning};
+use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{common, source::Source, Partitioning};
use arrow::csv;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
@@ -32,7 +33,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::task::{Context, Poll};
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::{DisplayFormatType, RecordBatchStream, SendableRecordBatchStream};
use async_trait::async_trait;
/// CSV file read option
@@ -106,77 +107,6 @@ impl<'a> CsvReadOptions<'a> {
}
}
-/// Source represents where the data comes from.
-enum Source {
- /// The data comes from partitioned files
- PartitionedFiles {
- /// Path to directory containing partitioned files with the same schema
- path: String,
- /// The individual files under path
- filenames: Vec<String>,
- },
-
- /// The data comes from anything impl Read trait
- Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>),
-}
-
-impl std::fmt::Debug for Source {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- Source::PartitionedFiles { path, filenames } => f
- .debug_struct("PartitionedFiles")
- .field("path", path)
- .field("filenames", filenames)
- .finish()?,
- Source::Reader(_) => f.write_str("Reader")?,
- };
- Ok(())
- }
-}
-
-impl std::fmt::Display for Source {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- Source::PartitionedFiles { path, filenames } => {
- write!(f, "Path({}: [{}])", path, filenames.join(","))
- }
- Source::Reader(_) => {
- write!(f, "Reader(...)")
- }
- }
- }
-}
-
-impl Clone for Source {
- fn clone(&self) -> Self {
- match self {
- Source::PartitionedFiles { path, filenames } => Self::PartitionedFiles {
- path: path.clone(),
- filenames: filenames.clone(),
- },
- Source::Reader(_) => Self::Reader(Mutex::new(None)),
- }
- }
-}
-
-impl Source {
- /// Path to directory containing partitioned files with the same schema
- pub fn path(&self) -> &str {
- match self {
- Source::PartitionedFiles { path, .. } => path.as_str(),
- Source::Reader(_) => "",
- }
- }
-
- /// The individual files under path
- pub fn filenames(&self) -> &[String] {
- match self {
- Source::PartitionedFiles { filenames, .. } => filenames,
- Source::Reader(_) => &[],
- }
- }
-}
-
/// Execution plan for scanning a CSV file
#[derive(Debug, Clone)]
pub struct CsvExec {
diff --git a/datafusion/src/physical_plan/json.rs b/datafusion/src/physical_plan/json.rs
new file mode 100644
index 0000000..ed9b0b0
--- /dev/null
+++ b/datafusion/src/physical_plan/json.rs
@@ -0,0 +1,487 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+ datatypes::{Schema, SchemaRef},
+ error::Result as ArrowResult,
+ json,
+ record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+ io::{BufReader, Read},
+ pin::Pin,
+ sync::{Arc, Mutex},
+ task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+ /// The data source schema.
+ pub schema: Option<SchemaRef>,
+
+ /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+ pub schema_infer_max_records: usize,
+
+ /// File extension; only files with this extension are selected for data input.
+ /// Defaults to ".json".
+ pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+ fn default() -> Self {
+ Self {
+ schema: None,
+ schema_infer_max_records: 1000,
+ file_extension: ".json",
+ }
+ }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+ source: Source<Box<dyn SeekRead + Send + Sync>>,
+ schema: SchemaRef,
+ projection: Option<Vec<usize>>,
+ projected_schema: SchemaRef,
+ file_extension: String,
+ batch_size: usize,
+ limit: Option<usize>,
+}
+
+impl NdJsonExec {
+ /// Create a new execution plan for reading from a path
+ pub fn try_new(
+ path: &str,
+ options: NdJsonReadOptions,
+ projection: Option<Vec<usize>>,
+ batch_size: usize,
+ limit: Option<usize>,
+ ) -> Result<Self> {
+ let file_extension = options.file_extension.to_string();
+
+ let filenames = common::build_file_list(path, &file_extension)?;
+
+ if filenames.is_empty() {
+ return Err(DataFusionError::Execution(format!(
+ "No files found at {path} with file extension {file_extension}",
+ path = path,
+ file_extension = file_extension.as_str()
+ )));
+ }
+
+ let schema = match options.schema {
+ Some(s) => s,
+ None => Arc::new(NdJsonExec::try_infer_schema(
+ filenames.clone(),
+ Some(options.schema_infer_max_records),
+ )?),
+ };
+
+ let projected_schema = match &projection {
+ None => schema.clone(),
+ Some(p) => Arc::new(Schema::new(
+ p.iter().map(|i| schema.field(*i).clone()).collect(),
+ )),
+ };
+
+ Ok(Self {
+ source: Source::PartitionedFiles {
+ path: path.to_string(),
+ filenames,
+ },
+ schema,
+ file_extension,
+ projection,
+ projected_schema,
+ batch_size,
+ limit,
+ })
+ }
+ /// Create a new execution plan for reading from a reader
+ pub fn try_new_from_reader(
+ reader: impl Read + Seek + Send + Sync + 'static,
+ options: NdJsonReadOptions,
+ projection: Option<Vec<usize>>,
+ batch_size: usize,
+ limit: Option<usize>,
+ ) -> Result<Self> {
+ let schema = match options.schema {
+ Some(s) => s,
+ None => {
+ return Err(DataFusionError::Execution(
+ "The schema must be provided in options when reading from a reader"
+ .to_string(),
+ ));
+ }
+ };
+
+ let projected_schema = match &projection {
+ None => schema.clone(),
+ Some(p) => Arc::new(Schema::new(
+ p.iter().map(|i| schema.field(*i).clone()).collect(),
+ )),
+ };
+
+ Ok(Self {
+ source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+ schema,
+ file_extension: String::new(),
+ projection,
+ projected_schema,
+ batch_size,
+ limit,
+ })
+ }
+
+ /// Path to directory containing partitioned CSV files with the same schema
+ pub fn path(&self) -> &str {
+ self.source.path()
+ }
+
+ /// The individual files under path
+ pub fn filenames(&self) -> &[String] {
+ self.source.filenames()
+ }
+
+ /// File extension
+ pub fn file_extension(&self) -> &str {
+ &self.file_extension
+ }
+
+ /// Get the schema of the CSV file
+ pub fn file_schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// Optional projection for which columns to load
+ pub fn projection(&self) -> Option<&Vec<usize>> {
+ self.projection.as_ref()
+ }
+
+ /// Batch size
+ pub fn batch_size(&self) -> usize {
+ self.batch_size
+ }
+
+ /// Limit
+ pub fn limit(&self) -> Option<usize> {
+ self.limit
+ }
+
+ /// Infer schema for given CSV dataset
+ pub fn try_infer_schema(
+ mut filenames: Vec<String>,
+ max_records: Option<usize>,
+ ) -> Result<Schema> {
+ let mut schemas = Vec::new();
+ let mut records_to_read = max_records.unwrap_or(usize::MAX);
+ while records_to_read > 0 && !filenames.is_empty() {
+ let file = File::open(filenames.pop().unwrap())?;
+ let mut reader = BufReader::new(file);
+ let iter = ValueIter::new(&mut reader, None);
+ let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+ let should_take = records_to_read > 0;
+ records_to_read -= 1;
+ should_take
+ }))?;
+ schemas.push(schema);
+ }
+
+ Ok(Schema::try_merge(schemas)?)
+ }
+}
+
+#[async_trait]
+impl ExecutionPlan for NdJsonExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.projected_schema.clone()
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(match &self.source {
+ Source::PartitionedFiles { filenames, .. } => filenames.len(),
+ Source::Reader(_) => 1,
+ })
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ Vec::new()
+ }
+
+ fn with_new_children(
+ &self,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if !children.is_empty() {
+ Err(DataFusionError::Internal(format!(
+ "Children cannot be replaced in {:?}",
+ self
+ )))
+ } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+ Ok(Arc::new(Self {
+ source: Source::PartitionedFiles {
+ filenames: filenames.clone(),
+ path: path.clone(),
+ },
+ schema: self.schema.clone(),
+ projection: self.projection.clone(),
+ projected_schema: self.projected_schema.clone(),
+ batch_size: self.batch_size,
+ limit: self.limit,
+ file_extension: self.file_extension.clone(),
+ }))
+ } else {
+ Err(DataFusionError::Internal(
+ "NdJsonExec with reader source cannot be used with `with_new_children`"
+ .to_string(),
+ ))
+ }
+ }
+
+ async fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<super::SendableRecordBatchStream> {
+ let mut builder = json::ReaderBuilder::new()
+ .with_schema(self.schema.clone())
+ .with_batch_size(self.batch_size);
+ if let Some(proj) = &self.projection {
+ builder = builder.with_projection(
+ proj.iter()
+ .map(|col_idx| self.schema.field(*col_idx).name())
+ .cloned()
+ .collect(),
+ );
+ }
+ match &self.source {
+ Source::PartitionedFiles { filenames, .. } => {
+ let file = File::open(&filenames[partition])?;
+
+ Ok(Box::pin(NdJsonStream::new(
+ builder.build(file)?,
+ self.limit,
+ )))
+ }
+ Source::Reader(rdr) => {
+ if partition != 0 {
+ Err(DataFusionError::Internal(
+ "Only partition 0 is valid when CSV comes from a reader"
+ .to_string(),
+ ))
+ } else if let Some(rdr) = rdr.lock().unwrap().take() {
+ Ok(Box::pin(NdJsonStream::new(builder.build(rdr)?, self.limit)))
+ } else {
+ Err(DataFusionError::Execution(
+ "Error reading CSV: Data can only be read a single time when the source is a reader"
+ .to_string(),
+ ))
+ }
+ }
+ }
+ }
+}
+
+struct NdJsonStream<R: Read> {
+ reader: json::Reader<R>,
+ remain: Option<usize>,
+}
+
+impl<R: Read> NdJsonStream<R> {
+ fn new(reader: json::Reader<R>, limit: Option<usize>) -> Self {
+ Self {
+ reader,
+ remain: limit,
+ }
+ }
+}
+
+impl<R: Read + Unpin> Stream for NdJsonStream<R> {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ if let Some(remain) = self.remain.as_mut() {
+ if *remain < 1 {
+ return Poll::Ready(None);
+ }
+ }
+
+ Poll::Ready(match self.reader.next() {
+ Ok(Some(item)) => {
+ if let Some(remain) = self.remain.as_mut() {
+ if *remain >= item.num_rows() {
+ *remain -= item.num_rows();
+ Some(Ok(item))
+ } else {
+ let len = *remain;
+ *remain = 0;
+ Some(Ok(RecordBatch::try_new(
+ item.schema(),
+ item.columns()
+ .iter()
+ .map(|column| column.slice(0, len))
+ .collect(),
+ )?))
+ }
+ } else {
+ Some(Ok(item))
+ }
+ }
+ Ok(None) => None,
+ Err(err) => Some(Err(err)),
+ })
+ }
+}
+
+impl<R: Read + Unpin> RecordBatchStream for NdJsonStream<R> {
+ fn schema(&self) -> SchemaRef {
+ self.reader.schema()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use futures::StreamExt;
+
+ const TEST_DATA_BASE: &str = "tests/jsons";
+
+ #[tokio::test]
+ async fn nd_json_exec_file_without_projection() -> Result<()> {
+ use arrow::datatypes::DataType;
+ let path = format!("{}/1.json", TEST_DATA_BASE);
+ let exec = NdJsonExec::try_new(&path, Default::default(), None, 1024, Some(3))?;
+ let inferred_schema = exec.schema();
+ assert_eq!(inferred_schema.fields().len(), 4);
+
+ // a,b,c,d should be inferred
+ inferred_schema.field_with_name("a").unwrap();
+ inferred_schema.field_with_name("b").unwrap();
+ inferred_schema.field_with_name("c").unwrap();
+ inferred_schema.field_with_name("d").unwrap();
+
+ assert_eq!(
+ inferred_schema.field_with_name("a").unwrap().data_type(),
+ &DataType::Int64
+ );
+ assert!(matches!(
+ inferred_schema.field_with_name("b").unwrap().data_type(),
+ DataType::List(_)
+ ));
+ assert_eq!(
+ inferred_schema.field_with_name("d").unwrap().data_type(),
+ &DataType::Utf8
+ );
+
+ let mut it = exec.execute(0).await?;
+ let batch = it.next().await.unwrap()?;
+
+ assert_eq!(batch.num_rows(), 3);
+ let values = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<arrow::array::Int64Array>()
+ .unwrap();
+ assert_eq!(values.value(0), 1);
+ assert_eq!(values.value(1), -10);
+ assert_eq!(values.value(2), 2);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn nd_json_exec_file_projection() -> Result<()> {
+ let path = format!("{}/1.json", TEST_DATA_BASE);
+ let exec =
+ NdJsonExec::try_new(&path, Default::default(), Some(vec![0, 2]), 1024, None)?;
+ let inferred_schema = exec.schema();
+ assert_eq!(inferred_schema.fields().len(), 2);
+
+ inferred_schema.field_with_name("a").unwrap();
+ inferred_schema.field_with_name("b").unwrap_err();
+ inferred_schema.field_with_name("c").unwrap();
+ inferred_schema.field_with_name("d").unwrap_err();
+
+ let mut it = exec.execute(0).await?;
+ let batch = it.next().await.unwrap()?;
+
+ assert_eq!(batch.num_rows(), 4);
+ let values = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<arrow::array::Int64Array>()
+ .unwrap();
+ assert_eq!(values.value(0), 1);
+ assert_eq!(values.value(1), -10);
+ assert_eq!(values.value(2), 2);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn nd_json_exec_from_reader() -> Result<()> {
+ let content = r#"{"a":"aaa", "b":[2.0, 1.3, -6.1], "c":[false, true], "d":"4"}
+{"a":"bbb", "b":[2.0, 1.3, -6.1], "c":[true, true], "d":"4"}"#;
+ let cur = std::io::Cursor::new(content);
+ let mut bufrdr = std::io::BufReader::new(cur);
+ let schema =
+ arrow::json::reader::infer_json_schema_from_seekable(&mut bufrdr, None)?;
+ let exec = NdJsonExec::try_new_from_reader(
+ bufrdr,
+ NdJsonReadOptions {
+ schema: Some(Arc::new(schema)),
+ ..Default::default()
+ },
+ None,
+ 1024,
+ Some(1),
+ )?;
+
+ let mut it = exec.execute(0).await?;
+ let batch = it.next().await.unwrap()?;
+
+ assert_eq!(batch.num_rows(), 1);
+
+ let values = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<arrow::array::StringArray>()
+ .unwrap();
+ assert_eq!(values.value(0), "aaa");
+
+ Ok(())
+ }
+}
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index b1234a0..ae84b36 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,6 +17,11 @@
//! Traits for physical query plan, supporting parallel execution for partitioned relations.
+use std::fmt;
+use std::fmt::{Debug, Display};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::LogicalPlan;
use crate::{
@@ -30,9 +35,6 @@ use arrow::{array::ArrayRef, datatypes::Field};
use async_trait::async_trait;
pub use display::DisplayFormatType;
use futures::stream::Stream;
-use std::fmt::{self, Debug, Display};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
use std::{any::Any, pin::Pin};
use self::{display::DisplayableExecutionPlan, merge::MergeExec};
@@ -594,6 +596,7 @@ pub mod group_scalar;
pub mod hash_aggregate;
pub mod hash_join;
pub mod hash_utils;
+pub mod json;
pub mod limit;
pub mod math_expressions;
pub mod memory;
@@ -605,6 +608,7 @@ pub mod projection;
pub mod regex_expressions;
pub mod repartition;
pub mod sort;
+pub mod source;
pub mod string_expressions;
pub mod type_coercion;
pub mod udaf;
diff --git a/datafusion/src/physical_plan/source.rs b/datafusion/src/physical_plan/source.rs
new file mode 100644
index 0000000..012405a
--- /dev/null
+++ b/datafusion/src/physical_plan/source.rs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains a `Source` enum represents where the data comes from.
+
+use std::{io::Read, sync::Mutex};
+
+/// Source represents where the data comes from.
+pub(crate) enum Source<R = Box<dyn Read + Send + Sync>> {
+ /// The data comes from partitioned files
+ PartitionedFiles {
+ /// Path to directory containing partitioned files with the same schema
+ path: String,
+ /// The individual files under path
+ filenames: Vec<String>,
+ },
+
+ /// The data comes from anything impl Read trait
+ Reader(Mutex<Option<R>>),
+}
+
+impl<R> std::fmt::Debug for Source<R> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Source::PartitionedFiles { path, filenames } => f
+ .debug_struct("PartitionedFiles")
+ .field("path", path)
+ .field("filenames", filenames)
+ .finish()?,
+ Source::Reader(_) => f.write_str("Reader")?,
+ };
+ Ok(())
+ }
+}
+impl std::fmt::Display for Source {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Source::PartitionedFiles { path, filenames } => {
+ write!(f, "Path({}: [{}])", path, filenames.join(","))
+ }
+ Source::Reader(_) => {
+ write!(f, "Reader(...)")
+ }
+ }
+ }
+}
+
+impl<R> Clone for Source<R> {
+ fn clone(&self) -> Self {
+ match self {
+ Source::PartitionedFiles { path, filenames } => Self::PartitionedFiles {
+ path: path.clone(),
+ filenames: filenames.clone(),
+ },
+ Source::Reader(_) => Self::Reader(Mutex::new(None)),
+ }
+ }
+}
+
+impl<R> Source<R> {
+ /// Path to directory containing partitioned files with the same schema
+ pub fn path(&self) -> &str {
+ match self {
+ Source::PartitionedFiles { path, .. } => path.as_str(),
+ Source::Reader(_) => "",
+ }
+ }
+
+ /// The individual files under path
+ pub fn filenames(&self) -> &[String] {
+ match self {
+ Source::PartitionedFiles { filenames, .. } => filenames,
+ Source::Reader(_) => &[],
+ }
+ }
+}
diff --git a/datafusion/tests/jsons/1.json b/datafusion/tests/jsons/1.json
new file mode 100644
index 0000000..e6f360f
--- /dev/null
+++ b/datafusion/tests/jsons/1.json
@@ -0,0 +1,4 @@
+{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":"4"}
+{"a":-10, "b":[2.0, 1.3, -6.1], "c":[true, true], "d":"4"}
+{"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"}
+{}
\ No newline at end of file
diff --git a/datafusion/tests/jsons/2.json b/datafusion/tests/jsons/2.json
new file mode 100644
index 0000000..dafd2dd
--- /dev/null
+++ b/datafusion/tests/jsons/2.json
@@ -0,0 +1,12 @@
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":-10, "b":-3.5, "c":true, "d":"4"}
+{"a":2, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":7, "b":-3.5, "c":true, "d":"4"}
+{"a":1, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":5, "b":-3.5, "c":true, "d":"4"}
+{"a":1, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":1, "b":-3.5, "c":true, "d":"4"}
+{"a":100000000000000, "b":0.6, "c":false, "d":"text"}
\ No newline at end of file