You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/05/29 10:47:40 UTC

[arrow-datafusion] branch master updated: NdJson support (#404)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 321fda4  NdJson support (#404)
321fda4 is described below

commit 321fda40a47bcc494c5d2511b6e8b02c9ea975b4
Author: 思维 <he...@users.noreply.github.com>
AuthorDate: Sat May 29 18:47:30 2021 +0800

    NdJson support (#404)
---
 datafusion/src/datasource/csv.rs       |  10 +-
 datafusion/src/datasource/json.rs      | 190 +++++++++++++
 datafusion/src/datasource/mod.rs       |   9 +
 datafusion/src/physical_plan/csv.rs    |  76 +----
 datafusion/src/physical_plan/json.rs   | 487 +++++++++++++++++++++++++++++++++
 datafusion/src/physical_plan/mod.rs    |  10 +-
 datafusion/src/physical_plan/source.rs |  90 ++++++
 datafusion/tests/jsons/1.json          |   4 +
 datafusion/tests/jsons/2.json          |  12 +
 9 files changed, 803 insertions(+), 85 deletions(-)

diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs
index 33cbeb1..10e6659 100644
--- a/datafusion/src/datasource/csv.rs
+++ b/datafusion/src/datasource/csv.rs
@@ -40,21 +40,13 @@ use std::string::String;
 use std::sync::{Arc, Mutex};
 
 use crate::datasource::datasource::Statistics;
-use crate::datasource::TableProvider;
+use crate::datasource::{Source, TableProvider};
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::Expr;
 use crate::physical_plan::csv::CsvExec;
 pub use crate::physical_plan::csv::CsvReadOptions;
 use crate::physical_plan::{common, ExecutionPlan};
 
-enum Source {
-    /// Path to a single CSV file or a directory containing one of more CSV files
-    Path(String),
-
-    /// Read CSV data from a reader
-    Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>),
-}
-
 /// Represents a CSV file with a provided schema
 pub struct CsvFile {
     source: Source,
diff --git a/datafusion/src/datasource/json.rs b/datafusion/src/datasource/json.rs
new file mode 100644
index 0000000..f916f6c
--- /dev/null
+++ b/datafusion/src/datasource/json.rs
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Line-delimited JSON data source
+//!
+//! This data source allows Line-delimited JSON string or files to be used as input for queries.
+//!
+
+use std::{
+    any::Any,
+    io::{BufReader, Read, Seek},
+    sync::{Arc, Mutex},
+};
+
+use crate::{
+    datasource::{Source, TableProvider},
+    error::{DataFusionError, Result},
+    physical_plan::{
+        common,
+        json::{NdJsonExec, NdJsonReadOptions},
+        ExecutionPlan,
+    },
+};
+use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable};
+
+use super::datasource::Statistics;
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+
+/// Represents a  line-delimited JSON file with a provided schema
+pub struct NdJsonFile {
+    source: Source<Box<dyn SeekRead + Send + Sync + 'static>>,
+    schema: SchemaRef,
+    file_extension: String,
+    statistics: Statistics,
+}
+
+impl NdJsonFile {
+    /// Attempt to initialize a `NdJsonFile` from a path. The schema can be inferred automatically.
+    pub fn try_new(path: &str, options: NdJsonReadOptions) -> Result<Self> {
+        let schema = if let Some(schema) = options.schema {
+            schema
+        } else {
+            let filenames = common::build_file_list(path, options.file_extension)?;
+            if filenames.is_empty() {
+                return Err(DataFusionError::Plan(format!(
+                    "No files found at {path} with file extension {file_extension}",
+                    path = path,
+                    file_extension = options.file_extension
+                )));
+            }
+
+            NdJsonExec::try_infer_schema(
+                filenames,
+                Some(options.schema_infer_max_records),
+            )?
+            .into()
+        };
+
+        Ok(Self {
+            source: Source::Path(path.to_string()),
+            schema,
+            file_extension: options.file_extension.to_string(),
+            statistics: Statistics::default(),
+        })
+    }
+
+    /// Attempt to initialize a `NdJsonFile` from a reader impls `Seek`. The schema can be inferred automatically.
+    pub fn try_new_from_reader<R: Read + Seek + Send + Sync + 'static>(
+        mut reader: R,
+        options: NdJsonReadOptions,
+    ) -> Result<Self> {
+        let schema = if let Some(schema) = options.schema {
+            schema
+        } else {
+            let mut bufr = BufReader::new(reader);
+            let schema = infer_json_schema_from_seekable(
+                &mut bufr,
+                Some(options.schema_infer_max_records),
+            )?
+            .into();
+            reader = bufr.into_inner();
+            schema
+        };
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            statistics: Statistics::default(),
+            file_extension: String::new(),
+        })
+    }
+}
+impl TableProvider for NdJsonFile {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn scan(
+        &self,
+        projection: &Option<Vec<usize>>,
+        batch_size: usize,
+        _filters: &[crate::logical_plan::Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let opts = NdJsonReadOptions {
+            schema: Some(self.schema.clone()),
+            schema_infer_max_records: 0, // schema will always be provided, so it's unnecessary to infer schema
+            file_extension: self.file_extension.as_str(),
+        };
+        let batch_size = limit
+            .map(|l| std::cmp::min(l, batch_size))
+            .unwrap_or(batch_size);
+
+        let exec = match &self.source {
+            Source::Reader(maybe_reader) => {
+                if let Some(rdr) = maybe_reader.lock().unwrap().take() {
+                    NdJsonExec::try_new_from_reader(
+                        rdr,
+                        opts,
+                        projection.clone(),
+                        batch_size,
+                        limit,
+                    )?
+                } else {
+                    return Err(DataFusionError::Execution(
+                        "You can only read once if the data comes from a reader"
+                            .to_string(),
+                    ));
+                }
+            }
+            Source::Path(p) => {
+                NdJsonExec::try_new(&p, opts, projection.clone(), batch_size, limit)?
+            }
+        };
+        Ok(Arc::new(exec))
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.statistics.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::prelude::*;
+    const TEST_DATA_BASE: &str = "tests/jsons";
+
+    #[tokio::test]
+    async fn csv_file_from_reader() -> Result<()> {
+        let mut ctx = ExecutionContext::new();
+        let path = format!("{}/2.json", TEST_DATA_BASE);
+        ctx.register_table(
+            "ndjson",
+            Arc::new(NdJsonFile::try_new(&path, Default::default())?),
+        )?;
+        let df = ctx.sql("select sum(a) from ndjson")?;
+        let batches = df.collect().await?;
+        assert_eq!(
+            batches[0]
+                .column(0)
+                .as_any()
+                .downcast_ref::<arrow::array::Int64Array>()
+                .unwrap()
+                .value(0),
+            100000000000011
+        );
+        Ok(())
+    }
+}
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index ac2f3d2..b46b9cc 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -20,9 +20,18 @@
 pub mod csv;
 pub mod datasource;
 pub mod empty;
+pub mod json;
 pub mod memory;
 pub mod parquet;
 
 pub use self::csv::{CsvFile, CsvReadOptions};
 pub use self::datasource::{TableProvider, TableType};
 pub use self::memory::MemTable;
+
+pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
+    /// Path to a single file or a directory containing one of more files
+    Path(String),
+
+    /// Read data from a reader
+    Reader(std::sync::Mutex<Option<R>>),
+}
diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs
index 96b24cc..9f88a53 100644
--- a/datafusion/src/physical_plan/csv.rs
+++ b/datafusion/src/physical_plan/csv.rs
@@ -18,7 +18,8 @@
 //! Execution plan for reading CSV files
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{common, DisplayFormatType, ExecutionPlan, Partitioning};
+use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{common, source::Source, Partitioning};
 use arrow::csv;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
@@ -32,7 +33,7 @@ use std::sync::Arc;
 use std::sync::Mutex;
 use std::task::{Context, Poll};
 
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::{DisplayFormatType, RecordBatchStream, SendableRecordBatchStream};
 use async_trait::async_trait;
 
 /// CSV file read option
@@ -106,77 +107,6 @@ impl<'a> CsvReadOptions<'a> {
     }
 }
 
