You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/05 14:08:52 UTC

[arrow-datafusion] branch maint-7.x updated: #2109 schema infer max (#2159)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch maint-7.x
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/maint-7.x by this push:
     new e7a72b47b #2109 schema infer max (#2159)
e7a72b47b is described below

commit e7a72b47b5026c6c4e346e5c9a4284cdd5b17106
Author: Rich <jy...@users.noreply.github.com>
AuthorDate: Tue Apr 5 10:08:48 2022 -0400

    #2109 schema infer max (#2159)
    
    * set default schema infer max record
    
    * fix unrelated issue "error: format argument must be a string literal" during `cargo test`
    
    * fix clippy
    
    same as https://github.com/apache/arrow-datafusion/pull/1885/files which already in master
---
 datafusion/fuzz-utils/src/lib.rs              |  9 +++------
 datafusion/src/datasource/file_format/csv.rs  |  5 +++--
 datafusion/src/datasource/file_format/json.rs | 13 +++++++++++--
 datafusion/src/datasource/file_format/mod.rs  |  3 +++
 datafusion/src/execution/options.rs           |  9 +++++----
 datafusion/tests/sql/mod.rs                   |  2 +-
 6 files changed, 26 insertions(+), 15 deletions(-)

diff --git a/datafusion/fuzz-utils/src/lib.rs b/datafusion/fuzz-utils/src/lib.rs
index e021f55f8..920a9bc8d 100644
--- a/datafusion/fuzz-utils/src/lib.rs
+++ b/datafusion/fuzz-utils/src/lib.rs
@@ -26,7 +26,7 @@ pub use env_logger;
 pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
     batches
         .iter()
-        .map(|batch| {
+        .flat_map(|batch| {
             assert_eq!(batch.num_columns(), 1);
             batch
                 .column(0)
@@ -35,7 +35,6 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
                 .unwrap()
                 .iter()
         })
-        .flatten()
         .collect()
 }
 
@@ -43,8 +42,7 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
 pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i32>> {
     let mut values: Vec<_> = partitions
         .iter()
-        .map(|batches| batches_to_vec(batches).into_iter())
-        .flatten()
+        .flat_map(|batches| batches_to_vec(batches).into_iter())
         .collect();
 
     values.sort_unstable();
@@ -60,7 +58,7 @@ pub fn add_empty_batches(
 
     batches
         .into_iter()
-        .map(|batch| {
+        .flat_map(|batch| {
             // insert 0, or 1 empty batches before and after the current batch
             let empty_batch = RecordBatch::new_empty(schema.clone());
             std::iter::repeat(empty_batch.clone())
@@ -68,6 +66,5 @@ pub fn add_empty_batches(
                 .chain(std::iter::once(batch))
                 .chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
         })
-        .flatten()
         .collect()
 }
diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs
index 6aa0d2123..819e62752 100644
--- a/datafusion/src/datasource/file_format/csv.rs
+++ b/datafusion/src/datasource/file_format/csv.rs
@@ -26,6 +26,7 @@ use async_trait::async_trait;
 use futures::StreamExt;
 
 use super::FileFormat;
+use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
 use crate::error::Result;
 use crate::logical_plan::Expr;
@@ -46,7 +47,7 @@ pub struct CsvFormat {
 impl Default for CsvFormat {
     fn default() -> Self {
         Self {
-            schema_infer_max_rec: None,
+            schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
             has_header: true,
             delimiter: b',',
         }
@@ -55,7 +56,7 @@ impl Default for CsvFormat {
 
 impl CsvFormat {
     /// Set a limit in terms of records to scan to infer the schema
-    /// - default to `None` (no limit)
+    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
     pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
         self.schema_infer_max_rec = max_rec;
         self
diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs
index bdd5ef81d..829ac56fd 100644
--- a/datafusion/src/datasource/file_format/json.rs
+++ b/datafusion/src/datasource/file_format/json.rs
@@ -30,6 +30,7 @@ use futures::StreamExt;
 
 use super::FileFormat;
 use super::FileScanConfig;
+use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
 use crate::error::Result;
 use crate::logical_plan::Expr;
@@ -40,14 +41,22 @@ use crate::physical_plan::Statistics;
 /// The default file extension of json files
 pub const DEFAULT_JSON_EXTENSION: &str = ".json";
 /// New line delimited JSON `FileFormat` implementation.
-#[derive(Debug, Default)]
+#[derive(Debug)]
 pub struct JsonFormat {
     schema_infer_max_rec: Option<usize>,
 }
 
+impl Default for JsonFormat {
+    fn default() -> Self {
+        Self {
+            schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
+        }
+    }
+}
+
 impl JsonFormat {
     /// Set a limit in terms of records to scan to infer the schema
-    /// - defaults to `None` (no limit)
+    /// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
     pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
         self.schema_infer_max_rec = max_rec;
         self
diff --git a/datafusion/src/datasource/file_format/mod.rs b/datafusion/src/datasource/file_format/mod.rs
index 21da2e1e6..46c298a36 100644
--- a/datafusion/src/datasource/file_format/mod.rs
+++ b/datafusion/src/datasource/file_format/mod.rs
@@ -17,6 +17,9 @@
 
 //! Module containing helper methods for the various file formats
 
+/// default max records to scan to infer the schema
+pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
+
 pub mod avro;
 pub mod csv;
 pub mod json;
diff --git a/datafusion/src/execution/options.rs b/datafusion/src/execution/options.rs
index 79b07536a..b39ea3895 100644
--- a/datafusion/src/execution/options.rs
+++ b/datafusion/src/execution/options.rs
@@ -22,6 +22,7 @@ use std::sync::Arc;
 use arrow::datatypes::{Schema, SchemaRef};
 
 use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
+use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::datasource::{
     file_format::{avro::AvroFormat, csv::CsvFormat},
     listing::ListingOptions,
@@ -40,7 +41,7 @@ pub struct CsvReadOptions<'a> {
     /// An optional schema representing the CSV files. If None, CSV reader will try to infer it
     /// based on data in file.
     pub schema: Option<&'a Schema>,
-    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
     pub schema_infer_max_records: usize,
     /// File extension; only files with this extension are selected for data input.
     /// Defaults to ".csv".
@@ -59,7 +60,7 @@ impl<'a> CsvReadOptions<'a> {
         Self {
             has_header: true,
             schema: None,
-            schema_infer_max_records: 1000,
+            schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
             delimiter: b',',
             file_extension: ".csv",
         }
@@ -161,7 +162,7 @@ pub struct NdJsonReadOptions<'a> {
     /// The data source schema.
     pub schema: Option<SchemaRef>,
 
-    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+    /// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
     pub schema_infer_max_records: usize,
 
     /// File extension; only files with this extension are selected for data input.
@@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
     fn default() -> Self {
         Self {
             schema: None,
-            schema_infer_max_records: 1000,
+            schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
             file_extension: DEFAULT_JSON_EXTENSION,
         }
     }
diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs
index 468762ea0..904bc25e5 100644
--- a/datafusion/tests/sql/mod.rs
+++ b/datafusion/tests/sql/mod.rs
@@ -883,7 +883,7 @@ async fn nyc() -> Result<()> {
             },
             _ => unreachable!(),
         },
-        _ => unreachable!(false),
+        _ => unreachable!("{}", false),
     }
 
     Ok(())