You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/30 11:33:57 UTC

[arrow-datafusion] branch master updated: FilePartition and PartitionedFile for scanning flexibility (#932)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a085fc  FilePartition and PartitionedFile for scanning flexibility (#932)
8a085fc is described below

commit 8a085fc2701f04381cf26bc8ea925339c68cf6e4
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Mon Aug 30 19:33:50 2021 +0800

    FilePartition and PartitionedFile for scanning flexibility (#932)
    
    * FilePartition and partitionedFile for scanning flexibility
    
    * clippy
    
    * remove schema from partitioned file
    
    * ballista logical parquet table
    
    * ballista physical parquet exec
    
    * resolve comments
    
    * resolve comments
---
 ballista/rust/core/proto/ballista.proto            |  33 +-
 .../rust/core/src/serde/logical_plan/from_proto.rs |  72 ++-
 .../rust/core/src/serde/logical_plan/to_proto.rs   |  77 +++-
 .../core/src/serde/physical_plan/from_proto.rs     |  40 +-
 .../rust/core/src/serde/physical_plan/to_proto.rs  |  23 +-
 ballista/rust/scheduler/src/lib.rs                 |  33 +-
 datafusion/src/datasource/mod.rs                   | 256 +++++++++++
 datafusion/src/datasource/parquet.rs               | 330 ++++++++++++--
 datafusion/src/physical_optimizer/repartition.rs   |  11 +-
 datafusion/src/physical_plan/parquet.rs            | 488 ++++-----------------
 10 files changed, 869 insertions(+), 494 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index c184de3..45ff6c5 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -271,12 +271,28 @@ message CsvTableScanNode {
   repeated LogicalExprNode filters = 8;
 }
 
+message Statistics {
+  int64 num_rows = 1;
+  int64 total_byte_size = 2;
+  repeated ColumnStats column_stats = 3;
+}
+
+message PartitionedFile {
+  string path = 1;
+  Statistics statistics = 2;
+}
+
+message TableDescriptor {
+  string path = 1;
+  repeated PartitionedFile partition_files = 2;
+  Schema schema = 3;
+}
+
 message ParquetTableScanNode {
   string table_name = 1;
-  string path = 2;
+  TableDescriptor table_desc = 2;
   ProjectionColumns projection = 3;
-  Schema schema = 4;
-  repeated LogicalExprNode filters = 5;
+  repeated LogicalExprNode filters = 4;
 }
 
 message ProjectionNode {
@@ -567,10 +583,15 @@ message FilterExecNode {
   PhysicalExprNode expr = 2;
 }
 
+message ParquetPartition {
+  uint32 index = 1;
+  repeated PartitionedFile files = 2;
+}
+
 message ParquetScanExecNode {
-  repeated string filename = 1;
-  repeated uint32 projection = 2;
-  uint32 num_partitions = 3;
+  repeated ParquetPartition partitions = 1;
+  Schema schema = 2;
+  repeated uint32 projection = 3;
   uint32 batch_size = 4;
 }
 
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 14fba06..fc4ac2c 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -21,6 +21,8 @@ use crate::error::BallistaError;
 use crate::serde::{from_proto_binary_op, proto_error, protobuf};
 use crate::{convert_box_required, convert_required};
 use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
+use datafusion::datasource::parquet::{ParquetTable, ParquetTableDescriptor};
+use datafusion::datasource::{PartitionedFile, TableDescriptor};
 use datafusion::logical_plan::window_frames::{
     WindowFrame, WindowFrameBound, WindowFrameUnits,
 };
@@ -134,10 +136,11 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                 .map_err(|e| e.into())
             }
             LogicalPlanType::ParquetScan(scan) => {
+                let descriptor: TableDescriptor = convert_required!(scan.table_desc)?;
                 let projection = match scan.projection.as_ref() {
                     None => None,
                     Some(columns) => {
-                        let schema: Schema = convert_required!(scan.schema)?;
+                        let schema = descriptor.schema.clone();
                         let r: Result<Vec<usize>, _> = columns
                             .columns
                             .iter()
@@ -154,11 +157,16 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                         Some(r?)
                     }
                 };
-                LogicalPlanBuilder::scan_parquet_with_name(
-                    &scan.path,
-                    projection,
+
+                let parquet_table = ParquetTable::try_new_with_desc(
+                    Arc::new(ParquetTableDescriptor { descriptor }),
                     24,
+                    true,
+                )?;
+                LogicalPlanBuilder::scan(
                     &scan.table_name,
+                    Arc::new(parquet_table),
+                    projection,
                 )? //TODO remove hard-coded max_partitions
                 .build()
                 .map_err(|e| e.into())
@@ -301,6 +309,60 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
     }
 }
 
+impl TryInto<TableDescriptor> for &protobuf::TableDescriptor {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<TableDescriptor, Self::Error> {
+        let partition_files = self
+            .partition_files
+            .iter()
+            .map(|f| f.try_into())
+            .collect::<Result<Vec<PartitionedFile>, _>>()?;
+        let schema = convert_required!(self.schema)?;
+        Ok(TableDescriptor {
+            path: self.path.to_owned(),
+            partition_files,
+            schema: Arc::new(schema),
+        })
+    }
+}
+
+impl TryInto<PartitionedFile> for &protobuf::PartitionedFile {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<PartitionedFile, Self::Error> {
+        let statistics = convert_required!(self.statistics)?;
+        Ok(PartitionedFile {
+            path: self.path.clone(),
+            statistics,
+        })
+    }
+}
+
+impl From<&protobuf::ColumnStats> for ColumnStatistics {
+    fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
+        ColumnStatistics {
+            null_count: Some(cs.null_count as usize),
+            max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()),
+            min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()),
+            distinct_count: Some(cs.distinct_count as usize),
+        }
+    }
+}
+
+impl TryInto<Statistics> for &protobuf::Statistics {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<Statistics, Self::Error> {
+        let column_statistics = self.column_stats.iter().map(|s| s.into()).collect();
+        Ok(Statistics {
+            num_rows: Some(self.num_rows as usize),
+            total_byte_size: Some(self.total_byte_size as usize),
+            column_statistics: Some(column_statistics),
+        })
+    }
+}
+
 impl From<&protobuf::Column> for Column {
     fn from(c: &protobuf::Column) -> Column {
         let c = c.clone();
@@ -1114,6 +1176,8 @@ impl TryInto<Field> for &protobuf::Field {
     }
 }
 
+use crate::serde::protobuf::ColumnStats;
+use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
 use datafusion::physical_plan::{aggregates, windows};
 use datafusion::prelude::{
     array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index 5877ced..aa7a973 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -22,8 +22,11 @@
 use super::super::proto_error;
 use crate::datasource::DfTableAdapter;
 use crate::serde::{protobuf, BallistaError};
-use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
-use datafusion::datasource::CsvFile;
+use datafusion::arrow::datatypes::{
+    DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
+};
+use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
+use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
 use datafusion::logical_plan::{
     window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
     Column, Expr, JoinConstraint, JoinType, LogicalPlan,
@@ -253,6 +256,58 @@ impl TryInto<DataType> for &protobuf::ArrowType {
     }
 }
 
+impl From<&ColumnStatistics> for protobuf::ColumnStats {
+    fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats {
+        protobuf::ColumnStats {
+            min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()),
+            max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()),
+            null_count: cs.null_count.map(|n| n as u32).unwrap_or(0),
+            distinct_count: cs.distinct_count.map(|n| n as u32).unwrap_or(0),
+        }
+    }
+}
+
+impl From<&Statistics> for protobuf::Statistics {
+    fn from(s: &Statistics) -> protobuf::Statistics {
+        let none_value = -1_i64;
+        let column_stats = match &s.column_statistics {
+            None => vec![],
+            Some(column_stats) => column_stats.iter().map(|s| s.into()).collect(),
+        };
+        protobuf::Statistics {
+            num_rows: s.num_rows.map(|n| n as i64).unwrap_or(none_value),
+            total_byte_size: s.total_byte_size.map(|n| n as i64).unwrap_or(none_value),
+            column_stats,
+        }
+    }
+}
+
+impl From<&PartitionedFile> for protobuf::PartitionedFile {
+    fn from(pf: &PartitionedFile) -> protobuf::PartitionedFile {
+        protobuf::PartitionedFile {
+            path: pf.path.clone(),
+            statistics: Some((&pf.statistics).into()),
+        }
+    }
+}
+
+impl TryFrom<TableDescriptor> for protobuf::TableDescriptor {
+    type Error = BallistaError;
+
+    fn try_from(desc: TableDescriptor) -> Result<protobuf::TableDescriptor, Self::Error> {
+        let partition_files: Vec<protobuf::PartitionedFile> =
+            desc.partition_files.iter().map(|pf| pf.into()).collect();
+
+        let schema: protobuf::Schema = desc.schema.into();
+
+        Ok(protobuf::TableDescriptor {
+            path: desc.path,
+            partition_files,
+            schema: Some(schema),
+        })
+    }
+}
+
 impl TryInto<DataType> for &Box<protobuf::List> {
     type Error = BallistaError;
     fn try_into(self) -> Result<DataType, Self::Error> {
@@ -706,13 +761,14 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                     .collect::<Result<Vec<_>, _>>()?;
 
                 if let Some(parquet) = source.downcast_ref::<ParquetTable>() {
+                    let table_desc: protobuf::TableDescriptor =
+                        parquet.desc.descriptor.clone().try_into()?;
                     Ok(protobuf::LogicalPlanNode {
                         logical_plan_type: Some(LogicalPlanType::ParquetScan(
                             protobuf::ParquetTableScanNode {
                                 table_name: table_name.to_owned(),
-                                path: parquet.path().to_owned(),
+                                table_desc: Some(table_desc),
                                 projection,
-                                schema: Some(schema),
                                 filters,
                             },
                         )),
@@ -1262,6 +1318,19 @@ impl Into<protobuf::Schema> for &Schema {
     }
 }
 
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::Schema> for SchemaRef {
+    fn into(self) -> protobuf::Schema {
+        protobuf::Schema {
+            columns: self
+                .fields()
+                .iter()
+                .map(protobuf::Field::from)
+                .collect::<Vec<_>>(),
+        }
+    }
+}
+
 impl From<&datafusion::logical_plan::DFField> for protobuf::DfField {
     fn from(f: &datafusion::logical_plan::DFField) -> protobuf::DfField {
         protobuf::DfField {
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index d37940e..522bac2 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -34,6 +34,8 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
 use datafusion::catalog::catalog::{
     CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
 };
+use datafusion::datasource::datasource::Statistics;
+use datafusion::datasource::FilePartition;
 use datafusion::execution::context::{
     ExecutionConfig, ExecutionContextState, ExecutionProps,
 };
@@ -44,6 +46,8 @@ use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunc
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
 use datafusion::physical_plan::hash_join::PartitionMode;
+use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion::physical_plan::parquet::ParquetPartition;
 use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
 use datafusion::physical_plan::window_functions::{
     BuiltInWindowFunction, WindowFunction,
@@ -129,17 +133,23 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
                 )?))
             }
             PhysicalPlanType::ParquetScan(scan) => {
+                let partitions = scan
+                    .partitions
+                    .iter()
+                    .map(|p| p.try_into())
+                    .collect::<Result<Vec<ParquetPartition>, _>>()?;
+                let schema = Arc::new(convert_required!(scan.schema)?);
                 let projection = scan.projection.iter().map(|i| *i as usize).collect();
-                let filenames: Vec<&str> =
-                    scan.filename.iter().map(|s| s.as_str()).collect();
-                Ok(Arc::new(ParquetExec::try_from_files(
-                    &filenames,
+                Ok(Arc::new(ParquetExec::new(
+                    partitions,
+                    schema,
                     Some(projection),
+                    Statistics::default(),
+                    ExecutionPlanMetricsSet::new(),
                     None,
                     scan.batch_size as usize,
-                    scan.num_partitions as usize,
                     None,
-                )?))
+                )))
             }
             PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
                 let input: Arc<dyn ExecutionPlan> =
@@ -470,6 +480,23 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
     }
 }
 
+impl TryInto<ParquetPartition> for &protobuf::ParquetPartition {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<ParquetPartition, Self::Error> {
+        let files = self
+            .files
+            .iter()
+            .map(|f| f.try_into())
+            .collect::<Result<Vec<_>, _>>()?;
+        Ok(ParquetPartition::new(
+            files,
+            self.index as usize,
+            ExecutionPlanMetricsSet::new(),
+        ))
+    }
+}
+
 impl From<&protobuf::PhysicalColumn> for Column {
     fn from(c: &protobuf::PhysicalColumn) -> Column {
         Column::new(&c.name, c.index as usize)
@@ -620,6 +647,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
 
                 let catalog_list =
                     Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
+
                 let ctx_state = ExecutionContextState {
                     catalog_list,
                     scalar_functions: Default::default(),
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 8d8f917..e7d4ac6 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -38,7 +38,7 @@ use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::hash_aggregate::AggregateMode;
 use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
 use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
-use datafusion::physical_plan::parquet::ParquetExec;
+use datafusion::physical_plan::parquet::{ParquetExec, ParquetPartition};
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sort::SortExec;
 use datafusion::{
@@ -268,22 +268,19 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
                 )),
             })
         } else if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
-            let filenames = exec
-                .partitions()
-                .iter()
-                .flat_map(|part| part.filenames().to_owned())
-                .collect();
+            let partitions = exec.partitions().iter().map(|p| p.into()).collect();
+
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
                     protobuf::ParquetScanExecNode {
-                        filename: filenames,
+                        partitions,
+                        schema: Some(exec.schema.as_ref().into()),
                         projection: exec
                             .projection()
                             .as_ref()
                             .iter()
                             .map(|n| *n as u32)
                             .collect(),
-                        num_partitions: exec.partitions().len() as u32,
                         batch_size: exec.batch_size() as u32,
                     },
                 )),
