You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/08/01 02:18:05 UTC

[arrow-datafusion] branch master updated: Rename FileReader to FileOpener (#2990) (#2991)

This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 193fc3b45 Rename FileReader to FileOpener (#2990) (#2991)
193fc3b45 is described below

commit 193fc3b4549ed40a08218b744388f9211605ab53
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Aug 1 03:18:01 2022 +0100

    Rename FileReader to FileOpener (#2990) (#2991)
---
 .../core/src/physical_plan/file_format/avro.rs     |  6 +++---
 .../core/src/physical_plan/file_format/csv.rs      |  6 +++---
 .../src/physical_plan/file_format/file_stream.rs   | 24 +++++++++++-----------
 .../core/src/physical_plan/file_format/json.rs     |  6 +++---
 .../core/src/physical_plan/file_format/parquet.rs  |  6 +++---
 5 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index f480b9d54..a1cef4a1e 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -154,7 +154,7 @@ impl ExecutionPlan for AvroExec {
 mod private {
     use super::*;
     use crate::datasource::listing::FileRange;
-    use crate::physical_plan::file_format::file_stream::{FormatReader, ReaderFuture};
+    use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
     use bytes::Buf;
     use futures::StreamExt;
     use object_store::{GetResult, ObjectMeta, ObjectStore};
@@ -183,13 +183,13 @@ mod private {
         pub config: Arc<AvroConfig>,
     }
 
-    impl FormatReader for AvroOpener {
+    impl FileOpener for AvroOpener {
         fn open(
             &self,
             store: Arc<dyn ObjectStore>,
             file: ObjectMeta,
             _range: Option<FileRange>,
-        ) -> ReaderFuture {
+        ) -> FileOpenFuture {
             let config = self.config.clone();
             Box::pin(async move {
                 match store.get(&file.location).await? {
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 975c3ae5f..276dd0ed6 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -27,7 +27,7 @@ use crate::physical_plan::{
 use crate::datasource::listing::FileRange;
 use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
 use crate::physical_plan::file_format::file_stream::{
-    FileStream, FormatReader, ReaderFuture,
+    FileOpenFuture, FileOpener, FileStream,
 };
 use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
 use arrow::csv;
@@ -197,13 +197,13 @@ struct CsvOpener {
     config: Arc<CsvConfig>,
 }
 
-impl FormatReader for CsvOpener {
+impl FileOpener for CsvOpener {
     fn open(
         &self,
         store: Arc<dyn ObjectStore>,
         file: ObjectMeta,
         _range: Option<FileRange>,
-    ) -> ReaderFuture {
+    ) -> FileOpenFuture {
         let config = self.config.clone();
         Box::pin(async move {
             match store.get(&file.location).await? {
diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs
index c3ac064e2..ae4d549b0 100644
--- a/datafusion/core/src/physical_plan/file_format/file_stream.rs
+++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs
@@ -43,20 +43,20 @@ use crate::physical_plan::metrics::BaselineMetrics;
 use crate::physical_plan::RecordBatchStream;
 
 /// A fallible future that resolves to a stream of [`RecordBatch`]
-pub type ReaderFuture =
+pub type FileOpenFuture =
     BoxFuture<'static, Result<BoxStream<'static, ArrowResult<RecordBatch>>>>;
 
-pub trait FormatReader: Unpin {
+pub trait FileOpener: Unpin {
     fn open(
         &self,
         store: Arc<dyn ObjectStore>,
         file: ObjectMeta,
         range: Option<FileRange>,
-    ) -> ReaderFuture;
+    ) -> FileOpenFuture;
 }
 
 /// A stream that iterates record batch by record batch, file over file.
-pub struct FileStream<F: FormatReader> {
+pub struct FileStream<F: FileOpener> {
     /// An iterator over input files.
     file_iter: VecDeque<PartitionedFile>,
     /// The stream schema (file schema including partition columns and after
@@ -85,12 +85,12 @@ enum FileStreamState {
     /// Currently performing asynchronous IO to obtain a stream of RecordBatch
     /// for a given parquet file
     Open {
-        /// A [`ReaderFuture`] returned by [`FormatReader::open`]
-        future: ReaderFuture,
+        /// A [`FileOpenFuture`] returned by [`FormatReader::open`]
+        future: FileOpenFuture,
         /// The partition values for this file
         partition_values: Vec<ScalarValue>,
     },
-    /// Scanning the [`BoxStream`] returned by the completion of a [`ReaderFuture`]
+    /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
     /// returned by [`FormatReader::open`]
     Scan {
         /// Partitioning column values for the current batch_iter
@@ -104,7 +104,7 @@ enum FileStreamState {
     Limit,
 }
 
-impl<F: FormatReader> FileStream<F> {
+impl<F: FileOpener> FileStream<F> {
     pub fn new(
         config: &FileScanConfig,
         partition: usize,
@@ -212,7 +212,7 @@ impl<F: FormatReader> FileStream<F> {
     }
 }
 
-impl<F: FormatReader> Stream for FileStream<F> {
+impl<F: FileOpener> Stream for FileStream<F> {
     type Item = ArrowResult<RecordBatch>;
 
     fn poll_next(
@@ -227,7 +227,7 @@ impl<F: FormatReader> Stream for FileStream<F> {
     }
 }
 
-impl<F: FormatReader> RecordBatchStream for FileStream<F> {
+impl<F: FileOpener> RecordBatchStream for FileStream<F> {
     fn schema(&self) -> SchemaRef {
         self.projected_schema.clone()
     }
@@ -250,13 +250,13 @@ mod tests {
         records: Vec<RecordBatch>,
     }
 
-    impl FormatReader for TestOpener {
+    impl FileOpener for TestOpener {
         fn open(
             &self,
             _store: Arc<dyn ObjectStore>,
             _file: ObjectMeta,
             _range: Option<FileRange>,
-        ) -> ReaderFuture {
+        ) -> FileOpenFuture {
             let iterator = self.records.clone().into_iter().map(Ok);
             let stream = futures::stream::iter(iterator).boxed();
             futures::future::ready(Ok(stream)).boxed()
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index bb2f09f42..6cc864312 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -25,7 +25,7 @@ use crate::execution::context::TaskContext;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
 use crate::physical_plan::file_format::file_stream::{
-    FileStream, FormatReader, ReaderFuture,
+    FileOpenFuture, FileOpener, FileStream,
 };
 use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
 use crate::physical_plan::{
@@ -159,13 +159,13 @@ struct JsonOpener {
     file_schema: SchemaRef,
 }
 
-impl FormatReader for JsonOpener {
+impl FileOpener for JsonOpener {
     fn open(
         &self,
         store: Arc<dyn ObjectStore>,
         file: ObjectMeta,
         _range: Option<FileRange>,
-    ) -> ReaderFuture {
+    ) -> FileOpenFuture {
         let options = self.options.clone();
         let schema = self.file_schema.clone();
         Box::pin(async move {
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index d79fd2673..184214d68 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -52,7 +52,7 @@ use datafusion_expr::Expr;
 use crate::datasource::file_format::parquet::fetch_parquet_metadata;
 use crate::datasource::listing::FileRange;
 use crate::physical_plan::file_format::file_stream::{
-    FileStream, FormatReader, ReaderFuture,
+    FileOpenFuture, FileOpener, FileStream,
 };
 use crate::physical_plan::metrics::BaselineMetrics;
 use crate::{
@@ -287,13 +287,13 @@ struct ParquetOpener {
     metrics: ExecutionPlanMetricsSet,
 }
 
-impl FormatReader for ParquetOpener {
+impl FileOpener for ParquetOpener {
     fn open(
         &self,
         store: Arc<dyn ObjectStore>,
         meta: ObjectMeta,
         range: Option<FileRange>,
-    ) -> ReaderFuture {
+    ) -> FileOpenFuture {
         let metrics = ParquetFileMetrics::new(
             self.partition_index,
             meta.location.as_ref(),