You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/14 13:22:44 UTC

[arrow-datafusion] branch master updated: Remove ObjectStore from FileStream (#4533) (#4601)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 84d3ae8ef Remove ObjectStore from FileStream (#4533) (#4601)
84d3ae8ef is described below

commit 84d3ae8efe83ca9c0a988ed23ec66c647b9b7669
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Dec 14 13:22:38 2022 +0000

    Remove ObjectStore from FileStream (#4533) (#4601)
    
    * Remove ObjectStore from FileStream (#4533)
    
    * Fix avro
---
 .../core/src/physical_plan/file_format/avro.rs     | 23 ++++++-------
 .../core/src/physical_plan/file_format/csv.rs      | 23 ++++++-------
 .../src/physical_plan/file_format/file_stream.rs   | 38 ++++------------------
 .../core/src/physical_plan/file_format/json.rs     | 21 +++++-------
 .../core/src/physical_plan/file_format/parquet.rs  |  7 +---
 5 files changed, 36 insertions(+), 76 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 7a9f11954..ec5b27ca7 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -105,20 +105,20 @@ impl ExecutionPlan for AvroExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         use super::file_stream::FileStream;
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.base_config.object_store_url)?;
+
         let config = Arc::new(private::AvroConfig {
             schema: Arc::clone(&self.base_config.file_schema),
             batch_size: context.session_config().batch_size(),
             projection: self.base_config.projected_file_column_names(),
+            object_store,
         });
         let opener = private::AvroOpener { config };
 
-        let stream = FileStream::new(
-            &self.base_config,
-            partition,
-            context,
-            opener,
-            self.metrics.clone(),
-        )?;
+        let stream =
+            FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
         Ok(Box::pin(stream))
     }
 
@@ -157,6 +157,7 @@ mod private {
         pub schema: SchemaRef,
         pub batch_size: usize,
         pub projection: Option<Vec<String>>,
+        pub object_store: Arc<dyn ObjectStore>,
     }
 
     impl AvroConfig {
@@ -178,14 +179,10 @@ mod private {
     }
 
     impl FileOpener for AvroOpener {
-        fn open(
-            &self,
-            store: Arc<dyn ObjectStore>,
-            file_meta: FileMeta,
-        ) -> Result<FileOpenFuture> {
+        fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
             let config = self.config.clone();
             Ok(Box::pin(async move {
-                match store.get(file_meta.location()).await? {
+                match config.object_store.get(file_meta.location()).await? {
                     GetResult::File(file, _) => {
                         let reader = config.open(file)?;
                         Ok(futures::stream::iter(reader).boxed())
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index e078b95aa..b759b7254 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -131,25 +131,25 @@ impl ExecutionPlan for CsvExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.base_config.object_store_url)?;
+
         let config = Arc::new(CsvConfig {
             batch_size: context.session_config().batch_size(),
             file_schema: Arc::clone(&self.base_config.file_schema),
             file_projection: self.base_config.file_column_projection_indices(),
             has_header: self.has_header,
             delimiter: self.delimiter,
+            object_store,
         });
 
         let opener = CsvOpener {
             config,
             file_compression_type: self.file_compression_type.to_owned(),
         };
-        let stream = FileStream::new(
-            &self.base_config,
-            partition,
-            context,
-            opener,
-            self.metrics.clone(),
-        )?;
+        let stream =
+            FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
         Ok(Box::pin(stream) as SendableRecordBatchStream)
     }
 
@@ -184,6 +184,7 @@ struct CsvConfig {
     file_projection: Option<Vec<usize>>,
     has_header: bool,
     delimiter: u8,
+    object_store: Arc<dyn ObjectStore>,
 }
 
 impl CsvConfig {
@@ -208,15 +209,11 @@ struct CsvOpener {
 }
 
 impl FileOpener for CsvOpener {
-    fn open(
-        &self,
-        store: Arc<dyn ObjectStore>,
-        file_meta: FileMeta,
-    ) -> Result<FileOpenFuture> {
+    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
         let config = self.config.clone();
         let file_compression_type = self.file_compression_type.to_owned();
         Ok(Box::pin(async move {
-            match store.get(file_meta.location()).await? {
+            match config.object_store.get(file_meta.location()).await? {
                 GetResult::File(file, _) => {
                     let decoder = file_compression_type.convert_read(file)?;
                     Ok(futures::stream::iter(config.open(decoder, true)).boxed())
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 51a911d51..91877a1d0 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -23,22 +23,18 @@
 
 use std::collections::VecDeque;
 use std::pin::Pin;
-use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::time::Instant;
 
 use arrow::datatypes::SchemaRef;
 use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
 use futures::future::BoxFuture;
 use futures::stream::BoxStream;
 use futures::{ready, FutureExt, Stream, StreamExt};
-use object_store::ObjectStore;
-
-use datafusion_common::ScalarValue;
 
 use crate::datasource::listing::PartitionedFile;
 use crate::error::Result;
-use crate::execution::context::TaskContext;
 use crate::physical_plan::file_format::{
     FileMeta, FileScanConfig, PartitionColumnProjector,
 };
@@ -56,11 +52,7 @@ pub type FileOpenFuture =
 pub trait FileOpener: Unpin {
     /// Asynchronously open the specified file and return a stream
     /// of [`RecordBatch`]
-    fn open(
-        &self,
-        store: Arc<dyn ObjectStore>,
-        file_meta: FileMeta,
-    ) -> Result<FileOpenFuture>;
+    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
 }
 
 /// A stream that iterates record batch by record batch, file over file.
@@ -79,8 +71,6 @@ pub struct FileStream<F: FileOpener> {
     file_reader: F,
     /// The partition column projector
     pc_projector: PartitionColumnProjector,
-    /// the store from which to source the files.
-    object_store: Arc<dyn ObjectStore>,
     /// The stream state
     state: FileStreamState,
     /// File stream specific metrics
@@ -175,7 +165,6 @@ impl<F: FileOpener> FileStream<F> {
     pub fn new(
         config: &FileScanConfig,
         partition: usize,
-        context: Arc<TaskContext>,
         file_reader: F,
         metrics: ExecutionPlanMetricsSet,
     ) -> Result<Self> {
@@ -191,17 +180,12 @@ impl<F: FileOpener> FileStream<F> {
 
         let files = config.file_groups[partition].clone();
 
-        let object_store = context
-            .runtime_env()
-            .object_store(&config.object_store_url)?;
-
         Ok(Self {
             file_iter: files.into(),
             projected_schema,
             remain: config.limit,
             file_reader,
             pc_projector,
-            object_store,
             state: FileStreamState::Idle,
             file_stream_metrics: FileStreamMetrics::new(&metrics, partition),
             baseline_metrics: BaselineMetrics::new(&metrics, partition),
@@ -228,7 +212,7 @@ impl<F: FileOpener> FileStream<F> {
 
                     self.file_stream_metrics.time_opening.start();
 
-                    match self.file_reader.open(self.object_store.clone(), file_meta) {
+                    match self.file_reader.open(file_meta) {
                         Ok(future) => {
                             self.state = FileStreamState::Open {
                                 future,
@@ -339,11 +323,7 @@ mod tests {
     }
 
     impl FileOpener for TestOpener {
-        fn open(
-            &self,
-            _store: Arc<dyn ObjectStore>,
-            _file_meta: FileMeta,
-        ) -> Result<FileOpenFuture> {
+        fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
             let iterator = self.records.clone().into_iter().map(Ok);
             let stream = futures::stream::iter(iterator).boxed();
             Ok(futures::future::ready(Ok(stream)).boxed())
@@ -375,14 +355,8 @@ mod tests {
             output_ordering: None,
         };
 
-        let file_stream = FileStream::new(
-            &config,
-            0,
-            ctx.task_ctx(),
-            reader,
-            ExecutionPlanMetricsSet::new(),
-        )
-        .unwrap();
+        let file_stream =
+            FileStream::new(&config, 0, reader, ExecutionPlanMetricsSet::new()).unwrap();
 
         file_stream
             .map(|b| b.expect("No error expected in stream"))
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index fe5db9d86..e507c7d6d 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -119,19 +119,18 @@ impl ExecutionPlan for NdJsonExec {
             options
         };
 
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.base_config.object_store_url)?;
         let opener = JsonOpener {
             file_schema,
             options,
             file_compression_type: self.file_compression_type.to_owned(),
+            object_store,
         };
 
-        let stream = FileStream::new(
-            &self.base_config,
-            partition,
-            context,
-            opener,
-            self.metrics.clone(),
-        )?;
+        let stream =
+            FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
 
         Ok(Box::pin(stream) as SendableRecordBatchStream)
     }
@@ -162,16 +161,14 @@ struct JsonOpener {
     options: DecoderOptions,
     file_schema: SchemaRef,
     file_compression_type: FileCompressionType,
+    object_store: Arc<dyn ObjectStore>,
 }
 
 impl FileOpener for JsonOpener {
-    fn open(
-        &self,
-        store: Arc<dyn ObjectStore>,
-        file_meta: FileMeta,
-    ) -> Result<FileOpenFuture> {
+    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
         let options = self.options.clone();
         let schema = self.file_schema.clone();
+        let store = self.object_store.clone();
         let file_compression_type = self.file_compression_type.to_owned();
         Ok(Box::pin(async move {
             match store.get(file_meta.location()).await? {
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index edf72b523..a9815680b 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -320,7 +320,6 @@ impl ExecutionPlan for ParquetExec {
         let stream = FileStream::new(
             &self.base_config,
             partition_index,
-            ctx,
             opener,
             self.metrics.clone(),
         )?;
@@ -406,11 +405,7 @@ struct ParquetOpener {
 }
 
 impl FileOpener for ParquetOpener {
-    fn open(
-        &self,
-        _: Arc<dyn ObjectStore>,
-        file_meta: FileMeta,
-    ) -> Result<FileOpenFuture> {
+    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
         let file_range = file_meta.range.clone();
 
         let file_metrics = ParquetFileMetrics::new(