@@ -621,6 +618,16 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
     }
 }
 
+impl From<&ParquetPartition> for protobuf::ParquetPartition {
+    fn from(p: &ParquetPartition) -> protobuf::ParquetPartition {
+        let files = p.file_partition.files.iter().map(|f| f.into()).collect();
+        protobuf::ParquetPartition {
+            index: p.file_partition.index as u32,
+            files,
+        }
+    }
+}
+
 fn try_parse_when_then_expr(
     when_expr: &Arc<dyn PhysicalExpr>,
     then_expr: &Arc<dyn PhysicalExpr>,
diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs
index 00f2c98..f03d08b 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -82,7 +82,7 @@ use self::state::{ConfigBackendClient, SchedulerState};
 use ballista_core::config::BallistaConfig;
 use ballista_core::execution_plans::ShuffleWriterExec;
 use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
-use datafusion::physical_plan::parquet::ParquetExec;
+use datafusion::datasource::parquet::ParquetTableDescriptor;
 use datafusion::prelude::{ExecutionConfig, ExecutionContext};
 use std::time::{Instant, SystemTime, UNIX_EPOCH};
 
@@ -282,24 +282,25 @@ impl SchedulerGrpc for SchedulerServer {
 
         match file_type {
             FileType::Parquet => {
-                let parquet_exec =
-                    ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
-                        .map_err(|e| {
-                            let msg = format!("Error opening parquet files: {}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
+                let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| {
+                    let msg = format!("Error opening parquet files: {}", e);
+                    error!("{}", msg);
+                    tonic::Status::internal(msg)
+                })?;
+
+                let partitions = parquet_desc
+                    .descriptor
+                    .partition_files
+                    .iter()
+                    .map(|pf| FilePartitionMetadata {
+                        filename: vec![pf.path.clone()],
+                    })
+                    .collect();
 
                 //TODO include statistics and any other info needed to reconstruct ParquetExec
                 Ok(Response::new(GetFileMetadataResult {
-                    schema: Some(parquet_exec.schema().as_ref().into()),
-                    partitions: parquet_exec
-                        .partitions()
-                        .iter()
-                        .map(|part| FilePartitionMetadata {
-                            filename: part.filenames().to_vec(),
-                        })
-                        .collect(),
+                    schema: Some(parquet_desc.schema().as_ref().into()),
+                    partitions,
                 }))
             }
             //TODO implement for CSV
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index 9699a99..d5e2952 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -27,6 +27,13 @@ pub mod parquet;
 pub use self::csv::{CsvFile, CsvReadOptions};
 pub use self::datasource::{TableProvider, TableType};
 pub use self::memory::MemTable;
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::build_file_list;
+use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
+use crate::physical_plan::Accumulator;
+use std::sync::Arc;
 
 /// Source for table input data
 pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
@@ -36,3 +43,252 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub path: String,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    // Values of partition columns to be appended to each row
+    // pub partition_value: Option<Vec<ScalarValue>>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(path: String) -> Self {
+        Self {
+            path,
+            statistics: Default::default(),
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct TableDescriptor {
+    /// root path of the table
+    pub path: String,
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Returned partitioned file with its schema
+pub struct FileAndSchema {
+    file: PartitionedFile,
+    schema: Schema,
+}
+
+/// Builder for ['TableDescriptor'] inside given path
+pub trait TableDescriptorBuilder {
+    /// Construct a ['TableDescriptor'] from the provided path
+    fn build_table_desc(
+        path: &str,
+        ext: &str,
+        provided_schema: Option<Schema>,
+        collect_statistics: bool,
+    ) -> Result<TableDescriptor> {
+        let filenames = build_file_list(path, ext)?;
+        if filenames.is_empty() {
+            return Err(DataFusionError::Plan(format!(
+                "No file (with .{} extension) found at path {}",
+                ext, path
+            )));
+        }
+
+        // build a list of partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+        let mut contains_file = false;
+
+        let partitioned_files = filenames
+            .iter()
+            .map(|file_path| {
+                contains_file = true;
+                let result = if collect_statistics {
+                    let FileAndSchema {file, schema} = Self::file_meta(file_path)?;
+                    if schemas.is_empty() {
+                        schemas.push(schema);
+                    } else if schema != schemas[0] {
+                        // we currently get the schema information from the first file rather than do
+                        // schema merging and this is a limitation.
+                        // See https://issues.apache.org/jira/browse/ARROW-11017
+                        return Err(DataFusionError::Plan(format!(
+                            "The file {} have different schema from the first file and DataFusion does \
+                        not yet support schema merging",
+                            file_path
+                        )));
+                    }
+                    file
+                } else {
+                    PartitionedFile {
+                        path: file_path.to_owned(),
+                        statistics: Statistics::default(),
+                    }
+                };
+
+                Ok(result)
+            }).collect::<Result<Vec<PartitionedFile>>>();
+
+        if !contains_file {
+            return Err(DataFusionError::Plan(format!(
+                "No file (with .{} extension) found at path {}",
+                ext, path
+            )));
+        }
+
+        let result_schema = provided_schema.unwrap_or_else(|| schemas.pop().unwrap());
+
+        Ok(TableDescriptor {
+            path: path.to_string(),
+            partition_files: partitioned_files?,
+            schema: Arc::new(result_schema),
+        })
+    }
+
+    /// Get all metadata for a source file, including schema, statistics, partitions, etc.
+    fn file_meta(path: &str) -> Result<FileAndSchema>;
+}
+
+/// Get all files as well as the summary statistic
+/// if the optional `limit` is provided, includes only sufficient files
+/// needed to read up to `limit` number of rows
+pub fn get_statistics_with_limit(
+    table_desc: &TableDescriptor,
+    limit: Option<usize>,
+) -> (Vec<PartitionedFile>, Statistics) {
+    let mut all_files = table_desc.partition_files.clone();
+    let schema = table_desc.schema.clone();
+
+    let mut total_byte_size = 0;
+    let mut null_counts = vec![0; schema.fields().len()];
+    let mut has_statistics = false;
+    let (mut max_values, mut min_values) = create_max_min_accs(&schema);
+
+    let mut num_rows = 0;
+    let mut num_files = 0;
+    for file in &all_files {
+        num_files += 1;
+        let file_stats = &file.statistics;
+        num_rows += file_stats.num_rows.unwrap_or(0);
+        total_byte_size += file_stats.total_byte_size.unwrap_or(0);
+        if let Some(vec) = &file_stats.column_statistics {
+            has_statistics = true;
+            for (i, cs) in vec.iter().enumerate() {
+                null_counts[i] += cs.null_count.unwrap_or(0);
+
+                if let Some(max_value) = &mut max_values[i] {
+                    if let Some(file_max) = cs.max_value.clone() {
+                        match max_value.update(&[file_max]) {
+                            Ok(_) => {}
+                            Err(_) => {
+                                max_values[i] = None;
+                            }
+                        }
+                    }
+                }
+
+                if let Some(min_value) = &mut min_values[i] {
+                    if let Some(file_min) = cs.min_value.clone() {
+                        match min_value.update(&[file_min]) {
+                            Ok(_) => {}
+                            Err(_) => {
+                                min_values[i] = None;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        if num_rows > limit.unwrap_or(usize::MAX) {
+            break;
+        }
+    }
+    all_files.truncate(num_files);
+
+    let column_stats = if has_statistics {
+        Some(get_col_stats(
+            &*schema,
+            null_counts,
+            &mut max_values,
+            &mut min_values,
+        ))
+    } else {
+        None
+    };
+
+    let statistics = Statistics {
+        num_rows: Some(num_rows as usize),
+        total_byte_size: Some(total_byte_size as usize),
+        column_statistics: column_stats,
+    };
+    (all_files, statistics)
+}
+
+fn create_max_min_accs(
+    schema: &Schema,
+) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
+    let max_values: Vec<Option<MaxAccumulator>> = schema
+        .fields()
+        .iter()
+        .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
+        .collect::<Vec<_>>();
+    let min_values: Vec<Option<MinAccumulator>> = schema
+        .fields()
+        .iter()
+        .map(|field| MinAccumulator::try_new(field.data_type()).ok())
+        .collect::<Vec<_>>();
+    (max_values, min_values)
+}
+
+fn get_col_stats(
+    schema: &Schema,
+    null_counts: Vec<usize>,
+    max_values: &mut Vec<Option<MaxAccumulator>>,
+    min_values: &mut Vec<Option<MinAccumulator>>,
+) -> Vec<ColumnStatistics> {
+    (0..schema.fields().len())
+        .map(|i| {
+            let max_value = match &max_values[i] {
+                Some(max_value) => max_value.evaluate().ok(),
+                None => None,
+            };
+            let min_value = match &min_values[i] {
+                Some(min_value) => min_value.evaluate().ok(),
+                None => None,
+            };
+            ColumnStatistics {
+                null_count: Some(null_counts[i] as usize),
+                max_value,
+                min_value,
+                distinct_count: None,
+            }
+        })
+        .collect()
+}
diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs
index a9b4c0f..c11aade 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -18,25 +18,32 @@
 //! Parquet data source
 
 use std::any::Any;
-use std::string::String;
+use std::fs::File;
 use std::sync::Arc;
 
-use arrow::datatypes::*;
+use parquet::arrow::ArrowReader;
+use parquet::arrow::ParquetFileArrowReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use parquet::file::statistics::Statistics as ParquetStatistics;
 
+use super::datasource::TableProviderFilterPushDown;
+use crate::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use crate::datasource::datasource::Statistics;
-use crate::datasource::TableProvider;
+use crate::datasource::{
+    create_max_min_accs, get_col_stats, get_statistics_with_limit, FileAndSchema,
+    PartitionedFile, TableDescriptor, TableDescriptorBuilder, TableProvider,
+};
 use crate::error::Result;
 use crate::logical_plan::{combine_filters, Expr};
+use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
 use crate::physical_plan::parquet::ParquetExec;
-use crate::physical_plan::ExecutionPlan;
-
-use super::datasource::TableProviderFilterPushDown;
+use crate::physical_plan::{Accumulator, ExecutionPlan};
+use crate::scalar::ScalarValue;
 
 /// Table-based representation of a `ParquetFile`.
 pub struct ParquetTable {
-    path: String,
-    schema: SchemaRef,
-    statistics: Statistics,
+    /// Descriptor of the table, including schema, files, etc.
+    pub desc: Arc<ParquetTableDescriptor>,
     max_partitions: usize,
     enable_pruning: bool,
 }
@@ -45,12 +52,9 @@ impl ParquetTable {
     /// Attempt to initialize a new `ParquetTable` from a file path.
     pub fn try_new(path: impl Into<String>, max_partitions: usize) -> Result<Self> {
         let path = path.into();
-        let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?;
-        let schema = parquet_exec.schema();
+        let table_desc = ParquetTableDescriptor::new(path.as_str());
         Ok(Self {
-            path,
-            schema,
-            statistics: parquet_exec.statistics().to_owned(),
+            desc: Arc::new(table_desc?),
             max_partitions,
             enable_pruning: true,
         })
@@ -65,29 +69,34 @@ impl ParquetTable {
         collect_statistics: bool,
     ) -> Result<Self> {
         let path = path.into();
-        if collect_statistics {
-            let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?;
-            Ok(Self {
-                path,
-                schema: Arc::new(schema),
-                statistics: parquet_exec.statistics().to_owned(),
-                max_partitions,
-                enable_pruning: true,
-            })
-        } else {
-            Ok(Self {
-                path,
-                schema: Arc::new(schema),
-                statistics: Statistics::default(),
-                max_partitions,
-                enable_pruning: true,
-            })
-        }
+        let table_desc = ParquetTableDescriptor::new_with_schema(
+            path.as_str(),
+            Some(schema),
+            collect_statistics,
+        );
+        Ok(Self {
+            desc: Arc::new(table_desc?),
+            max_partitions,
+            enable_pruning: true,
+        })
+    }
+
+    /// Attempt to initialize a new `ParquetTable` from a table descriptor.
+    pub fn try_new_with_desc(
+        desc: Arc<ParquetTableDescriptor>,
+        max_partitions: usize,
+        enable_pruning: bool,
+    ) -> Result<Self> {
+        Ok(Self {
+            desc,
+            max_partitions,
+            enable_pruning,
+        })
     }
 
     /// Get the path for the Parquet file(s) represented by this ParquetTable instance
     pub fn path(&self) -> &str {
-        &self.path
+        &self.desc.descriptor.path
     }
 
     /// Get parquet pruning option
@@ -109,7 +118,7 @@ impl TableProvider for ParquetTable {
 
     /// Get the schema for this parquet file.
     fn schema(&self) -> SchemaRef {
-        self.schema.clone()
+        self.desc.schema()
     }
 
     fn supports_filter_pushdown(
@@ -136,8 +145,8 @@ impl TableProvider for ParquetTable {
         } else {
             None
         };
-        Ok(Arc::new(ParquetExec::try_from_path(
-            &self.path,
+        Ok(Arc::new(ParquetExec::try_new(
+            self.desc.clone(),
             projection.clone(),
             predicate,
             limit
@@ -149,7 +158,7 @@ impl TableProvider for ParquetTable {
     }
 
     fn statistics(&self) -> Statistics {
-        self.statistics.clone()
+        self.desc.statistics()
     }
 
     fn has_exact_statistics(&self) -> bool {
@@ -157,6 +166,253 @@ impl TableProvider for ParquetTable {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Descriptor for a parquet root path
+pub struct ParquetTableDescriptor {
+    /// metadata for files inside the root path
+    pub descriptor: TableDescriptor,
+}
+
+impl ParquetTableDescriptor {
+    /// Construct a new parquet descriptor for a root path
+    pub fn new(root_path: &str) -> Result<Self> {
+        let table_desc = Self::build_table_desc(root_path, "parquet", None, true);
+        Ok(Self {
+            descriptor: table_desc?,
+        })
+    }
+
+    /// Construct a new parquet descriptor for a root path with known schema
+    pub fn new_with_schema(
+        root_path: &str,
+        schema: Option<Schema>,
+        collect_statistics: bool,
+    ) -> Result<Self> {
+        let table_desc =
+            Self::build_table_desc(root_path, "parquet", schema, collect_statistics);
+        Ok(Self {
+            descriptor: table_desc?,
+        })
+    }
+
+    /// Get file schema for all parquet files
+    pub fn schema(&self) -> SchemaRef {
+        self.descriptor.schema.clone()
+    }
+
+    /// Get the summary statistics for all parquet files
+    pub fn statistics(&self) -> Statistics {
+        get_statistics_with_limit(&self.descriptor, None).1
+    }
+
+    fn summarize_min_max(
+        max_values: &mut Vec<Option<MaxAccumulator>>,
+        min_values: &mut Vec<Option<MinAccumulator>>,
+        fields: &[Field],
+        i: usize,
+        stat: &ParquetStatistics,
+    ) {
+        match stat {
+            ParquetStatistics::Boolean(s) => {
+                if let DataType::Boolean = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value
+                                .update(&[ScalarValue::Boolean(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value
+                                .update(&[ScalarValue::Boolean(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Int32(s) => {
+                if let DataType::Int32 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value.update(&[ScalarValue::Int32(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value.update(&[ScalarValue::Int32(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Int64(s) => {
+                if let DataType::Int64 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value.update(&[ScalarValue::Int64(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value.update(&[ScalarValue::Int64(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Float(s) => {
+                if let DataType::Float32 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value
+                                .update(&[ScalarValue::Float32(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value
+                                .update(&[ScalarValue::Float32(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Double(s) => {
+                if let DataType::Float64 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value
+                                .update(&[ScalarValue::Float64(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value
+                                .update(&[ScalarValue::Float64(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            _ => {}
+        }
+    }
+}
+
+impl TableDescriptorBuilder for ParquetTableDescriptor {
+    fn file_meta(path: &str) -> Result<FileAndSchema> {
+        let file = File::open(path)?;
+        let file_reader = Arc::new(SerializedFileReader::new(file)?);
+        let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+        let path = path.to_string();
+        let schema = arrow_reader.get_schema()?;
+        let num_fields = schema.fields().len();
+        let fields = schema.fields().to_vec();
+        let meta_data = arrow_reader.get_metadata();
+
+        let mut num_rows = 0;
+        let mut total_byte_size = 0;
+        let mut null_counts = vec![0; num_fields];
+        let mut has_statistics = false;
+
+        let (mut max_values, mut min_values) = create_max_min_accs(&schema);
+
+        for row_group_meta in meta_data.row_groups() {
+            num_rows += row_group_meta.num_rows();
+            total_byte_size += row_group_meta.total_byte_size();
+
+            let columns_null_counts = row_group_meta
+                .columns()
+                .iter()
+                .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
+
+            for (i, cnt) in columns_null_counts.enumerate() {
+                null_counts[i] += cnt as usize
+            }
+
+            for (i, column) in row_group_meta.columns().iter().enumerate() {
+                if let Some(stat) = column.statistics() {
+                    has_statistics = true;
+                    ParquetTableDescriptor::summarize_min_max(
+                        &mut max_values,
+                        &mut min_values,
+                        &fields,
+                        i,
+                        stat,
+                    )
+                }
+            }
+        }
+
+        let column_stats = if has_statistics {
+            Some(get_col_stats(
+                &schema,
+                null_counts,
+                &mut max_values,
+                &mut min_values,
+            ))
+        } else {
+            None
+        };
+
+        let statistics = Statistics {
+            num_rows: Some(num_rows as usize),
+            total_byte_size: Some(total_byte_size as usize),
+            column_statistics: column_stats,
+        };
+
+        Ok(FileAndSchema {
+            file: PartitionedFile { path, statistics },
+            schema,
+        })
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs
index a983af4..fd86504 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -110,6 +110,7 @@ mod tests {
 
     use super::*;
     use crate::datasource::datasource::Statistics;
+    use crate::datasource::PartitionedFile;
     use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
     use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
     use crate::physical_plan::projection::ProjectionExec;
@@ -122,12 +123,13 @@ mod tests {
             vec![],
             Arc::new(ParquetExec::new(
                 vec![ParquetPartition::new(
-                    vec!["x".to_string()],
-                    Statistics::default(),
+                    vec![PartitionedFile::from("x".to_string())],
+                    0,
                     metrics.clone(),
                 )],
                 schema,
                 None,
+                Statistics::default(),
                 metrics,
                 None,
                 2048,
@@ -162,12 +164,13 @@ mod tests {
                 vec![],
                 Arc::new(ParquetExec::new(
                     vec![ParquetPartition::new(
-                        vec!["x".to_string()],
-                        Statistics::default(),
+                        vec![PartitionedFile::from("x".to_string())],
+                        0,
                         metrics.clone(),
                     )],
                     schema,
                     None,
+                    Statistics::default(),
                     metrics,
                     None,
                     2048,
diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs
index ac1655b..eb8f927 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -27,14 +27,14 @@ use crate::{
     logical_plan::{Column, Expr},
     physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
     physical_plan::{
-        common, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
+        DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
     },
     scalar::ScalarValue,
 };
 
 use arrow::{
     array::ArrayRef,
-    datatypes::{DataType, Schema, SchemaRef},
+    datatypes::{Schema, SchemaRef},
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
@@ -53,21 +53,21 @@ use tokio::{
     task,
 };
 
-use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::datasource::datasource::Statistics;
 use async_trait::async_trait;
 
 use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::stream::RecordBatchReceiverStream;
-use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::Accumulator;
+use crate::datasource::parquet::ParquetTableDescriptor;
+use crate::datasource::{get_statistics_with_limit, FilePartition, PartitionedFile};
 
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
 pub struct ParquetExec {
     /// Parquet partitions to read
-    partitions: Vec<ParquetPartition>,
+    pub partitions: Vec<ParquetPartition>,
     /// Schema after projection is applied
-    schema: SchemaRef,
+    pub schema: SchemaRef,
     /// Projection for which columns to load
     projection: Vec<usize>,
     /// Batch size
@@ -94,9 +94,7 @@ pub struct ParquetExec {
 #[derive(Debug, Clone)]
 pub struct ParquetPartition {
     /// The Parquet filename for this partition
-    pub filenames: Vec<String>,
-    /// Statistics for this partition
-    pub statistics: Statistics,
+    pub file_partition: FilePartition,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -104,7 +102,7 @@ pub struct ParquetPartition {
 /// Stores metrics about the parquet execution for a particular parquet file
 #[derive(Debug, Clone)]
 struct ParquetFileMetrics {
-    /// Numer of times the predicate could not be evaluated
+    /// Number of times the predicate could not be evaluated
     pub predicate_evaluation_errors: metrics::Count,
     /// Number of row groups pruned using
     pub row_groups_pruned: metrics::Count,
@@ -123,290 +121,42 @@ impl ParquetExec {
     ) -> Result<Self> {
         // build a list of filenames from the specified path, which could be a single file or
         // a directory containing one or more parquet files
-        let filenames = common::build_file_list(path, ".parquet")?;
-        if filenames.is_empty() {
-            Err(DataFusionError::Plan(format!(
-                "No Parquet files (with .parquet extension) found at path {}",
-                path
-            )))
-        } else {
-            let filenames = filenames
-                .iter()
-                .map(|filename| filename.as_str())
-                .collect::<Vec<&str>>();
-            Self::try_from_files(
-                &filenames,
-                projection,
-                predicate,
-                batch_size,
-                max_partitions,
-                limit,
-            )
-        }
+        let table_desc = ParquetTableDescriptor::new(path)?;
+        Self::try_new(
+            Arc::new(table_desc),
+            projection,
+            predicate,
+            batch_size,
+            max_partitions,
+            limit,
+        )
     }
 
-    /// Create a new Parquet reader execution plan based on the specified list of Parquet
-    /// files
-    pub fn try_from_files(
-        filenames: &[&str],
+    /// Create a new Parquet reader execution plan with root descriptor, provided partitions and schema
+    pub fn try_new(
+        desc: Arc<ParquetTableDescriptor>,
         projection: Option<Vec<usize>>,
         predicate: Option<Expr>,
         batch_size: usize,
         max_partitions: usize,
         limit: Option<usize>,
     ) -> Result<Self> {
-        debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
-               filenames, projection, predicate, limit);
-        // build a list of Parquet partitions with statistics and gather all unique schemas
-        // used in this data set
-        let metrics = ExecutionPlanMetricsSet::new();
-        let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(max_partitions);
-        let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect();
-        let chunks = split_files(&filenames, max_partitions);
-        let mut num_rows = 0;
-        let mut num_fields = 0;
-        let mut fields = Vec::new();
-        let mut total_byte_size = 0;
-        let mut null_counts = Vec::new();
-        let mut max_values: Vec<Option<MaxAccumulator>> = Vec::new();
-        let mut min_values: Vec<Option<MinAccumulator>> = Vec::new();
-        let mut limit_exhausted = false;
-        for chunk in chunks {
-            let mut filenames: Vec<String> =
-                chunk.iter().map(|x| x.to_string()).collect();
-            let mut total_files = 0;
-            for filename in &filenames {
-                total_files += 1;
-                let file = File::open(filename)?;
-                let file_reader = Arc::new(SerializedFileReader::new(file)?);
-                let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-                let meta_data = arrow_reader.get_metadata();
-                // collect all the unique schemas in this data set
-                let schema = arrow_reader.get_schema()?;
-                if schemas.is_empty() || schema != schemas[0] {
-                    fields = schema.fields().to_vec();
-                    num_fields = schema.fields().len();
-                    null_counts = vec![0; num_fields];
-                    max_values = schema
-                        .fields()
-                        .iter()
-                        .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
-                        .collect::<Vec<_>>();
-                    min_values = schema
-                        .fields()
-                        .iter()
-                        .map(|field| MinAccumulator::try_new(field.data_type()).ok())
-                        .collect::<Vec<_>>();
-                    schemas.push(schema);
-                }
-
-                for row_group_meta in meta_data.row_groups() {
-                    num_rows += row_group_meta.num_rows();
-                    total_byte_size += row_group_meta.total_byte_size();
+        debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
+               desc, projection, predicate, limit);
 
-                    // Currently assumes every Parquet file has same schema
-                    // https://issues.apache.org/jira/browse/ARROW-11017
-                    let columns_null_counts = row_group_meta
-                        .columns()
-                        .iter()
-                        .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
-
-                    for (i, cnt) in columns_null_counts.enumerate() {
-                        null_counts[i] += cnt
-                    }
-
-                    for (i, column) in row_group_meta.columns().iter().enumerate() {
-                        if let Some(stat) = column.statistics() {
-                            match stat {
-                                ParquetStatistics::Boolean(s) => {
-                                    if let DataType::Boolean = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Boolean(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Boolean(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Int32(s) => {
-                                    if let DataType::Int32 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Int32(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Int32(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Int64(s) => {
-                                    if let DataType::Int64 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Int64(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Int64(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Float(s) => {
-                                    if let DataType::Float32 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Float32(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Float32(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                ParquetStatistics::Double(s) => {
-                                    if let DataType::Float64 = fields[i].data_type() {
-                                        if s.has_min_max_set() {
-                                            if let Some(max_value) = &mut max_values[i] {
-                                                match max_value.update(&[
-                                                    ScalarValue::Float64(Some(*s.max())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        max_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                            if let Some(min_value) = &mut min_values[i] {
-                                                match min_value.update(&[
-                                                    ScalarValue::Float64(Some(*s.min())),
-                                                ]) {
-                                                    Ok(_) => {}
-                                                    Err(_) => {
-                                                        min_values[i] = None;
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                                _ => {}
-                            }
-                        }
-                    }
+        let metrics = ExecutionPlanMetricsSet::new();
+        let (all_files, statistics) = get_statistics_with_limit(&desc.descriptor, limit);
+        let schema = desc.schema();
 
-                    if limit.map(|x| num_rows >= x as i64).unwrap_or(false) {
-                        limit_exhausted = true;
-                        break;
-                    }
-                }
-            }
-            let column_stats = (0..num_fields)
-                .map(|i| {
-                    let max_value = match &max_values[i] {
-                        Some(max_value) => max_value.evaluate().ok(),
-                        None => None,
-                    };
-                    let min_value = match &min_values[i] {
-                        Some(min_value) => min_value.evaluate().ok(),
-                        None => None,
-                    };
-                    ColumnStatistics {
-                        null_count: Some(null_counts[i] as usize),
-                        max_value,
-                        min_value,
-                        distinct_count: None,
-                    }
-                })
-                .collect();
-
-            let statistics = Statistics {
-                num_rows: Some(num_rows as usize),
-                total_byte_size: Some(total_byte_size as usize),
-                column_statistics: Some(column_stats),
-            };
-            // remove files that are not needed in case of limit
-            filenames.truncate(total_files);
+        let mut partitions = Vec::with_capacity(max_partitions);
+        let chunked_files = split_files(&all_files, max_partitions);
+        for (index, group) in chunked_files.iter().enumerate() {
             partitions.push(ParquetPartition::new(
-                filenames,
-                statistics,
+                Vec::from(*group),
+                index,
                 metrics.clone(),
             ));
-            if limit_exhausted {
-                break;
-            }
-        }
-
-        // we currently get the schema information from the first file rather than do
-        // schema merging and this is a limitation.
-        // See https://issues.apache.org/jira/browse/ARROW-11017
-        if schemas.len() > 1 {
-            return Err(DataFusionError::Plan(format!(
-                "The Parquet files have {} different schemas and DataFusion does \
-                not yet support schema merging",
-                schemas.len()
-            )));
         }
-        let schema = Arc::new(schemas.pop().unwrap());
 
         let metrics = ExecutionPlanMetricsSet::new();
         let predicate_creation_errors =
@@ -430,6 +180,7 @@ impl ParquetExec {
             partitions,
             schema,
             projection,
+            statistics,
             metrics,
             predicate_builder,
             batch_size,
@@ -438,10 +189,12 @@ impl ParquetExec {
     }
 
     /// Create a new Parquet reader execution plan with provided partitions and schema
+    #[allow(clippy::too_many_arguments)]
     pub fn new(
         partitions: Vec<ParquetPartition>,
         schema: SchemaRef,
         projection: Option<Vec<usize>>,
+        statistics: Statistics,
         metrics: ExecutionPlanMetricsSet,
         predicate_builder: Option<PruningPredicate>,
         batch_size: usize,
@@ -459,94 +212,20 @@ impl ParquetExec {
                 .collect(),
         );
 
-        // sum the statistics
-        let mut num_rows: Option<usize> = None;
-        let mut total_byte_size: Option<usize> = None;
-        let mut null_counts: Vec<usize> = vec![0; schema.fields().len()];
-        let mut has_statistics = false;
-        let mut max_values = schema
-            .fields()
-            .iter()
-            .map(|field| MaxAccumulator::try_new(field.data_type()).ok())
-            .collect::<Vec<_>>();
-        let mut min_values = schema
-            .fields()
-            .iter()
-            .map(|field| MinAccumulator::try_new(field.data_type()).ok())
-            .collect::<Vec<_>>();
-        for part in &partitions {
-            if let Some(n) = part.statistics.num_rows {
-                num_rows = Some(num_rows.unwrap_or(0) + n)
-            }
-            if let Some(n) = part.statistics.total_byte_size {
-                total_byte_size = Some(total_byte_size.unwrap_or(0) + n)
+        let new_column_statistics = statistics.column_statistics.map(|stats| {
+            let mut projected_stats = Vec::with_capacity(projection.len());
+            for proj in &projection {
+                projected_stats.push(stats[*proj].clone());
             }
-            if let Some(x) = &part.statistics.column_statistics {
-                let part_nulls: Vec<Option<usize>> =
-                    x.iter().map(|c| c.null_count).collect();
-                has_statistics = true;
-
-                let part_max_values: Vec<Option<ScalarValue>> =
-                    x.iter().map(|c| c.max_value.clone()).collect();
-                let part_min_values: Vec<Option<ScalarValue>> =
-                    x.iter().map(|c| c.min_value.clone()).collect();
-
-                for &i in projection.iter() {
-                    null_counts[i] = part_nulls[i].unwrap_or(0);
-                    if let Some(part_max_value) = part_max_values[i].clone() {
-                        if let Some(max_value) = &mut max_values[i] {
-                            match max_value.update(&[part_max_value]) {
-                                Ok(_) => {}
-                                Err(_) => {
-                                    max_values[i] = None;
-                                }
-                            }
-                        }
-                    }
-                    if let Some(part_min_value) = part_min_values[i].clone() {
-                        if let Some(min_value) = &mut min_values[i] {
-                            match min_value.update(&[part_min_value]) {
-                                Ok(_) => {}
-                                Err(_) => {
-                                    min_values[i] = None;
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        let column_stats = if has_statistics {
-            Some(
-                (0..schema.fields().len())
-                    .map(|i| {
-                        let max_value = match &max_values[i] {
-                            Some(max_value) => max_value.evaluate().ok(),
-                            None => None,
-                        };
-                        let min_value = match &min_values[i] {
-                            Some(min_value) => min_value.evaluate().ok(),
-                            None => None,
-                        };
-                        ColumnStatistics {
-                            null_count: Some(null_counts[i] as usize),
-                            max_value,
-                            min_value,
-                            distinct_count: None,
-                        }
-                    })
-                    .collect(),
-            )
-        } else {
-            None
-        };
+            projected_stats
+        });
 
         let statistics = Statistics {
-            num_rows,
-            total_byte_size,
-            column_statistics: column_stats,
+            num_rows: statistics.num_rows,
+            total_byte_size: statistics.total_byte_size,
+            column_statistics: new_column_statistics,
         };
+
         Self {
             partitions,
             schema: Arc::new(projected_schema),
@@ -583,26 +262,15 @@ impl ParquetExec {
 impl ParquetPartition {
     /// Create a new parquet partition
     pub fn new(
-        filenames: Vec<String>,
-        statistics: Statistics,
+        files: Vec<PartitionedFile>,
+        index: usize,
         metrics: ExecutionPlanMetricsSet,
     ) -> Self {
         Self {
-            filenames,
-            statistics,
+            file_partition: FilePartition { index, files },
             metrics,
         }
     }
-
-    /// The Parquet filename for this partition
-    pub fn filenames(&self) -> &[String] {
-        &self.filenames
-    }
-
-    /// Statistics for this partition
-    pub fn statistics(&self) -> &Statistics {
-        &self.statistics
-    }
 }
 
 impl ParquetFileMetrics {
@@ -662,7 +330,7 @@ impl ExecutionPlan for ParquetExec {
         }
     }
 
-    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+    async fn execute(&self, partition_index: usize) -> Result<SendableRecordBatchStream> {
         // because the parquet implementation is not thread-safe, it is necessary to execute
         // on a thread and communicate with channels
         let (response_tx, response_rx): (
@@ -670,8 +338,7 @@ impl ExecutionPlan for ParquetExec {
             Receiver<ArrowResult<RecordBatch>>,
         ) = channel(2);
 
-        let parquet_partition = &self.partitions[partition];
-        let filenames = parquet_partition.filenames.clone();
+        let partition = self.partitions[partition_index].clone();
         let metrics = self.metrics.clone();
         let projection = self.projection.clone();
         let predicate_builder = self.predicate_builder.clone();
@@ -679,9 +346,9 @@ impl ExecutionPlan for ParquetExec {
         let limit = self.limit;
 
         task::spawn_blocking(move || {
-            if let Err(e) = read_files(
+            if let Err(e) = read_partition(
+                partition_index,
                 partition,
-                &filenames,
                 metrics,
                 &projection,
                 &predicate_builder,
@@ -706,9 +373,7 @@ impl ExecutionPlan for ParquetExec {
                 let files: Vec<_> = self
                     .partitions
                     .iter()
-                    .map(|pp| pp.filenames.iter())
-                    .flatten()
-                    .map(|s| s.as_str())
+                    .map(|pp| format!("{}", pp.file_partition))
                     .collect();
 
                 write!(
@@ -838,7 +503,7 @@ fn build_row_group_predicate(
     match predicate_values {
         Ok(values) => {
             // NB: false means don't scan row group
-            let num_pruned = values.iter().filter(|&v| !v).count();
+            let num_pruned = values.iter().filter(|&v| !*v).count();
             metrics.row_groups_pruned.add(num_pruned);
             Box::new(move |_, i| values[i])
         }
@@ -853,9 +518,9 @@ fn build_row_group_predicate(
 }
 
 #[allow(clippy::too_many_arguments)]
-fn read_files(
-    partition: usize,
-    filenames: &[String],
+fn read_partition(
+    partition_index: usize,
+    partition: ParquetPartition,
     metrics: ExecutionPlanMetricsSet,
     projection: &[usize],
     predicate_builder: &Option<PruningPredicate>,
@@ -864,9 +529,11 @@ fn read_files(
     limit: Option<usize>,
 ) -> Result<()> {
     let mut total_rows = 0;
-    'outer: for filename in filenames {
-        let file_metrics = ParquetFileMetrics::new(partition, filename, &metrics);
-        let file = File::open(&filename)?;
+    let all_files = partition.file_partition.files;
+    'outer: for partitioned_file in all_files {
+        let file_metrics =
+            ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics);
+        let file = File::open(partitioned_file.path.as_str())?;
         let mut file_reader = SerializedFileReader::new(file)?;
         if let Some(predicate_builder) = predicate_builder {
             let row_group_predicate = build_row_group_predicate(
@@ -894,7 +561,7 @@ fn read_files(
                 Some(Err(e)) => {
                     let err_msg = format!(
                         "Error reading batch from {}: {}",
-                        filename,
+                        partitioned_file,
                         e.to_string()
                     );
                     // send error to operator
@@ -914,12 +581,15 @@ fn read_files(
     Ok(())
 }
 
-fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> {
-    let mut chunk_size = filenames.len() / n;
-    if filenames.len() % n > 0 {
+fn split_files(
+    partitioned_files: &[PartitionedFile],
+    n: usize,
+) -> Vec<&[PartitionedFile]> {
+    let mut chunk_size = partitioned_files.len() / n;
+    if partitioned_files.len() % n > 0 {
         chunk_size += 1;
     }
-    filenames.chunks(chunk_size).collect()
+    partitioned_files.chunks(chunk_size).collect()
 }
 
 #[cfg(test)]
@@ -935,24 +605,24 @@ mod tests {
 
     #[test]
     fn test_split_files() {
-        let filenames = vec![
-            "a".to_string(),
-            "b".to_string(),
-            "c".to_string(),
-            "d".to_string(),
-            "e".to_string(),
+        let files = vec![
+            PartitionedFile::from("a".to_string()),
+            PartitionedFile::from("b".to_string()),
+            PartitionedFile::from("c".to_string()),
+            PartitionedFile::from("d".to_string()),
+            PartitionedFile::from("e".to_string()),
         ];
 
-        let chunks = split_files(&filenames, 1);
+        let chunks = split_files(&files, 1);
         assert_eq!(1, chunks.len());
         assert_eq!(5, chunks[0].len());
 
-        let chunks = split_files(&filenames, 2);
+        let chunks = split_files(&files, 2);
         assert_eq!(2, chunks.len());
         assert_eq!(3, chunks[0].len());
         assert_eq!(2, chunks[1].len());
 
-        let chunks = split_files(&filenames, 5);
+        let chunks = split_files(&files, 5);
         assert_eq!(5, chunks.len());
         assert_eq!(1, chunks[0].len());
         assert_eq!(1, chunks[1].len());
@@ -960,7 +630,7 @@ mod tests {
         assert_eq!(1, chunks[3].len());
         assert_eq!(1, chunks[4].len());
 
-        let chunks = split_files(&filenames, 123);
+        let chunks = split_files(&files, 123);
         assert_eq!(5, chunks.len());
         assert_eq!(1, chunks[0].len());
         assert_eq!(1, chunks[1].len());