You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "wiedld (via GitHub)" <gi...@apache.org> on 2024/03/11 03:39:13 UTC

[PR] WIP(do-not-merge): changes to enable ParquetSink poc [arrow-datafusion]

wiedld opened a new pull request, #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548

   **POC for Discussion only: DO NOT MERGE.**
   
   
   ## Which issue does this PR close?
   
   For discussion of https://github.com/apache/arrow-datafusion/issues/9493.
   
   ## Rationale for this change
   
   We are proposed a generalized public API that provides access to parallelized parquet writes outside of the COPYTO execution context. The code shared here is **NOT** the changes we are requesting. Instead, it shows what current limitations exist when trying to use the ParquetSink for parquet writing, instead of the ArrowWriter.
   
   ## What changes are included in this PR?
   
   What ArrowWriter already provided, and we had to change in order to use ParquetSink:
   * expose the FileMetaData associated with the created parquet:
      * **ArrowWriter already provides:**
          * in the [ArrowWriter::close() return signature](https://github.com/apache/arrow-rs/blob/c6ba0f764a9142b74c9070db269de04d2701d112/parquet/src/arrow/arrow_writer/mod.rs#L254), the FileMetaData is provided.
      * **ParquetSink had to be changed:**
          * as ParquetSink is intended for use inside a query execution context, and writes to 1+ file sinks, it does not currently return any FileMetaData associated with any sinks.
          * We had to change this, in order for the POC to work.
   * provide the appropriate schema in the kv store:
      * **ArrowWriter already provides:**
          * the [ArrowWriter::try_new() both serializes the schema and maps to the appropriate key](https://github.com/apache/arrow-rs/blob/c6ba0f764a9142b74c9070db269de04d2701d112/parquet/src/arrow/arrow_writer/mod.rs#L141) within the kv_store of the WriterPropertries
      * **ParquetSink had to be changed:**
          * whereas the ParquetSink does not include this functionality.
          * As such, we had to provide this mutation of WriterProperties in our own code (by extracting the `add_encoded_arrow_schema_to_metadata()` and associated upstream functionality).
   
   
   
   ## Are these changes tested?
   
   This code will not be merged.
   
   ## Are there any user-facing changes?
   
   This code will not be merged.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat(9493): provide access to FileMetaData for files written with ParquetSink [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat(9493): provide access to FileMetaData for files written with ParquetSink [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#discussion_r1520436449


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -717,7 +734,18 @@ impl DataSink for ParquetSink {
         while let Some(result) = file_write_tasks.join_next().await {
             match result {
                 Ok(r) => {
-                    row_count += r?;
+                    let (path, file_metadata) = r?;
+                    row_count += file_metadata.num_rows;
+                    let mut written_files = self.written.lock();
+                    written_files
+                        .try_insert(path.clone(), file_metadata)
+                        .map_err(|e| {
+                            DataFusionError::Internal(format!(
+                                "duplicate entry detected for partitioned file {}: {}",
+                                &path, e
+                            ))

Review Comment:
   Can you please use the `internal_err!` macro here instead -- something like
   
   ```suggestion
                           .map_err(|e| internal_err!("duplicate entry detected for partitioned file {path}: {e}"))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP(do-not-merge): Proposed public parallel parquet writer API / changes to `ParquetSink` [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#issuecomment-1988486807

   I plan to take a closer look at this later this evening. Looks good at a high level though, thanks @wiedld !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP(do-not-merge): Proposed public parallel parquet writer API / changes to `ParquetSink` [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#issuecomment-1989258358

   @wiedld  -- what do you think about changing the title and description of this PR to match the intent of merging this API?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat(9493): provide access to FileMetaData for files written with ParquetSink [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#issuecomment-1992069331

   Thanks @devinjdangelo  and @wiedld


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat(9493): provide access to FileMetaData for files written with ParquetSink [arrow-datafusion]

Posted by "devinjdangelo (via GitHub)" <gi...@apache.org>.
devinjdangelo commented on code in PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#discussion_r1520647339


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1789,4 +1816,182 @@ mod tests {
         let format = ParquetFormat::default();
         scan_format(state, &format, &testdata, file_name, projection, limit).await
     }
+
+    fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let local = Arc::new(
+            LocalFileSystem::new_with_prefix(&tmp_dir)
+                .expect("should create object store"),
+        );
+
+        let mut session = SessionConfig::default();
+        let mut parquet_opts = ParquetOptions::default();
+        parquet_opts.allow_single_file_parallelism = true;
+        session.options_mut().execution.parquet = parquet_opts;
+
+        let runtime = RuntimeEnv::default();
+        runtime
+            .object_store_registry
+            .register_store(store_url, local);
+
+        Arc::new(
+            TaskContext::default()
+                .with_session_config(session)
+                .with_runtime(Arc::new(runtime)),
+        )
+    }
+
+    #[tokio::test]
+    async fn parquet_sink_write() -> Result<()> {
+        let field_a = Field::new("a", DataType::Utf8, false);
+        let field_b = Field::new("b", DataType::Utf8, false);
+        let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+        let object_store_url = ObjectStoreUrl::local_filesystem();
+
+        let file_sink_config = FileSinkConfig {
+            object_store_url: object_store_url.clone(),
+            file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+            table_paths: vec![ListingTableUrl::parse("file:///")?],
+            output_schema: schema.clone(),
+            table_partition_cols: vec![],
+            overwrite: true,
+            file_type_writer_options: FileTypeWriterOptions::Parquet(
+                ParquetWriterOptions::new(WriterProperties::default()),
+            ),
+        };
+        let parquet_sink = Arc::new(ParquetSink::new(file_sink_config));
+
+        // create data
+        let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
+        let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
+        let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();
+
+        // write stream
+        parquet_sink
+            .write_all(
+                Box::pin(RecordBatchStreamAdapter::new(
+                    schema,
+                    futures::stream::iter(vec![Ok(batch)]),
+                )),
+                &build_ctx(object_store_url.as_ref()),
+            )
+            .await
+            .unwrap();
+
+        // assert written
+        let mut written = parquet_sink.written();
+        let written = written.drain();
+        assert_eq!(
+            written.len(),
+            1,
+            "expected a single parquet files to be written, instead found {}",
+            written.len()
+        );
+
+        // check the file metadata
+        for (
+            path,
+            FileMetaData {
+                num_rows, schema, ..
+            },

Review Comment:
   I think that getting rid of the for loop and doing:
   
   ```rust
   let (path, FileMetaData { num_rows, schema, ..}) = written.iter().next().unwrap();
   ```
   
   is easier to understand the intention, which is just getting the one and only element.



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -541,6 +542,8 @@ async fn fetch_statistics(
 pub struct ParquetSink {
     /// Config options for writing data
     config: FileSinkConfig,
+    /// File metadata from successfully produced parquet files.

Review Comment:
   ```suggestion
   /// File metadata from successfully produced parquet files. The Mutex is only used to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
   ```
   
   The use of a Mutex here is confusing without the context of this PR, so I think it would be a good idea to leave a comment explaining. 
   
   I think this is a fine temporary workaround, but I'm sure we can find a way to return FileMetaData in a new public interface without breaking changes to DataSink or using locks.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat(9493): provide access to FileMetaData for files written with ParquetSink [arrow-datafusion]

Posted by "wiedld (via GitHub)" <gi...@apache.org>.
wiedld commented on code in PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#discussion_r1520870685


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -541,6 +542,8 @@ async fn fetch_statistics(
 pub struct ParquetSink {
     /// Config options for writing data
     config: FileSinkConfig,
+    /// File metadata from successfully produced parquet files.

Review Comment:
   Absolutely agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP(do-not-merge): changes to enable ParquetSink poc [arrow-datafusion]

Posted by "wiedld (via GitHub)" <gi...@apache.org>.
wiedld commented on PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#issuecomment-1987582397

   Note: this code was used for a POC, where we added a single commit after the latest release commit (that we were using at the time). This code will not be merged, and is not intended as any principled solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP(do-not-merge): Proposed public parallel parquet writer API / changes to `ParquetSink` [arrow-datafusion]

Posted by "wiedld (via GitHub)" <gi...@apache.org>.
wiedld commented on code in PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#discussion_r1520139161


##########
datafusion/execution/src/object_store.rs:
##########
@@ -60,6 +60,11 @@ impl ObjectStoreUrl {
     pub fn as_str(&self) -> &str {
         self.as_ref()
     }
+
+    /// Returns as Url
+    pub fn as_url(&self) -> &Url {

Review Comment:
   `AsRef<Url>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP(do-not-merge): Proposed public parallel parquet writer API / changes to `ParquetSink` [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#discussion_r1519644955


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -540,6 +541,8 @@ async fn fetch_statistics(
 pub struct ParquetSink {
     /// Config options for writing data
     config: FileSinkConfig,
+    /// File metadata from successfully produced parquet files.

Review Comment:
   To use this API in a more general context, we would probably need to define how these `FileMetaData` mapped to the original files
   
   Maybe it could be something like this (which would also support the API below)
   
   
   ```rust
   HashMap<Path, FileMetaData>
   ```
   
   



##########
datafusion/execution/src/object_store.rs:
##########
@@ -60,6 +60,11 @@ impl ObjectStoreUrl {
     pub fn as_str(&self) -> &str {
         self.as_ref()
     }
+
+    /// Returns as Url
+    pub fn as_url(&self) -> &Url {

Review Comment:
   What is the accessor? `AsRef<str>`?



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -563,13 +566,22 @@ impl DisplayAs for ParquetSink {
 impl ParquetSink {
     /// Create from config.
     pub fn new(config: FileSinkConfig) -> Self {
-        Self { config }
+        Self {
+            config,
+            written: Default::default(),
+        }
     }
 
     /// Retrieve the inner [`FileSinkConfig`].
     pub fn config(&self) -> &FileSinkConfig {
         &self.config
     }
+
+    /// Retrieve the file metadata from the last write.
+    pub fn written(&self) -> Vec<FileMetaData> {
+        self.written.lock().clone()
+    }

Review Comment:
   I think the comments here are out of date (it returns the FileMetaData written for *all* written files, not just the last one)
   
   As mentioned above, without any additional information I don't think there is any way to know how to match up the written files and the entries in the `Vec<FileMetaData>` without some more information
   
   However, if we changed the API to return a HashMap I think it would work well
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] feat(9493): provide access to FileMetaData for written files with ParquetSink [arrow-datafusion]

Posted by "wiedld (via GitHub)" <gi...@apache.org>.
wiedld commented on PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#issuecomment-1989327557

   > @wiedld -- what do you think about changing the title and description of this PR to match the intent of merging this API?
   
   You are ahead of me. I was still writing tests. 😆 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP(do-not-merge): changes to enable ParquetSink poc [arrow-datafusion]

Posted by "wiedld (via GitHub)" <gi...@apache.org>.
wiedld commented on code in PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#discussion_r1519131511


##########
datafusion/execution/src/object_store.rs:
##########
@@ -60,6 +60,11 @@ impl ObjectStoreUrl {
     pub fn as_str(&self) -> &str {
         self.as_ref()
     }
+
+    /// Returns as Url
+    pub fn as_url(&self) -> &Url {

Review Comment:
   This is not required, since there is another accessor available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org