You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/11 14:00:26 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Add support for reading remote storage systems

alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r686856778



##########
File path: datafusion/src/datasource/object_store.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+use crate::datasource::local::LocalFileSystem;
+use crate::error::Result;
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Read;
+use std::sync::{Arc, RwLock};
+
+/// Objct Reader for one file in a object store
+pub trait ObjectReader {

Review comment:
       I think the traits in this file should be `async` (or at least offer an async version) -- as all the major object store implementations I know of in Rust are `async` and it seems that is the way the ecosystem is headed.
   
   At the very least I would imagine `async fn get_reader(...) -> Box<dyn Read>` which would allow it to make a non blocking async network call to fetch the bytes. 
   
   I am not sure if the thing returned (`Box<dyn Read>`) would benefit from being something async as well (aka an async stream) to allow streaming reads without necessarily buffering the entire `start/length` range.  At the moment I am inclined to keep it as `Box<dyn Read>`) for simplicity

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,
+    /// Schema of partition columns
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+            partition_value: None,
+            partition_schema: None,
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct SourceRootDescriptor {
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Builder for ['SourceRootDescriptor'] inside given path
+pub trait SourceRootDescBuilder {
+    /// Construct a ['SourceRootDescriptor'] from the provided path
+    fn get_source_desc(
+        path: &str,
+        object_store: Arc<dyn ObjectStore>,
+        ext: &str,
+    ) -> Result<SourceRootDescriptor> {
+        let filenames = object_store.list_all_files(path, ext)?;
+        if filenames.is_empty() {
+            return Err(DataFusionError::Plan(format!(
+                "No file (with .{} extension) found at path {}",
+                ext, path
+            )));
+        }
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+
+        let partitioned_files = filenames
+            .iter()
+            .map(|file_path| {
+                let pf = Self::get_file_meta(file_path, object_store.clone())?;

Review comment:
       +1 for making the statistics fetch optional

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,
+    /// Schema of partition columns
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+            partition_value: None,
+            partition_schema: None,
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {

Review comment:
       What is `FilePartition` used for?

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics

Review comment:
       Do I understand it that the idea here is to allow a DataFusion reader to read a set of files in a generic way (rather than having separate implementations for csv, parquet, etc)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org