You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/08/01 02:18:05 UTC
[arrow-datafusion] branch master updated: Rename FileReader to FileOpener (#2990) (#2991)
This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 193fc3b45 Rename FileReader to FileOpener (#2990) (#2991)
193fc3b45 is described below
commit 193fc3b4549ed40a08218b744388f9211605ab53
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Aug 1 03:18:01 2022 +0100
Rename FileReader to FileOpener (#2990) (#2991)
---
.../core/src/physical_plan/file_format/avro.rs | 6 +++---
.../core/src/physical_plan/file_format/csv.rs | 6 +++---
.../src/physical_plan/file_format/file_stream.rs | 24 +++++++++++-----------
.../core/src/physical_plan/file_format/json.rs | 6 +++---
.../core/src/physical_plan/file_format/parquet.rs | 6 +++---
5 files changed, 24 insertions(+), 24 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index f480b9d54..a1cef4a1e 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -154,7 +154,7 @@ impl ExecutionPlan for AvroExec {
mod private {
use super::*;
use crate::datasource::listing::FileRange;
- use crate::physical_plan::file_format::file_stream::{FormatReader, ReaderFuture};
+ use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResult, ObjectMeta, ObjectStore};
@@ -183,13 +183,13 @@ mod private {
pub config: Arc<AvroConfig>,
}
- impl FormatReader for AvroOpener {
+ impl FileOpener for AvroOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
- ) -> ReaderFuture {
+ ) -> FileOpenFuture {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 975c3ae5f..276dd0ed6 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -27,7 +27,7 @@ use crate::physical_plan::{
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
- FileStream, FormatReader, ReaderFuture,
+ FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use arrow::csv;
@@ -197,13 +197,13 @@ struct CsvOpener {
config: Arc<CsvConfig>,
}
-impl FormatReader for CsvOpener {
+impl FileOpener for CsvOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
- ) -> ReaderFuture {
+ ) -> FileOpenFuture {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index c3ac064e2..ae4d549b0 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -43,20 +43,20 @@ use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::RecordBatchStream;
/// A fallible future that resolves to a stream of [`RecordBatch`]
-pub type ReaderFuture =
+pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, ArrowResult<RecordBatch>>>>;
-pub trait FormatReader: Unpin {
+pub trait FileOpener: Unpin {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
range: Option<FileRange>,
- ) -> ReaderFuture;
+ ) -> FileOpenFuture;
}
/// A stream that iterates record batch by record batch, file over file.
-pub struct FileStream<F: FormatReader> {
+pub struct FileStream<F: FileOpener> {
/// An iterator over input files.
file_iter: VecDeque<PartitionedFile>,
/// The stream schema (file schema including partition columns and after
@@ -85,12 +85,12 @@ enum FileStreamState {
/// Currently performing asynchronous IO to obtain a stream of RecordBatch
/// for a given parquet file
Open {
- /// A [`ReaderFuture`] returned by [`FormatReader::open`]
- future: ReaderFuture,
+ /// A [`FileOpenFuture`] returned by [`FormatReader::open`]
+ future: FileOpenFuture,
/// The partition values for this file
partition_values: Vec<ScalarValue>,
},
- /// Scanning the [`BoxStream`] returned by the completion of a [`ReaderFuture`]
+ /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
/// returned by [`FormatReader::open`]
Scan {
/// Partitioning column values for the current batch_iter
@@ -104,7 +104,7 @@ enum FileStreamState {
Limit,
}
-impl<F: FormatReader> FileStream<F> {
+impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
@@ -212,7 +212,7 @@ impl<F: FormatReader> FileStream<F> {
}
}
-impl<F: FormatReader> Stream for FileStream<F> {
+impl<F: FileOpener> Stream for FileStream<F> {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
@@ -227,7 +227,7 @@ impl<F: FormatReader> Stream for FileStream<F> {
}
}
-impl<F: FormatReader> RecordBatchStream for FileStream<F> {
+impl<F: FileOpener> RecordBatchStream for FileStream<F> {
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}
@@ -250,13 +250,13 @@ mod tests {
records: Vec<RecordBatch>,
}
- impl FormatReader for TestOpener {
+ impl FileOpener for TestOpener {
fn open(
&self,
_store: Arc<dyn ObjectStore>,
_file: ObjectMeta,
_range: Option<FileRange>,
- ) -> ReaderFuture {
+ ) -> FileOpenFuture {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
futures::future::ready(Ok(stream)).boxed()
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index bb2f09f42..6cc864312 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -25,7 +25,7 @@ use crate::execution::context::TaskContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
- FileStream, FormatReader, ReaderFuture,
+ FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::{
@@ -159,13 +159,13 @@ struct JsonOpener {
file_schema: SchemaRef,
}
-impl FormatReader for JsonOpener {
+impl FileOpener for JsonOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
- ) -> ReaderFuture {
+ ) -> FileOpenFuture {
let options = self.options.clone();
let schema = self.file_schema.clone();
Box::pin(async move {
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index d79fd2673..184214d68 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -52,7 +52,7 @@ use datafusion_expr::Expr;
use crate::datasource::file_format::parquet::fetch_parquet_metadata;
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
- FileStream, FormatReader, ReaderFuture,
+ FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::BaselineMetrics;
use crate::{
@@ -287,13 +287,13 @@ struct ParquetOpener {
metrics: ExecutionPlanMetricsSet,
}
-impl FormatReader for ParquetOpener {
+impl FileOpener for ParquetOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
range: Option<FileRange>,
- ) -> ReaderFuture {
+ ) -> FileOpenFuture {
let metrics = ParquetFileMetrics::new(
self.partition_index,
meta.location.as_ref(),