You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/14 13:22:44 UTC
[arrow-datafusion] branch master updated: Remove ObjectStore from FileStream (#4533) (#4601)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 84d3ae8ef Remove ObjectStore from FileStream (#4533) (#4601)
84d3ae8ef is described below
commit 84d3ae8efe83ca9c0a988ed23ec66c647b9b7669
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Dec 14 13:22:38 2022 +0000
Remove ObjectStore from FileStream (#4533) (#4601)
* Remove ObjectStore from FileStream (#4533)
* Fix avro
---
.../core/src/physical_plan/file_format/avro.rs | 23 ++++++-------
.../core/src/physical_plan/file_format/csv.rs | 23 ++++++-------
.../src/physical_plan/file_format/file_stream.rs | 38 ++++------------------
.../core/src/physical_plan/file_format/json.rs | 21 +++++-------
.../core/src/physical_plan/file_format/parquet.rs | 7 +---
5 files changed, 36 insertions(+), 76 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 7a9f11954..ec5b27ca7 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -105,20 +105,20 @@ impl ExecutionPlan for AvroExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
use super::file_stream::FileStream;
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.base_config.object_store_url)?;
+
let config = Arc::new(private::AvroConfig {
schema: Arc::clone(&self.base_config.file_schema),
batch_size: context.session_config().batch_size(),
projection: self.base_config.projected_file_column_names(),
+ object_store,
});
let opener = private::AvroOpener { config };
- let stream = FileStream::new(
- &self.base_config,
- partition,
- context,
- opener,
- self.metrics.clone(),
- )?;
+ let stream =
+ FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
Ok(Box::pin(stream))
}
@@ -157,6 +157,7 @@ mod private {
pub schema: SchemaRef,
pub batch_size: usize,
pub projection: Option<Vec<String>>,
+ pub object_store: Arc<dyn ObjectStore>,
}
impl AvroConfig {
@@ -178,14 +179,10 @@ mod private {
}
impl FileOpener for AvroOpener {
- fn open(
- &self,
- store: Arc<dyn ObjectStore>,
- file_meta: FileMeta,
- ) -> Result<FileOpenFuture> {
+ fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
Ok(Box::pin(async move {
- match store.get(file_meta.location()).await? {
+ match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = config.open(file)?;
Ok(futures::stream::iter(reader).boxed())
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index e078b95aa..b759b7254 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -131,25 +131,25 @@ impl ExecutionPlan for CsvExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.base_config.object_store_url)?;
+
let config = Arc::new(CsvConfig {
batch_size: context.session_config().batch_size(),
file_schema: Arc::clone(&self.base_config.file_schema),
file_projection: self.base_config.file_column_projection_indices(),
has_header: self.has_header,
delimiter: self.delimiter,
+ object_store,
});
let opener = CsvOpener {
config,
file_compression_type: self.file_compression_type.to_owned(),
};
- let stream = FileStream::new(
- &self.base_config,
- partition,
- context,
- opener,
- self.metrics.clone(),
- )?;
+ let stream =
+ FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
@@ -184,6 +184,7 @@ struct CsvConfig {
file_projection: Option<Vec<usize>>,
has_header: bool,
delimiter: u8,
+ object_store: Arc<dyn ObjectStore>,
}
impl CsvConfig {
@@ -208,15 +209,11 @@ struct CsvOpener {
}
impl FileOpener for CsvOpener {
- fn open(
- &self,
- store: Arc<dyn ObjectStore>,
- file_meta: FileMeta,
- ) -> Result<FileOpenFuture> {
+ fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
- match store.get(file_meta.location()).await? {
+ match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let decoder = file_compression_type.convert_read(file)?;
Ok(futures::stream::iter(config.open(decoder, true)).boxed())
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index 51a911d51..91877a1d0 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -23,22 +23,18 @@
use std::collections::VecDeque;
use std::pin::Pin;
-use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use arrow::datatypes::SchemaRef;
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use datafusion_common::ScalarValue;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
-use object_store::ObjectStore;
-
-use datafusion_common::ScalarValue;
use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
-use crate::execution::context::TaskContext;
use crate::physical_plan::file_format::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
@@ -56,11 +52,7 @@ pub type FileOpenFuture =
pub trait FileOpener: Unpin {
/// Asynchronously open the specified file and return a stream
/// of [`RecordBatch`]
- fn open(
- &self,
- store: Arc<dyn ObjectStore>,
- file_meta: FileMeta,
- ) -> Result<FileOpenFuture>;
+ fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
}
/// A stream that iterates record batch by record batch, file over file.
@@ -79,8 +71,6 @@ pub struct FileStream<F: FileOpener> {
file_reader: F,
/// The partition column projector
pc_projector: PartitionColumnProjector,
- /// the store from which to source the files.
- object_store: Arc<dyn ObjectStore>,
/// The stream state
state: FileStreamState,
/// File stream specific metrics
@@ -175,7 +165,6 @@ impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
- context: Arc<TaskContext>,
file_reader: F,
metrics: ExecutionPlanMetricsSet,
) -> Result<Self> {
@@ -191,17 +180,12 @@ impl<F: FileOpener> FileStream<F> {
let files = config.file_groups[partition].clone();
- let object_store = context
- .runtime_env()
- .object_store(&config.object_store_url)?;
-
Ok(Self {
file_iter: files.into(),
projected_schema,
remain: config.limit,
file_reader,
pc_projector,
- object_store,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(&metrics, partition),
baseline_metrics: BaselineMetrics::new(&metrics, partition),
@@ -228,7 +212,7 @@ impl<F: FileOpener> FileStream<F> {
self.file_stream_metrics.time_opening.start();
- match self.file_reader.open(self.object_store.clone(), file_meta) {
+ match self.file_reader.open(file_meta) {
Ok(future) => {
self.state = FileStreamState::Open {
future,
@@ -339,11 +323,7 @@ mod tests {
}
impl FileOpener for TestOpener {
- fn open(
- &self,
- _store: Arc<dyn ObjectStore>,
- _file_meta: FileMeta,
- ) -> Result<FileOpenFuture> {
+ fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
Ok(futures::future::ready(Ok(stream)).boxed())
@@ -375,14 +355,8 @@ mod tests {
output_ordering: None,
};
- let file_stream = FileStream::new(
- &config,
- 0,
- ctx.task_ctx(),
- reader,
- ExecutionPlanMetricsSet::new(),
- )
- .unwrap();
+ let file_stream =
+ FileStream::new(&config, 0, reader, ExecutionPlanMetricsSet::new()).unwrap();
file_stream
.map(|b| b.expect("No error expected in stream"))
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index fe5db9d86..e507c7d6d 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -119,19 +119,18 @@ impl ExecutionPlan for NdJsonExec {
options
};
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.base_config.object_store_url)?;
let opener = JsonOpener {
file_schema,
options,
file_compression_type: self.file_compression_type.to_owned(),
+ object_store,
};
- let stream = FileStream::new(
- &self.base_config,
- partition,
- context,
- opener,
- self.metrics.clone(),
- )?;
+ let stream =
+ FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
@@ -162,16 +161,14 @@ struct JsonOpener {
options: DecoderOptions,
file_schema: SchemaRef,
file_compression_type: FileCompressionType,
+ object_store: Arc<dyn ObjectStore>,
}
impl FileOpener for JsonOpener {
- fn open(
- &self,
- store: Arc<dyn ObjectStore>,
- file_meta: FileMeta,
- ) -> Result<FileOpenFuture> {
+ fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let options = self.options.clone();
let schema = self.file_schema.clone();
+ let store = self.object_store.clone();
let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index edf72b523..a9815680b 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -320,7 +320,6 @@ impl ExecutionPlan for ParquetExec {
let stream = FileStream::new(
&self.base_config,
partition_index,
- ctx,
opener,
self.metrics.clone(),
)?;
@@ -406,11 +405,7 @@ struct ParquetOpener {
}
impl FileOpener for ParquetOpener {
- fn open(
- &self,
- _: Arc<dyn ObjectStore>,
- file_meta: FileMeta,
- ) -> Result<FileOpenFuture> {
+ fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();
let file_metrics = ParquetFileMetrics::new(