-///  Source represents where the data comes from.
-enum Source {
-    /// The data comes from partitioned files
-    PartitionedFiles {
-        /// Path to directory containing partitioned files with the same schema
-        path: String,
-        /// The individual files under path
-        filenames: Vec<String>,
-    },
-
-    /// The data comes from anything impl Read trait
-    Reader(Mutex<Option<Box<dyn Read + Send + Sync + 'static>>>),
-}
-
-impl std::fmt::Debug for Source {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        match self {
-            Source::PartitionedFiles { path, filenames } => f
-                .debug_struct("PartitionedFiles")
-                .field("path", path)
-                .field("filenames", filenames)
-                .finish()?,
-            Source::Reader(_) => f.write_str("Reader")?,
-        };
-        Ok(())
-    }
-}
-
-impl std::fmt::Display for Source {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        match self {
-            Source::PartitionedFiles { path, filenames } => {
-                write!(f, "Path({}: [{}])", path, filenames.join(","))
-            }
-            Source::Reader(_) => {
-                write!(f, "Reader(...)")
-            }
-        }
-    }
-}
-
-impl Clone for Source {
-    fn clone(&self) -> Self {
-        match self {
-            Source::PartitionedFiles { path, filenames } => Self::PartitionedFiles {
-                path: path.clone(),
-                filenames: filenames.clone(),
-            },
-            Source::Reader(_) => Self::Reader(Mutex::new(None)),
-        }
-    }
-}
-
-impl Source {
-    /// Path to directory containing partitioned files with the same schema
-    pub fn path(&self) -> &str {
-        match self {
-            Source::PartitionedFiles { path, .. } => path.as_str(),
-            Source::Reader(_) => "",
-        }
-    }
-
-    /// The individual files under path
-    pub fn filenames(&self) -> &[String] {
-        match self {
-            Source::PartitionedFiles { filenames, .. } => filenames,
-            Source::Reader(_) => &[],
-        }
-    }
-}
-
 /// Execution plan for scanning a CSV file
 #[derive(Debug, Clone)]
 pub struct CsvExec {
diff --git a/datafusion/src/physical_plan/json.rs b/datafusion/src/physical_plan/json.rs
new file mode 100644
index 0000000..ed9b0b0
--- /dev/null
+++ b/datafusion/src/physical_plan/json.rs
@@ -0,0 +1,487 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, source::Source, ExecutionPlan, Partitioning, RecordBatchStream};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    json,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::{BufReader, Read},
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    pub schema_infer_max_records: usize,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".json".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            schema_infer_max_records: 1000,
+            file_extension: ".json",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl NdJsonExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_new(
+        path: &str,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(NdJsonExec::try_infer_schema(
+                filenames.clone(),
+                Some(options.schema_infer_max_records),
+            )?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            file_extension,
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the CSV file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given CSV dataset
+    pub fn try_infer_schema(
+        mut filenames: Vec<String>,
+        max_records: Option<usize>,
+    ) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        let mut records_to_read = max_records.unwrap_or(usize::MAX);
+        while records_to_read > 0 && !filenames.is_empty() {
+            let file = File::open(filenames.pop().unwrap())?;
+            let mut reader = BufReader::new(file);
+            let iter = ValueIter::new(&mut reader, None);
+            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+                let should_take = records_to_read > 0;
+                records_to_read -= 1;
+                should_take
+            }))?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for NdJsonExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "NdJsonExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = json::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(NdJsonStream::new(
+                    builder.build(file)?,
+                    self.limit,
+                )))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when CSV comes from a reader"
+                            .to_string(),
+                    ))
+                } else if let Some(rdr) = rdr.lock().unwrap().take() {
+                    Ok(Box::pin(NdJsonStream::new(builder.build(rdr)?, self.limit)))
+                } else {
+                    Err(DataFusionError::Execution(
+                        "Error reading CSV: Data can only be read a single time when the source is a reader"
+                            .to_string(),
+                    ))
+                }
+            }
+        }
+    }
+}
+
+struct NdJsonStream<R: Read> {
+    reader: json::Reader<R>,
+    remain: Option<usize>,
+}
+
+impl<R: Read> NdJsonStream<R> {
+    fn new(reader: json::Reader<R>, limit: Option<usize>) -> Self {
+        Self {
+            reader,
+            remain: limit,
+        }
+    }
+}
+
+impl<R: Read + Unpin> Stream for NdJsonStream<R> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if let Some(remain) = self.remain.as_mut() {
+            if *remain < 1 {
+                return Poll::Ready(None);
+            }
+        }
+
+        Poll::Ready(match self.reader.next() {
+            Ok(Some(item)) => {
+                if let Some(remain) = self.remain.as_mut() {
+                    if *remain >= item.num_rows() {
+                        *remain -= item.num_rows();
+                        Some(Ok(item))
+                    } else {
+                        let len = *remain;
+                        *remain = 0;
+                        Some(Ok(RecordBatch::try_new(
+                            item.schema(),
+                            item.columns()
+                                .iter()
+                                .map(|column| column.slice(0, len))
+                                .collect(),
+                        )?))
+                    }
+                } else {
+                    Some(Ok(item))
+                }
+            }
+            Ok(None) => None,
+            Err(err) => Some(Err(err)),
+        })
+    }
+}
+
+impl<R: Read + Unpin> RecordBatchStream for NdJsonStream<R> {
+    fn schema(&self) -> SchemaRef {
+        self.reader.schema()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::StreamExt;
+
+    const TEST_DATA_BASE: &str = "tests/jsons";
+
+    #[tokio::test]
+    async fn nd_json_exec_file_without_projection() -> Result<()> {
+        use arrow::datatypes::DataType;
+        let path = format!("{}/1.json", TEST_DATA_BASE);
+        let exec = NdJsonExec::try_new(&path, Default::default(), None, 1024, Some(3))?;
+        let inferred_schema = exec.schema();
+        assert_eq!(inferred_schema.fields().len(), 4);
+
+        // a,b,c,d should be inferred
+        inferred_schema.field_with_name("a").unwrap();
+        inferred_schema.field_with_name("b").unwrap();
+        inferred_schema.field_with_name("c").unwrap();
+        inferred_schema.field_with_name("d").unwrap();
+
+        assert_eq!(
+            inferred_schema.field_with_name("a").unwrap().data_type(),
+            &DataType::Int64
+        );
+        assert!(matches!(
+            inferred_schema.field_with_name("b").unwrap().data_type(),
+            DataType::List(_)
+        ));
+        assert_eq!(
+            inferred_schema.field_with_name("d").unwrap().data_type(),
+            &DataType::Utf8
+        );
+
+        let mut it = exec.execute(0).await?;
+        let batch = it.next().await.unwrap()?;
+
+        assert_eq!(batch.num_rows(), 3);
+        let values = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<arrow::array::Int64Array>()
+            .unwrap();
+        assert_eq!(values.value(0), 1);
+        assert_eq!(values.value(1), -10);
+        assert_eq!(values.value(2), 2);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn nd_json_exec_file_projection() -> Result<()> {
+        let path = format!("{}/1.json", TEST_DATA_BASE);
+        let exec =
+            NdJsonExec::try_new(&path, Default::default(), Some(vec![0, 2]), 1024, None)?;
+        let inferred_schema = exec.schema();
+        assert_eq!(inferred_schema.fields().len(), 2);
+
+        inferred_schema.field_with_name("a").unwrap();
+        inferred_schema.field_with_name("b").unwrap_err();
+        inferred_schema.field_with_name("c").unwrap();
+        inferred_schema.field_with_name("d").unwrap_err();
+
+        let mut it = exec.execute(0).await?;
+        let batch = it.next().await.unwrap()?;
+
+        assert_eq!(batch.num_rows(), 4);
+        let values = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<arrow::array::Int64Array>()
+            .unwrap();
+        assert_eq!(values.value(0), 1);
+        assert_eq!(values.value(1), -10);
+        assert_eq!(values.value(2), 2);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn nd_json_exec_from_reader() -> Result<()> {
+        let content = r#"{"a":"aaa", "b":[2.0, 1.3, -6.1], "c":[false, true], "d":"4"}
+{"a":"bbb", "b":[2.0, 1.3, -6.1], "c":[true, true], "d":"4"}"#;
+        let cur = std::io::Cursor::new(content);
+        let mut bufrdr = std::io::BufReader::new(cur);
+        let schema =
+            arrow::json::reader::infer_json_schema_from_seekable(&mut bufrdr, None)?;
+        let exec = NdJsonExec::try_new_from_reader(
+            bufrdr,
+            NdJsonReadOptions {
+                schema: Some(Arc::new(schema)),
+                ..Default::default()
+            },
+            None,
+            1024,
+            Some(1),
+        )?;
+
+        let mut it = exec.execute(0).await?;
+        let batch = it.next().await.unwrap()?;
+
+        assert_eq!(batch.num_rows(), 1);
+
+        let values = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<arrow::array::StringArray>()
+            .unwrap();
+        assert_eq!(values.value(0), "aaa");
+
+        Ok(())
+    }
+}
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index b1234a0..ae84b36 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,6 +17,11 @@
 
 //! Traits for physical query plan, supporting parallel execution for partitioned relations.
 
+use std::fmt;
+use std::fmt::{Debug, Display};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
 use crate::execution::context::ExecutionContextState;
 use crate::logical_plan::LogicalPlan;
 use crate::{
@@ -30,9 +35,6 @@ use arrow::{array::ArrayRef, datatypes::Field};
 use async_trait::async_trait;
 pub use display::DisplayFormatType;
 use futures::stream::Stream;
-use std::fmt::{self, Debug, Display};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
 use std::{any::Any, pin::Pin};
 
 use self::{display::DisplayableExecutionPlan, merge::MergeExec};
@@ -594,6 +596,7 @@ pub mod group_scalar;
 pub mod hash_aggregate;
 pub mod hash_join;
 pub mod hash_utils;
+pub mod json;
 pub mod limit;
 pub mod math_expressions;
 pub mod memory;
@@ -605,6 +608,7 @@ pub mod projection;
 pub mod regex_expressions;
 pub mod repartition;
 pub mod sort;
+pub mod source;
 pub mod string_expressions;
 pub mod type_coercion;
 pub mod udaf;
diff --git a/datafusion/src/physical_plan/source.rs b/datafusion/src/physical_plan/source.rs
new file mode 100644
index 0000000..012405a
--- /dev/null
+++ b/datafusion/src/physical_plan/source.rs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains a `Source` enum represents where the data comes from.
+
+use std::{io::Read, sync::Mutex};
+
+///  Source represents where the data comes from.
+pub(crate) enum Source<R = Box<dyn Read + Send + Sync>> {
+    /// The data comes from partitioned files
+    PartitionedFiles {
+        /// Path to directory containing partitioned files with the same schema
+        path: String,
+        /// The individual files under path
+        filenames: Vec<String>,
+    },
+
+    /// The data comes from anything impl Read trait
+    Reader(Mutex<Option<R>>),
+}
+
+impl<R> std::fmt::Debug for Source<R> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Source::PartitionedFiles { path, filenames } => f
+                .debug_struct("PartitionedFiles")
+                .field("path", path)
+                .field("filenames", filenames)
+                .finish()?,
+            Source::Reader(_) => f.write_str("Reader")?,
+        };
+        Ok(())
+    }
+}
+impl std::fmt::Display for Source {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Source::PartitionedFiles { path, filenames } => {
+                write!(f, "Path({}: [{}])", path, filenames.join(","))
+            }
+            Source::Reader(_) => {
+                write!(f, "Reader(...)")
+            }
+        }
+    }
+}
+
+impl<R> Clone for Source<R> {
+    fn clone(&self) -> Self {
+        match self {
+            Source::PartitionedFiles { path, filenames } => Self::PartitionedFiles {
+                path: path.clone(),
+                filenames: filenames.clone(),
+            },
+            Source::Reader(_) => Self::Reader(Mutex::new(None)),
+        }
+    }
+}
+
+impl<R> Source<R> {
+    /// Path to directory containing partitioned files with the same schema
+    pub fn path(&self) -> &str {
+        match self {
+            Source::PartitionedFiles { path, .. } => path.as_str(),
+            Source::Reader(_) => "",
+        }
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        match self {
+            Source::PartitionedFiles { filenames, .. } => filenames,
+            Source::Reader(_) => &[],
+        }
+    }
+}
diff --git a/datafusion/tests/jsons/1.json b/datafusion/tests/jsons/1.json
new file mode 100644
index 0000000..e6f360f
--- /dev/null
+++ b/datafusion/tests/jsons/1.json
@@ -0,0 +1,4 @@
+{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":"4"}
+{"a":-10, "b":[2.0, 1.3, -6.1], "c":[true, true], "d":"4"}
+{"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"}
+{}
\ No newline at end of file
diff --git a/datafusion/tests/jsons/2.json b/datafusion/tests/jsons/2.json
new file mode 100644
index 0000000..dafd2dd
--- /dev/null
+++ b/datafusion/tests/jsons/2.json
@@ -0,0 +1,12 @@
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":-10, "b":-3.5, "c":true, "d":"4"}
+{"a":2, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":7, "b":-3.5, "c":true, "d":"4"}
+{"a":1, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":5, "b":-3.5, "c":true, "d":"4"}
+{"a":1, "b":0.6, "c":false, "d":"text"}
+{"a":1, "b":2.0, "c":false, "d":"4"}
+{"a":1, "b":-3.5, "c":true, "d":"4"}
+{"a":100000000000000, "b":0.6, "c":false, "d":"text"}
\ No newline at end of file