You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/30 11:33:57 UTC
[arrow-datafusion] branch master updated: FilePartition and
PartitionedFile for scanning flexibility (#932)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 8a085fc FilePartition and PartitionedFile for scanning flexibility (#932)
8a085fc is described below
commit 8a085fc2701f04381cf26bc8ea925339c68cf6e4
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Mon Aug 30 19:33:50 2021 +0800
FilePartition and PartitionedFile for scanning flexibility (#932)
* FilePartition and partitionedFile for scanning flexibility
* clippy
* remove schema from partitioned file
* ballista logical parquet table
* ballista physical parquet exec
* resolve comments
* resolve comments
---
ballista/rust/core/proto/ballista.proto | 33 +-
.../rust/core/src/serde/logical_plan/from_proto.rs | 72 ++-
.../rust/core/src/serde/logical_plan/to_proto.rs | 77 +++-
.../core/src/serde/physical_plan/from_proto.rs | 40 +-
.../rust/core/src/serde/physical_plan/to_proto.rs | 23 +-
ballista/rust/scheduler/src/lib.rs | 33 +-
datafusion/src/datasource/mod.rs | 256 +++++++++++
datafusion/src/datasource/parquet.rs | 330 ++++++++++++--
datafusion/src/physical_optimizer/repartition.rs | 11 +-
datafusion/src/physical_plan/parquet.rs | 488 ++++-----------------
10 files changed, 869 insertions(+), 494 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index c184de3..45ff6c5 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -271,12 +271,28 @@ message CsvTableScanNode {
repeated LogicalExprNode filters = 8;
}
+message Statistics {
+ int64 num_rows = 1;
+ int64 total_byte_size = 2;
+ repeated ColumnStats column_stats = 3;
+}
+
+message PartitionedFile {
+ string path = 1;
+ Statistics statistics = 2;
+}
+
+message TableDescriptor {
+ string path = 1;
+ repeated PartitionedFile partition_files = 2;
+ Schema schema = 3;
+}
+
message ParquetTableScanNode {
string table_name = 1;
- string path = 2;
+ TableDescriptor table_desc = 2;
ProjectionColumns projection = 3;
- Schema schema = 4;
- repeated LogicalExprNode filters = 5;
+ repeated LogicalExprNode filters = 4;
}
message ProjectionNode {
@@ -567,10 +583,15 @@ message FilterExecNode {
PhysicalExprNode expr = 2;
}
+message ParquetPartition {
+ uint32 index = 1;
+ repeated PartitionedFile files = 2;
+}
+
message ParquetScanExecNode {
- repeated string filename = 1;
- repeated uint32 projection = 2;
- uint32 num_partitions = 3;
+ repeated ParquetPartition partitions = 1;
+ Schema schema = 2;
+ repeated uint32 projection = 3;
uint32 batch_size = 4;
}
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 14fba06..fc4ac2c 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -21,6 +21,8 @@ use crate::error::BallistaError;
use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use crate::{convert_box_required, convert_required};
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
+use datafusion::datasource::parquet::{ParquetTable, ParquetTableDescriptor};
+use datafusion::datasource::{PartitionedFile, TableDescriptor};
use datafusion::logical_plan::window_frames::{
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
@@ -134,10 +136,11 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.map_err(|e| e.into())
}
LogicalPlanType::ParquetScan(scan) => {
+ let descriptor: TableDescriptor = convert_required!(scan.table_desc)?;
let projection = match scan.projection.as_ref() {
None => None,
Some(columns) => {
- let schema: Schema = convert_required!(scan.schema)?;
+ let schema = descriptor.schema.clone();
let r: Result<Vec<usize>, _> = columns
.columns
.iter()
@@ -154,11 +157,16 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
Some(r?)
}
};
- LogicalPlanBuilder::scan_parquet_with_name(
- &scan.path,
- projection,
+
+ let parquet_table = ParquetTable::try_new_with_desc(
+ Arc::new(ParquetTableDescriptor { descriptor }),
24,
+ true,
+ )?;
+ LogicalPlanBuilder::scan(
&scan.table_name,
+ Arc::new(parquet_table),
+ projection,
)? //TODO remove hard-coded max_partitions
.build()
.map_err(|e| e.into())
@@ -301,6 +309,60 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
}
}
+impl TryInto<TableDescriptor> for &protobuf::TableDescriptor {
+ type Error = BallistaError;
+
+ fn try_into(self) -> Result<TableDescriptor, Self::Error> {
+ let partition_files = self
+ .partition_files
+ .iter()
+ .map(|f| f.try_into())
+ .collect::<Result<Vec<PartitionedFile>, _>>()?;
+ let schema = convert_required!(self.schema)?;
+ Ok(TableDescriptor {
+ path: self.path.to_owned(),
+ partition_files,
+ schema: Arc::new(schema),
+ })
+ }
+}
+
+impl TryInto<PartitionedFile> for &protobuf::PartitionedFile {
+ type Error = BallistaError;
+
+ fn try_into(self) -> Result<PartitionedFile, Self::Error> {
+ let statistics = convert_required!(self.statistics)?;
+ Ok(PartitionedFile {
+ path: self.path.clone(),
+ statistics,
+ })
+ }
+}
+
+impl From<&protobuf::ColumnStats> for ColumnStatistics {
+ fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
+ ColumnStatistics {
+ null_count: Some(cs.null_count as usize),
+ max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()),
+ min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()),
+ distinct_count: Some(cs.distinct_count as usize),
+ }
+ }
+}
+
+impl TryInto<Statistics> for &protobuf::Statistics {
+ type Error = BallistaError;
+
+ fn try_into(self) -> Result<Statistics, Self::Error> {
+ let column_statistics = self.column_stats.iter().map(|s| s.into()).collect();
+ Ok(Statistics {
+ num_rows: Some(self.num_rows as usize),
+ total_byte_size: Some(self.total_byte_size as usize),
+ column_statistics: Some(column_statistics),
+ })
+ }
+}
+
impl From<&protobuf::Column> for Column {
fn from(c: &protobuf::Column) -> Column {
let c = c.clone();
@@ -1114,6 +1176,8 @@ impl TryInto<Field> for &protobuf::Field {
}
}
+use crate::serde::protobuf::ColumnStats;
+use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
use datafusion::physical_plan::{aggregates, windows};
use datafusion::prelude::{
array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 5877ced..aa7a973 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -22,8 +22,11 @@
use super::super::proto_error;
use crate::datasource::DfTableAdapter;
use crate::serde::{protobuf, BallistaError};
-use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
-use datafusion::datasource::CsvFile;
+use datafusion::arrow::datatypes::{
+ DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
+};
+use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
+use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
use datafusion::logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, Expr, JoinConstraint, JoinType, LogicalPlan,
@@ -253,6 +256,58 @@ impl TryInto<DataType> for &protobuf::ArrowType {
}
}
+impl From<&ColumnStatistics> for protobuf::ColumnStats {
+ fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats {
+ protobuf::ColumnStats {
+ min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()),
+ max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()),
+ null_count: cs.null_count.map(|n| n as u32).unwrap_or(0),
+ distinct_count: cs.distinct_count.map(|n| n as u32).unwrap_or(0),
+ }
+ }
+}
+
+impl From<&Statistics> for protobuf::Statistics {
+ fn from(s: &Statistics) -> protobuf::Statistics {
+ let none_value = -1_i64;
+ let column_stats = match &s.column_statistics {
+ None => vec![],
+ Some(column_stats) => column_stats.iter().map(|s| s.into()).collect(),
+ };
+ protobuf::Statistics {
+ num_rows: s.num_rows.map(|n| n as i64).unwrap_or(none_value),
+ total_byte_size: s.total_byte_size.map(|n| n as i64).unwrap_or(none_value),
+ column_stats,
+ }
+ }
+}
+
+impl From<&PartitionedFile> for protobuf::PartitionedFile {
+ fn from(pf: &PartitionedFile) -> protobuf::PartitionedFile {
+ protobuf::PartitionedFile {
+ path: pf.path.clone(),
+ statistics: Some((&pf.statistics).into()),
+ }
+ }
+}
+
+impl TryFrom<TableDescriptor> for protobuf::TableDescriptor {
+ type Error = BallistaError;
+
+ fn try_from(desc: TableDescriptor) -> Result<protobuf::TableDescriptor, Self::Error> {
+ let partition_files: Vec<protobuf::PartitionedFile> =
+ desc.partition_files.iter().map(|pf| pf.into()).collect();
+
+ let schema: protobuf::Schema = desc.schema.into();
+
+ Ok(protobuf::TableDescriptor {
+ path: desc.path,
+ partition_files,
+ schema: Some(schema),
+ })
+ }
+}
+
impl TryInto<DataType> for &Box<protobuf::List> {
type Error = BallistaError;
fn try_into(self) -> Result<DataType, Self::Error> {
@@ -706,13 +761,14 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
.collect::<Result<Vec<_>, _>>()?;
if let Some(parquet) = source.downcast_ref::<ParquetTable>() {
+ let table_desc: protobuf::TableDescriptor =
+ parquet.desc.descriptor.clone().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ParquetScan(
protobuf::ParquetTableScanNode {
table_name: table_name.to_owned(),
- path: parquet.path().to_owned(),
+ table_desc: Some(table_desc),
projection,
- schema: Some(schema),
filters,
},
)),
@@ -1262,6 +1318,19 @@ impl Into<protobuf::Schema> for &Schema {
}
}
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::Schema> for SchemaRef {
+ fn into(self) -> protobuf::Schema {
+ protobuf::Schema {
+ columns: self
+ .fields()
+ .iter()
+ .map(protobuf::Field::from)
+ .collect::<Vec<_>>(),
+ }
+ }
+}
+
impl From<&datafusion::logical_plan::DFField> for protobuf::DfField {
fn from(f: &datafusion::logical_plan::DFField) -> protobuf::DfField {
protobuf::DfField {
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index d37940e..522bac2 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -34,6 +34,8 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
+use datafusion::datasource::datasource::Statistics;
+use datafusion::datasource::FilePartition;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
@@ -44,6 +46,8 @@ use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunc
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
+use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion::physical_plan::parquet::ParquetPartition;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
@@ -129,17 +133,23 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
)?))
}
PhysicalPlanType::ParquetScan(scan) => {
+ let partitions = scan
+ .partitions
+ .iter()
+ .map(|p| p.try_into())
+ .collect::<Result<Vec<ParquetPartition>, _>>()?;
+ let schema = Arc::new(convert_required!(scan.schema)?);
let projection = scan.projection.iter().map(|i| *i as usize).collect();
- let filenames: Vec<&str> =
- scan.filename.iter().map(|s| s.as_str()).collect();
- Ok(Arc::new(ParquetExec::try_from_files(
- &filenames,
+ Ok(Arc::new(ParquetExec::new(
+ partitions,
+ schema,
Some(projection),
+ Statistics::default(),
+ ExecutionPlanMetricsSet::new(),
None,
scan.batch_size as usize,
- scan.num_partitions as usize,
None,
- )?))
+ )))
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> =
@@ -470,6 +480,23 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
}
+impl TryInto<ParquetPartition> for &protobuf::ParquetPartition {
+ type Error = BallistaError;
+
+ fn try_into(self) -> Result<ParquetPartition, Self::Error> {
+ let files = self
+ .files
+ .iter()
+ .map(|f| f.try_into())
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(ParquetPartition::new(
+ files,
+ self.index as usize,
+ ExecutionPlanMetricsSet::new(),
+ ))
+ }
+}
+
impl From<&protobuf::PhysicalColumn> for Column {
fn from(c: &protobuf::PhysicalColumn) -> Column {
Column::new(&c.name, c.index as usize)
@@ -620,6 +647,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
+
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 8d8f917..e7d4ac6 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -38,7 +38,7 @@ use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::AggregateMode;
use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
-use datafusion::physical_plan::parquet::ParquetExec;
+use datafusion::physical_plan::parquet::{ParquetExec, ParquetPartition};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::{
@@ -268,22 +268,19 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
)),
})
} else if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
- let filenames = exec
- .partitions()
- .iter()
- .flat_map(|part| part.filenames().to_owned())
- .collect();
+ let partitions = exec.partitions().iter().map(|p| p.into()).collect();
+
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
protobuf::ParquetScanExecNode {
- filename: filenames,
+ partitions,
+ schema: Some(exec.schema.as_ref().into()),
projection: exec
.projection()
.as_ref()
.iter()
.map(|n| *n as u32)
.collect(),
- num_partitions: exec.partitions().len() as u32,
batch_size: exec.batch_size() as u32,
},
)),
@@ -621,6 +618,16 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
}
}
+impl From<&ParquetPartition> for protobuf::ParquetPartition {
+ fn from(p: &ParquetPartition) -> protobuf::ParquetPartition {
+ let files = p.file_partition.files.iter().map(|f| f.into()).collect();
+ protobuf::ParquetPartition {
+ index: p.file_partition.index as u32,
+ files,
+ }
+ }
+}
+
fn try_parse_when_then_expr(
when_expr: &Arc<dyn PhysicalExpr>,
then_expr: &Arc<dyn PhysicalExpr>,
diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs
index 00f2c98..f03d08b 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -82,7 +82,7 @@ use self::state::{ConfigBackendClient, SchedulerState};
use ballista_core::config::BallistaConfig;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
-use datafusion::physical_plan::parquet::ParquetExec;
+use datafusion::datasource::parquet::ParquetTableDescriptor;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
@@ -282,24 +282,25 @@ impl SchedulerGrpc for SchedulerServer {
match file_type {
FileType::Parquet => {
- let parquet_exec =
- ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
- .map_err(|e| {
- let msg = format!("Error opening parquet files: {}", e);
- error!("{}", msg);
- tonic::Status::internal(msg)
- })?;
+ let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| {
+ let msg = format!("Error opening parquet files: {}", e);
+ error!("{}", msg);
+ tonic::Status::internal(msg)
+ })?;
+
+ let partitions = parquet_desc
+ .descriptor
+ .partition_files
+ .iter()
+ .map(|pf| FilePartitionMetadata {
+ filename: vec![pf.path.clone()],
+ })
+ .collect();
//TODO include statistics and any other info needed to reconstruct ParquetExec
Ok(Response::new(GetFileMetadataResult {
- schema: Some(parquet_exec.schema().as_ref().into()),
- partitions: parquet_exec
- .partitions()
- .iter()
- .map(|part| FilePartitionMetadata {
- filename: part.filenames().to_vec(),
- })
- .collect(),
+ schema: Some(parquet_desc.schema().as_ref().into()),
+ partitions,
}))
}
//TODO implement for CSV
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index 9699a99..d5e2952 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -27,6 +27,13 @@ pub mod parquet;
pub use self::csv::{CsvFile, CsvReadOptions};
pub use self::datasource::{TableProvider, TableType};
pub use self::memory::MemTable;
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::build_file_list;
+use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
+use crate::physical_plan::Accumulator;
+use std::sync::Arc;
/// Source for table input data
pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
@@ -36,3 +43,252 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
/// Read data from a reader
Reader(std::sync::Mutex<Option<R>>),
}
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+ /// Path for the file (e.g. URL, filesystem path, etc)
+ pub path: String,
+ /// Statistics of the file
+ pub statistics: Statistics,
+ // Values of partition columns to be appended to each row
+ // pub partition_value: Option<Vec<ScalarValue>>,
+ // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+ fn from(path: String) -> Self {
+ Self {
+ path,
+ statistics: Default::default(),
+ }
+ }
+}
+
+impl std::fmt::Display for PartitionedFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}", self.path)
+ }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+ /// The index of the partition among all partitions
+ pub index: usize,
+ /// The contained files of the partition
+ pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+ write!(f, "{}", files.join(", "))
+ }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct TableDescriptor {
+ /// root path of the table
+ pub path: String,
+ /// All source files in the path
+ pub partition_files: Vec<PartitionedFile>,
+ /// The schema of the files
+ pub schema: SchemaRef,
+}
+
+/// Returned partitioned file with its schema
+pub struct FileAndSchema {
+ file: PartitionedFile,
+ schema: Schema,
+}
+
+/// Builder for ['TableDescriptor'] inside given path
+pub trait TableDescriptorBuilder {
+ /// Construct a ['TableDescriptor'] from the provided path
+ fn build_table_desc(
+ path: &str,
+ ext: &str,
+ provided_schema: Option<Schema>,
+ collect_statistics: bool,
+ ) -> Result<TableDescriptor> {
+ let filenames = build_file_list(path, ext)?;
+ if filenames.is_empty() {
+ return Err(DataFusionError::Plan(format!(
+ "No file (with .{} extension) found at path {}",
+ ext, path
+ )));
+ }
+
+ // build a list of partitions with statistics and gather all unique schemas
+ // used in this data set
+ let mut schemas: Vec<Schema> = vec![];
+ let mut contains_file = false;
+
+ let partitioned_files = filenames
+ .iter()
+ .map(|file_path| {
+ contains_file = true;
+ let result = if collect_statistics {
+ let FileAndSchema {file, schema} = Self::file_meta(file_path)?;
+ if schemas.is_empty() {
+ schemas.push(schema);
+ } else if schema != schemas[0] {
+ // we currently get the schema information from the first file rather than do
+ // schema merging and this is a limitation.
+ // See https://issues.apache.org/jira/browse/ARROW-11017
+ return Err(DataFusionError::Plan(format!(
+ "The file {} have different schema from the first file and DataFusion does \
+ not yet support schema merging",
+ file_path
+ )));
+ }
+ file
+ } else {
+ PartitionedFile {
+ path: file_path.to_owned(),
+ statistics: Statistics::default(),
+ }
+ };
+
+ Ok(result)
+ }).collect::<Result<Vec<PartitionedFile>>>();
+
+ if !contains_file {
+ return Err(DataFusionError::Plan(format!(
+ "No file (with .{} extension) found at path {}",
+ ext, path
+ )));
+ }
+
+ let result_schema = provided_schema.unwrap_or_else(|| schemas.pop().unwrap());
+
+ Ok(TableDescriptor {
+ path: path.to_string(),
+ partition_files: partitioned_files?,
+ schema: Arc::new(result_schema),
+ })
+ }
+
+ /// Get all metadata for a source file, including schema, statistics, partitions, etc.
+ fn file_meta(path: &str) -> Result<FileAndSchema>;
+}
+
+/// Get all files as well as the summary statistic
+/// if the optional `limit` is provided, includes only sufficient files
+/// needed to read up to `limit` number of rows
+pub fn get_statistics_with_limit(
+ table_desc: &TableDescriptor,
+ limit: Option<usize>,
+) -> (Vec<PartitionedFile>, Statistics) {
+ let mut all_files = table_desc.partition_files.clone();
+ let schema = table_desc.schema.clone();
+
+ let mut total_byte_size = 0;
+ let mut null_counts = vec![0; schema.fields().len()];
+ let mut has_statistics = false;
+ let (mut max_values, mut min_values) = create_max_min_accs(&schema);
+
+ let mut num_rows = 0;
+ let mut num_files = 0;
+ for file in &all_files {
+ num_files += 1;
+ let file_stats = &file.statistics;
+ num_rows += file_stats.num_rows.unwrap_or(0);
+ total_byte_size += file_stats.total_byte_size.unwrap_or(0);
+ if let Some(vec) = &file_stats.column_statistics {
+ has_statistics = true;
+ for (i, cs) in vec.iter().enumerate() {
+ null_counts[i] += cs.null_count.unwrap_or(0);
+
+ if let Some(max_value) = &mut max_values[i] {
+ if let Some(file_max) = cs.max_value.clone() {
+ match max_value.update(&[file_max]) {
+ Ok(_) => {}
+ Err(_) => {
+ max_values[i] = None;
+ }
+ }
+ }
+ }
+
+ if let Some(min_value) = &mut min_values[i] {
+ if let Some(file_min) = cs.min_value.clone() {
+ match min_value.update(&[file_min]) {
+ Ok(_) => {}
+ Err(_) => {
+ min_values[i] = None;
+ }
+ }
+ }
+ }
+ }
+ }
+ if num_rows > limit.unwrap_or(usize::MAX) {
+ break;
+ }
+ }
+ all_files.truncate(num_files);
+
+ let column_stats = if has_statistics {
+ Some(get_col_stats(
+ &*schema,
+ null_counts,
+ &mut max_values,
+ &mut min_values,
+ ))
+ } else {
+ None
+ };
+
+ let statistics = Statistics {
+ num_rows: Some(num_rows as usize),
+ total_byte_size: Some(total_byte_size as usize),
+ column_statistics: column_stats,
+ };
+ (all_files, statistics)
+}
+
+fn create_max_min_accs(
+ schema: &Schema,
+) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
+ let max_values: Vec<Option<MaxAccumulator>> = schema
+ .fields()
+ .iter()
+ .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
+ .collect::<Vec<_>>();
+ let min_values: Vec<Option<MinAccumulator>> = schema
+ .fields()
+ .iter()
+ .map(|field| MinAccumulator::try_new(field.data_type()).ok())
+ .collect::<Vec<_>>();
+ (max_values, min_values)
+}
+
+fn get_col_stats(
+ schema: &Schema,
+ null_counts: Vec<usize>,
+ max_values: &mut Vec<Option<MaxAccumulator>>,
+ min_values: &mut Vec<Option<MinAccumulator>>,
+) -> Vec<ColumnStatistics> {
+ (0..schema.fields().len())
+ .map(|i| {
+ let max_value = match &max_values[i] {
+ Some(max_value) => max_value.evaluate().ok(),
+ None => None,
+ };
+ let min_value = match &min_values[i] {
+ Some(min_value) => min_value.evaluate().ok(),
+ None => None,
+ };
+ ColumnStatistics {
+ null_count: Some(null_counts[i] as usize),
+ max_value,
+ min_value,
+ distinct_count: None,
+ }
+ })
+ .collect()
+}
diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs
index a9b4c0f..c11aade 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -18,25 +18,32 @@
//! Parquet data source
use std::any::Any;
-use std::string::String;
+use std::fs::File;
use std::sync::Arc;
-use arrow::datatypes::*;
+use parquet::arrow::ArrowReader;
+use parquet::arrow::ParquetFileArrowReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use parquet::file::statistics::Statistics as ParquetStatistics;
+use super::datasource::TableProviderFilterPushDown;
+use crate::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use crate::datasource::datasource::Statistics;
-use crate::datasource::TableProvider;
+use crate::datasource::{
+ create_max_min_accs, get_col_stats, get_statistics_with_limit, FileAndSchema,
+ PartitionedFile, TableDescriptor, TableDescriptorBuilder, TableProvider,
+};
use crate::error::Result;
use crate::logical_plan::{combine_filters, Expr};
+use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::parquet::ParquetExec;
-use crate::physical_plan::ExecutionPlan;
-
-use super::datasource::TableProviderFilterPushDown;
+use crate::physical_plan::{Accumulator, ExecutionPlan};
+use crate::scalar::ScalarValue;
/// Table-based representation of a `ParquetFile`.
pub struct ParquetTable {
- path: String,
- schema: SchemaRef,
- statistics: Statistics,
+ /// Descriptor of the table, including schema, files, etc.
+ pub desc: Arc<ParquetTableDescriptor>,
max_partitions: usize,
enable_pruning: bool,
}
@@ -45,12 +52,9 @@ impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
pub fn try_new(path: impl Into<String>, max_partitions: usize) -> Result<Self> {
let path = path.into();
- let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?;
- let schema = parquet_exec.schema();
+ let table_desc = ParquetTableDescriptor::new(path.as_str());
Ok(Self {
- path,
- schema,
- statistics: parquet_exec.statistics().to_owned(),
+ desc: Arc::new(table_desc?),
max_partitions,
enable_pruning: true,
})
@@ -65,29 +69,34 @@ impl ParquetTable {
collect_statistics: bool,
) -> Result<Self> {
let path = path.into();
- if collect_statistics {
- let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?;
- Ok(Self {
- path,
- schema: Arc::new(schema),
- statistics: parquet_exec.statistics().to_owned(),
- max_partitions,
- enable_pruning: true,
- })
- } else {
- Ok(Self {
- path,
- schema: Arc::new(schema),
- statistics: Statistics::default(),
- max_partitions,
- enable_pruning: true,
- })
- }
+ let table_desc = ParquetTableDescriptor::new_with_schema(
+ path.as_str(),
+ Some(schema),
+ collect_statistics,
+ );
+ Ok(Self {
+ desc: Arc::new(table_desc?),
+ max_partitions,
+ enable_pruning: true,
+ })
+ }
+
+ /// Attempt to initialize a new `ParquetTable` from a table descriptor.
+ pub fn try_new_with_desc(
+ desc: Arc<ParquetTableDescriptor>,
+ max_partitions: usize,
+ enable_pruning: bool,
+ ) -> Result<Self> {
+ Ok(Self {
+ desc,
+ max_partitions,
+ enable_pruning,
+ })
}
/// Get the path for the Parquet file(s) represented by this ParquetTable instance
pub fn path(&self) -> &str {
- &self.path
+ &self.desc.descriptor.path
}
/// Get parquet pruning option
@@ -109,7 +118,7 @@ impl TableProvider for ParquetTable {
/// Get the schema for this parquet file.
fn schema(&self) -> SchemaRef {
- self.schema.clone()
+ self.desc.schema()
}
fn supports_filter_pushdown(
@@ -136,8 +145,8 @@ impl TableProvider for ParquetTable {
} else {
None
};
- Ok(Arc::new(ParquetExec::try_from_path(
- &self.path,
+ Ok(Arc::new(ParquetExec::try_new(
+ self.desc.clone(),
projection.clone(),
predicate,
limit
@@ -149,7 +158,7 @@ impl TableProvider for ParquetTable {
}
fn statistics(&self) -> Statistics {
- self.statistics.clone()
+ self.desc.statistics()
}
fn has_exact_statistics(&self) -> bool {
@@ -157,6 +166,253 @@ impl TableProvider for ParquetTable {
}
}
+#[derive(Debug, Clone)]
+/// Descriptor for a parquet root path
+pub struct ParquetTableDescriptor {
+ /// metadata for files inside the root path
+ pub descriptor: TableDescriptor,
+}
+
+impl ParquetTableDescriptor {
+ /// Construct a new parquet descriptor for a root path
+ pub fn new(root_path: &str) -> Result<Self> {
+ let table_desc = Self::build_table_desc(root_path, "parquet", None, true);
+ Ok(Self {
+ descriptor: table_desc?,
+ })
+ }
+
+ /// Construct a new parquet descriptor for a root path with known schema
+ pub fn new_with_schema(
+ root_path: &str,
+ schema: Option<Schema>,
+ collect_statistics: bool,
+ ) -> Result<Self> {
+ let table_desc =
+ Self::build_table_desc(root_path, "parquet", schema, collect_statistics);
+ Ok(Self {
+ descriptor: table_desc?,
+ })
+ }
+
+ /// Get file schema for all parquet files
+ pub fn schema(&self) -> SchemaRef {
+ self.descriptor.schema.clone()
+ }
+
+ /// Get the summary statistics for all parquet files
+ pub fn statistics(&self) -> Statistics {
+ get_statistics_with_limit(&self.descriptor, None).1
+ }
+
+ fn summarize_min_max(
+ max_values: &mut Vec<Option<MaxAccumulator>>,
+ min_values: &mut Vec<Option<MinAccumulator>>,
+ fields: &[Field],
+ i: usize,
+ stat: &ParquetStatistics,
+ ) {
+ match stat {
+ ParquetStatistics::Boolean(s) => {
+ if let DataType::Boolean = fields[i].data_type() {
+ if s.has_min_max_set() {
+ if let Some(max_value) = &mut max_values[i] {
+ match max_value
+ .update(&[ScalarValue::Boolean(Some(*s.max()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ max_values[i] = None;
+ }
+ }
+ }
+ if let Some(min_value) = &mut min_values[i] {
+ match min_value
+ .update(&[ScalarValue::Boolean(Some(*s.min()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ min_values[i] = None;
+ }
+ }
+ }
+ }
+ }
+ }
+ ParquetStatistics::Int32(s) => {
+ if let DataType::Int32 = fields[i].data_type() {
+ if s.has_min_max_set() {
+ if let Some(max_value) = &mut max_values[i] {
+ match max_value.update(&[ScalarValue::Int32(Some(*s.max()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ max_values[i] = None;
+ }
+ }
+ }
+ if let Some(min_value) = &mut min_values[i] {
+ match min_value.update(&[ScalarValue::Int32(Some(*s.min()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ min_values[i] = None;
+ }
+ }
+ }
+ }
+ }
+ }
+ ParquetStatistics::Int64(s) => {
+ if let DataType::Int64 = fields[i].data_type() {
+ if s.has_min_max_set() {
+ if let Some(max_value) = &mut max_values[i] {
+ match max_value.update(&[ScalarValue::Int64(Some(*s.max()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ max_values[i] = None;
+ }
+ }
+ }
+ if let Some(min_value) = &mut min_values[i] {
+ match min_value.update(&[ScalarValue::Int64(Some(*s.min()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ min_values[i] = None;
+ }
+ }
+ }
+ }
+ }
+ }
+ ParquetStatistics::Float(s) => {
+ if let DataType::Float32 = fields[i].data_type() {
+ if s.has_min_max_set() {
+ if let Some(max_value) = &mut max_values[i] {
+ match max_value
+ .update(&[ScalarValue::Float32(Some(*s.max()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ max_values[i] = None;
+ }
+ }
+ }
+ if let Some(min_value) = &mut min_values[i] {
+ match min_value
+ .update(&[ScalarValue::Float32(Some(*s.min()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ min_values[i] = None;
+ }
+ }
+ }
+ }
+ }
+ }
+ ParquetStatistics::Double(s) => {
+ if let DataType::Float64 = fields[i].data_type() {
+ if s.has_min_max_set() {
+ if let Some(max_value) = &mut max_values[i] {
+ match max_value
+ .update(&[ScalarValue::Float64(Some(*s.max()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ max_values[i] = None;
+ }
+ }
+ }
+ if let Some(min_value) = &mut min_values[i] {
+ match min_value
+ .update(&[ScalarValue::Float64(Some(*s.min()))])
+ {
+ Ok(_) => {}
+ Err(_) => {
+ min_values[i] = None;
+ }
+ }
+ }
+ }
+ }
+ }
+ _ => {}
+ }
+ }
+}
+
+impl TableDescriptorBuilder for ParquetTableDescriptor {
+ fn file_meta(path: &str) -> Result<FileAndSchema> {
+ let file = File::open(path)?;
+ let file_reader = Arc::new(SerializedFileReader::new(file)?);
+ let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+ let path = path.to_string();
+ let schema = arrow_reader.get_schema()?;
+ let num_fields = schema.fields().len();
+ let fields = schema.fields().to_vec();
+ let meta_data = arrow_reader.get_metadata();
+
+ let mut num_rows = 0;
+ let mut total_byte_size = 0;
+ let mut null_counts = vec![0; num_fields];
+ let mut has_statistics = false;
+
+ let (mut max_values, mut min_values) = create_max_min_accs(&schema);
+
+ for row_group_meta in meta_data.row_groups() {
+ num_rows += row_group_meta.num_rows();
+ total_byte_size += row_group_meta.total_byte_size();
+
+ let columns_null_counts = row_group_meta
+ .columns()
+ .iter()
+ .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
+
+ for (i, cnt) in columns_null_counts.enumerate() {
+ null_counts[i] += cnt as usize
+ }
+
+ for (i, column) in row_group_meta.columns().iter().enumerate() {
+ if let Some(stat) = column.statistics() {
+ has_statistics = true;
+ ParquetTableDescriptor::summarize_min_max(
+ &mut max_values,
+ &mut min_values,
+ &fields,
+ i,
+ stat,
+ )
+ }
+ }
+ }
+
+ let column_stats = if has_statistics {
+ Some(get_col_stats(
+ &schema,
+ null_counts,
+ &mut max_values,
+ &mut min_values,
+ ))
+ } else {
+ None
+ };
+
+ let statistics = Statistics {
+ num_rows: Some(num_rows as usize),
+ total_byte_size: Some(total_byte_size as usize),
+ column_statistics: column_stats,
+ };
+
+ Ok(FileAndSchema {
+ file: PartitionedFile { path, statistics },
+ schema,
+ })
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs
index a983af4..fd86504 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -110,6 +110,7 @@ mod tests {
use super::*;
use crate::datasource::datasource::Statistics;
+ use crate::datasource::PartitionedFile;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
use crate::physical_plan::projection::ProjectionExec;
@@ -122,12 +123,13 @@ mod tests {
vec![],
Arc::new(ParquetExec::new(
vec![ParquetPartition::new(
- vec!["x".to_string()],
- Statistics::default(),
+ vec![PartitionedFile::from("x".to_string())],
+ 0,
metrics.clone(),
)],
schema,
None,
+ Statistics::default(),
metrics,
None,
2048,
@@ -162,12 +164,13 @@ mod tests {
vec![],
Arc::new(ParquetExec::new(
vec![ParquetPartition::new(
- vec!["x".to_string()],
- Statistics::default(),
+ vec![PartitionedFile::from("x".to_string())],
+ 0,
metrics.clone(),
)],
schema,
None,
+ Statistics::default(),
metrics,
None,
2048,
diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs
index ac1655b..eb8f927 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -27,14 +27,14 @@ use crate::{
logical_plan::{Column, Expr},
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
physical_plan::{
- common, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
},
scalar::ScalarValue,
};
use arrow::{
array::ArrayRef,
- datatypes::{DataType, Schema, SchemaRef},
+ datatypes::{Schema, SchemaRef},
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
@@ -53,21 +53,21 @@ use tokio::{
task,
};
-use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::datasource::datasource::Statistics;
use async_trait::async_trait;
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::stream::RecordBatchReceiverStream;
-use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::Accumulator;
+use crate::datasource::parquet::ParquetTableDescriptor;
+use crate::datasource::{get_statistics_with_limit, FilePartition, PartitionedFile};
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
/// Parquet partitions to read
- partitions: Vec<ParquetPartition>,
+ pub partitions: Vec<ParquetPartition>,
/// Schema after projection is applied
- schema: SchemaRef,
+ pub schema: SchemaRef,
/// Projection for which columns to load
projection: Vec<usize>,
/// Batch size
@@ -94,9 +94,7 @@ pub struct ParquetExec {
#[derive(Debug, Clone)]
pub struct ParquetPartition {
/// The Parquet filename for this partition
- pub filenames: Vec<String>,
- /// Statistics for this partition
- pub statistics: Statistics,
+ pub file_partition: FilePartition,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -104,7 +102,7 @@ pub struct ParquetPartition {
/// Stores metrics about the parquet execution for a particular parquet file
#[derive(Debug, Clone)]
struct ParquetFileMetrics {
- /// Numer of times the predicate could not be evaluated
+ /// Number of times the predicate could not be evaluated
pub predicate_evaluation_errors: metrics::Count,
/// Number of row groups pruned using
pub row_groups_pruned: metrics::Count,
@@ -123,290 +121,42 @@ impl ParquetExec {
) -> Result<Self> {
// build a list of filenames from the specified path, which could be a single file or
// a directory containing one or more parquet files
- let filenames = common::build_file_list(path, ".parquet")?;
- if filenames.is_empty() {
- Err(DataFusionError::Plan(format!(
- "No Parquet files (with .parquet extension) found at path {}",
- path
- )))
- } else {
- let filenames = filenames
- .iter()
- .map(|filename| filename.as_str())
- .collect::<Vec<&str>>();
- Self::try_from_files(
- &filenames,
- projection,
- predicate,
- batch_size,
- max_partitions,
- limit,
- )
- }
+ let table_desc = ParquetTableDescriptor::new(path)?;
+ Self::try_new(
+ Arc::new(table_desc),
+ projection,
+ predicate,
+ batch_size,
+ max_partitions,
+ limit,
+ )
}
- /// Create a new Parquet reader execution plan based on the specified list of Parquet
- /// files
- pub fn try_from_files(
- filenames: &[&str],
+ /// Create a new Parquet reader execution plan with root descriptor, provided partitions and schema
+ pub fn try_new(
+ desc: Arc<ParquetTableDescriptor>,
projection: Option<Vec<usize>>,
predicate: Option<Expr>,
batch_size: usize,
max_partitions: usize,
limit: Option<usize>,
) -> Result<Self> {
- debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
- filenames, projection, predicate, limit);
- // build a list of Parquet partitions with statistics and gather all unique schemas
- // used in this data set
- let metrics = ExecutionPlanMetricsSet::new();
- let mut schemas: Vec<Schema> = vec![];
- let mut partitions = Vec::with_capacity(max_partitions);
- let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
- let chunks = split_files(&filenames, max_partitions);
- let mut num_rows = 0;
- let mut num_fields = 0;
- let mut fields = Vec::new();
- let mut total_byte_size = 0;
- let mut null_counts = Vec::new();
- let mut max_values: Vec<Option<MaxAccumulator>> = Vec::new();
- let mut min_values: Vec<Option<MinAccumulator>> = Vec::new();
- let mut limit_exhausted = false;
- for chunk in chunks {
- let mut filenames: Vec<String> =
- chunk.iter().map(|x| x.to_string()).collect();
- let mut total_files = 0;
- for filename in &filenames {
- total_files += 1;
- let file = File::open(filename)?;
- let file_reader = Arc::new(SerializedFileReader::new(file)?);
- let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
- let meta_data = arrow_reader.get_metadata();
- // collect all the unique schemas in this data set
- let schema = arrow_reader.get_schema()?;
- if schemas.is_empty() || schema != schemas[0] {
- fields = schema.fields().to_vec();
- num_fields = schema.fields().len();
- null_counts = vec![0; num_fields];
- max_values = schema
- .fields()
- .iter()
- .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
- .collect::<Vec<_>>();
- min_values = schema
- .fields()
- .iter()
- .map(|field| MinAccumulator::try_new(field.data_type()).ok())
- .collect::<Vec<_>>();
- schemas.push(schema);
- }
-
- for row_group_meta in meta_data.row_groups() {
- num_rows += row_group_meta.num_rows();
- total_byte_size += row_group_meta.total_byte_size();
+ debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
+ desc, projection, predicate, limit);
- // Currently assumes every Parquet file has same schema
- // https://issues.apache.org/jira/browse/ARROW-11017
- let columns_null_counts = row_group_meta
- .columns()
- .iter()
- .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
-
- for (i, cnt) in columns_null_counts.enumerate() {
- null_counts[i] += cnt
- }
-
- for (i, column) in row_group_meta.columns().iter().enumerate() {
- if let Some(stat) = column.statistics() {
- match stat {
- ParquetStatistics::Boolean(s) => {
- if let DataType::Boolean = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match max_value.update(&[
- ScalarValue::Boolean(Some(*s.max())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match min_value.update(&[
- ScalarValue::Boolean(Some(*s.min())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- }
- }
- }
- ParquetStatistics::Int32(s) => {
- if let DataType::Int32 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match max_value.update(&[
- ScalarValue::Int32(Some(*s.max())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match min_value.update(&[
- ScalarValue::Int32(Some(*s.min())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- }
- }
- }
- ParquetStatistics::Int64(s) => {
- if let DataType::Int64 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match max_value.update(&[
- ScalarValue::Int64(Some(*s.max())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match min_value.update(&[
- ScalarValue::Int64(Some(*s.min())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- }
- }
- }
- ParquetStatistics::Float(s) => {
- if let DataType::Float32 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match max_value.update(&[
- ScalarValue::Float32(Some(*s.max())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match min_value.update(&[
- ScalarValue::Float32(Some(*s.min())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- }
- }
- }
- ParquetStatistics::Double(s) => {
- if let DataType::Float64 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match max_value.update(&[
- ScalarValue::Float64(Some(*s.max())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match min_value.update(&[
- ScalarValue::Float64(Some(*s.min())),
- ]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- }
- }
- }
- _ => {}
- }
- }
- }
+ let metrics = ExecutionPlanMetricsSet::new();
+ let (all_files, statistics) = get_statistics_with_limit(&desc.descriptor, limit);
+ let schema = desc.schema();
- if limit.map(|x| num_rows >= x as i64).unwrap_or(false) {
- limit_exhausted = true;
- break;
- }
- }
- }
- let column_stats = (0..num_fields)
- .map(|i| {
- let max_value = match &max_values[i] {
- Some(max_value) => max_value.evaluate().ok(),
- None => None,
- };
- let min_value = match &min_values[i] {
- Some(min_value) => min_value.evaluate().ok(),
- None => None,
- };
- ColumnStatistics {
- null_count: Some(null_counts[i] as usize),
- max_value,
- min_value,
- distinct_count: None,
- }
- })
- .collect();
-
- let statistics = Statistics {
- num_rows: Some(num_rows as usize),
- total_byte_size: Some(total_byte_size as usize),
- column_statistics: Some(column_stats),
- };
- // remove files that are not needed in case of limit
- filenames.truncate(total_files);
+ let mut partitions = Vec::with_capacity(max_partitions);
+ let chunked_files = split_files(&all_files, max_partitions);
+ for (index, group) in chunked_files.iter().enumerate() {
partitions.push(ParquetPartition::new(
- filenames,
- statistics,
+ Vec::from(*group),
+ index,
metrics.clone(),
));
- if limit_exhausted {
- break;
- }
- }
-
- // we currently get the schema information from the first file rather than do
- // schema merging and this is a limitation.
- // See https://issues.apache.org/jira/browse/ARROW-11017
- if schemas.len() > 1 {
- return Err(DataFusionError::Plan(format!(
- "The Parquet files have {} different schemas and DataFusion does \
- not yet support schema merging",
- schemas.len()
- )));
}
- let schema = Arc::new(schemas.pop().unwrap());
let metrics = ExecutionPlanMetricsSet::new();
let predicate_creation_errors =
@@ -430,6 +180,7 @@ impl ParquetExec {
partitions,
schema,
projection,
+ statistics,
metrics,
predicate_builder,
batch_size,
@@ -438,10 +189,12 @@ impl ParquetExec {
}
/// Create a new Parquet reader execution plan with provided partitions and schema
+ #[allow(clippy::too_many_arguments)]
pub fn new(
partitions: Vec<ParquetPartition>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
+ statistics: Statistics,
metrics: ExecutionPlanMetricsSet,
predicate_builder: Option<PruningPredicate>,
batch_size: usize,
@@ -459,94 +212,20 @@ impl ParquetExec {
.collect(),
);
- // sum the statistics
- let mut num_rows: Option<usize> = None;
- let mut total_byte_size: Option<usize> = None;
- let mut null_counts: Vec<usize> = vec![0; schema.fields().len()];
- let mut has_statistics = false;
- let mut max_values = schema
- .fields()
- .iter()
- .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
- .collect::<Vec<_>>();
- let mut min_values = schema
- .fields()
- .iter()
- .map(|field| MinAccumulator::try_new(field.data_type()).ok())
- .collect::<Vec<_>>();
- for part in &partitions {
- if let Some(n) = part.statistics.num_rows {
- num_rows = Some(num_rows.unwrap_or(0) + n)
- }
- if let Some(n) = part.statistics.total_byte_size {
- total_byte_size = Some(total_byte_size.unwrap_or(0) + n)
+ let new_column_statistics = statistics.column_statistics.map(|stats| {
+ let mut projected_stats = Vec::with_capacity(projection.len());
+ for proj in &projection {
+ projected_stats.push(stats[*proj].clone());
}
- if let Some(x) = &part.statistics.column_statistics {
- let part_nulls: Vec<Option<usize>> =
- x.iter().map(|c| c.null_count).collect();
- has_statistics = true;
-
- let part_max_values: Vec<Option<ScalarValue>> =
- x.iter().map(|c| c.max_value.clone()).collect();
- let part_min_values: Vec<Option<ScalarValue>> =
- x.iter().map(|c| c.min_value.clone()).collect();
-
- for &i in projection.iter() {
- null_counts[i] = part_nulls[i].unwrap_or(0);
- if let Some(part_max_value) = part_max_values[i].clone() {
- if let Some(max_value) = &mut max_values[i] {
- match max_value.update(&[part_max_value]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- }
- if let Some(part_min_value) = part_min_values[i].clone() {
- if let Some(min_value) = &mut min_values[i] {
- match min_value.update(&[part_min_value]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- }
- }
- }
- }
-
- let column_stats = if has_statistics {
- Some(
- (0..schema.fields().len())
- .map(|i| {
- let max_value = match &max_values[i] {
- Some(max_value) => max_value.evaluate().ok(),
- None => None,
- };
- let min_value = match &min_values[i] {
- Some(min_value) => min_value.evaluate().ok(),
- None => None,
- };
- ColumnStatistics {
- null_count: Some(null_counts[i] as usize),
- max_value,
- min_value,
- distinct_count: None,
- }
- })
- .collect(),
- )
- } else {
- None
- };
+ projected_stats
+ });
let statistics = Statistics {
- num_rows,
- total_byte_size,
- column_statistics: column_stats,
+ num_rows: statistics.num_rows,
+ total_byte_size: statistics.total_byte_size,
+ column_statistics: new_column_statistics,
};
+
Self {
partitions,
schema: Arc::new(projected_schema),
@@ -583,26 +262,15 @@ impl ParquetExec {
impl ParquetPartition {
/// Create a new parquet partition
pub fn new(
- filenames: Vec<String>,
- statistics: Statistics,
+ files: Vec<PartitionedFile>,
+ index: usize,
metrics: ExecutionPlanMetricsSet,
) -> Self {
Self {
- filenames,
- statistics,
+ file_partition: FilePartition { index, files },
metrics,
}
}
-
- /// The Parquet filename for this partition
- pub fn filenames(&self) -> &[String] {
- &self.filenames
- }
-
- /// Statistics for this partition
- pub fn statistics(&self) -> &Statistics {
- &self.statistics
- }
}
impl ParquetFileMetrics {
@@ -662,7 +330,7 @@ impl ExecutionPlan for ParquetExec {
}
}
- async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+ async fn execute(&self, partition_index: usize) -> Result<SendableRecordBatchStream> {
// because the parquet implementation is not thread-safe, it is necessary to execute
// on a thread and communicate with channels
let (response_tx, response_rx): (
@@ -670,8 +338,7 @@ impl ExecutionPlan for ParquetExec {
Receiver<ArrowResult<RecordBatch>>,
) = channel(2);
- let parquet_partition = &self.partitions[partition];
- let filenames = parquet_partition.filenames.clone();
+ let partition = self.partitions[partition_index].clone();
let metrics = self.metrics.clone();
let projection = self.projection.clone();
let predicate_builder = self.predicate_builder.clone();
@@ -679,9 +346,9 @@ impl ExecutionPlan for ParquetExec {
let limit = self.limit;
task::spawn_blocking(move || {
- if let Err(e) = read_files(
+ if let Err(e) = read_partition(
+ partition_index,
partition,
- &filenames,
metrics,
&projection,
&predicate_builder,
@@ -706,9 +373,7 @@ impl ExecutionPlan for ParquetExec {
let files: Vec<_> = self
.partitions
.iter()
- .map(|pp| pp.filenames.iter())
- .flatten()
- .map(|s| s.as_str())
+ .map(|pp| format!("{}", pp.file_partition))
.collect();
write!(
@@ -838,7 +503,7 @@ fn build_row_group_predicate(
match predicate_values {
Ok(values) => {
// NB: false means don't scan row group
- let num_pruned = values.iter().filter(|&v| !v).count();
+ let num_pruned = values.iter().filter(|&v| !*v).count();
metrics.row_groups_pruned.add(num_pruned);
Box::new(move |_, i| values[i])
}
@@ -853,9 +518,9 @@ fn build_row_group_predicate(
}
#[allow(clippy::too_many_arguments)]
-fn read_files(
- partition: usize,
- filenames: &[String],
+fn read_partition(
+ partition_index: usize,
+ partition: ParquetPartition,
metrics: ExecutionPlanMetricsSet,
projection: &[usize],
predicate_builder: &Option<PruningPredicate>,
@@ -864,9 +529,11 @@ fn read_files(
limit: Option<usize>,
) -> Result<()> {
let mut total_rows = 0;
- 'outer: for filename in filenames {
- let file_metrics = ParquetFileMetrics::new(partition, filename, &metrics);
- let file = File::open(&filename)?;
+ let all_files = partition.file_partition.files;
+ 'outer: for partitioned_file in all_files {
+ let file_metrics =
+ ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics);
+ let file = File::open(partitioned_file.path.as_str())?;
let mut file_reader = SerializedFileReader::new(file)?;
if let Some(predicate_builder) = predicate_builder {
let row_group_predicate = build_row_group_predicate(
@@ -894,7 +561,7 @@ fn read_files(
Some(Err(e)) => {
let err_msg = format!(
"Error reading batch from {}: {}",
- filename,
+ partitioned_file,
e.to_string()
);
// send error to operator
@@ -914,12 +581,15 @@ fn read_files(
Ok(())
}
-fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> {
- let mut chunk_size = filenames.len() / n;
- if filenames.len() % n > 0 {
+fn split_files(
+ partitioned_files: &[PartitionedFile],
+ n: usize,
+) -> Vec<&[PartitionedFile]> {
+ let mut chunk_size = partitioned_files.len() / n;
+ if partitioned_files.len() % n > 0 {
chunk_size += 1;
}
- filenames.chunks(chunk_size).collect()
+ partitioned_files.chunks(chunk_size).collect()
}
#[cfg(test)]
@@ -935,24 +605,24 @@ mod tests {
#[test]
fn test_split_files() {
- let filenames = vec![
- "a".to_string(),
- "b".to_string(),
- "c".to_string(),
- "d".to_string(),
- "e".to_string(),
+ let files = vec![
+ PartitionedFile::from("a".to_string()),
+ PartitionedFile::from("b".to_string()),
+ PartitionedFile::from("c".to_string()),
+ PartitionedFile::from("d".to_string()),
+ PartitionedFile::from("e".to_string()),
];
- let chunks = split_files(&filenames, 1);
+ let chunks = split_files(&files, 1);
assert_eq!(1, chunks.len());
assert_eq!(5, chunks[0].len());
- let chunks = split_files(&filenames, 2);
+ let chunks = split_files(&files, 2);
assert_eq!(2, chunks.len());
assert_eq!(3, chunks[0].len());
assert_eq!(2, chunks[1].len());
- let chunks = split_files(&filenames, 5);
+ let chunks = split_files(&files, 5);
assert_eq!(5, chunks.len());
assert_eq!(1, chunks[0].len());
assert_eq!(1, chunks[1].len());
@@ -960,7 +630,7 @@ mod tests {
assert_eq!(1, chunks[3].len());
assert_eq!(1, chunks[4].len());
- let chunks = split_files(&filenames, 123);
+ let chunks = split_files(&files, 123);
assert_eq!(5, chunks.len());
assert_eq!(1, chunks[0].len());
assert_eq!(1, chunks[1].len());