You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "DDtKey (via GitHub)" <gi...@apache.org> on 2023/02/06 10:28:00 UTC

[GitHub] [arrow-datafusion] DDtKey commented on a diff in pull request #5196: fix: add cancellation token to `write_csv/json/parquet`

DDtKey commented on code in PR #5196:
URL: https://github.com/apache/arrow-datafusion/pull/5196#discussion_r1097198239


##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -286,38 +288,44 @@ pub async fn plan_to_csv(
     let path = path.as_ref();
     // create directory to contain the CSV files (one per partition)
     let fs_path = Path::new(path);
-    match fs::create_dir(fs_path) {
-        Ok(()) => {
-            let mut tasks = vec![];
-            for i in 0..plan.output_partitioning().partition_count() {
-                let plan = plan.clone();
-                let filename = format!("part-{i}.csv");
-                let path = fs_path.join(filename);
-                let file = fs::File::create(path)?;
-                let mut writer = csv::Writer::new(file);
-                let task_ctx = Arc::new(TaskContext::from(state));
-                let stream = plan.execute(i, task_ctx)?;
-                let handle: JoinHandle<Result<()>> = task::spawn(async move {
-                    stream
-                        .map(|batch| writer.write(&batch?))
-                        .try_collect()
-                        .await
-                        .map_err(DataFusionError::from)
-                });
-                tasks.push(handle);
-            }
-            futures::future::join_all(tasks)
-                .await
-                .into_iter()
-                .try_for_each(|result| {
-                    result.map_err(|e| DataFusionError::Execution(format!("{e}")))?
-                })?;
-            Ok(())
-        }
-        Err(e) => Err(DataFusionError::Execution(format!(
+    if let Err(e) = fs::create_dir(fs_path) {
+        return Err(DataFusionError::Execution(format!(
             "Could not create directory {path}: {e:?}"
-        ))),
+        )));
     }
+
+    let mut tasks = vec![];
+    // Create cancellation-token to interrupt background execution on drop.
+    let cancellation_token = CancellationToken::new();

Review Comment:
   Another way is to use abort-on-drop wrapper, I see this pattern is used in datafusion and there are built-in wrapper already. Let me know if you think it would be better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org