You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/11 10:14:31 UTC

[GitHub] [arrow-datafusion] rdettai commented on a change in pull request #811: Add support for reading remote storage systems

rdettai commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r686666936



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,
+    /// Schema of partition columns
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+            partition_value: None,
+            partition_schema: None,
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct SourceRootDescriptor {
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Builder for ['SourceRootDescriptor'] inside given path
+pub trait SourceRootDescBuilder {
+    /// Construct a ['SourceRootDescriptor'] from the provided path
+    fn get_source_desc(
+        path: &str,
+        object_store: Arc<dyn ObjectStore>,
+        ext: &str,
+    ) -> Result<SourceRootDescriptor> {
+        let filenames = object_store.list_all_files(path, ext)?;
+        if filenames.is_empty() {
+            return Err(DataFusionError::Plan(format!(
+                "No file (with .{} extension) found at path {}",
+                ext, path
+            )));
+        }
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+
+        let partitioned_files = filenames
+            .iter()
+            .map(|file_path| {
+                let pf = Self::get_file_meta(file_path, object_store.clone())?;

Review comment:
       I feel we should make it optional to get the full parquet statistics right away, otherwise creating a parquet table for a bucket/prefix with many files will be very slow.
   
   Also, this should definitively be parallelized, because getting the footers sequentially is really not efficient.

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,

Review comment:
       if I'm not mistaken, `partition_value` and `partition_schema` are never used nor filled, so I think it would be better to leave them commented out for now

##########
File path: datafusion/src/datasource/parquet.rs
##########
@@ -120,14 +132,303 @@ impl TableProvider for ParquetTable {
     }
 
     fn statistics(&self) -> Statistics {
-        self.statistics.clone()
+        self.desc.statistics()
     }
 
     fn has_exact_statistics(&self) -> bool {
         true
     }
 }
 
+#[derive(Debug)]
+/// Descriptor for a parquet root path
+pub struct ParquetRootDesc {
+    /// object store for reading files inside the root path
+    pub object_store: Arc<dyn ObjectStore>,
+    /// metadata for files inside the root path
+    pub descriptor: SourceRootDescriptor,
+}
+
+impl ParquetRootDesc {
+    /// Construct a new parquet descriptor for a root path
+    pub fn new(root_path: &str, context: ExecutionContext) -> Result<Self> {
+        let object_store = context
+            .state
+            .lock()
+            .unwrap()
+            .object_store_registry
+            .store_for_path(root_path);
+        let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet");

Review comment:
       in my experience, it is too strict to expect parquet files to have the parquet suffix

##########
File path: datafusion/src/datasource/csv.rs
##########
@@ -64,7 +66,8 @@ impl CsvFile {
         let schema = Arc::new(match options.schema {
             Some(s) => s.clone(),
             None => {
-                let filenames = common::build_file_list(&path, options.file_extension)?;
+                let filenames = LocalFileSystem

Review comment:
       why not also allow csv/json files to be fetched using the object_store_registry ? this would make the behavior more consistent, but can definitively be added later.

##########
File path: datafusion/src/datasource/parquet.rs
##########
@@ -120,14 +132,303 @@ impl TableProvider for ParquetTable {
     }
 
     fn statistics(&self) -> Statistics {
-        self.statistics.clone()
+        self.desc.statistics()
     }
 
     fn has_exact_statistics(&self) -> bool {
         true
     }
 }
 
+#[derive(Debug)]
+/// Descriptor for a parquet root path
+pub struct ParquetRootDesc {
+    /// object store for reading files inside the root path
+    pub object_store: Arc<dyn ObjectStore>,
+    /// metadata for files inside the root path
+    pub descriptor: SourceRootDescriptor,
+}
+
+impl ParquetRootDesc {
+    /// Construct a new parquet descriptor for a root path
+    pub fn new(root_path: &str, context: ExecutionContext) -> Result<Self> {
+        let object_store = context
+            .state
+            .lock()
+            .unwrap()
+            .object_store_registry
+            .store_for_path(root_path);
+        let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet");
+        Ok(Self {
+            object_store,
+            descriptor: root_desc?,
+        })
+    }
+
+    /// Get file schema for all parquet files
+    pub fn schema(&self) -> SchemaRef {
+        self.descriptor.schema.clone()
+    }
+
+    /// Get the summary statistics for all parquet files
+    pub fn statistics(&self) -> Statistics {
+        get_statistics_with_limit(&self.descriptor, None).1
+    }
+
+    fn summarize_min_max(
+        max_values: &mut Vec<Option<MaxAccumulator>>,
+        min_values: &mut Vec<Option<MinAccumulator>>,
+        fields: &[Field],
+        i: usize,
+        stat: &ParquetStatistics,
+    ) {
+        match stat {
+            ParquetStatistics::Boolean(s) => {
+                if let DataType::Boolean = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value
+                                .update(&[ScalarValue::Boolean(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value
+                                .update(&[ScalarValue::Boolean(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Int32(s) => {
+                if let DataType::Int32 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value.update(&[ScalarValue::Int32(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value.update(&[ScalarValue::Int32(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Int64(s) => {
+                if let DataType::Int64 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value.update(&[ScalarValue::Int64(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value.update(&[ScalarValue::Int64(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Float(s) => {
+                if let DataType::Float32 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value
+                                .update(&[ScalarValue::Float32(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value
+                                .update(&[ScalarValue::Float32(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Double(s) => {
+                if let DataType::Float64 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value
+                                .update(&[ScalarValue::Float64(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value
+                                .update(&[ScalarValue::Float64(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            _ => {}
+        }
+    }
+}
+
+impl SourceRootDescBuilder for ParquetRootDesc {
+    fn get_file_meta(
+        file_path: &str,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Result<PartitionedFile> {
+        let reader = object_store.get_reader(file_path)?;
+        let file_reader =
+            Arc::new(SerializedFileReader::new(ObjectReaderWrapper::new(reader))?);
+        let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+        let file_path = file_path.to_string();
+        let schema = arrow_reader.get_schema()?;
+        let num_fields = schema.fields().len();
+        let fields = schema.fields().to_vec();
+        let meta_data = arrow_reader.get_metadata();
+
+        let mut num_rows = 0;
+        let mut total_byte_size = 0;
+        let mut null_counts = vec![0; num_fields];
+        let mut has_statistics = false;
+
+        let (mut max_values, mut min_values) = create_max_min_accs(&schema);
+
+        for row_group_meta in meta_data.row_groups() {
+            num_rows += row_group_meta.num_rows();
+            total_byte_size += row_group_meta.total_byte_size();
+
+            let columns_null_counts = row_group_meta
+                .columns()
+                .iter()
+                .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
+
+            for (i, cnt) in columns_null_counts.enumerate() {
+                null_counts[i] += cnt as usize
+            }
+
+            for (i, column) in row_group_meta.columns().iter().enumerate() {
+                if let Some(stat) = column.statistics() {
+                    has_statistics = true;
+                    ParquetRootDesc::summarize_min_max(
+                        &mut max_values,
+                        &mut min_values,
+                        &fields,
+                        i,
+                        stat,
+                    )
+                }
+            }
+        }
+
+        let column_stats = if has_statistics {
+            Some(get_col_stats(
+                &schema,
+                null_counts,
+                &mut max_values,
+                &mut min_values,
+            ))
+        } else {
+            None
+        };
+
+        let statistics = Statistics {
+            num_rows: Some(num_rows as usize),
+            total_byte_size: Some(total_byte_size as usize),
+            column_statistics: column_stats,
+        };
+
+        Ok(PartitionedFile {
+            file_path,
+            schema,
+            statistics,
+            partition_value: None,
+            partition_schema: None,
+        })
+    }
+}
+
+/// Thin wrapper over object wrapper to work with parquet file read
+pub struct ObjectReaderWrapper {
+    reader: Arc<dyn ObjectReader>,
+}
+
+impl ObjectReaderWrapper {
+    /// Construct a wrapper over the provided object reader
+    pub fn new(reader: Arc<dyn ObjectReader>) -> Self {
+        Self { reader }
+    }
+}
+
+impl ChunkReader for ObjectReaderWrapper {
+    type T = InnerReaderWrapper;
+
+    fn get_read(&self, start: u64, length: usize) -> parquet::errors::Result<Self::T> {
+        Ok(InnerReaderWrapper {
+            inner_reader: self.reader.get_reader(start, length),
+        })
+    }
+}
+
+impl Length for ObjectReaderWrapper {
+    fn len(&self) -> u64 {
+        self.reader.length()
+    }
+}
+
+/// Thin wrapper over reader for a parquet file
+pub struct InnerReaderWrapper {

Review comment:
       might be nice to mark this as "to be removed once https://github.com/rust-lang/rfcs/pull/1598 is stabilized" (should be soon according to https://blog.rust-lang.org/2021/08/03/GATs-stabilization-push.html)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org