You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/05 14:08:52 UTC
[arrow-datafusion] branch maint-7.x updated: #2109 schema infer max (#2159)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch maint-7.x
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/maint-7.x by this push:
new e7a72b47b #2109 schema infer max (#2159)
e7a72b47b is described below
commit e7a72b47b5026c6c4e346e5c9a4284cdd5b17106
Author: Rich <jy...@users.noreply.github.com>
AuthorDate: Tue Apr 5 10:08:48 2022 -0400
#2109 schema infer max (#2159)
* set default schema infer max record
* fix unrelated issue "error: format argument must be a string literal" during `cargo test`
* fix clippy
same as https://github.com/apache/arrow-datafusion/pull/1885/files which already in master
---
datafusion/fuzz-utils/src/lib.rs | 9 +++------
datafusion/src/datasource/file_format/csv.rs | 5 +++--
datafusion/src/datasource/file_format/json.rs | 13 +++++++++++--
datafusion/src/datasource/file_format/mod.rs | 3 +++
datafusion/src/execution/options.rs | 9 +++++----
datafusion/tests/sql/mod.rs | 2 +-
6 files changed, 26 insertions(+), 15 deletions(-)
diff --git a/datafusion/fuzz-utils/src/lib.rs b/datafusion/fuzz-utils/src/lib.rs
index e021f55f8..920a9bc8d 100644
--- a/datafusion/fuzz-utils/src/lib.rs
+++ b/datafusion/fuzz-utils/src/lib.rs
@@ -26,7 +26,7 @@ pub use env_logger;
pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
batches
.iter()
- .map(|batch| {
+ .flat_map(|batch| {
assert_eq!(batch.num_columns(), 1);
batch
.column(0)
@@ -35,7 +35,6 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
.unwrap()
.iter()
})
- .flatten()
.collect()
}
@@ -43,8 +42,7 @@ pub fn batches_to_vec(batches: &[RecordBatch]) -> Vec<Option<i32>> {
pub fn partitions_to_sorted_vec(partitions: &[Vec<RecordBatch>]) -> Vec<Option<i32>> {
let mut values: Vec<_> = partitions
.iter()
- .map(|batches| batches_to_vec(batches).into_iter())
- .flatten()
+ .flat_map(|batches| batches_to_vec(batches).into_iter())
.collect();
values.sort_unstable();
@@ -60,7 +58,7 @@ pub fn add_empty_batches(
batches
.into_iter()
- .map(|batch| {
+ .flat_map(|batch| {
// insert 0, or 1 empty batches before and after the current batch
let empty_batch = RecordBatch::new_empty(schema.clone());
std::iter::repeat(empty_batch.clone())
@@ -68,6 +66,5 @@ pub fn add_empty_batches(
.chain(std::iter::once(batch))
.chain(std::iter::repeat(empty_batch).take(rng.gen_range(0..2)))
})
- .flatten()
.collect()
}
diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs
index 6aa0d2123..819e62752 100644
--- a/datafusion/src/datasource/file_format/csv.rs
+++ b/datafusion/src/datasource/file_format/csv.rs
@@ -26,6 +26,7 @@ use async_trait::async_trait;
use futures::StreamExt;
use super::FileFormat;
+use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
@@ -46,7 +47,7 @@ pub struct CsvFormat {
impl Default for CsvFormat {
fn default() -> Self {
Self {
- schema_infer_max_rec: None,
+ schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
has_header: true,
delimiter: b',',
}
@@ -55,7 +56,7 @@ impl Default for CsvFormat {
impl CsvFormat {
/// Set a limit in terms of records to scan to infer the schema
- /// - default to `None` (no limit)
+ /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
self.schema_infer_max_rec = max_rec;
self
diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs
index bdd5ef81d..829ac56fd 100644
--- a/datafusion/src/datasource/file_format/json.rs
+++ b/datafusion/src/datasource/file_format/json.rs
@@ -30,6 +30,7 @@ use futures::StreamExt;
use super::FileFormat;
use super::FileScanConfig;
+use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
@@ -40,14 +41,22 @@ use crate::physical_plan::Statistics;
/// The default file extension of json files
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
/// New line delimited JSON `FileFormat` implementation.
-#[derive(Debug, Default)]
+#[derive(Debug)]
pub struct JsonFormat {
schema_infer_max_rec: Option<usize>,
}
+impl Default for JsonFormat {
+ fn default() -> Self {
+ Self {
+ schema_infer_max_rec: Some(DEFAULT_SCHEMA_INFER_MAX_RECORD),
+ }
+ }
+}
+
impl JsonFormat {
/// Set a limit in terms of records to scan to infer the schema
- /// - defaults to `None` (no limit)
+ /// - defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
self.schema_infer_max_rec = max_rec;
self
diff --git a/datafusion/src/datasource/file_format/mod.rs b/datafusion/src/datasource/file_format/mod.rs
index 21da2e1e6..46c298a36 100644
--- a/datafusion/src/datasource/file_format/mod.rs
+++ b/datafusion/src/datasource/file_format/mod.rs
@@ -17,6 +17,9 @@
//! Module containing helper methods for the various file formats
+/// default max records to scan to infer the schema
+pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
+
pub mod avro;
pub mod csv;
pub mod json;
diff --git a/datafusion/src/execution/options.rs b/datafusion/src/execution/options.rs
index 79b07536a..b39ea3895 100644
--- a/datafusion/src/execution/options.rs
+++ b/datafusion/src/execution/options.rs
@@ -22,6 +22,7 @@ use std::sync::Arc;
use arrow::datatypes::{Schema, SchemaRef};
use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
+use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::{
file_format::{avro::AvroFormat, csv::CsvFormat},
listing::ListingOptions,
@@ -40,7 +41,7 @@ pub struct CsvReadOptions<'a> {
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
- /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+ /// Max number of rows to read from CSV files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data input.
/// Defaults to ".csv".
@@ -59,7 +60,7 @@ impl<'a> CsvReadOptions<'a> {
Self {
has_header: true,
schema: None,
- schema_infer_max_records: 1000,
+ schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
delimiter: b',',
file_extension: ".csv",
}
@@ -161,7 +162,7 @@ pub struct NdJsonReadOptions<'a> {
/// The data source schema.
pub schema: Option<SchemaRef>,
- /// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
+ /// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data input.
@@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
fn default() -> Self {
Self {
schema: None,
- schema_infer_max_records: 1000,
+ schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
file_extension: DEFAULT_JSON_EXTENSION,
}
}
diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs
index 468762ea0..904bc25e5 100644
--- a/datafusion/tests/sql/mod.rs
+++ b/datafusion/tests/sql/mod.rs
@@ -883,7 +883,7 @@ async fn nyc() -> Result<()> {
},
_ => unreachable!(),
},
- _ => unreachable!(false),
+ _ => unreachable!("{}", false),
}
Ok(())