You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/24 06:45:08 UTC

[GitHub] [arrow-datafusion] heymind opened a new pull request #404: NdJson support

heymind opened a new pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404


   # Which issue does this PR close?
   
   Closes #103.
   
   # What changes are included in this PR?
   
   This pr creates two pub structs, `NdJsonFile` and `NdJsonExec`.
   
   `NdJsonFile` can be used as a data source ( or table provider ) to load JSON data from files or a reader.
   And `NdJsonExec` represents an execution plan for scanning NdJson data source.
   
   # Are there any user-facing changes?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] heymind commented on a change in pull request #404: Support reading from NdJson formatted data sources

Posted by GitBox <gi...@apache.org>.
heymind commented on a change in pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#discussion_r641913531



##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -509,6 +514,76 @@ pub trait Accumulator: Send + Sync + Debug {
     fn evaluate(&self) -> Result<ScalarValue>;
 }
 
+///  Source represents where the data comes from.
+enum Source<R = Box<dyn Read + Send + Sync>> {

Review comment:
       Ok. A new module named `source.rs` is more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #404: Support reading from NdJson formatted data sources

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#discussion_r640891566



##########
File path: datafusion/src/physical_plan/json.rs
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, ExecutionPlan, Partitioning, RecordBatchStream, Source};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    json,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::{BufReader, Read},
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    pub schema_infer_max_records: usize,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".json".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            schema_infer_max_records: 1000,
+            file_extension: ".json",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl NdJsonExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_new(
+        path: &str,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(NdJsonExec::try_infer_schema(
+                filenames.clone(),
+                Some(options.schema_infer_max_records),
+            )?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            file_extension,
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the CSV file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given CSV dataset
+    pub fn try_infer_schema(
+        mut filenames: Vec<String>,
+        max_records: Option<usize>,
+    ) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        let mut records_to_read = max_records.unwrap_or(usize::MAX);
+        while records_to_read > 0 && !filenames.is_empty() {
+            let file = File::open(filenames.pop().unwrap())?;
+            let mut reader = BufReader::new(file);
+            let iter = ValueIter::new(&mut reader, None);
+            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+                let shoud_take = records_to_read > 0;

Review comment:
       ```suggestion
                   let should_take = records_to_read > 0;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #404: Support reading from NdJson formatted data sources

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#discussion_r640106466



##########
File path: datafusion/src/physical_plan/json.rs
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, ExecutionPlan, Partitioning, RecordBatchStream, Source};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    json,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::{BufReader, Read},
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    pub schema_infer_max_records: usize,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".json".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            schema_infer_max_records: 1000,
+            file_extension: ".json",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl NdJsonExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_new(
+        path: &str,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(NdJsonExec::try_infer_schema(
+                filenames.clone(),
+                Some(options.schema_infer_max_records),
+            )?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            file_extension,
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the CSV file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given CSV dataset
+    pub fn try_infer_schema(
+        mut filenames: Vec<String>,
+        max_records: Option<usize>,
+    ) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        let mut records_to_read = max_records.unwrap_or(usize::MAX);
+        while records_to_read > 0 && !filenames.is_empty() {
+            let file = File::open(filenames.pop().unwrap())?;
+            let mut reader = BufReader::new(file);
+            let iter = ValueIter::new(&mut reader, None);
+            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+                let shoud_take = records_to_read > 0;
+                records_to_read -= 1;
+                shoud_take
+            }))?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for NdJsonExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "NdJsonExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = json::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(NdJsonStream::new(
+                    builder.build(file)?,
+                    self.limit,
+                )))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when CSV comes from a reader"
+                            .to_string(),
+                    ))
+                } else if let Some(rdr) = rdr.lock().unwrap().take() {
+                    Ok(Box::pin(NdJsonStream::new(builder.build(rdr)?, self.limit)))
+                } else {
+                    Err(DataFusionError::Execution(
+                        "Error reading CSV: Data can only be read a single time when the source is a reader"
+                            .to_string(),
+                    ))
+                }
+            }
+        }
+    }
+}
+
+struct NdJsonStream<R: Read> {
+    reader: json::Reader<R>,
+    remind: Option<usize>,

Review comment:
       I wonder if a more specific name would be `remain`?

##########
File path: datafusion/src/physical_plan/json.rs
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, ExecutionPlan, Partitioning, RecordBatchStream, Source};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    json,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::{BufReader, Read},
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    pub schema_infer_max_records: usize,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".json".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            schema_infer_max_records: 1000,
+            file_extension: ".json",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl NdJsonExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_new(
+        path: &str,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(NdJsonExec::try_infer_schema(
+                filenames.clone(),
+                Some(options.schema_infer_max_records),
+            )?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            file_extension,
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the CSV file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given CSV dataset
+    pub fn try_infer_schema(
+        mut filenames: Vec<String>,
+        max_records: Option<usize>,
+    ) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        let mut records_to_read = max_records.unwrap_or(usize::MAX);
+        while records_to_read > 0 && !filenames.is_empty() {
+            let file = File::open(filenames.pop().unwrap())?;
+            let mut reader = BufReader::new(file);
+            let iter = ValueIter::new(&mut reader, None);
+            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {

Review comment:
       FYI @houqp  πŸ‘ 

##########
File path: datafusion/src/physical_plan/json.rs
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, ExecutionPlan, Partitioning, RecordBatchStream, Source};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    json,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::{BufReader, Read},
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    pub schema_infer_max_records: usize,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".json".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            schema_infer_max_records: 1000,
+            file_extension: ".json",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl NdJsonExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_new(
+        path: &str,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(NdJsonExec::try_infer_schema(
+                filenames.clone(),
+                Some(options.schema_infer_max_records),
+            )?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            file_extension,
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the CSV file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given CSV dataset
+    pub fn try_infer_schema(
+        mut filenames: Vec<String>,
+        max_records: Option<usize>,
+    ) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        let mut records_to_read = max_records.unwrap_or(usize::MAX);
+        while records_to_read > 0 && !filenames.is_empty() {
+            let file = File::open(filenames.pop().unwrap())?;
+            let mut reader = BufReader::new(file);
+            let iter = ValueIter::new(&mut reader, None);
+            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+                let shoud_take = records_to_read > 0;
+                records_to_read -= 1;
+                shoud_take
+            }))?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for NdJsonExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "NdJsonExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = json::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(NdJsonStream::new(
+                    builder.build(file)?,
+                    self.limit,
+                )))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when CSV comes from a reader"
+                            .to_string(),
+                    ))
+                } else if let Some(rdr) = rdr.lock().unwrap().take() {
+                    Ok(Box::pin(NdJsonStream::new(builder.build(rdr)?, self.limit)))
+                } else {
+                    Err(DataFusionError::Execution(
+                        "Error reading CSV: Data can only be read a single time when the source is a reader"
+                            .to_string(),
+                    ))
+                }
+            }
+        }
+    }
+}
+
+struct NdJsonStream<R: Read> {
+    reader: json::Reader<R>,
+    remind: Option<usize>,

Review comment:
       I wonder if a more specific name would be `remain`?

##########
File path: datafusion/src/physical_plan/json.rs
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, ExecutionPlan, Partitioning, RecordBatchStream, Source};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    json,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::{BufReader, Read},
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    pub schema_infer_max_records: usize,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".json".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            schema_infer_max_records: 1000,
+            file_extension: ".json",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl NdJsonExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_new(
+        path: &str,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(NdJsonExec::try_infer_schema(
+                filenames.clone(),
+                Some(options.schema_infer_max_records),
+            )?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            file_extension,
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the CSV file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given CSV dataset
+    pub fn try_infer_schema(
+        mut filenames: Vec<String>,
+        max_records: Option<usize>,
+    ) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        let mut records_to_read = max_records.unwrap_or(usize::MAX);
+        while records_to_read > 0 && !filenames.is_empty() {
+            let file = File::open(filenames.pop().unwrap())?;
+            let mut reader = BufReader::new(file);
+            let iter = ValueIter::new(&mut reader, None);
+            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+                let shoud_take = records_to_read > 0;
+                records_to_read -= 1;
+                shoud_take
+            }))?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for NdJsonExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "NdJsonExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = json::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(NdJsonStream::new(
+                    builder.build(file)?,
+                    self.limit,
+                )))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when CSV comes from a reader"
+                            .to_string(),
+                    ))
+                } else if let Some(rdr) = rdr.lock().unwrap().take() {
+                    Ok(Box::pin(NdJsonStream::new(builder.build(rdr)?, self.limit)))
+                } else {
+                    Err(DataFusionError::Execution(
+                        "Error reading CSV: Data can only be read a single time when the source is a reader"
+                            .to_string(),
+                    ))
+                }
+            }
+        }
+    }
+}
+
+struct NdJsonStream<R: Read> {
+    reader: json::Reader<R>,
+    remind: Option<usize>,
+}
+
+impl<R: Read> NdJsonStream<R> {
+    fn new(reader: json::Reader<R>, limit: Option<usize>) -> Self {
+        Self {
+            reader,
+            remind: limit,
+        }
+    }
+}
+
+impl<R: Read + Unpin> Stream for NdJsonStream<R> {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if let Some(remind) = self.remind.as_mut() {
+            if *remind < 1 {
+                return Poll::Ready(None);
+            }
+        }
+
+        Poll::Ready(match self.reader.next() {
+            Ok(Some(item)) => {
+                if let Some(remind) = self.remind.as_mut() {
+                    if *remind >= item.num_rows() {
+                        *remind -= item.num_rows();
+                        Some(Ok(item))
+                    } else {
+                        let len = *remind;
+                        *remind = 0;
+                        Some(Ok(RecordBatch::try_new(
+                            item.schema(),
+                            item.columns()
+                                .iter()
+                                .map(|column| column.slice(0, len))
+                                .collect(),
+                        )?))
+                    }
+                } else {
+                    Some(Ok(item))
+                }
+            }
+            Ok(None) => None,
+            Err(err) => Some(Err(err)),
+        })
+    }
+}
+
+impl<R: Read + Unpin> RecordBatchStream for NdJsonStream<R> {
+    fn schema(&self) -> SchemaRef {
+        self.reader.schema()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use futures::StreamExt;
+
+    const TEST_DATA_BASE: &str = "tests/jsons";
+
+    #[tokio::test]
+    async fn nd_json_exec_file_without_projection() -> Result<()> {
+        use arrow::datatypes::DataType;
+        let path = format!("{}/1.json", TEST_DATA_BASE);
+        let exec = NdJsonExec::try_new(&path, Default::default(), None, 1024, Some(3))?;
+        let inferred_schema = exec.schema();
+        assert_eq!(inferred_schema.fields().len(), 4);
+
+        // a,b,c,d should be inferred
+        inferred_schema.field_with_name("a").unwrap();
+        inferred_schema.field_with_name("b").unwrap();
+        inferred_schema.field_with_name("c").unwrap();
+        inferred_schema.field_with_name("d").unwrap();
+
+        assert_eq!(
+            inferred_schema.field_with_name("a").unwrap().data_type(),
+            &DataType::Int64
+        );
+        assert!(matches!(
+            inferred_schema.field_with_name("b").unwrap().data_type(),
+            DataType::List(_)
+        ));
+        assert_eq!(
+            inferred_schema.field_with_name("d").unwrap().data_type(),
+            &DataType::Utf8
+        );
+
+        let mut it = exec.execute(0).await?;
+        let batch = it.next().await.unwrap()?;
+
+        assert_eq!(batch.num_rows(), 3);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn nd_json_exec_file_projection() -> Result<()> {
+        let path = format!("{}/1.json", TEST_DATA_BASE);
+        let exec =
+            NdJsonExec::try_new(&path, Default::default(), Some(vec![0, 2]), 1024, None)?;
+        let inferred_schema = exec.schema();
+        assert_eq!(inferred_schema.fields().len(), 2);
+
+        inferred_schema.field_with_name("a").unwrap();
+        inferred_schema.field_with_name("b").unwrap_err();
+        inferred_schema.field_with_name("c").unwrap();
+        inferred_schema.field_with_name("d").unwrap_err();
+
+        let mut it = exec.execute(0).await?;
+        let batch = it.next().await.unwrap()?;
+
+        assert_eq!(batch.num_rows(), 4);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn nd_json_exec_from_reader() -> Result<()> {
+        let content = r#"{"a":"aaa", "b":[2.0, 1.3, -6.1], "c":[false, true], "d":"4"}
+{"a":"bbb", "b":[2.0, 1.3, -6.1], "c":[true, true], "d":"4"}"#;
+        let cur = std::io::Cursor::new(content);
+        let mut bufrdr = std::io::BufReader::new(cur);
+        let schema =
+            arrow::json::reader::infer_json_schema_from_seekable(&mut bufrdr, None)?;
+        let exec = NdJsonExec::try_new_from_reader(
+            bufrdr,
+            NdJsonReadOptions {
+                schema: Some(Arc::new(schema)),
+                ..Default::default()
+            },
+            None,
+            1024,
+            Some(1),
+        )?;
+
+        let mut it = exec.execute(0).await?;

Review comment:
       I have the same comment about verifying the output here as above. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #404: NdJson support

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#issuecomment-846871923


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#404](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (bac1164) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/aeed776986da6813a4e1c54d20e8bf0eb363d706?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aeed776) will **decrease** coverage by `0.13%`.
   > The diff coverage is `60.75%`.
   
   > :exclamation: Current head bac1164 differs from pull request most recent head 5632a90. Consider uploading reports for the commit 5632a90 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/404/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #404      +/-   ##
   ==========================================
   - Coverage   74.94%   74.80%   -0.14%     
   ==========================================
     Files         146      148       +2     
     Lines       24344    24611     +267     
   ==========================================
   + Hits        18244    18411     +167     
   - Misses       6100     6200     +100     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [datafusion/src/datasource/csv.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvZGF0YXNvdXJjZS9jc3YucnM=) | `72.81% <ΓΈ> (ΓΈ)` | |
   | [datafusion/src/physical\_plan/csv.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9jc3YucnM=) | `81.41% <ΓΈ> (+4.03%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tb2QucnM=) | `73.45% <42.30%> (-9.31%)` | :arrow_down: |
   | [datafusion/src/datasource/json.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvZGF0YXNvdXJjZS9qc29uLnJz) | `52.30% <52.30%> (ΓΈ)` | |
   | [datafusion/src/physical\_plan/json.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9qc29uLnJz) | `65.84% <65.84%> (ΓΈ)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [aeed776...5632a90](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #404: Support reading from NdJson formatted data sources

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#discussion_r640893148



##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -509,6 +514,76 @@ pub trait Accumulator: Send + Sync + Debug {
     fn evaluate(&self) -> Result<ScalarValue>;
 }
 
+///  Source represents where the data comes from.
+enum Source<R = Box<dyn Read + Send + Sync>> {

Review comment:
       Might this be better in a new module, e.g. sources? WDYT @alamb 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #404: Support reading from NdJson formatted data sources

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#discussion_r640937844



##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -509,6 +514,76 @@ pub trait Accumulator: Send + Sync + Debug {
     fn evaluate(&self) -> Result<ScalarValue>;
 }
 
+///  Source represents where the data comes from.
+enum Source<R = Box<dyn Read + Send + Sync>> {

Review comment:
       I think that would improve the code, but I also think it would be fine to move the code into its own module as a follow on PR, depending on @heymind 's preference




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter edited a comment on pull request #404: NdJson support

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#issuecomment-846871923


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#404](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (97e1fe9) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/aeed776986da6813a4e1c54d20e8bf0eb363d706?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aeed776) will **decrease** coverage by `0.22%`.
   > The diff coverage is `58.51%`.
   
   > :exclamation: Current head 97e1fe9 differs from pull request most recent head d07b0b0. Consider uploading reports for the commit d07b0b0 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/404/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #404      +/-   ##
   ==========================================
   - Coverage   74.94%   74.71%   -0.23%     
   ==========================================
     Files         146      148       +2     
     Lines       24344    24782     +438     
   ==========================================
   + Hits        18244    18516     +272     
   - Misses       6100     6266     +166     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [...ta/rust/core/src/execution\_plans/shuffle\_reader.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9leGVjdXRpb25fcGxhbnMvc2h1ZmZsZV9yZWFkZXIucnM=) | `0.00% <0.00%> (ΓΈ)` | |
   | [datafusion-cli/src/main.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi1jbGkvc3JjL21haW4ucnM=) | `0.00% <0.00%> (ΓΈ)` | |
   | [datafusion/src/datasource/csv.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvZGF0YXNvdXJjZS9jc3YucnM=) | `72.81% <ΓΈ> (ΓΈ)` | |
   | [datafusion/src/physical\_plan/csv.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9jc3YucnM=) | `81.41% <ΓΈ> (+4.03%)` | :arrow_up: |
   | [benchmarks/src/bin/tpch.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmVuY2htYXJrcy9zcmMvYmluL3RwY2gucnM=) | `30.84% <11.11%> (+0.01%)` | :arrow_up: |
   | [datafusion/src/physical\_optimizer/pruning.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfb3B0aW1pemVyL3BydW5pbmcucnM=) | `89.73% <38.46%> (-0.88%)` | :arrow_down: |
   | [datafusion/src/physical\_plan/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tb2QucnM=) | `73.45% <42.30%> (-9.31%)` | :arrow_down: |
   | [datafusion/src/datasource/json.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvZGF0YXNvdXJjZS9qc29uLnJz) | `52.30% <52.30%> (ΓΈ)` | |
   | [datafusion-cli/src/print\_format.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi1jbGkvc3JjL3ByaW50X2Zvcm1hdC5ycw==) | `84.44% <58.82%> (-5.97%)` | :arrow_down: |
   | [datafusion/src/scalar.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvc2NhbGFyLnJz) | `58.66% <63.26%> (+3.18%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/arrow-datafusion/pull/404/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [aeed776...d07b0b0](https://codecov.io/gh/apache/arrow-datafusion/pull/404?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #404: NdJson support

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#issuecomment-847991131


   Thank you for the contribution @heymind  -- I will try and review this later today. FYI @houqp @andygrove @nevi-me 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] heymind commented on a change in pull request #404: Support reading from NdJson formatted data sources

Posted by GitBox <gi...@apache.org>.
heymind commented on a change in pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#discussion_r641913372



##########
File path: datafusion/src/physical_plan/json.rs
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, ExecutionPlan, Partitioning, RecordBatchStream, Source};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    json,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::{BufReader, Read},
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    pub schema_infer_max_records: usize,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".json".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            schema_infer_max_records: 1000,
+            file_extension: ".json",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl NdJsonExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_new(
+        path: &str,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(NdJsonExec::try_infer_schema(
+                filenames.clone(),
+                Some(options.schema_infer_max_records),
+            )?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            file_extension,
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the CSV file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given CSV dataset
+    pub fn try_infer_schema(
+        mut filenames: Vec<String>,
+        max_records: Option<usize>,
+    ) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        let mut records_to_read = max_records.unwrap_or(usize::MAX);
+        while records_to_read > 0 && !filenames.is_empty() {
+            let file = File::open(filenames.pop().unwrap())?;
+            let mut reader = BufReader::new(file);
+            let iter = ValueIter::new(&mut reader, None);
+            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+                let shoud_take = records_to_read > 0;
+                records_to_read -= 1;
+                shoud_take
+            }))?;
+            schemas.push(schema);
+        }
+
+        Ok(Schema::try_merge(schemas)?)
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for NdJsonExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(match &self.source {
+            Source::PartitionedFiles { filenames, .. } => filenames.len(),
+            Source::Reader(_) => 1,
+        })
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        Vec::new()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if !children.is_empty() {
+            Err(DataFusionError::Internal(format!(
+                "Children cannot be replaced in {:?}",
+                self
+            )))
+        } else if let Source::PartitionedFiles { filenames, path } = &self.source {
+            Ok(Arc::new(Self {
+                source: Source::PartitionedFiles {
+                    filenames: filenames.clone(),
+                    path: path.clone(),
+                },
+                schema: self.schema.clone(),
+                projection: self.projection.clone(),
+                projected_schema: self.projected_schema.clone(),
+                batch_size: self.batch_size,
+                limit: self.limit,
+                file_extension: self.file_extension.clone(),
+            }))
+        } else {
+            Err(DataFusionError::Internal(
+                "NdJsonExec with reader source cannot be used with `with_new_children`"
+                    .to_string(),
+            ))
+        }
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<super::SendableRecordBatchStream> {
+        let mut builder = json::ReaderBuilder::new()
+            .with_schema(self.schema.clone())
+            .with_batch_size(self.batch_size);
+        if let Some(proj) = &self.projection {
+            builder = builder.with_projection(
+                proj.iter()
+                    .map(|col_idx| self.schema.field(*col_idx).name())
+                    .cloned()
+                    .collect(),
+            );
+        }
+        match &self.source {
+            Source::PartitionedFiles { filenames, .. } => {
+                let file = File::open(&filenames[partition])?;
+
+                Ok(Box::pin(NdJsonStream::new(
+                    builder.build(file)?,
+                    self.limit,
+                )))
+            }
+            Source::Reader(rdr) => {
+                if partition != 0 {
+                    Err(DataFusionError::Internal(
+                        "Only partition 0 is valid when CSV comes from a reader"
+                            .to_string(),
+                    ))
+                } else if let Some(rdr) = rdr.lock().unwrap().take() {
+                    Ok(Box::pin(NdJsonStream::new(builder.build(rdr)?, self.limit)))
+                } else {
+                    Err(DataFusionError::Execution(
+                        "Error reading CSV: Data can only be read a single time when the source is a reader"
+                            .to_string(),
+                    ))
+                }
+            }
+        }
+    }
+}
+
+struct NdJsonStream<R: Read> {
+    reader: json::Reader<R>,
+    remind: Option<usize>,

Review comment:
       That's a mistake. I confused the meaning of these two words. πŸ’”




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #404: Support reading from NdJson formatted data sources

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #404: Support reading from NdJson formatted data sources

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#discussion_r640891709



##########
File path: datafusion/src/physical_plan/json.rs
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, ExecutionPlan, Partitioning, RecordBatchStream, Source};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    json,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::{BufReader, Read},
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    pub schema_infer_max_records: usize,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".json".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            schema_infer_max_records: 1000,
+            file_extension: ".json",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl NdJsonExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_new(
+        path: &str,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(NdJsonExec::try_infer_schema(
+                filenames.clone(),
+                Some(options.schema_infer_max_records),
+            )?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            file_extension,
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the CSV file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given CSV dataset
+    pub fn try_infer_schema(
+        mut filenames: Vec<String>,
+        max_records: Option<usize>,
+    ) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        let mut records_to_read = max_records.unwrap_or(usize::MAX);
+        while records_to_read > 0 && !filenames.is_empty() {
+            let file = File::open(filenames.pop().unwrap())?;
+            let mut reader = BufReader::new(file);
+            let iter = ValueIter::new(&mut reader, None);
+            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+                let shoud_take = records_to_read > 0;
+                records_to_read -= 1;
+                shoud_take

Review comment:
       ```suggestion
                   should_take
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org