You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "metesynnada (via GitHub)" <gi...@apache.org> on 2023/03/15 17:41:42 UTC

[GitHub] [arrow-datafusion] metesynnada commented on a diff in pull request #5584: Introduce file writer strategies for Parquet writer

metesynnada commented on code in PR #5584:
URL: https://github.com/apache/arrow-datafusion/pull/5584#discussion_r1137479350


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -707,26 +709,58 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
     }
 }
 
-/// Executes a query and writes the results to a partitioned Parquet file.
-pub async fn plan_to_parquet(
-    task_ctx: Arc<TaskContext>,
-    plan: Arc<dyn ExecutionPlan>,
+/// create target folder using different save mode strategy, refer to [`FileWriterSaveMode`] for more details
+fn create_target_folder(
     path: impl AsRef<str>,
-    writer_properties: Option<WriterProperties>,
-) -> Result<()> {
+    save_mode: FileWriterSaveMode,
+) -> Result<Option<std::path::PathBuf>> {
     let path = path.as_ref();
-    // create directory to contain the Parquet files (one per partition)
     let fs_path = std::path::Path::new(path);
+
+    if fs_path.exists() {
+        match save_mode {
+            FileWriterSaveMode::Overwrite => {
+                std::fs::remove_dir_all(fs_path)?;
+            }
+            FileWriterSaveMode::Ignore => {
+                return Ok(None);
+            }
+            FileWriterSaveMode::Append => {
+                return Ok(Some(fs_path.to_path_buf()));
+            }
+            _ => {}
+        };
+    }
+
     if let Err(e) = fs::create_dir(fs_path) {
         return Err(DataFusionError::Execution(format!(
             "Could not create directory {path}: {e:?}"
         )));
     }
 
+    Ok(Some(fs_path.to_path_buf()))
+}
+
+/// Executes a query and writes the results to a partitioned Parquet file.
+pub async fn plan_to_parquet(
+    task_ctx: Arc<TaskContext>,
+    plan: Arc<dyn ExecutionPlan>,
+    path: impl AsRef<str>,
+    writer_properties: Option<WriterProperties>,
+    save_mode: FileWriterSaveMode,
+) -> Result<()> {
+    // create directory to contain the Parquet files (one per partition)
+    let fs_path = &create_target_folder(path, save_mode)?;

Review Comment:
   There is an unnecessary copy/clone if the folder is not overwritten. We can make this part without a clone with a different implementation.



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -541,6 +541,23 @@ pub(crate) fn get_output_ordering(
         }).unwrap_or_else(|| None)
 }
 
+/// Defines strategies for the saving behavior in the case when the saved folder already exists
+#[derive(Debug, Clone, Copy)]
+pub enum FileWriterSaveMode {
+    /// Overwrite mode means that when saving a parquet file, if folder already exists, existing data is expected to be overwritten.
+    /// If folder does not exist, the folder will be created. This is default value.
+    Overwrite,
+    /// Append mode means that when saving a parquet file, if folder already exists, new data is expected to be appended on top of the existing.
+    /// If folder does not exist, the folder will be created
+    Append,

Review Comment:
   As I understand, parquet files are not appended, we hold the directory and write new files with a new uuid. 
   
   This approach is specific to the parquet file format and may not apply to other file formats like CSV, where appending to an existing file is the expected behavior. It's worth noting that this structure may be extended to support other file formats in the future, so it's important to update the comments accordingly.
   



##########
datafusion/core/src/dataframe.rs:
##########
@@ -930,10 +932,11 @@ impl DataFrame {
         self,
         path: &str,
         writer_properties: Option<WriterProperties>,
+        save_mode: FileWriterSaveMode,

Review Comment:
   Spark made the save mode optional, we can also default to `append` mode. Also, we can merge the `WriterProperties` and `FileWriterSaveMode` structs like in a `ParquetWriterOptions` struct since they are both configurations, this would abstract the save mode from immediate usage and contribute the simplicity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org