You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/09 13:03:30 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2677: Switch to object_store crate (#2489)

alamb commented on code in PR #2677:
URL: https://github.com/apache/arrow-datafusion/pull/2677#discussion_r893456167


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -294,37 +287,50 @@ fn summarize_min_max(
     }
 }
 
-/// Read and parse the schema of the Parquet file at location `path`
-fn fetch_schema(store: &dyn ObjectStore, file: &FileMeta) -> Result<Schema> {
-    let object_reader = store.file_reader(file.sized_file.clone())?;
-    let obj_reader = ChunkObjectReader {
-        object_reader,
-        bytes_scanned: None,
-    };
-    let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
-    let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-    let schema = arrow_reader.get_schema()?;
+async fn fetch_metadata(
+    store: &dyn ObjectStore,
+    file: &ObjectMeta,
+) -> Result<ParquetMetaData> {
+    // TODO: Fetching the entire file to get metadata is wasteful

Review Comment:
   To be clear, the entire file is fetched to read metadata before this PR too (so this PR doesn't make it better or worse).
   
   Maybe it is worth a link to the work in arrow-rs related to avoiding this



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -294,37 +287,50 @@ fn summarize_min_max(
     }
 }
 
-/// Read and parse the schema of the Parquet file at location `path`
-fn fetch_schema(store: &dyn ObjectStore, file: &FileMeta) -> Result<Schema> {
-    let object_reader = store.file_reader(file.sized_file.clone())?;
-    let obj_reader = ChunkObjectReader {
-        object_reader,
-        bytes_scanned: None,
-    };
-    let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
-    let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-    let schema = arrow_reader.get_schema()?;
+async fn fetch_metadata(
+    store: &dyn ObjectStore,
+    file: &ObjectMeta,
+) -> Result<ParquetMetaData> {
+    // TODO: Fetching the entire file to get metadata is wasteful
+    match store.get(&file.location).await? {
+        GetResult::File(file, _) => {
+            Ok(SerializedFileReader::new(file)?.metadata().clone())
+        }
+        r @ GetResult::Stream(_) => {
+            let data = r.bytes().await?;
+            let cursor = SliceableCursor::new(data.to_vec());
+            Ok(SerializedFileReader::new(cursor)?.metadata().clone())
+        }
+    }
+}
 
+/// Read and parse the schema of the Parquet file at location `path`
+async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result<Schema> {
+    let metadata = fetch_metadata(store, file).await?;
+    let file_metadata = metadata.file_metadata();
+    let schema = parquet_to_arrow_schema(
+        file_metadata.schema_descr(),
+        file_metadata.key_value_metadata(),
+    )?;
     Ok(schema)
 }
 
 /// Read and parse the statistics of the Parquet file at location `path`
-fn fetch_statistics(
+async fn fetch_statistics(
     store: &dyn ObjectStore,
     table_schema: SchemaRef,
-    file: &FileMeta,
+    file: &ObjectMeta,
 ) -> Result<Statistics> {
-    let object_reader = store.file_reader(file.sized_file.clone())?;
-    let obj_reader = ChunkObjectReader {
-        object_reader,
-        bytes_scanned: None,
-    };
-    let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
-    let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
-    let file_schema = arrow_reader.get_schema()?;
+    let metadata = fetch_metadata(store, file).await?;

Review Comment:
   it is somewhat appalling how many times a file is fetched during planning -- can't wait to have a way to cache the metadata



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -80,104 +71,146 @@ pub struct FileStream<F: FormatReaderOpener> {
     pc_projector: PartitionColumnProjector,
     /// the store from which to source the files.
     object_store: Arc<dyn ObjectStore>,
+    /// The stream state
+    state: FileStreamState,
 }
 
-impl<F: FormatReaderOpener> FileStream<F> {
+enum FileStreamState {
+    Idle,
+    Open {
+        future: ReaderFuture,
+        partition_values: Vec<ScalarValue>,
+    },
+    Scan {
+        /// Partitioning column values for the current batch_iter
+        partition_values: Vec<ScalarValue>,
+        /// The reader instance
+        reader: BoxStream<'static, ArrowResult<RecordBatch>>,
+    },
+    Error,
+    Limit,
+}
+
+impl<F: FormatReader> FileStream<F> {
     pub fn new(
-        object_store: Arc<dyn ObjectStore>,
-        files: Vec<PartitionedFile>,
+        config: &FileScanConfig,
+        partition: usize,
+        context: Arc<TaskContext>,
         file_reader: F,
-        projected_schema: SchemaRef,
-        limit: Option<usize>,
-        table_partition_cols: Vec<String>,
-    ) -> Self {
+    ) -> Result<Self> {
+        let (projected_schema, _) = config.project();
         let pc_projector = PartitionColumnProjector::new(
-            Arc::clone(&projected_schema),
-            &table_partition_cols,
+            projected_schema.clone(),
+            &config.table_partition_cols,
         );
 
-        Self {
-            file_iter: Box::new(files.into_iter()),
-            batch_iter: Box::new(iter::empty()),
-            partition_values: vec![],
-            remain: limit,
+        let files = config.file_groups[partition].clone();
+
+        let object_store = context
+            .runtime_env()
+            .object_store(&config.object_store_url)?;
+
+        Ok(Self {
+            file_iter: files.into(),
             projected_schema,
+            remain: config.limit,
             file_reader,
             pc_projector,
             object_store,
-        }
+            state: FileStreamState::Idle,
+        })
     }
 
-    /// Acts as a flat_map of record batches over files. Adds the partitioning
-    /// Columns to the returned record batches.
-    fn next_batch(&mut self) -> Option<ArrowResult<RecordBatch>> {
-        match self.batch_iter.next() {
-            Some(Ok(batch)) => {
-                Some(self.pc_projector.project(batch, &self.partition_values))
-            }
-            Some(Err(e)) => Some(Err(e)),
-            None => match self.file_iter.next() {
-                Some(f) => {
-                    self.partition_values = f.partition_values;
-                    self.object_store
-                        .file_reader(f.file_meta.sized_file)
-                        .and_then(|r| r.sync_reader())
-                        .map_err(|e| ArrowError::ExternalError(Box::new(e)))
-                        .and_then(|f| {
-                            self.batch_iter = (self.file_reader)(f, &self.remain);
-                            self.next_batch().transpose()
-                        })
-                        .transpose()
+    fn poll_inner(

Review Comment:
   cc @rdettai 



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -580,7 +554,7 @@ mod tests {
         let batches = collect(exec, task_ctx).await?;
         assert_eq!(1, batches.len());
         assert_eq!(11, batches[0].num_columns());
-        assert_eq!(8, batches[0].num_rows());
+        assert_eq!(1, batches[0].num_rows());

Review Comment:
   why is this different?



##########
datafusion/core/src/physical_plan/file_format/file_stream.rs:
##########
@@ -80,104 +71,146 @@ pub struct FileStream<F: FormatReaderOpener> {
     pc_projector: PartitionColumnProjector,
     /// the store from which to source the files.
     object_store: Arc<dyn ObjectStore>,
+    /// The stream state
+    state: FileStreamState,
 }
 
-impl<F: FormatReaderOpener> FileStream<F> {
+enum FileStreamState {
+    Idle,
+    Open {
+        future: ReaderFuture,

Review Comment:
   Perhaps we can add some docstrings -- especially for what `future` represents



##########
datafusion/core/src/datasource/listing/url.rs:
##########
@@ -102,86 +103,72 @@ impl ListingTableUrl {
             false => Url::from_directory_path(path).unwrap(),
         };
 
-        Ok(Self { url, glob })
+        Ok(Self::new(url, glob))
+    }
+
+    /// Creates a new [`ListingTableUrl`] from a url and optional glob expression
+    fn new(url: Url, glob: Option<Pattern>) -> Self {
+        // TODO: fix this upstream

Review Comment:
   Fix what? Is there a ticket?



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -560,8 +534,8 @@ mod tests {
         let _ = collect(exec.clone(), task_ctx.clone()).await?;
         let _ = collect(exec_projected.clone(), task_ctx).await?;
 
-        assert_bytes_scanned(exec, 2522);
-        assert_bytes_scanned(exec_projected, 1924);
+        assert_bytes_scanned(exec, 1851);

Review Comment:
   Can you explain why this number is different?



##########
datafusion/core/src/datasource/file_format/json.rs:
##########
@@ -71,21 +71,33 @@ impl FileFormat for JsonFormat {
     async fn infer_schema(
         &self,
         store: &Arc<dyn ObjectStore>,
-        files: &[FileMeta],
+        objects: &[ObjectMeta],
     ) -> Result<SchemaRef> {
         let mut schemas = Vec::new();
         let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
-        for file in files {
-            let reader = store.file_reader(file.sized_file.clone())?.sync_reader()?;
-            let mut reader = BufReader::new(reader);
-            let iter = ValueIter::new(&mut reader, None);
-            let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+        for object in objects {
+            let mut take_while = || {
                 let should_take = records_to_read > 0;
                 if should_take {
                     records_to_read -= 1;
                 }
                 should_take
-            }))?;
+            };
+
+            let schema = match store.get(&object.location).await? {

Review Comment:
   I like how this interface allows for specialized access of LocalFiles as well as streams 👍 



##########
datafusion/core/Cargo.toml:
##########
@@ -58,9 +58,9 @@ ahash = { version = "0.7", default-features = false }
 arrow = { version = "15.0.0", features = ["prettyprint"] }
 async-trait = "0.1.41"
 avro-rs = { version = "0.13", features = ["snappy"], optional = true }
+bytes = "1.1"
 chrono = { version = "0.4", default-features = false }
-datafusion-common = { path = "../common", version = "8.0.0", features = ["parquet"] }
-datafusion-data-access = { path = "../data-access", version = "8.0.0" }

Review Comment:
   should we also perhaps remove the `data-access` directory as part of the same PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org