You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/27 19:01:25 UTC

[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #404: Support reading from NdJson formatted data sources

Dandandan commented on a change in pull request #404:
URL: https://github.com/apache/arrow-datafusion/pull/404#discussion_r640891566



##########
File path: datafusion/src/physical_plan/json.rs
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading line-delimited JSON files
+use async_trait::async_trait;
+use futures::Stream;
+
+use super::{common, ExecutionPlan, Partitioning, RecordBatchStream, Source};
+use crate::error::{DataFusionError, Result};
+use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    error::Result as ArrowResult,
+    json,
+    record_batch::RecordBatch,
+};
+use std::fs::File;
+use std::{any::Any, io::Seek};
+use std::{
+    io::{BufReader, Read},
+    pin::Pin,
+    sync::{Arc, Mutex},
+    task::{Context, Poll},
+};
+
+/// Line-delimited JSON read options
+#[derive(Clone)]
+pub struct NdJsonReadOptions<'a> {
+    /// The data source schema.
+    pub schema: Option<SchemaRef>,
+
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    pub schema_infer_max_records: usize,
+
+    /// File extension; only files with this extension are selected for data input.
+    /// Defaults to ".json".
+    pub file_extension: &'a str,
+}
+
+impl<'a> Default for NdJsonReadOptions<'a> {
+    fn default() -> Self {
+        Self {
+            schema: None,
+            schema_infer_max_records: 1000,
+            file_extension: ".json",
+        }
+    }
+}
+
+trait SeekRead: Read + Seek {}
+
+impl<T: Seek + Read> SeekRead for T {}
+/// Execution plan for scanning NdJson data source
+#[derive(Debug)]
+pub struct NdJsonExec {
+    source: Source<Box<dyn SeekRead + Send + Sync>>,
+    schema: SchemaRef,
+    projection: Option<Vec<usize>>,
+    projected_schema: SchemaRef,
+    file_extension: String,
+    batch_size: usize,
+    limit: Option<usize>,
+}
+
+impl NdJsonExec {
+    /// Create a new execution plan for reading from a path
+    pub fn try_new(
+        path: &str,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let file_extension = options.file_extension.to_string();
+
+        let filenames = common::build_file_list(path, &file_extension)?;
+
+        if filenames.is_empty() {
+            return Err(DataFusionError::Execution(format!(
+                "No files found at {path} with file extension {file_extension}",
+                path = path,
+                file_extension = file_extension.as_str()
+            )));
+        }
+
+        let schema = match options.schema {
+            Some(s) => s,
+            None => Arc::new(NdJsonExec::try_infer_schema(
+                filenames.clone(),
+                Some(options.schema_infer_max_records),
+            )?),
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::PartitionedFiles {
+                path: path.to_string(),
+                filenames,
+            },
+            schema,
+            file_extension,
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+    /// Create a new execution plan for reading from a reader
+    pub fn try_new_from_reader(
+        reader: impl Read + Seek + Send + Sync + 'static,
+        options: NdJsonReadOptions,
+        projection: Option<Vec<usize>>,
+        batch_size: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let schema = match options.schema {
+            Some(s) => s,
+            None => {
+                return Err(DataFusionError::Execution(
+                    "The schema must be provided in options when reading from a reader"
+                        .to_string(),
+                ));
+            }
+        };
+
+        let projected_schema = match &projection {
+            None => schema.clone(),
+            Some(p) => Arc::new(Schema::new(
+                p.iter().map(|i| schema.field(*i).clone()).collect(),
+            )),
+        };
+
+        Ok(Self {
+            source: Source::Reader(Mutex::new(Some(Box::new(reader)))),
+            schema,
+            file_extension: String::new(),
+            projection,
+            projected_schema,
+            batch_size,
+            limit,
+        })
+    }
+
+    /// Path to directory containing partitioned CSV files with the same schema
+    pub fn path(&self) -> &str {
+        self.source.path()
+    }
+
+    /// The individual files under path
+    pub fn filenames(&self) -> &[String] {
+        self.source.filenames()
+    }
+
+    /// File extension
+    pub fn file_extension(&self) -> &str {
+        &self.file_extension
+    }
+
+    /// Get the schema of the CSV file
+    pub fn file_schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Optional projection for which columns to load
+    pub fn projection(&self) -> Option<&Vec<usize>> {
+        self.projection.as_ref()
+    }
+
+    /// Batch size
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Limit
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
+    /// Infer schema for given CSV dataset
+    pub fn try_infer_schema(
+        mut filenames: Vec<String>,
+        max_records: Option<usize>,
+    ) -> Result<Schema> {
+        let mut schemas = Vec::new();
+        let mut records_to_read = max_records.unwrap_or(usize::MAX);
+        while records_to_read > 0 && !filenames.is_empty() {
+            let file = File::open(filenames.pop().unwrap())?;
+            let mut reader = BufReader::new(file);
+            let iter = ValueIter::new(&mut reader, None);
+            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+                let shoud_take = records_to_read > 0;

Review comment:
       ```suggestion
                   let should_take = records_to_read > 0;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org