You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/02 05:44:11 UTC

[GitHub] [arrow-datafusion] yjshen opened a new pull request #811: Source ext for remote files read

yjshen opened a new pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811


   # Which issue does this PR close?
   Closes #616
   
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r683300110



##########
File path: datafusion/src/execution/context.rs
##########
@@ -125,12 +127,26 @@ pub struct ExecutionContext {
     pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
+lazy_static! {

Review comment:
       I've filed #824 for this and will remove it from this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682087771



##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {
+    /// Returns the protocol handler as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>>;
+
+    fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ChunkReader<T = R>>>;
+
+    fn handler_name(&self) -> String;
+}
+
+static LOCAL_SCHEME: &str = "file";
+
+pub struct ProtocolRegistry {
+    pub protocol_handlers: RwLock<HashMap<String, Arc<dyn ProtocolHandler>>>,
+}
+
+impl ProtocolRegistry {
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ProtocolHandler>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFSHandler));
+
+        Self {
+            protocol_handlers: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new handler to this registry.
+    /// If a handler of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_handler(
+        &self,
+        scheme: &str,
+        handler: Arc<dyn ProtocolHandler>,
+    ) -> Option<Arc<dyn ProtocolHandler>> {
+        let mut handlers = self.protocol_handlers.write().unwrap();
+        handlers.insert(scheme.to_string(), handler)
+    }
+
+    pub fn handler(&self, scheme: &str) -> Option<Arc<dyn ProtocolHandler>> {
+        let handlers = self.protocol_handlers.read().unwrap();
+        handlers.get(scheme).cloned()
+    }
+
+    pub fn handler_for_path(&self, path: &str) -> Arc<dyn ProtocolHandler> {

Review comment:
       It would be good to document the assumptions this function is making. Like it seems to be assuming uri style paths (e.g. `file://blah` or `s3://blah`) and falls back to treating paths as loca filenames if the path does not contain a `':'`

##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {
+    /// Returns the protocol handler as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>>;

Review comment:
       Something that might be worth thinking about in this API is how would one traverse the paths (for example, how would one  get all sub directories of `root_path`, and then how would you concatenate them together)?
   
   This may not be an important usecase, but I do think it is worth thinking about

##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {

Review comment:
       I wonder if calling interface `ObjectStore` might be more standard -- it is basically an abstraction for something that maps string keys to sources of "objects" (aka bytes). 
   
   I also suggest adding some documentation on what a protocol handler is. Perhaps like the following
   
   ```rust
   /// A ProtocolHandler abstracts access to an underlying file/object storage.
   /// It maps maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
   ```

##########
File path: datafusion/src/execution/context.rs
##########
@@ -125,12 +127,26 @@ pub struct ExecutionContext {
     pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
+lazy_static! {

Review comment:
       I don't understand the need for a global static execution context. I think the intent is that the user of DataFusion would determine if it wanted to use a single context or if it wanted to re-use the same context




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-902533289


   @alamb 
   
   > The alternate compromise, which you have partly implemented in this PR, is to implement both async and non async versions. This is similar to the approach in the C/C++ filesystem api (props to @nealrichardson for the pointer), which has both having synchronous and asynchronous APIs.
   
   If I understand you correctly, do you mean I should tell sync and async implementation apart, with two different logics? Instead of the current wrapper way (sync function wrap over async logic.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r692658418



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +52,304 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    // Values of partition columns to be appended to each row
+    // pub partition_value: Option<Vec<ScalarValue>>,
+    // Schema of partition columns
+    // pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct SourceRootDescriptor {
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Stream of
+pub type PartitionedFileStream =
+    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
+
+/// Builder for ['SourceRootDescriptor'] inside given path
+#[async_trait]
+pub trait SourceRootDescBuilder: Sync + Send + Debug {
+    /// Construct a ['SourceRootDescriptor'] from the provided path
+    fn get_source_desc(
+        path: &str,
+        object_store: Arc<dyn ObjectStore>,
+        ext: &str,
+        provided_schema: Option<Schema>,
+        collect_statistics: bool,
+    ) -> Result<SourceRootDescriptor> {
+        let mut results: Vec<Result<PartitionedFile>> = Vec::new();
+        futures::executor::block_on(async {
+            match Self::get_source_desc_async(
+                path,
+                object_store,
+                ext,
+                provided_schema,
+                collect_statistics,
+            )
+            .await
+            {
+                Ok(mut stream) => {
+                    while let Some(pf) = stream.next().await {
+                        results.push(pf);
+                    }
+                }
+                Err(e) => {
+                    results.push(Err(e));
+                }
+            }
+        });
+
+        let partition_results: Result<Vec<PartitionedFile>> =
+            results.into_iter().collect();
+        let partition_files = partition_results?;
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas

Review comment:
       Yes, the `get_source_desc` is used to adapting `async` to `sync` to stop propagating async to API.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-898711031


   @yjshen @rdettai FYI @Dandandan added support for creating a parquet table with provided schema in https://github.com/apache/arrow-datafusion/pull/872.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-903106790


   Thank you @houqp @alamb @jorgecarleitao for your great help! 
   
   This PR initially contains several functions related to reading data, including the core object store abstraction, a more general scan partitioning abstraction, and some refactoring of parquet scan. At the same time, as I think more and get more valuable input, the scope becomes more extensive. Although I try to maintain PR lean as possible,  leaving out some functionality such as JSON/CVS scan,  it grows inevitably huge and is hard to review.
   
   I agree we could make the current PR a proof of concept, and I'm happy to break it down into several parts to get the work finally merged. 
   
   As for the design doc for the object store API, I can write up a draft proposal first this weekend. Please help to review and revise it when it's available. Thanks again @alamb for offering the help on the doc part :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r688299568



##########
File path: ballista/rust/core/src/utils.rs
##########
@@ -252,6 +252,11 @@ pub fn create_datafusion_context(
     ExecutionContext::with_config(config)
 }
 
+/// Create a DataFusion context that is compatible with Ballista in concurrency
+pub fn create_datafusion_context_concurrency(concurrency: usize) -> ExecutionContext {

Review comment:
       Nitpick, I think `ExecutionContext::with_concurrency(24)` is as readable as `create_datafusion_context_concurrency(24)` but incurs less indirection/abstraction, so IMHO this helper function doesn't provide a lot of value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-904681619


   > Would the following structure work for delta-rs?
   
   cc @houqp  since I'm not familiar with delta-rs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r688299568



##########
File path: ballista/rust/core/src/utils.rs
##########
@@ -252,6 +252,11 @@ pub fn create_datafusion_context(
     ExecutionContext::with_config(config)
 }
 
+/// Create a DataFusion context that is compatible with Ballista in concurrency
+pub fn create_datafusion_context_concurrency(concurrency: usize) -> ExecutionContext {

Review comment:
       Nitpick, I think `ExecutionContext::with_concurrency(24)` is as readable as `create_datafusion_context_concurrency(24)` but incurs less indirection/abstraction, so IMHO think this helper function doesn't provide a lot of value.

##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -285,24 +286,19 @@ impl SchedulerGrpc for SchedulerServer {
 
         match file_type {
             FileType::Parquet => {
-                let parquet_exec =
-                    ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
-                        .map_err(|e| {
-                            let msg = format!("Error opening parquet files: {}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
+                let ctx = create_datafusion_context_concurrency(1);
+                let parquet_desc = ParquetRootDesc::new(&path, ctx).map_err(|e| {
+                    let msg = format!("Error opening parquet files: {}", e);
+                    error!("{}", msg);
+                    tonic::Status::internal(msg)
+                })?;
 
                 //TODO include statistics and any other info needed to reconstruct ParquetExec
                 Ok(Response::new(GetFileMetadataResult {
-                    schema: Some(parquet_exec.schema().as_ref().into()),
-                    partitions: parquet_exec
-                        .partitions()
-                        .iter()
-                        .map(|part| FilePartitionMetadata {
-                            filename: part.filenames().to_vec(),
-                        })
-                        .collect(),
+                    schema: Some(parquet_desc.schema().as_ref().into()),
+                    partitions: vec![FilePartitionMetadata {
+                        filename: vec![path],

Review comment:
       We are always returning a single path for the partitions field? This changes the behavior doesn't it? 

##########
File path: datafusion/src/physical_plan/parquet.rs
##########
@@ -871,8 +552,10 @@ fn build_row_group_predicate(
     }
 }
 
+#[allow(clippy::too_many_arguments)]
 fn read_files(

Review comment:
       perhaps this function should be renamed to `read_partition`.

##########
File path: datafusion/src/datasource/object_store.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+use crate::datasource::local::LocalFileSystem;
+use crate::error::Result;
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Read;
+use std::sync::{Arc, RwLock};
+
+/// Objct Reader for one file in a object store
+pub trait ObjectReader {
+    /// Get reader for a part [start, start + length] in the file
+    fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read>;
+
+    /// Get lenght for the file
+    fn length(&self) -> u64;
+}
+
+/// A ObjectStore abstracts access to an underlying file/object storage.
+/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
+pub trait ObjectStore: Sync + Send + Debug {
+    /// Returns the object store as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// Returns all the files with filename extension `ext` in path `prefix`
+    fn list_all_files(&self, prefix: &str, ext: &str) -> Result<Vec<String>>;

Review comment:
       nitpick, IMHO, `list_files` or simply `list` would be a simpler method name here.
   
   One thing I would like to point out is for different object stores, object listing will actually give us more information than just the file path, for example, last updated time and file size are often returned as part of the api/sys call. These extra metadata might be useful for other purposes. I don't think we need to take this into account in this PR, just something to keep in mind since we might need to change the return type here in the future.

##########
File path: datafusion/src/datasource/local.rs
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       nitpick, code organization wise, I recommend creating an object_store module, move the existing object_store.rs code into `object_store/mod.rs`, then move local module into the object_store module as a submodule.

##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -137,20 +138,20 @@ impl LogicalPlanBuilder {
     pub fn scan_parquet(
         path: impl Into<String>,
         projection: Option<Vec<usize>>,
-        max_concurrency: usize,
+        context: ExecutionContext,

Review comment:
       I don't have a better solution on how to handle this, but it strikes me a bit odd to couple execution context with logical plan builder here. I will think more about this later this week.

##########
File path: datafusion/src/datasource/object_store.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+use crate::datasource::local::LocalFileSystem;
+use crate::error::Result;
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Read;
+use std::sync::{Arc, RwLock};
+
+/// Objct Reader for one file in a object store
+pub trait ObjectReader {
+    /// Get reader for a part [start, start + length] in the file
+    fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read>;
+
+    /// Get lenght for the file
+    fn length(&self) -> u64;
+}
+
+/// A ObjectStore abstracts access to an underlying file/object storage.
+/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
+pub trait ObjectStore: Sync + Send + Debug {
+    /// Returns the object store as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// Returns all the files with filename extension `ext` in path `prefix`
+    fn list_all_files(&self, prefix: &str, ext: &str) -> Result<Vec<String>>;
+
+    /// Get object reader for one file
+    fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>>;
+}
+
+static LOCAL_SCHEME: &str = "file";
+
+/// A Registry holds all the object stores at runtime with a scheme for each store.
+/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+/// and query data inside these systems.
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read from localfs natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write().unwrap();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read().unwrap();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the path based on it's scheme. For example:
+    /// path with prefix file:/// or no prefix will return the default LocalFS store,
+    /// path with prefix s3:/// will return the S3 store if it's registered,
+    /// and will always return LocalFS store when a prefix is not registered in the path.
+    pub fn store_for_path(&self, path: &str) -> Arc<dyn ObjectStore> {
+        if let Some((scheme, _)) = path.split_once(':') {
+            let stores = self.object_stores.read().unwrap();
+            if let Some(store) = stores.get(&*scheme.to_lowercase()) {
+                return store.clone();
+            }

Review comment:
       we should return an error here if no matching store is found for the particular scheme right?

##########
File path: datafusion/src/datasource/object_store.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+use crate::datasource::local::LocalFileSystem;
+use crate::error::Result;
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Read;
+use std::sync::{Arc, RwLock};
+
+/// Objct Reader for one file in a object store
+pub trait ObjectReader {
+    /// Get reader for a part [start, start + length] in the file
+    fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read>;
+
+    /// Get lenght for the file
+    fn length(&self) -> u64;
+}
+
+/// A ObjectStore abstracts access to an underlying file/object storage.
+/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
+pub trait ObjectStore: Sync + Send + Debug {
+    /// Returns the object store as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// Returns all the files with filename extension `ext` in path `prefix`
+    fn list_all_files(&self, prefix: &str, ext: &str) -> Result<Vec<String>>;
+
+    /// Get object reader for one file
+    fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>>;
+}
+
+static LOCAL_SCHEME: &str = "file";
+
+/// A Registry holds all the object stores at runtime with a scheme for each store.
+/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+/// and query data inside these systems.
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read from localfs natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write().unwrap();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read().unwrap();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the path based on it's scheme. For example:
+    /// path with prefix file:/// or no prefix will return the default LocalFS store,
+    /// path with prefix s3:/// will return the S3 store if it's registered,
+    /// and will always return LocalFS store when a prefix is not registered in the path.
+    pub fn store_for_path(&self, path: &str) -> Arc<dyn ObjectStore> {

Review comment:
       maybe `get_by_path` is a better name here?

##########
File path: datafusion/src/datasource/parquet.rs
##########
@@ -120,14 +132,303 @@ impl TableProvider for ParquetTable {
     }
 
     fn statistics(&self) -> Statistics {
-        self.statistics.clone()
+        self.desc.statistics()
     }
 
     fn has_exact_statistics(&self) -> bool {
         true
     }
 }
 
+#[derive(Debug)]
+/// Descriptor for a parquet root path
+pub struct ParquetRootDesc {
+    /// object store for reading files inside the root path
+    pub object_store: Arc<dyn ObjectStore>,
+    /// metadata for files inside the root path
+    pub descriptor: SourceRootDescriptor,
+}
+
+impl ParquetRootDesc {
+    /// Construct a new parquet descriptor for a root path
+    pub fn new(root_path: &str, context: ExecutionContext) -> Result<Self> {
+        let object_store = context
+            .state
+            .lock()
+            .unwrap()
+            .object_store_registry
+            .store_for_path(root_path);
+        let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet");

Review comment:
       I agree with @rdettai , I think we can also address this as a quick follow up PR since this is also the old behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] rdettai commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
rdettai commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r685815721



##########
File path: datafusion/src/datasource/object_store.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+use crate::datasource::local::LocalFileSystem;
+use crate::error::Result;
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Read;
+use std::sync::{Arc, RwLock};
+
+/// Objct Reader for one file in a object store
+pub trait ObjectReader {
+    /// Get reader for a part [start, start + length] in the file
+    fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read>;
+
+    /// Get lenght for the file
+    fn length(&self) -> u64;
+}
+
+/// A ObjectStore abstracts access to an underlying file/object storage.
+/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
+pub trait ObjectStore: Sync + Send + Debug {
+    /// Returns the object store as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// Returns all the files with `ext` in path `prefix`

Review comment:
       Took me a while to understand that `ext` was a filter on the file extension! Maybe the naming or the documentation should be a bit more specific here :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] ehenry2 commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
ehenry2 commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-902849925


   I have a question on the ThreadSafeRead trait...is there anything prebuilt (or recommendedation) to wrap say a tokio AsyncRead or bytes.Buf to easily implement the get_reader_async function? I see the example for the local filesystem using the FileSource2 to wrap the File, but I'm assuming most remote implementations will approach this function implementation with some kind of in-memory buffer or stream. I had some issues figuring this one out trying to implement this for S3 (I'm still a bit new to rust and lifetimes, etc).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-901679869


   @alamb @andygrove  @Dandandan @jorgecarleitao @rdettai  On making the remote storage system object listing & data reading API async, a design choice occurs. This might be quite important, and I'd love to have your suggestions:
   
   ### To which level should I propagate async?  
   
   This was because once we have async dir listing -> we can have async logical plans & async table provider ->  we can have async DataFrame / context API
   
   Two available alternatives are:
   
   1. Limit async to just `listing` / `metadata_fetch` /  file `read`, wrap a sync version over these async and keep most of the user-facing API untouched. (keep the PR lean as possible)
   2. Propogate Async API all the way up and finally change the user-facing API: including DataFrame & ExecutionContext. (which includes huge user-facing API changes ).
   
   Currently, This PR took the first approach by constructing all APIs in `ObjectStore` / `ObjectReader` /  `SourceRootDescriptor` natively in async and wrap the async function to a sync one. Trying to keep other parts of the project untouched. Great thanks to @houqp for guiding me through the way.
   
   Does approach 1 make sense to you? 
   
   ### If I take approach 1, how should the sync version function be constructed?
   
   This PR tries to make a wrapper over the async counterparts and keep single logic for each functionality. therefore relies on `futures::executor::block_on` to bridge async to sync function. 
   
   However, this approach is flawed for `block_on` may block the only thread in tokio, and the future inside won't get a chance to run, therefore hanging forever if the tokio runtime is not a multi-threaded one.  (I temporarily change the related test to use `#[tokio::test(flavor = "multi_thread", worker_threads = 2)]` to avoid hanging). Do you have any suggestions on this?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682247695



##########
File path: datafusion/src/datasource/datasource2.rs
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{Schema, SchemaRef};
+
+use parquet::arrow::ArrowReader;
+use parquet::arrow::ParquetFileArrowReader;
+use parquet::file::reader::ChunkReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct PartitionedFile {
+    pub file_path: String,
+    pub schema: Schema,
+    pub statistics: Statistics,
+    pub partition_value: Option<ScalarValue>,
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+#[derive(Debug, Clone)]
+pub struct FilePartition {
+    pub index: usize,
+    pub files: Vec<PartitionedFile>,
+}
+
+#[derive(Debug, Clone)]
+pub struct SourceDescriptor {
+    pub partition_files: Vec<PartitionedFile>,
+    pub schema: SchemaRef,
+}
+
+pub trait DataSource2: Send + Sync {
+    fn list_partitions(&self, max_concurrency: usize) -> Result<Arc<FilePartition>>;
+
+    fn schema(&self) -> Result<Arc<Schema>>;
+
+    fn get_read_for_file(
+        &self,
+        partitioned_file: PartitionedFile,
+    ) -> Result<dyn ChunkReader>;
+
+    fn statistics(&self) -> &Statistics;
+}
+
+pub trait SourceDescBuilder {
+    fn get_source_desc(root_path: &str) -> Result<SourceDescriptor> {
+        let filenames = Self::get_all_files(root_path)?;
+        if filenames.is_empty() {
+            return Err(DataFusionError::Plan(format!(
+                "No Parquet files (with .parquet extension) found at path {}",
+                root_path
+            )));
+        }
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+
+        let partitioned_files = filenames
+            .iter()
+            .map(|file_path| {
+                let pf = Self::get_file_meta(file_path)?;
+                let schema = pf.schema.clone();
+                if schemas.is_empty() {
+                    schemas.push(schema);
+                } else if schema != schemas[0] {
+                    // we currently get the schema information from the first file rather than do
+                    // schema merging and this is a limitation.
+                    // See https://issues.apache.org/jira/browse/ARROW-11017
+                    return Err(DataFusionError::Plan(format!(
+                        "The file {} have different schema from the first file and DataFusion does \
+                        not yet support schema merging",
+                        file_path
+                    )));
+                }
+                Ok(pf)
+            }).collect::<Result<Vec<PartitionedFile>>>();
+
+        Ok(SourceDescriptor {
+            partition_files: partitioned_files?,
+            schema: Arc::new(schemas.pop().unwrap()),
+        })
+    }
+
+    fn get_all_files(root_path: &str) -> Result<Vec<String>>;
+
+    fn get_file_meta(file_path: &str) -> Result<PartitionedFile>;
+
+    fn reader_for_file_meta(file_path: &str) -> Result<dyn ChunkReader>;

Review comment:
       It's removed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] ehenry2 edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
ehenry2 edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-899683040


   This is an awesome PR...one question for clarification...should the object reader trait have all its functions return Result? Remote storage systems have the wonderful feature of potentially having IO errors at any moment (as the request is being made over the wire) so even innocuous functions like length() have the potential to fail. I'm viewing this from the lens of S3, as I'd assuming a length() request would involve a head_object request or such to the s3 api, or should that metadata be pre-populated when the ObjectReader is instantiated? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-903106790


   Thank you @alamb @houqp @jorgecarleitao for your great help! 
   
   This PR initially contains several functions related to reading data, including the core object store abstraction, a more general scan partitioning abstraction, and some refactoring of parquet scan. At the same time, as I think more and get more valuable input, the scope becomes more extensive. Although I try to maintain PR lean as possible,  leaving out some functionality such as JSON/CVS scan,  it grows inevitably huge and is hard to review.
   
   I agree we could make the current PR a proof of concept, and I'm happy to break it down into several parts to get the work finally merged. 
   
   As for the design doc for the object store API, I can write up a draft proposal first this weekend. Please help to review and revise it when it's available. Thanks again @alamb for offering the help on the doc part :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r688304741



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics

Review comment:
       Yes, that's the intention here.
   - `PartitionedFile` -> Single file (for the moment) or part of a file (later, part of the row groups or rows), and we may even extend this to include partition value and partition schema (see below) to support partitioned tables:
   `/path/to/table/root/p_date=20210813/p_hour=1200/xxxxx.parquet`
   - `FilePartition` -> The basic unit for parallel processing, each task is responsible for processing one `FilePartition` which is composed of several `PartitionFile`s.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-902533289


   @alamb 
   
   > The alternate compromise, which you have partly implemented in this PR, is to implement both async and non async versions. This is similar to the approach in the C/C++ filesystem api (props to @nealrichardson for the pointer), which has both having synchronous and asynchronous APIs.
   
   If I understand you correctly, do you mean I should tell sync and async implementation apart, with two different logics? Instead of the current wrapper way (sync function wrap over async logic.)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] ehenry2 commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
ehenry2 commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-899683040


   This is an awesome PR...one question though...should the object reader trait have all its functions return Result? Remote storage systems have the wonderful feature of potentially having IO errors at any moment (as the request is being made over the wire) so even innocuous functions like length() have the potential to fail. I'm viewing this from the lens of S3, as I'd assuming a length() request would involve a head_object request or such to the s3 api, or should that info be pre-populated when the ObjectReader is instantiated? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682513345



##########
File path: datafusion/src/datasource/datasource2.rs
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{Schema, SchemaRef};
+
+use parquet::arrow::ArrowReader;
+use parquet::arrow::ParquetFileArrowReader;
+use parquet::file::reader::ChunkReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct PartitionedFile {
+    pub file_path: String,
+    pub schema: Schema,
+    pub statistics: Statistics,
+    pub partition_value: Option<ScalarValue>,
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+#[derive(Debug, Clone)]
+pub struct FilePartition {
+    pub index: usize,
+    pub files: Vec<PartitionedFile>,
+}
+
+#[derive(Debug, Clone)]
+pub struct SourceDescriptor {
+    pub partition_files: Vec<PartitionedFile>,
+    pub schema: SchemaRef,
+}
+
+pub trait DataSource2: Send + Sync {
+    fn list_partitions(&self, max_concurrency: usize) -> Result<Arc<FilePartition>>;
+
+    fn schema(&self) -> Result<Arc<Schema>>;
+
+    fn get_read_for_file(
+        &self,
+        partitioned_file: PartitionedFile,
+    ) -> Result<dyn ChunkReader>;

Review comment:
       One approach might be to redefine (or copy / rename) a trait in DataFusion that is close / the same as ChunkReader .
   
   Then we could make a new type for use with parquet; Something like 
   
   ```rust
   trait ProtocolReader<R: ...> {
   }
   
   struct ParquetProtocolReader<R: ..> {
     inner: ProtocolReader<R>
   }
   
   impl ChunkReader<R> for ParquetProtocolReader<R> { 
   ...
   }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-896687704


   Regarding the async part, should I just make `async fn list_all_files`? and wait for parquet / csv reading asynced and proceed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-902873042


   > I'm assuming most remote implementations will approach this function implementation with some kind of in-memory buffer or stream.
   
   I think this would need to be handled case by case for different remote store client. It would be helpful to share exactly what client API signatures you are trying to use within `get_reader_async`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-900782441


   @ehenry2 Thanks for the advice, will change make them all return `Result`s in the next commit 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-906515615


   As a result of previous discussions on this PR as well as in the [design doc](https://docs.google.com/document/d/1ZEZqvdohrot0ewtTNeaBtqczOIJ1Q0OnX9PqMMxpOF8/edit#) (updated according to latest reviews as well). I break down this PR into one dedicated API adding PR #950 and a `PartitionedFile` abstraction PR #932, and left parquet async integrations as follow-ups.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-906087245


   I also think constructing `ParquetTable` async before passing to `register_table` is a good idea. This is how delta-rs implements its daatafusion integration as well.
   
   With regards to ballista table provider protobuf ser/de limitation, I think it's something we need address in the long term, otherwise, it would impossible to support custom table sources in ballista.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen closed pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen closed pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-900782441


   @ehenry2 Thanks for the advice, will change make them all return `Result`s in the next commit 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r693157246



##########
File path: datafusion/Cargo.toml
##########
@@ -56,9 +56,9 @@ paste = "^1.0"
 num_cpus = "1.13.0"
 chrono = "0.4"
 async-trait = "0.1.41"
-futures = "0.3"
+futures = { version = "0.3", features = ["executor"] }

Review comment:
       I think the only real suggestion is to plumb `async` all the way through to planning (aka remove the non async API)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] ehenry2 edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
ehenry2 edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-899683040


   This is an awesome PR...one question for clarification...should the object reader trait have all its functions return Result? Remote storage systems have the wonderful feature of potentially having IO errors at any moment (as the request is being made over the wire) so even innocuous functions like length() have the potential to fail. I'm viewing this from the lens of S3, as I'm assuming a length() request would involve a head_object request or such to the s3 api, or should that metadata be pre-populated when the ObjectReader is instantiated? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r692439889



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +52,304 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    // Values of partition columns to be appended to each row
+    // pub partition_value: Option<Vec<ScalarValue>>,
+    // Schema of partition columns
+    // pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct SourceRootDescriptor {
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Stream of
+pub type PartitionedFileStream =
+    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
+
+/// Builder for ['SourceRootDescriptor'] inside given path
+#[async_trait]
+pub trait SourceRootDescBuilder: Sync + Send + Debug {
+    /// Construct a ['SourceRootDescriptor'] from the provided path
+    fn get_source_desc(
+        path: &str,
+        object_store: Arc<dyn ObjectStore>,
+        ext: &str,
+        provided_schema: Option<Schema>,
+        collect_statistics: bool,
+    ) -> Result<SourceRootDescriptor> {
+        let mut results: Vec<Result<PartitionedFile>> = Vec::new();
+        futures::executor::block_on(async {

Review comment:
       As above I think you can use `tokio::runtime::Handle::current().block_on` 

##########
File path: datafusion/Cargo.toml
##########
@@ -56,9 +56,9 @@ paste = "^1.0"
 num_cpus = "1.13.0"
 chrono = "0.4"
 async-trait = "0.1.41"
-futures = "0.3"
+futures = { version = "0.3", features = ["executor"] }

Review comment:
       Since we already have `tokio` (which has full on executor) I don't think we also need the futures executor so I would like to avoid this new dependency.  
   
   I tried removing this change locally and it seems to work

##########
File path: datafusion/Cargo.toml
##########
@@ -56,9 +56,9 @@ paste = "^1.0"
 num_cpus = "1.13.0"
 chrono = "0.4"
 async-trait = "0.1.41"
-futures = "0.3"
+futures = { version = "0.3", features = ["executor"] }

Review comment:
       I think you can use `tokio::runtime::Handle::block_on` rather than `futures::executor::block_on` as a way to play as nicely as possible with the tokio executor:   https://docs.rs/tokio/1.10.0/tokio/runtime/struct.Handle.html#method.block_on
   
   So something like
   
   ```rust
   Handle::current()
     .block_on(async { .... });
   ```

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +52,304 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    // Values of partition columns to be appended to each row
+    // pub partition_value: Option<Vec<ScalarValue>>,
+    // Schema of partition columns
+    // pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct SourceRootDescriptor {
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Stream of
+pub type PartitionedFileStream =
+    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
+
+/// Builder for ['SourceRootDescriptor'] inside given path
+#[async_trait]
+pub trait SourceRootDescBuilder: Sync + Send + Debug {
+    /// Construct a ['SourceRootDescriptor'] from the provided path
+    fn get_source_desc(
+        path: &str,
+        object_store: Arc<dyn ObjectStore>,
+        ext: &str,
+        provided_schema: Option<Schema>,
+        collect_statistics: bool,
+    ) -> Result<SourceRootDescriptor> {
+        let mut results: Vec<Result<PartitionedFile>> = Vec::new();
+        futures::executor::block_on(async {
+            match Self::get_source_desc_async(
+                path,
+                object_store,
+                ext,
+                provided_schema,
+                collect_statistics,
+            )
+            .await
+            {
+                Ok(mut stream) => {
+                    while let Some(pf) = stream.next().await {
+                        results.push(pf);
+                    }
+                }
+                Err(e) => {
+                    results.push(Err(e));
+                }
+            }
+        });
+
+        let partition_results: Result<Vec<PartitionedFile>> =
+            results.into_iter().collect();
+        let partition_files = partition_results?;
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas

Review comment:
       it is strange to me that the collating of partitions doesn't happen in `get_source_desc_async` -- it seems like `get_source_desc` would just be doing the adapting of `async` --> sync code. 

##########
File path: ballista/rust/scheduler/src/planner.rs
##########
@@ -269,8 +269,8 @@ mod test {
         };
     }
 
-    #[test]
-    fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
+    #[tokio::test]

Review comment:
       In case anyone else is interested, this is what happens if you don't have `tokio::test`:
   
   ```
   failures:
   
   ---- planner::test::distributed_hash_aggregate_plan stdout ----
   thread 'planner::test::distributed_hash_aggregate_plan' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.10.0/src/runtime/blocking/pool.rs:84:33
   stack backtrace:
      0: rust_begin_unwind
                at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/std/src/panicking.rs:515:5
      1: core::panicking::panic_fmt
                at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/panicking.rs:92:14
      2: core::option::expect_failed
                at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/option.rs:1243:5
      3: core::option::Option<T>::expect
                at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/option.rs:351:21
      4: tokio::runtime::blocking::pool::spawn_blocking
                at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.10.0/src/runtime/blocking/pool.rs:84:14
      5: tokio::fs::asyncify::{{closure}}
                at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.10.0/src/fs/mod.rs:119:11
      6: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
      7: tokio::fs::metadata::metadata::{{closure}}
                at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.10.0/src/fs/metadata.rs:46:5
      8: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
      9: datafusion::datasource::object_store::local::list_all_async::{{closure}}
                at /Users/alamb/Software/arrow-datafusion/datafusion/src/datasource/object_store/local.rs:148:8
     10: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
                at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
     11: datafusion::datasource::object_store::local::list_all::{{closure}}
                at /Users/alamb/Software/arrow-datafusion/datafusion/src/datasource/object_store/local.rs:111:15
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-904677639


   @alamb I might be wrong on this: is it possible to not provide a `RemoteParquetTable`, but provide a `RemoteParquetTableBuilder` that uses the `ObjectStore` API on the async listing but build a `ParquetTable` asynchronously?
   
   By doing this, we may pass async table building logic from planning API to users' hands, during they construct `ParquetTable` TableProvider, then they could register ParquetTable using `context::register_table(self, table_ref, provider)`.  Does this volatiles the idiomatic async in Rust?
   
   I think of this from the perspective of ballista, even though I'm not quite familiar with the code there, it seems ballista could only [serialize/deserialize](https://github.com/apache/arrow-datafusion/blob/master/ballista/rust/core/src/serde/logical_plan/to_proto.rs#L708-L742) known typed TableProviders, therefore `RemoteParquetTable` outside DataFusion might not be preferable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen closed pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen closed pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r690067772



##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -285,24 +286,19 @@ impl SchedulerGrpc for SchedulerServer {
 
         match file_type {
             FileType::Parquet => {
-                let parquet_exec =
-                    ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
-                        .map_err(|e| {
-                            let msg = format!("Error opening parquet files: {}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
+                let ctx = create_datafusion_context_concurrency(1);
+                let parquet_desc = ParquetRootDesc::new(&path, ctx).map_err(|e| {
+                    let msg = format!("Error opening parquet files: {}", e);
+                    error!("{}", msg);
+                    tonic::Status::internal(msg)
+                })?;
 
                 //TODO include statistics and any other info needed to reconstruct ParquetExec
                 Ok(Response::new(GetFileMetadataResult {
-                    schema: Some(parquet_exec.schema().as_ref().into()),
-                    partitions: parquet_exec
-                        .partitions()
-                        .iter()
-                        .map(|part| FilePartitionMetadata {
-                            filename: part.filenames().to_vec(),
-                        })
-                        .collect(),
+                    schema: Some(parquet_desc.schema().as_ref().into()),
+                    partitions: vec![FilePartitionMetadata {
+                        filename: vec![path],

Review comment:
       The behavior is unchanged indeed, the origin filenames all comes from the root_path, and here I just use the root_path instead, to avoid touching too much code in ballista proto definition as well as its serde (to_proto and from_proto)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r689265728



##########
File path: datafusion/src/datasource/object_store.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+use crate::datasource::local::LocalFileSystem;
+use crate::error::Result;
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Read;
+use std::sync::{Arc, RwLock};
+
+/// Objct Reader for one file in a object store
+pub trait ObjectReader {
+    /// Get reader for a part [start, start + length] in the file
+    fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read>;
+
+    /// Get lenght for the file
+    fn length(&self) -> u64;
+}
+
+/// A ObjectStore abstracts access to an underlying file/object storage.
+/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
+pub trait ObjectStore: Sync + Send + Debug {
+    /// Returns the object store as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// Returns all the files with filename extension `ext` in path `prefix`
+    fn list_all_files(&self, prefix: &str, ext: &str) -> Result<Vec<String>>;
+
+    /// Get object reader for one file
+    fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>>;
+}
+
+static LOCAL_SCHEME: &str = "file";
+
+/// A Registry holds all the object stores at runtime with a scheme for each store.
+/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
+/// and query data inside these systems.
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read from localfs natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write().unwrap();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read().unwrap();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the path based on it's scheme. For example:
+    /// path with prefix file:/// or no prefix will return the default LocalFS store,
+    /// path with prefix s3:/// will return the S3 store if it's registered,
+    /// and will always return LocalFS store when a prefix is not registered in the path.
+    pub fn store_for_path(&self, path: &str) -> Arc<dyn ObjectStore> {
+        if let Some((scheme, _)) = path.split_once(':') {
+            let stores = self.object_stores.read().unwrap();
+            if let Some(store) = stores.get(&*scheme.to_lowercase()) {
+                return store.clone();
+            }

Review comment:
       It's falling back to the local fs store here now, I think it's better to report not found here, will change in the next commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682249207



##########
File path: datafusion/src/execution/context.rs
##########
@@ -840,6 +859,8 @@ pub struct ExecutionContextState {
     pub config: ExecutionConfig,
     /// Execution properties
     pub execution_props: ExecutionProps,
+    /// Protocol handlers
+    pub protocol_registry: ProtocolRegistry,

Review comment:
       I made changes after your first review and it's now first used to register all available handlers while the user inits the context in the first place, and get a suitable protocol handler for each path with the scheme. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-890734915


   Thank you @yjshen , this is huge! I will help review it tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] jorgecarleitao commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-902982466


   Could it make sense to write a design doc like @houqp wrote some time ago for the qualified names? Does it feel a sufficiently impactful change to design this a bit before commiting?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r686713464



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,

Review comment:
       Yes, they are all set None for now. will comment them out :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682513345



##########
File path: datafusion/src/datasource/datasource2.rs
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{Schema, SchemaRef};
+
+use parquet::arrow::ArrowReader;
+use parquet::arrow::ParquetFileArrowReader;
+use parquet::file::reader::ChunkReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct PartitionedFile {
+    pub file_path: String,
+    pub schema: Schema,
+    pub statistics: Statistics,
+    pub partition_value: Option<ScalarValue>,
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+#[derive(Debug, Clone)]
+pub struct FilePartition {
+    pub index: usize,
+    pub files: Vec<PartitionedFile>,
+}
+
+#[derive(Debug, Clone)]
+pub struct SourceDescriptor {
+    pub partition_files: Vec<PartitionedFile>,
+    pub schema: SchemaRef,
+}
+
+pub trait DataSource2: Send + Sync {
+    fn list_partitions(&self, max_concurrency: usize) -> Result<Arc<FilePartition>>;
+
+    fn schema(&self) -> Result<Arc<Schema>>;
+
+    fn get_read_for_file(
+        &self,
+        partitioned_file: PartitionedFile,
+    ) -> Result<dyn ChunkReader>;

Review comment:
       One approach might be to redefine (or copy / rename) a trait in DataFusion that is close / the same as ChunkReader .
   
   Then we could make a new type for use with parquet; Something like 
   
   ```rust
   trait ProtocolReader<R: ...> {
   }
   
   struct ParquetProtocolReader<R: ..> {
     inner: ProtocolReader<R>
   }
   
   impl ChunkReader<R> for ParquetProtocolReader<R> { 
   ...
   }
   ```
   

##########
File path: datafusion/src/execution/context.rs
##########
@@ -125,12 +127,26 @@ pub struct ExecutionContext {
     pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
+lazy_static! {

Review comment:
       I agree a global context may be interesting to to add. It doesn't seem entirely related to reading external data sources, however -- I suggest proposing that change in a separate PR to make sure discussion on it isn't lost in the discussion on this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r688791295



##########
File path: ballista/rust/core/src/utils.rs
##########
@@ -252,6 +252,11 @@ pub fn create_datafusion_context(
     ExecutionContext::with_config(config)
 }
 
+/// Create a DataFusion context that is compatible with Ballista in concurrency
+pub fn create_datafusion_context_concurrency(concurrency: usize) -> ExecutionContext {

Review comment:
       Agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-903095113


   > Could it make sense to write a design doc like @houqp wrote some time ago for the qualified names? 
   
   I think a design doc is a great idea @jorgecarleitao  -- it would let us make sure some of the larger points are clear and there is consensus (especially around adding `async` in various places). @yjshen  I am happy to help draft such a document if you think it would help.
   
   I am personally very interested in getting the ideas in this PR into DataFusion -- I think it is an important architectural step forward and since I think it will directly help IOx (the project I am working on) I can spend non trivial amounts of time working on it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-922262311


   I'm closing this PR since most of the functionalities in this one come true or will soon get in. I am excited about the changes taking place.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-904677639


   @alamb I might be wrong on this: is it possible to not provide a `RemoteParquetTable`, but provide a `RemoteParquetTableBuilder` that uses the `ObjectStore` API on the async listing but build a `ParquetTable` asynchronously?
   
   By doing this, we may pass async table building logic from planning API to users' hands, during they construct `ParquetTable` TableProvider. Then they could register ParquetTable using `context::register_table(self, table_ref, provider)`.  Does this volatiles the idiomatic async in Rust?
   
   I think of this from the perspective of ballista, even though I'm not quite familiar with the code there, it seems ballista could only [serialize/deserialize](https://github.com/apache/arrow-datafusion/blob/master/ballista/rust/core/src/serde/logical_plan/to_proto.rs#L708-L742) known typed TableProviders, therefore `RemoteParquetTable` outside DataFusion might not be preferable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r686712278



##########
File path: datafusion/src/datasource/parquet.rs
##########
@@ -120,14 +132,303 @@ impl TableProvider for ParquetTable {
     }
 
     fn statistics(&self) -> Statistics {
-        self.statistics.clone()
+        self.desc.statistics()
     }
 
     fn has_exact_statistics(&self) -> bool {
         true
     }
 }
 
+#[derive(Debug)]
+/// Descriptor for a parquet root path
+pub struct ParquetRootDesc {
+    /// object store for reading files inside the root path
+    pub object_store: Arc<dyn ObjectStore>,
+    /// metadata for files inside the root path
+    pub descriptor: SourceRootDescriptor,
+}
+
+impl ParquetRootDesc {
+    /// Construct a new parquet descriptor for a root path
+    pub fn new(root_path: &str, context: ExecutionContext) -> Result<Self> {
+        let object_store = context
+            .state
+            .lock()
+            .unwrap()
+            .object_store_registry
+            .store_for_path(root_path);
+        let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet");

Review comment:
       Or just list all files that not start with `_` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-903136394


   I've drafted a design doc here: https://docs.google.com/document/d/1ZEZqvdohrot0ewtTNeaBtqczOIJ1Q0OnX9PqMMxpOF8/edit#. Please help to review it. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682306021



##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {

Review comment:
       BTW, `ProtocolHandler` or `ObjectStore` both sound fine to me, I can move to either name later. cc @houqp thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-904677639


   @alamb I might be wrong on this: is it possible to not provide a `RemoteParquetTable`, but provide a `RemoteParquetTableBuilder` that uses the `ObjectStore` API on the async listing but build a `ParquetTable` asynchronously?
   
   By doing this, we may pass async table building logic from planning API to users' hands, during they construct `ParquetTable` TableProvider. Then they could register ParquetTable using `context::register_table(self, table_ref, provider)`.  Does this volatiles the idiomatic async in Rust?
   
   I think of this from the perspective of ballista, even though I'm not quite familiar with the code there, it seems ballista could only [serialize/deserialize](https://github.com/apache/arrow-datafusion/blob/master/ballista/rust/core/src/serde/logical_plan/to_proto.rs#L708-L742) known typed TableProviders, therefore `RemoteParquetTable` outside DataFusion might not be preferable? I think Rust doesn't support runtime reflection?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r693233342



##########
File path: datafusion/Cargo.toml
##########
@@ -56,9 +56,9 @@ paste = "^1.0"
 num_cpus = "1.13.0"
 chrono = "0.4"
 async-trait = "0.1.41"
-futures = "0.3"
+futures = { version = "0.3", features = ["executor"] }

Review comment:
       The other thing I was thinking about was what about adding in the ObjectStore interfaces in one PR and then start hooking that up into the rest of the system / rewrite the existing data sources (like Parquet, etc) as separate PRs. 
   
   I think @yjshen  has done a great job with this PR showing how everything would hook together, but I do feel like this PR is slightly beyond my ability to comprehend given its size and scope. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682256093



##########
File path: datafusion/src/execution/context.rs
##########
@@ -125,12 +127,26 @@ pub struct ExecutionContext {
     pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
+lazy_static! {

Review comment:
       I made a registry (a hashmap) for storing all available protocol handlers (or ObjectStore as you suggested above) in the context at runtime and registering the LocalFS handler by default. In this way, I intended to make DataFusion’s core protocol agnostic and get different handlers according to the scheme of a path when users are creating new tables at runtime (based on their previous registrations), and get the suitable handler for each table.
   
   And at the same time, I thought a global/sharable context is useful when we consider controlling the total amount of memory Datafusion could use. All physical operators should register their memory usage with a global context, and try to allocate more memory or spill itself if used up all allowed memory.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-904723875


   > but provide a RemoteParquetTableBuilder that uses the ObjectStore API on the async listing but build a ParquetTable asynchronously?
   
   That is a really neat idea @yjshen  - I hadn't thought of that but it sounds very good
   
   > Then they could register ParquetTable using context::register_table(self, table_ref, provider). Does this volatiles the idiomatic async in Rust?
   
   Not in my opinion.
   
   >  I think Rust doesn't support runtime reflection?
   
   That is correct -- Rust doesn't have built in runtime reflection support -- that type of behavior needs to be added in the application logic


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-896686361


   @houqp @alamb I've done with the original implementation by abstracting file listing/reading logic into `ObjectStore` and `ObjectReader`, and I think it's ready for review again.
   
   ```rust
   /// Objct Reader for one file in a object store
   pub trait ObjectReader {
       /// Get reader for a part [start, start + length] in the file
       fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read>;
   
       /// Get lenght for the file
       fn length(&self) -> u64;
   }
   
   /// A ObjectStore abstracts access to an underlying file/object storage.
   /// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
   pub trait ObjectStore: Sync + Send + Debug {
       /// Returns the object store as [`Any`](std::any::Any)
       /// so that it can be downcast to a specific implementation.
       fn as_any(&self) -> &dyn Any;
   
       /// Returns all the files with filename extension `ext` in path `prefix`
       fn list_all_files(&self, prefix: &str, ext: &str) -> Result<Vec<String>>;
   
       /// Get object reader for one file
       fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>>;
   }
   ```
   
   Currently, there are several things remaining (I suppose that are not blockers for this PR, please correct me if get something wrong):
   - Async listing (`list_all_files`) as well as async reading (`get_reader`).
   - Figure out for ballista how to register `ObjectStore` in the client and pass the registration on to executors.
   - Make JSON / CSV read from `ObjectReader` as well.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r683230259



##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {

Review comment:
       I agree with Andrew that `ObjectStore` would have been a better name :)

##########
File path: datafusion/src/datasource/local.rs
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::protocol_registry::ProtocolHandler;
+use crate::error::DataFusionError;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::fs;
+use std::fs::{metadata, File};
+use std::sync::Arc;
+
+pub struct LocalFSHandler;
+
+impl ProtocolHandler<File> for LocalFSHandler {
+    fn as_any(&self) -> &dyn Any {
+        return self;
+    }
+
+    fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>> {
+        list_all(root_path, ext)
+    }
+
+    fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ChunkReader<T = File>>> {

Review comment:
       Just my quick thoughts on possible designs to avoid the chunkreader trait here. We could declare a `read_file` method here instead that would take start and length as argument, then return a Vector or Iterator of bytes.
   
   Then in `ParquetRootDesc`, we use a local struct to implement `ChunkReader` leveraging the `ObjectStore::read_file` method, which gets passed to `SerializedFileReader`.

##########
File path: datafusion/src/execution/context.rs
##########
@@ -125,12 +127,26 @@ pub struct ExecutionContext {
     pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
+lazy_static! {

Review comment:
       I agree with @alamb on this one, would be better to have discussion about global static in a separate PR. This way, it will be easier for us to approve and merge code change specific to remote data source.

##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {
+    /// Returns the protocol handler as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>>;

Review comment:
       Perhaps the name `root_path` here is a bit misleading here, maybe it should be named `path` or `prefix`? To list sub directory, I am thinking one would pass in `root_path/subdir1/subdir2`. But I think we need to be more specific on whether the return value should contain relative path or absolute path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r685959075



##########
File path: datafusion/src/datasource/object_store.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+use crate::datasource::local::LocalFileSystem;
+use crate::error::Result;
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Read;
+use std::sync::{Arc, RwLock};
+
+/// Objct Reader for one file in a object store
+pub trait ObjectReader {
+    /// Get reader for a part [start, start + length] in the file
+    fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read>;
+
+    /// Get lenght for the file
+    fn length(&self) -> u64;
+}
+
+/// A ObjectStore abstracts access to an underlying file/object storage.
+/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
+pub trait ObjectStore: Sync + Send + Debug {
+    /// Returns the object store as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// Returns all the files with `ext` in path `prefix`

Review comment:
       Thanks, I will update the documentation here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682305233



##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {
+    /// Returns the protocol handler as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>>;
+
+    fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ChunkReader<T = R>>>;
+
+    fn handler_name(&self) -> String;
+}
+
+static LOCAL_SCHEME: &str = "file";
+
+pub struct ProtocolRegistry {
+    pub protocol_handlers: RwLock<HashMap<String, Arc<dyn ProtocolHandler>>>,
+}
+
+impl ProtocolRegistry {
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ProtocolHandler>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFSHandler));
+
+        Self {
+            protocol_handlers: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new handler to this registry.
+    /// If a handler of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_handler(
+        &self,
+        scheme: &str,
+        handler: Arc<dyn ProtocolHandler>,
+    ) -> Option<Arc<dyn ProtocolHandler>> {
+        let mut handlers = self.protocol_handlers.write().unwrap();
+        handlers.insert(scheme.to_string(), handler)
+    }
+
+    pub fn handler(&self, scheme: &str) -> Option<Arc<dyn ProtocolHandler>> {
+        let handlers = self.protocol_handlers.read().unwrap();
+        handlers.get(scheme).cloned()
+    }
+
+    pub fn handler_for_path(&self, path: &str) -> Arc<dyn ProtocolHandler> {

Review comment:
       Thanks, I'll add this comment in the next commit

##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {

Review comment:
       Thanks, I'll add this comment in the next commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r686856778



##########
File path: datafusion/src/datasource/object_store.rs
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Object Store abstracts access to an underlying file/object storage.
+
+use crate::datasource::local::LocalFileSystem;
+use crate::error::Result;
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::io::Read;
+use std::sync::{Arc, RwLock};
+
+/// Objct Reader for one file in a object store
+pub trait ObjectReader {

Review comment:
       I think the traits in this file should be `async` (or at least offer an async version) -- as all the major object store implementations I know of in Rust are `async` and it seems that is the way the ecosystem is headed.
   
   At the very least I would imagine `async fn get_reader(...) -> Box<dyn Read>` which would allow it to make a non blocking async network call to fetch the bytes. 
   
   I am not sure if the thing returned (`Box<dyn Read>`) would benefit from being something async as well (aka an async stream) to allow streaming reads without necessarily buffering the entire `start/length` range.  At the moment I am inclined to keep it as `Box<dyn Read>`) for simplicity

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,
+    /// Schema of partition columns
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+            partition_value: None,
+            partition_schema: None,
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct SourceRootDescriptor {
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Builder for ['SourceRootDescriptor'] inside given path
+pub trait SourceRootDescBuilder {
+    /// Construct a ['SourceRootDescriptor'] from the provided path
+    fn get_source_desc(
+        path: &str,
+        object_store: Arc<dyn ObjectStore>,
+        ext: &str,
+    ) -> Result<SourceRootDescriptor> {
+        let filenames = object_store.list_all_files(path, ext)?;
+        if filenames.is_empty() {
+            return Err(DataFusionError::Plan(format!(
+                "No file (with .{} extension) found at path {}",
+                ext, path
+            )));
+        }
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+
+        let partitioned_files = filenames
+            .iter()
+            .map(|file_path| {
+                let pf = Self::get_file_meta(file_path, object_store.clone())?;

Review comment:
       +1 for making the statistics fetch optional

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,
+    /// Schema of partition columns
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+            partition_value: None,
+            partition_schema: None,
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {

Review comment:
       What is `FilePartition` used for?

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics

Review comment:
       Do I understand it that the idea here is to allow a DataFusion reader to read a set of files in a generic way (rather than having separate implementations for csv, parquet, etc)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r683659016



##########
File path: datafusion/src/execution/context.rs
##########
@@ -125,12 +127,26 @@ pub struct ExecutionContext {
     pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
+lazy_static! {

Review comment:
       thank you @yjshen 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-902138512


   I am starting to check this out carefully


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r686711509



##########
File path: datafusion/src/datasource/parquet.rs
##########
@@ -120,14 +132,303 @@ impl TableProvider for ParquetTable {
     }
 
     fn statistics(&self) -> Statistics {
-        self.statistics.clone()
+        self.desc.statistics()
     }
 
     fn has_exact_statistics(&self) -> bool {
         true
     }
 }
 
+#[derive(Debug)]
+/// Descriptor for a parquet root path
+pub struct ParquetRootDesc {
+    /// object store for reading files inside the root path
+    pub object_store: Arc<dyn ObjectStore>,
+    /// metadata for files inside the root path
+    pub descriptor: SourceRootDescriptor,
+}
+
+impl ParquetRootDesc {
+    /// Construct a new parquet descriptor for a root path
+    pub fn new(root_path: &str, context: ExecutionContext) -> Result<Self> {
+        let object_store = context
+            .state
+            .lock()
+            .unwrap()
+            .object_store_registry
+            .store_for_path(root_path);
+        let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet");

Review comment:
       Agree, it was restricted to parquet suffix in the original implementation, so I moved it here. Probably we could make it as an argument and ask from the user?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] rdettai commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
rdettai commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-896727736


   When I talk about a catalog, I mean:
   - schema
   - list of files with statistics. 
   
   Ideally, you should be able to compose different ways of getting the list of files with different ways of reading them. For example, when reading from S3, you might get the list of file from s3.list_objects, but also from Hive Catalog or from Delta.
   
   Regarding early materialization of the file list: the usecase I have in mind is the bucket with partitioned data. Most queries will be able to use only a fraction of the files. For example if you generate 24 files per day, even if you have 3 years of parquet in your bucket, queries that target only 3 days of data should work fine (once partitions are detected properly). But if you need to open all the files when registering the table, you won't scale to buckets with large numbers of files (in this example you would need to open 24k files first). I understand that for now partition pruning is not implemented, but as you created a structure called `PartitionedFile`, I guess that this would have been the next step, no? đŸ˜‰ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-922450762


   Thank you again for all your work in this area @yjshen  -- the improvements to DataFusion are amazing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r692659385



##########
File path: datafusion/Cargo.toml
##########
@@ -56,9 +56,9 @@ paste = "^1.0"
 num_cpus = "1.13.0"
 chrono = "0.4"
 async-trait = "0.1.41"
-futures = "0.3"
+futures = { version = "0.3", features = ["executor"] }

Review comment:
       While using `tokio::runtime::Handle::block_on`, I'm facing with:
   > ’Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
   
   Since `block_on` is `try_enter`ing an already entered runtime, therefore I changed to `future::executor`'s to avoid panic in the first place. But as I noted before, `future::executor::block_on` is also flawed here:
   
   > However, this approach is flawed for block_on may block the only thread in tokio, and the future inside won't get a chance to run, therefore hanging forever if the tokio runtime is not a multi-threaded one. (I temporarily change the related test to use #[tokio::test(flavor = "multi_thread", worker_threads = 2)] to avoid hanging). 
   
   Do you have any suggestions on this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-890746694


   > Thank you @yjshen , this is huge! I will help review it tomorrow.
   
   Thanks, @houqp. Currently, it's just a draft and not ready for a full review. I made it out early to get your idea to see if it's on the right track.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] rdettai commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
rdettai commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r686666936



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,
+    /// Schema of partition columns
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+            partition_value: None,
+            partition_schema: None,
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {
+    /// The index of the partition among all partitions
+    pub index: usize,
+    /// The contained files of the partition
+    pub files: Vec<PartitionedFile>,
+}
+
+impl std::fmt::Display for FilePartition {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
+        write!(f, "{}", files.join(", "))
+    }
+}
+
+#[derive(Debug, Clone)]
+/// All source files with same schema exists in a path
+pub struct SourceRootDescriptor {
+    /// All source files in the path
+    pub partition_files: Vec<PartitionedFile>,
+    /// The schema of the files
+    pub schema: SchemaRef,
+}
+
+/// Builder for ['SourceRootDescriptor'] inside given path
+pub trait SourceRootDescBuilder {
+    /// Construct a ['SourceRootDescriptor'] from the provided path
+    fn get_source_desc(
+        path: &str,
+        object_store: Arc<dyn ObjectStore>,
+        ext: &str,
+    ) -> Result<SourceRootDescriptor> {
+        let filenames = object_store.list_all_files(path, ext)?;
+        if filenames.is_empty() {
+            return Err(DataFusionError::Plan(format!(
+                "No file (with .{} extension) found at path {}",
+                ext, path
+            )));
+        }
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+
+        let partitioned_files = filenames
+            .iter()
+            .map(|file_path| {
+                let pf = Self::get_file_meta(file_path, object_store.clone())?;

Review comment:
       I feel we should make it optional to get the full parquet statistics right away, otherwise creating a parquet table for a bucket/prefix with many files will be very slow.
   
   Also, this should definitively be parallelized, because getting the footers sequentially is really not efficient.

##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,

Review comment:
       if I'm not mistaken, `partition_value` and `partition_schema` are never used nor filled, so I think it would be better to leave them commented out for now

##########
File path: datafusion/src/datasource/parquet.rs
##########
@@ -120,14 +132,303 @@ impl TableProvider for ParquetTable {
     }
 
     fn statistics(&self) -> Statistics {
-        self.statistics.clone()
+        self.desc.statistics()
     }
 
     fn has_exact_statistics(&self) -> bool {
         true
     }
 }
 
+#[derive(Debug)]
+/// Descriptor for a parquet root path
+pub struct ParquetRootDesc {
+    /// object store for reading files inside the root path
+    pub object_store: Arc<dyn ObjectStore>,
+    /// metadata for files inside the root path
+    pub descriptor: SourceRootDescriptor,
+}
+
+impl ParquetRootDesc {
+    /// Construct a new parquet descriptor for a root path
+    pub fn new(root_path: &str, context: ExecutionContext) -> Result<Self> {
+        let object_store = context
+            .state
+            .lock()
+            .unwrap()
+            .object_store_registry
+            .store_for_path(root_path);
+        let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet");

Review comment:
       in my experience, it is too strict to expect parquet files to have the parquet suffix

##########
File path: datafusion/src/datasource/csv.rs
##########
@@ -64,7 +66,8 @@ impl CsvFile {
         let schema = Arc::new(match options.schema {
             Some(s) => s.clone(),
             None => {
-                let filenames = common::build_file_list(&path, options.file_extension)?;
+                let filenames = LocalFileSystem

Review comment:
       why not also allow csv/json files to be fetched using the object_store_registry ? this would make the behavior more consistent, but can definitively be added later.

##########
File path: datafusion/src/datasource/parquet.rs
##########
@@ -120,14 +132,303 @@ impl TableProvider for ParquetTable {
     }
 
     fn statistics(&self) -> Statistics {
-        self.statistics.clone()
+        self.desc.statistics()
     }
 
     fn has_exact_statistics(&self) -> bool {
         true
     }
 }
 
+#[derive(Debug)]
+/// Descriptor for a parquet root path
+pub struct ParquetRootDesc {
+    /// object store for reading files inside the root path
+    pub object_store: Arc<dyn ObjectStore>,
+    /// metadata for files inside the root path
+    pub descriptor: SourceRootDescriptor,
+}
+
+impl ParquetRootDesc {
+    /// Construct a new parquet descriptor for a root path
+    pub fn new(root_path: &str, context: ExecutionContext) -> Result<Self> {
+        let object_store = context
+            .state
+            .lock()
+            .unwrap()
+            .object_store_registry
+            .store_for_path(root_path);
+        let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet");
+        Ok(Self {
+            object_store,
+            descriptor: root_desc?,
+        })
+    }
+
+    /// Get file schema for all parquet files
+    pub fn schema(&self) -> SchemaRef {
+        self.descriptor.schema.clone()
+    }
+
+    /// Get the summary statistics for all parquet files
+    pub fn statistics(&self) -> Statistics {
+        get_statistics_with_limit(&self.descriptor, None).1
+    }
+
+    fn summarize_min_max(
+        max_values: &mut Vec<Option<MaxAccumulator>>,
+        min_values: &mut Vec<Option<MinAccumulator>>,
+        fields: &[Field],
+        i: usize,
+        stat: &ParquetStatistics,
+    ) {
+        match stat {
+            ParquetStatistics::Boolean(s) => {
+                if let DataType::Boolean = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value
+                                .update(&[ScalarValue::Boolean(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value
+                                .update(&[ScalarValue::Boolean(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Int32(s) => {
+                if let DataType::Int32 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value.update(&[ScalarValue::Int32(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value.update(&[ScalarValue::Int32(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Int64(s) => {
+                if let DataType::Int64 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value.update(&[ScalarValue::Int64(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value.update(&[ScalarValue::Int64(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Float(s) => {
+                if let DataType::Float32 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value
+                                .update(&[ScalarValue::Float32(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value
+                                .update(&[ScalarValue::Float32(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            ParquetStatistics::Double(s) => {
+                if let DataType::Float64 = fields[i].data_type() {
+                    if s.has_min_max_set() {
+                        if let Some(max_value) = &mut max_values[i] {
+                            match max_value
+                                .update(&[ScalarValue::Float64(Some(*s.max()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    max_values[i] = None;
+                                }
+                            }
+                        }
+                        if let Some(min_value) = &mut min_values[i] {
+                            match min_value
+                                .update(&[ScalarValue::Float64(Some(*s.min()))])
+                            {
+                                Ok(_) => {}
+                                Err(_) => {
+                                    min_values[i] = None;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            _ => {}
+        }
+    }
+}
+
+impl SourceRootDescBuilder for ParquetRootDesc {
+    fn get_file_meta(
+        file_path: &str,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Result<PartitionedFile> {
+        let reader = object_store.get_reader(file_path)?;
+        let file_reader =
+            Arc::new(SerializedFileReader::new(ObjectReaderWrapper::new(reader))?);
+        let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
+        let file_path = file_path.to_string();
+        let schema = arrow_reader.get_schema()?;
+        let num_fields = schema.fields().len();
+        let fields = schema.fields().to_vec();
+        let meta_data = arrow_reader.get_metadata();
+
+        let mut num_rows = 0;
+        let mut total_byte_size = 0;
+        let mut null_counts = vec![0; num_fields];
+        let mut has_statistics = false;
+
+        let (mut max_values, mut min_values) = create_max_min_accs(&schema);
+
+        for row_group_meta in meta_data.row_groups() {
+            num_rows += row_group_meta.num_rows();
+            total_byte_size += row_group_meta.total_byte_size();
+
+            let columns_null_counts = row_group_meta
+                .columns()
+                .iter()
+                .flat_map(|c| c.statistics().map(|stats| stats.null_count()));
+
+            for (i, cnt) in columns_null_counts.enumerate() {
+                null_counts[i] += cnt as usize
+            }
+
+            for (i, column) in row_group_meta.columns().iter().enumerate() {
+                if let Some(stat) = column.statistics() {
+                    has_statistics = true;
+                    ParquetRootDesc::summarize_min_max(
+                        &mut max_values,
+                        &mut min_values,
+                        &fields,
+                        i,
+                        stat,
+                    )
+                }
+            }
+        }
+
+        let column_stats = if has_statistics {
+            Some(get_col_stats(
+                &schema,
+                null_counts,
+                &mut max_values,
+                &mut min_values,
+            ))
+        } else {
+            None
+        };
+
+        let statistics = Statistics {
+            num_rows: Some(num_rows as usize),
+            total_byte_size: Some(total_byte_size as usize),
+            column_statistics: column_stats,
+        };
+
+        Ok(PartitionedFile {
+            file_path,
+            schema,
+            statistics,
+            partition_value: None,
+            partition_schema: None,
+        })
+    }
+}
+
+/// Thin wrapper over object wrapper to work with parquet file read
+pub struct ObjectReaderWrapper {
+    reader: Arc<dyn ObjectReader>,
+}
+
+impl ObjectReaderWrapper {
+    /// Construct a wrapper over the provided object reader
+    pub fn new(reader: Arc<dyn ObjectReader>) -> Self {
+        Self { reader }
+    }
+}
+
+impl ChunkReader for ObjectReaderWrapper {
+    type T = InnerReaderWrapper;
+
+    fn get_read(&self, start: u64, length: usize) -> parquet::errors::Result<Self::T> {
+        Ok(InnerReaderWrapper {
+            inner_reader: self.reader.get_reader(start, length),
+        })
+    }
+}
+
+impl Length for ObjectReaderWrapper {
+    fn len(&self) -> u64 {
+        self.reader.length()
+    }
+}
+
+/// Thin wrapper over reader for a parquet file
+pub struct InnerReaderWrapper {

Review comment:
       might be nice to mark this as "to be removed once https://github.com/rust-lang/rfcs/pull/1598 is stabilized" (should be soon according to https://blog.rust-lang.org/2021/08/03/GATs-stabilization-push.html)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-903095113


   > Could it make sense to write a design doc like @houqp wrote some time ago for the qualified names? 
   
   I think a design doc is a great idea @jorgecarleitao  -- it would let us make sure some of the larger points are clear and there is consensus (especially around adding `async` in various places). @yjshen  I am happy to singtart draft such a document if you think it would help.
   
   I am personally very interested in getting the ideas in this PR into DataFusion -- I think it is an important architectural step forward and since I think it will directly help IOx (the project I am working on) I can spend non trivial amounts of time working on it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682247394



##########
File path: datafusion/src/datasource/datasource2.rs
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{Schema, SchemaRef};
+
+use parquet::arrow::ArrowReader;
+use parquet::arrow::ParquetFileArrowReader;
+use parquet::file::reader::ChunkReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct PartitionedFile {
+    pub file_path: String,
+    pub schema: Schema,
+    pub statistics: Statistics,
+    pub partition_value: Option<ScalarValue>,
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+#[derive(Debug, Clone)]
+pub struct FilePartition {
+    pub index: usize,
+    pub files: Vec<PartitionedFile>,
+}
+
+#[derive(Debug, Clone)]
+pub struct SourceDescriptor {
+    pub partition_files: Vec<PartitionedFile>,
+    pub schema: SchemaRef,
+}
+
+pub trait DataSource2: Send + Sync {
+    fn list_partitions(&self, max_concurrency: usize) -> Result<Arc<FilePartition>>;
+
+    fn schema(&self) -> Result<Arc<Schema>>;
+
+    fn get_read_for_file(
+        &self,
+        partitioned_file: PartitionedFile,
+    ) -> Result<dyn ChunkReader>;

Review comment:
       Yes, I find it hard to get a suitable abstraction over readable objects that is both storages agnostic and generalized enough for data reading, for parquet / JSON / cvs all. Therefore I use `ChunkReader` here from parquet first and find it storage agnostic. Do you have a suggestion on how to achieve this? 
   
   At the same time, I get stuck with ChunkReader right now since it needs associate type defined, it may propagate type parameter to its users and finally made the ProtocolHandler not able to find a suitable def that contains all kinds of type params.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r686718895



##########
File path: datafusion/src/datasource/csv.rs
##########
@@ -64,7 +66,8 @@ impl CsvFile {
         let schema = Arc::new(match options.schema {
             Some(s) => s.clone(),
             None => {
-                let filenames = common::build_file_list(&path, options.file_extension)?;
+                let filenames = LocalFileSystem

Review comment:
       I leave out csv/json for now for simplicity, since their reading logic are quite different from parquet, I prefer to do these as follow-ups.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] ehenry2 edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
ehenry2 edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-902849925


   I have a question on the ThreadSafeRead trait...is there anything prebuilt (or recommendation) to wrap say a tokio AsyncRead or bytes.Buf to easily implement the get_reader_async function? I see the example for the local filesystem using the FileSource2 to wrap the File, but I'm assuming most remote implementations will approach this function implementation with some kind of in-memory buffer or stream. I had some issues figuring this one out trying to implement this for S3 (I'm still a bit new to rust and lifetimes, etc).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r688298118



##########
File path: datafusion/src/datasource/mod.rs
##########
@@ -36,3 +47,231 @@ pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Read data from a reader
     Reader(std::sync::Mutex<Option<R>>),
 }
+
+#[derive(Debug, Clone)]
+/// A single file that should be read, along with its schema, statistics
+/// and partition column values that need to be appended to each row.
+pub struct PartitionedFile {
+    /// Path for the file (e.g. URL, filesystem path, etc)
+    pub file_path: String,
+    /// Schema of the file
+    pub schema: Schema,
+    /// Statistics of the file
+    pub statistics: Statistics,
+    /// Values of partition columns to be appended to each row
+    pub partition_value: Option<Vec<ScalarValue>>,
+    /// Schema of partition columns
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+impl From<String> for PartitionedFile {
+    fn from(file_path: String) -> Self {
+        Self {
+            file_path,
+            schema: Schema::empty(),
+            statistics: Default::default(),
+            partition_value: None,
+            partition_schema: None,
+        }
+    }
+}
+
+impl std::fmt::Display for PartitionedFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.file_path)
+    }
+}
+
+#[derive(Debug, Clone)]
+/// A collection of files that should be read in a single task
+pub struct FilePartition {

Review comment:
       `FilePartition` is used as the unit of execution in Scan, a task is responsible for reading one `FilePartition` each time,  much like the `ParquetPartition` but I intended to make it more generalized for all kinds of scan.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-898300269


   > The file listing happens when we are registering a new table. Since we currently enforce all the files have the same schema, I thought this can only be achieved to read them all first? I think this could be relaxed when we can provide schema in advance and can handle parquet files with different schema inside one table.
   
   I agree on this one, in the long run, we would want to provide the schema (from catalog) for a parquet table ahead of time to avoid detecting/merging schema by reading file content. That said, I think this is something that we can tackle as a follow up PR as long as we make sure the current design allows such optimization. For example, we could simply extend `ParquetTable::try_new` to take a schema as an extra argument. 
   
   > Regarding early materialization of the file list: the usecase I have in mind is the bucket with partitioned data. Most queries will be able to use only a fraction of the files. 
   
   +1. @yjshen in your mind, is `SourceRootDescriptor` the right abstraction layer to handle the early partition based file filtering?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r693238747



##########
File path: datafusion/Cargo.toml
##########
@@ -56,9 +56,9 @@ paste = "^1.0"
 num_cpus = "1.13.0"
 chrono = "0.4"
 async-trait = "0.1.41"
-futures = "0.3"
+futures = { version = "0.3", features = ["executor"] }

Review comment:
       I am onboard with further reducing the scope by focusing only on the ObjectStore interface :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-903249726


   Thanks @yjshen  -- the plan you lay out sounds great


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-901777403


   > Could you describe which APIs would be affected by this? 
   
   Mainly API change:
   - execution/context:  read(register)_parquet / read(register)_csv / read(register)_json. etc.
   
   Other pub function / trait touched:
   - logical_plan: scan(csv/parquet/json)
   - physical_plan:  csv / parquet / json
   
   Upstream dependencies need to change:
   - arrow parquet crate: ChunkReader / Length /  ParquetReader
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r682514446



##########
File path: datafusion/src/execution/context.rs
##########
@@ -125,12 +127,26 @@ pub struct ExecutionContext {
     pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
+lazy_static! {

Review comment:
       I agree a global context may be interesting to to add. It doesn't seem entirely related to reading external data sources, however -- I suggest proposing that change in a separate PR to make sure discussion on it isn't lost in the discussion on this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r693222812



##########
File path: datafusion/Cargo.toml
##########
@@ -56,9 +56,9 @@ paste = "^1.0"
 num_cpus = "1.13.0"
 chrono = "0.4"
 async-trait = "0.1.41"
-futures = "0.3"
+futures = { version = "0.3", features = ["executor"] }

Review comment:
       > The alternate compromise, which you have partly implemented in this PR, is to implement both async and non async versions. This is similar to the approach in the C/C++ filesystem api (props to @nealrichardson for the pointer), which has both having synchronous and asynchronous APIs.
   
   How about this alternative to reduce the scope of this PR? i.e. implement both sync and async, but only use sync API to migrate existing code to the new IO abstraction, then work on async propagation as a fast follow up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r694751627



##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -137,20 +138,20 @@ impl LogicalPlanBuilder {
     pub fn scan_parquet(
         path: impl Into<String>,
         projection: Option<Vec<usize>>,
-        max_concurrency: usize,
+        context: ExecutionContext,

Review comment:
       Perhaps we could  pass in the ObjectStore / ObjectStoreRegistry reference rather than an ExecutionContext




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-897396502


   Thank you @yjshen , it's looking a lot better after this round of refactor. I will take a close look at the code and everyone's comments tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] ehenry2 edited a comment on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
ehenry2 edited a comment on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-899683040


   This is an awesome PR...one question for clarification...should the object reader trait have all its functions return Result? Remote storage systems have the wonderful feature of potentially having IO errors at any moment (as the request is being made over the wire) so even innocuous functions like length() have the potential to fail. I'm viewing this from the lens of S3, as I'd assuming a length() request would involve a head_object request or such to the s3 api, or should that info be pre-populated when the ObjectReader is instantiated? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r681452105



##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+use super::datasource2::DataSource2;
+use crate::error::{DataFusionError, Result};
+use crate::parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::fs::File;
+
+pub trait ProtocolHandler: Sync + Send {
+    /// Returns the protocol handler as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>>;
+
+    fn get_reader(&self, file_path: &str) -> Result<dyn ChunkReader>;
+}
+
+pub struct LocalFSHandler;
+
+impl ProtocolHander for LocalFSHander {
+    fn as_any(&self) -> &dyn Any {
+        return self;
+    }
+
+    fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>> {
+        let mut filenames: Vec<String> = Vec::new();
+        crate::datasource::local::list_all_files(root_path, &mut filenames, ext);
+        Ok(filenames)
+    }
+
+    fn get_reader(&self, file_path: &str) -> Result<R> {
+        Ok(File::open(file_path)?)
+    }
+}
+
+pub struct ProtocolRegistry {
+    pub protocol_handlers: RwLock<HashMap<String, Arc<dyn ProtocolHandler>>>,
+}
+
+impl ProtocolRegistry {
+    pub fn new() -> Self {
+        Self {
+            protocol_handlers: RwLock::new(HashMap::new()),
+        }
+    }
+
+    /// Adds a new handler to this registry.
+    /// If a handler of the same prefix existed before, it is replaced in the registry and returned.
+    pub fn register_handler(
+        &self,
+        prefix: &str,

Review comment:
       nitpick, i feel like scheme would have been a better name instead of prefix here.

##########
File path: datafusion/src/datasource/datasource2.rs
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{Schema, SchemaRef};
+
+use parquet::arrow::ArrowReader;
+use parquet::arrow::ParquetFileArrowReader;
+use parquet::file::reader::ChunkReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct PartitionedFile {
+    pub file_path: String,
+    pub schema: Schema,
+    pub statistics: Statistics,
+    pub partition_value: Option<ScalarValue>,
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+#[derive(Debug, Clone)]
+pub struct FilePartition {
+    pub index: usize,
+    pub files: Vec<PartitionedFile>,
+}
+
+#[derive(Debug, Clone)]
+pub struct SourceDescriptor {
+    pub partition_files: Vec<PartitionedFile>,
+    pub schema: SchemaRef,
+}
+
+pub trait DataSource2: Send + Sync {
+    fn list_partitions(&self, max_concurrency: usize) -> Result<Arc<FilePartition>>;
+
+    fn schema(&self) -> Result<Arc<Schema>>;
+
+    fn get_read_for_file(
+        &self,
+        partitioned_file: PartitionedFile,
+    ) -> Result<dyn ChunkReader>;

Review comment:
       it seems a bit odd to have a abstract data source trait coupled with a trait from the parquet crate.

##########
File path: datafusion/src/execution/context.rs
##########
@@ -840,6 +859,8 @@ pub struct ExecutionContextState {
     pub config: ExecutionConfig,
     /// Execution properties
     pub execution_props: ExecutionProps,
+    /// Protocol handlers
+    pub protocol_registry: ProtocolRegistry,

Review comment:
       protocol_registry is not being used at the moment right?

##########
File path: datafusion/src/datasource/datasource2.rs
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::datasource::{ColumnStatistics, Statistics};
+use crate::error::{DataFusionError, Result};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::{Schema, SchemaRef};
+
+use parquet::arrow::ArrowReader;
+use parquet::arrow::ParquetFileArrowReader;
+use parquet::file::reader::ChunkReader;
+use parquet::file::serialized_reader::SerializedFileReader;
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct PartitionedFile {
+    pub file_path: String,
+    pub schema: Schema,
+    pub statistics: Statistics,
+    pub partition_value: Option<ScalarValue>,
+    pub partition_schema: Option<Schema>,
+    // We may include row group range here for a more fine-grained parallel execution
+}
+
+#[derive(Debug, Clone)]
+pub struct FilePartition {
+    pub index: usize,
+    pub files: Vec<PartitionedFile>,
+}
+
+#[derive(Debug, Clone)]
+pub struct SourceDescriptor {
+    pub partition_files: Vec<PartitionedFile>,
+    pub schema: SchemaRef,
+}
+
+pub trait DataSource2: Send + Sync {
+    fn list_partitions(&self, max_concurrency: usize) -> Result<Arc<FilePartition>>;
+
+    fn schema(&self) -> Result<Arc<Schema>>;
+
+    fn get_read_for_file(
+        &self,
+        partitioned_file: PartitionedFile,
+    ) -> Result<dyn ChunkReader>;
+
+    fn statistics(&self) -> &Statistics;
+}
+
+pub trait SourceDescBuilder {
+    fn get_source_desc(root_path: &str) -> Result<SourceDescriptor> {
+        let filenames = Self::get_all_files(root_path)?;
+        if filenames.is_empty() {
+            return Err(DataFusionError::Plan(format!(
+                "No Parquet files (with .parquet extension) found at path {}",
+                root_path
+            )));
+        }
+
+        // build a list of Parquet partitions with statistics and gather all unique schemas
+        // used in this data set
+        let mut schemas: Vec<Schema> = vec![];
+
+        let partitioned_files = filenames
+            .iter()
+            .map(|file_path| {
+                let pf = Self::get_file_meta(file_path)?;
+                let schema = pf.schema.clone();
+                if schemas.is_empty() {
+                    schemas.push(schema);
+                } else if schema != schemas[0] {
+                    // we currently get the schema information from the first file rather than do
+                    // schema merging and this is a limitation.
+                    // See https://issues.apache.org/jira/browse/ARROW-11017
+                    return Err(DataFusionError::Plan(format!(
+                        "The file {} have different schema from the first file and DataFusion does \
+                        not yet support schema merging",
+                        file_path
+                    )));
+                }
+                Ok(pf)
+            }).collect::<Result<Vec<PartitionedFile>>>();
+
+        Ok(SourceDescriptor {
+            partition_files: partitioned_files?,
+            schema: Arc::new(schemas.pop().unwrap()),
+        })
+    }
+
+    fn get_all_files(root_path: &str) -> Result<Vec<String>>;
+
+    fn get_file_meta(file_path: &str) -> Result<PartitionedFile>;
+
+    fn reader_for_file_meta(file_path: &str) -> Result<dyn ChunkReader>;

Review comment:
       this method is supposed to return reader for the file content itself, not the file metadata right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-896706081


   > Overall I would prefer (but this is just my opinion) a higher level abstraction in which we can also plug catalogs such as Delta or Iceberg
   
   Hi @rdettai, we do have `CatalogProvider` already and a `CatalogList` in the ExecutionContext, and we get table from `CatalogProvider` -> `SchemaProvider` -> `TableProvider`.  I suppose the `Catalog` you want is orthogonal to `ObjectStore` here?
   
   > But here you cannot use async because the file list and statistics are materialized at the ParquetTable creation level which is too early. This early materialization will also be problematic with buckets that have thousands of files:
   
   The file listing happens when we are registering a new table. Since we currently enforce all the files have the same schema, I thought this can only be achieved to read them all first?  I think this could be relaxed when we can provide schema in advance and can handle parquet files with different schema inside one table.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r683300110



##########
File path: datafusion/src/execution/context.rs
##########
@@ -125,12 +127,26 @@ pub struct ExecutionContext {
     pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
+lazy_static! {

Review comment:
       I've filed #824 for this and will remove it from this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-899268671


   > +1. @yjshen in your mind, is `SourceRootDescriptor` the right abstraction layer to handle the early partition based file filtering?
   
   My intention here at `SourceRootDescriptor` is to provide a base table with all its partitions, without taking any query-related info here. We can register the base table first, init the root descriptor once, and do partition filtering lazily when we trigger each query run based on its requirements and generate physical scan operator hereafter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-901679869


   @alamb @andygrove  @Dandandan @jorgecarleitao On making the remote storage system object listing & data reading API async, a design choice occurs. This might be quite important, and I'd love to have your suggestions:
   
   ### To which level should I propagate async?  
   
   This was because once we have async dir listing -> we can have async logical plans & async table provider ->  we can have async DataFrame / context API
   
   Two available alternatives are:
   
   1. Limit async to just `listing` / `metadata_fetch` /  file `read`, wrap a sync version over these async and keep most of the user-facing API untouched. (keep the PR lean as possible)
   2. Propogate Async API all the way up and finally change the user-facing API: including DataFrame & ExecutionContext. (which includes huge user-facing API changes ).
   
   Currently, This PR took the first approach by constructing all APIs in `ObjectStore` / `ObjectReader` /  `SourceRootDescriptor` natively in async and wrap the async function to a sync one. Trying to keep other parts of the project untouched. Great thanks to @houqp for guiding me through the way.
   
   Does approach 1 make sense to you? 
   
   ### If I take approach 1, how should the sync version function be constructed?
   
   This PR tries to make a wrapper over the async counterparts and keep single logic for each functionality. therefore relies on `futures::executor::block_on` to bridge async to sync function. 
   
   However, this approach is flawed for `block_on` may block the only thread in tokio, and the future inside won't get a chance to run, therefore hanging forever if the tokio runtime is not a multi-threaded one.  (I temporarily change the related test to use `#[tokio::test(flavor = "multi_thread", worker_threads = 2)]` to avoid hanging). Do you have any suggestions on this?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] jorgecarleitao commented on pull request #811: Add support for reading remote storage systems

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#issuecomment-901687032


   Thanks a lot for taking a good look at this and for the proposal.
   
   > Propogate Async API all the way up and finally change the user-facing API: including DataFrame & ExecutionContext
   
   Could you describe which APIs would be affected by this? For example, creating a logical plan would become `async` because we have to read metadata to build a schema, correct? So, for example, things like `df = context.read_parquet(...).await?;`, right?
   
   I agree with making the planing `async`: there is no guarantee that we synchronously have all the information to build the plan in the first place, and imo we should not block because we need to read 50 metadata files from s3.
   
   I agree that this would be a major change. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #811: Source ext for remote files read

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #811:
URL: https://github.com/apache/arrow-datafusion/pull/811#discussion_r683230259



##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {

Review comment:
       I agree with Andrew that `ObjectStore` would have been a better name :)

##########
File path: datafusion/src/datasource/local.rs
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::protocol_registry::ProtocolHandler;
+use crate::error::DataFusionError;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::fs;
+use std::fs::{metadata, File};
+use std::sync::Arc;
+
+pub struct LocalFSHandler;
+
+impl ProtocolHandler<File> for LocalFSHandler {
+    fn as_any(&self) -> &dyn Any {
+        return self;
+    }
+
+    fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>> {
+        list_all(root_path, ext)
+    }
+
+    fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ChunkReader<T = File>>> {

Review comment:
       Just my quick thoughts on possible designs to avoid the chunkreader trait here. We could declare a `read_file` method here instead that would take start and length as argument, then return a Vector or Iterator of bytes.
   
   Then in `ParquetRootDesc`, we use a local struct to implement `ChunkReader` leveraging the `ObjectStore::read_file` method, which gets passed to `SerializedFileReader`.

##########
File path: datafusion/src/execution/context.rs
##########
@@ -125,12 +127,26 @@ pub struct ExecutionContext {
     pub state: Arc<Mutex<ExecutionContextState>>,
 }
 
+lazy_static! {

Review comment:
       I agree with @alamb on this one, would be better to have discussion about global static in a separate PR. This way, it will be easier for us to approve and merge code change specific to remote data source.

##########
File path: datafusion/src/datasource/protocol_registry.rs
##########
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::local::LocalFSHandler;
+use crate::error::Result;
+use parquet::file::reader::ChunkReader;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+
+pub trait ProtocolHandler<R>: Sync + Send {
+    /// Returns the protocol handler as [`Any`](std::any::Any)
+    /// so that it can be downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    fn list_all_files(&self, root_path: &str, ext: &str) -> Result<Vec<String>>;

Review comment:
       Perhaps the name `root_path` here is a bit misleading here, maybe it should be named `path` or `prefix`? To list sub directory, I am thinking one would pass in `root_path/subdir1/subdir2`. But I think we need to be more specific on whether the return value should contain relative path or absolute path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org