You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/23 14:11:05 UTC

[GitHub] [arrow-datafusion] yjshen opened a new pull request #932: FilePartition and partitionedFile for scanning flexibility

yjshen opened a new pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932


   # Which issue does this PR close?
   
   Closes #.
   
    # Rationale for this change
   1. For potentially finer-grained readers that parallelize even one file reading or balancing workload between scanning threads even in case of great variance in input file sizes.  As I quote @andygrove [here](https://docs.google.com/document/d/1ZEZqvdohrot0ewtTNeaBtqczOIJ1Q0OnX9PqMMxpOF8/edit?disco=AAAAN5XUero):
   > One of the current issues IMO with DataFusion is that we use "file" as the default unit of partitioning. We would be able to scale better if we had finer-grained readers such as reading Parquet row groups instead. This way we can have multiple threads reading from the same file concurrently and avoid the need to repartition first to increase concurrency.  
   
   2. Refactoring Logic in ParquetExec and parquet datasource. It's strange to call `ParquetExec:: try_from_path` to get planning-related metadata.
   
   # What changes are included in this PR?
   1. PartitionedFile -> Single file (for the moment) or part of a file (later, part of the row groups or rows), and we may even extend this to include partition value and partition schema to support partitioned tables:
   /path/to/table/root/p_date=20210813/p_hour=1200/xxxxx.parquet
   
   2. FilePartition -> The basic unit for parallel processing, each task is responsible for processing one FilePartition which is composed of several PartitionFiles.
   
   3. Telling apart the planning related code from `ParquetExec`
   
   # Are there any user-facing changes?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#issuecomment-907801177


   @houqp @alamb I've resolved the comments, PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#issuecomment-908345442


   Thanks @houqp @alamb for your great help!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#issuecomment-908345442


   Thanks @houqp @alamb for your great help!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#discussion_r697951379



##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -282,24 +282,18 @@ impl SchedulerGrpc for SchedulerServer {
 
         match file_type {
             FileType::Parquet => {
-                let parquet_exec =
-                    ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
-                        .map_err(|e| {
-                            let msg = format!("Error opening parquet files: {}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
+                let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| {
+                    let msg = format!("Error opening parquet files: {}", e);
+                    error!("{}", msg);
+                    tonic::Status::internal(msg)
+                })?;
 
                 //TODO include statistics and any other info needed to reconstruct ParquetExec
                 Ok(Response::new(GetFileMetadataResult {
-                    schema: Some(parquet_exec.schema().as_ref().into()),
-                    partitions: parquet_exec
-                        .partitions()
-                        .iter()
-                        .map(|part| FilePartitionMetadata {
-                            filename: part.filenames().to_vec(),
-                        })
-                        .collect(),
+                    schema: Some(parquet_desc.schema().as_ref().into()),
+                    partitions: vec![FilePartitionMetadata {
+                        filename: vec![path],

Review comment:
       I changed it to a vector of all the files. 
   
   However, after searching for a while in the project, I find this method may not be actually used, it's hard to understand this RPC's intention as well. Perhaps it's deprecated and we should remove it later?
   
   ```
    rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#issuecomment-908415921


   Thank you @yjshen for being patient and driving through this big change step by step :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#discussion_r698013181



##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -162,14 +162,15 @@ impl LogicalPlanBuilder {
     ) -> Result<Self> {
         let table_schema = Arc::new(table_schema.clone());
         let provider = Arc::new(EmptyTable::new(table_schema));
-        Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection)
+        Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection, None)
     }
 
     /// Convert a table provider into a builder with a TableScan
     pub fn scan(
         table_name: impl Into<String>,
         provider: Arc<dyn TableProvider>,
         projection: Option<Vec<usize>>,
+        filters: Option<Vec<Expr>>,

Review comment:
       Removed, and keep the filters not deserialized for ballista as before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#discussion_r696268779



##########
File path: ballista/rust/core/src/serde/logical_plan/to_proto.rs
##########
@@ -253,6 +256,44 @@ impl TryInto<DataType> for &protobuf::ArrowType {
     }
 }
 
+#[allow(clippy::from_over_into)]
+impl Into<protobuf::Statistics> for Statistics {
+    fn into(self) -> protobuf::Statistics {
+        let none_value = -1_i64;
+        protobuf::Statistics {
+            num_rows: self.num_rows.map(|n| n as i64).unwrap_or(none_value),
+            total_byte_size: self.total_byte_size.map(|n| n as i64).unwrap_or(none_value),
+            column_stats: vec![],

Review comment:
       what's the reason for always setting column_stats to empty vector here?

##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -282,24 +282,18 @@ impl SchedulerGrpc for SchedulerServer {
 
         match file_type {
             FileType::Parquet => {
-                let parquet_exec =
-                    ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
-                        .map_err(|e| {
-                            let msg = format!("Error opening parquet files: {}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
+                let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| {
+                    let msg = format!("Error opening parquet files: {}", e);
+                    error!("{}", msg);
+                    tonic::Status::internal(msg)
+                })?;
 
                 //TODO include statistics and any other info needed to reconstruct ParquetExec
                 Ok(Response::new(GetFileMetadataResult {
-                    schema: Some(parquet_exec.schema().as_ref().into()),
-                    partitions: parquet_exec
-                        .partitions()
-                        .iter()
-                        .map(|part| FilePartitionMetadata {
-                            filename: part.filenames().to_vec(),
-                        })
-                        .collect(),
+                    schema: Some(parquet_desc.schema().as_ref().into()),
+                    partitions: vec![FilePartitionMetadata {
+                        filename: vec![path],

Review comment:
       I remember we discussed this in the original PR. After taking a second look at the code, I am still not fully following the change here. The old behavior has `FilePartitionMetadata.filename` set to a vector of file paths returned from a directory list, while the new behavior here has the filename always set to a vector of single entry with value set to the root path of the table.
   
   Shouldn't we use `parquet_desc.descriptor.descriptor` to build the filename vector here instead?

##########
File path: ballista/rust/core/src/serde/logical_plan/from_proto.rs
##########
@@ -301,6 +315,48 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
     }
 }
 
+impl TryInto<TableDescriptor> for &protobuf::TableDescriptor {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<TableDescriptor, Self::Error> {
+        let partition_files = self
+            .partition_files
+            .iter()
+            .map(|f| f.try_into())
+            .collect::<Result<Vec<PartitionedFile>, _>>()?;
+        let schema = convert_required!(self.schema)?;
+        Ok(TableDescriptor {
+            path: self.path.to_owned(),
+            partition_files,
+            schema: Arc::new(schema),
+        })
+    }
+}
+
+impl TryInto<PartitionedFile> for &protobuf::PartitionedFile {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<PartitionedFile, Self::Error> {
+        let statistics = convert_required!(self.statistics)?;
+        Ok(PartitionedFile {
+            file_path: self.path.clone(),
+            statistics,
+        })
+    }
+}
+
+impl TryInto<Statistics> for &protobuf::Statistics {
+    type Error = BallistaError;
+
+    fn try_into(self) -> Result<Statistics, Self::Error> {
+        Ok(Statistics {
+            num_rows: Some(self.num_rows as usize),
+            total_byte_size: Some(self.total_byte_size as usize),
+            column_statistics: None,

Review comment:
       what's the reason for always setting column_statistics to None here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#issuecomment-905674626


   cc @houqp @alamb @andygrove  for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#discussion_r697951379



##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -282,24 +282,18 @@ impl SchedulerGrpc for SchedulerServer {
 
         match file_type {
             FileType::Parquet => {
-                let parquet_exec =
-                    ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
-                        .map_err(|e| {
-                            let msg = format!("Error opening parquet files: {}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
+                let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| {
+                    let msg = format!("Error opening parquet files: {}", e);
+                    error!("{}", msg);
+                    tonic::Status::internal(msg)
+                })?;
 
                 //TODO include statistics and any other info needed to reconstruct ParquetExec
                 Ok(Response::new(GetFileMetadataResult {
-                    schema: Some(parquet_exec.schema().as_ref().into()),
-                    partitions: parquet_exec
-                        .partitions()
-                        .iter()
-                        .map(|part| FilePartitionMetadata {
-                            filename: part.filenames().to_vec(),
-                        })
-                        .collect(),
+                    schema: Some(parquet_desc.schema().as_ref().into()),
+                    partitions: vec![FilePartitionMetadata {
+                        filename: vec![path],

Review comment:
       I changed it to a vector of all the files. 
   
   After searching sometime in the code path, I find this method may not be actually used, it's hard to understand this RPC's intention as well. Perhaps it's deprecated and we should remove it later?
   
   ```
    rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#discussion_r697989669



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    // Values of partition columns to be appended to each row
+    // pub partition_value: Option<Vec<ScalarValue>>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            statistics: Default::default(),
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct TableDescriptor {
+    /// root path of the table
+    pub path: String,
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Returned partitioned file with its schema
+pub type FileAndSchema = (PartitionedFile, Schema);
+
+/// Builder for ['TableDescriptor'] inside given path
+pub trait TableDescriptorBuilder {
+    /// Construct a ['TableDescriptor'] from the provided path
+    fn build_table_desc(
+        path: &str,
+        ext: &str,
+        provided_schema: Option<Schema>,
+        collect_statistics: bool,
+    ) -> Result<TableDescriptor> {
+        let filenames = build_file_list(path, ext)?;
+        if filenames.is_empty() {
+            return Err(DataFusionError::Plan(format!(
+                "No file (with .{} extension) found at path {}",
+                ext, path
+            )));
+        }
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+        let mut contains_file = false;
+
+        let partitioned_files = filenames
+            .iter()
+            .map(|file_path| {
+                contains_file = true;
+                let result = if collect_statistics {
+                    let (pf, schema) = Self::file_meta(file_path)?;
+                    if schemas.is_empty() {
+                        schemas.push(schema);
+                    } else if schema != schemas[0] {
+                        // we currently get the schema information from the first file rather than do
+                        // schema merging and this is a limitation.
+                        // See https://issues.apache.org/jira/browse/ARROW-11017
+                        return Err(DataFusionError::Plan(format!(
+                            "The file {} have different schema from the first file and DataFusion does \
+                        not yet support schema merging",
+                            file_path
+                        )));
+                    }
+                    pf
+                } else {
+                    PartitionedFile {
+                        file_path: file_path.to_owned(),
+                        statistics: Statistics::default(),
+                    }
+                };
+
+                Ok(result)
+            }).collect::<Result<Vec<PartitionedFile>>>();
+
+        if !contains_file {
+            return Err(DataFusionError::Plan(format!(
+                "No file (with .{} extension) found at path {}",
+                ext, path
+            )));
+        }
+
+        let result_schema = provided_schema.unwrap_or_else(|| schemas.pop().unwrap());
+
+        Ok(TableDescriptor {
+            path: path.to_string(),
+            partition_files: partitioned_files?,
+            schema: Arc::new(result_schema),
+        })
+    }
+
+    /// Get all metadata for a source file, including schema, statistics, partitions, etc.
+    fn file_meta(file_path: &str) -> Result<FileAndSchema>;
+}
+
+/// Get all files as well as the summary statistics when a limit is provided

Review comment:
       ```suggestion
   /// Get all files as well as the summary statistic
   /// if the optional `limit` is provided, includes only sufficient files 
   /// needed to read up to `limit` number of rows
   ```

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,

Review comment:
       I wondered about calling this `file_path` as opposed to `uri`, but given `path` is the term used in https://github.com/apache/arrow-datafusion/pull/950/files#diff-45af7ea371f36434653f767a522a50aabf41bf34e4e11117103532363305793dR73 I like the consistency

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    // Values of partition columns to be appended to each row

Review comment:
       I think in order to take full advantage of partition values (which might span multiple columns, for example), more information about the partitioning scheme will be needed (e.g. what expression is used to generated partitioning values).  Adding partitioning support to DataFusion's planning / execution is probably worth its own discussion
   
   (that is to say I agree with postponing adding anything partition specific)

##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -27,14 +27,14 @@ use crate::{
     logical_plan::{Column, Expr},
     physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
     physical_plan::{
-        common, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,

Review comment:
       I really like how the statistics and schema related code has been moved out of `physical_plan` and into` datasource`

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    // Values of partition columns to be appended to each row
+    // pub partition_value: Option<Vec<ScalarValue>>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            statistics: Default::default(),
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct TableDescriptor {
+    /// root path of the table
+    pub path: String,
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Returned partitioned file with its schema
+pub type FileAndSchema = (PartitionedFile, Schema);

Review comment:
       ```suggestion
   pub struct FileAndSchema {
     file: PartitionedFile, 
     schema: Schema
   };
   ```
   
   I personally find `struct`s with named fields easier to work with than trying to remember what `input.0` means (aka having to remember what `0` and `1` tuple offsets mean)

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Statistics of the file
+    pub statistics: Statistics,

Review comment:
       I think this makes sense to keep as `Statistics` rather than, say `Option<Statistics>` all the fields are already `Option` 

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    // Values of partition columns to be appended to each row
+    // pub partition_value: Option<Vec<ScalarValue>>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            statistics: Default::default(),
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct TableDescriptor {
+    /// root path of the table
+    pub path: String,
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Returned partitioned file with its schema
+pub type FileAndSchema = (PartitionedFile, Schema);
+
+/// Builder for ['TableDescriptor'] inside given path
+pub trait TableDescriptorBuilder {
+    /// Construct a ['TableDescriptor'] from the provided path
+    fn build_table_desc(
+        path: &str,
+        ext: &str,
+        provided_schema: Option<Schema>,
+        collect_statistics: bool,
+    ) -> Result<TableDescriptor> {
+        let filenames = build_file_list(path, ext)?;
+        if filenames.is_empty() {
+            return Err(DataFusionError::Plan(format!(
+                "No file (with .{} extension) found at path {}",
+                ext, path
+            )));
+        }
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas

Review comment:
       ```suggestion
           // build a list of partitions with statistics and gather all unique schemas
   ```

##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -162,14 +162,15 @@ impl LogicalPlanBuilder {
     ) -> Result<Self> {
         let table_schema = Arc::new(table_schema.clone());
         let provider = Arc::new(EmptyTable::new(table_schema));
-        Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection)
+        Self::scan(name.unwrap_or(UNNAMED_TABLE), provider, projection, None)
     }
 
     /// Convert a table provider into a builder with a TableScan
     pub fn scan(
         table_name: impl Into<String>,
         provider: Arc<dyn TableProvider>,
         projection: Option<Vec<usize>>,
+        filters: Option<Vec<Expr>>,

Review comment:
       I think this argument is likely going to  be confusing to users and it should be removed. 
   
   For example as a user of `LogicalPlanBuilder` I would probably assume that the following plan would return only rows where with `a<5` 
   
   ```rust
     // Build a plan that looks like it would filter out all rows with `a < 5`
     let plan = builder.scan("table", provider, None, vec![col("a").lt(lit(5)));
   ```
   
   However, I am pretty sure it could (and often would) return rows with a >= 5). This is because `filters`  added to a `TableScan` node are optional (in the sense that the provider might not filter rows that do not pass the predicate, but is not required to). Indeed, even for the parquet provider, the filters are only used for row group pruning which may or may not be able to filter rows.
   
   I think we could solve this with:
   1. Leave `scan` signature alone and rely on the predicate pushdown optimization to push filters appropriately down to the scan (my preference as it is simpler for the users)
   2. Rename this argument to something like 'optional_filters_for_performance'  and document what it does more carefully. I think it would be challenging to explain as it might/might not do anything depending on how the data was laid out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#issuecomment-908415921


   Thank you @yjshen for being patient and driving through this big change step by step :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#discussion_r697951379



##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -282,24 +282,18 @@ impl SchedulerGrpc for SchedulerServer {
 
         match file_type {
             FileType::Parquet => {
-                let parquet_exec =
-                    ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
-                        .map_err(|e| {
-                            let msg = format!("Error opening parquet files: {}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
+                let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| {
+                    let msg = format!("Error opening parquet files: {}", e);
+                    error!("{}", msg);
+                    tonic::Status::internal(msg)
+                })?;
 
                 //TODO include statistics and any other info needed to reconstruct ParquetExec
                 Ok(Response::new(GetFileMetadataResult {
-                    schema: Some(parquet_exec.schema().as_ref().into()),
-                    partitions: parquet_exec
-                        .partitions()
-                        .iter()
-                        .map(|part| FilePartitionMetadata {
-                            filename: part.filenames().to_vec(),
-                        })
-                        .collect(),
+                    schema: Some(parquet_desc.schema().as_ref().into()),
+                    partitions: vec![FilePartitionMetadata {
+                        filename: vec![path],

Review comment:
       I changed it to all the files inside. 
   
   After searching sometime in the code path, I find this method may not be actually used, it's hard to understand this RPC's intention as well. Perhaps it's deprecated and we should remove it later?
   
   ```
    rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#discussion_r697952169



##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -282,24 +282,18 @@ impl SchedulerGrpc for SchedulerServer {
 
         match file_type {
             FileType::Parquet => {
-                let parquet_exec =
-                    ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
-                        .map_err(|e| {
-                            let msg = format!("Error opening parquet files: {}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
+                let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| {
+                    let msg = format!("Error opening parquet files: {}", e);
+                    error!("{}", msg);
+                    tonic::Status::internal(msg)
+                })?;
 
                 //TODO include statistics and any other info needed to reconstruct ParquetExec
                 Ok(Response::new(GetFileMetadataResult {
-                    schema: Some(parquet_exec.schema().as_ref().into()),
-                    partitions: parquet_exec
-                        .partitions()
-                        .iter()
-                        .map(|part| FilePartitionMetadata {
-                            filename: part.filenames().to_vec(),
-                        })
-                        .collect(),
+                    schema: Some(parquet_desc.schema().as_ref().into()),
+                    partitions: vec![FilePartitionMetadata {
+                        filename: vec![path],

Review comment:
       I had the same question when I was going through the code base yesterday, I noticed it's only mentioned in `ballista/docs/architecture.md`. @andygrove do you know if this RPC method is still needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #932: FilePartition and PartitionedFile for scanning flexibility

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #932:
URL: https://github.com/apache/arrow-datafusion/pull/932#discussion_r698038625



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +43,247 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,

Review comment:
       I think the convention is to use uri for strings that could contain a uri scheme components like `s3://`, `gcs://`, etc. For object store, it uses path because uri scheme is already implicitly assumed by the object store implementation, so it's not needed anymore. On top of that, it avoids the need to perform uri validation in object store, for example, rejecting uris with `gcs://` schemes in s3 object store implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org