You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "DDtKey (via GitHub)" <gi...@apache.org> on 2023/02/06 10:14:17 UTC

[GitHub] [arrow-datafusion] DDtKey opened a new pull request, #5196: fix: add cancellation token to `write_csv/json/parquet`

DDtKey opened a new pull request, #5196:
URL: https://github.com/apache/arrow-datafusion/pull/5196

   # Which issue does this PR close?
   
   Closes #5178
   
   # Rationale for this change
   
   Currently interruption from outside of datafusion don't stop spawned tasks and it continue to execute query.
   It's very helpful to be able to restrict these methods by `timeout` or `select` for example.
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   It's quite hard to cover with union-tests and such test could be too flaky (any ideas are welcome). 
   However all existed tests are passing successfully. 
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] DDtKey commented on a diff in pull request #5196: fix: make `write_csv/json/parquet` cancel-safe

Posted by "DDtKey (via GitHub)" <gi...@apache.org>.
DDtKey commented on code in PR #5196:
URL: https://github.com/apache/arrow-datafusion/pull/5196#discussion_r1097222724


##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -286,38 +288,44 @@ pub async fn plan_to_csv(
     let path = path.as_ref();
     // create directory to contain the CSV files (one per partition)
     let fs_path = Path::new(path);
-    match fs::create_dir(fs_path) {
-        Ok(()) => {
-            let mut tasks = vec![];
-            for i in 0..plan.output_partitioning().partition_count() {
-                let plan = plan.clone();
-                let filename = format!("part-{i}.csv");
-                let path = fs_path.join(filename);
-                let file = fs::File::create(path)?;
-                let mut writer = csv::Writer::new(file);
-                let task_ctx = Arc::new(TaskContext::from(state));
-                let stream = plan.execute(i, task_ctx)?;
-                let handle: JoinHandle<Result<()>> = task::spawn(async move {
-                    stream
-                        .map(|batch| writer.write(&batch?))
-                        .try_collect()
-                        .await
-                        .map_err(DataFusionError::from)
-                });
-                tasks.push(handle);
-            }
-            futures::future::join_all(tasks)
-                .await
-                .into_iter()
-                .try_for_each(|result| {
-                    result.map_err(|e| DataFusionError::Execution(format!("{e}")))?
-                })?;
-            Ok(())
-        }
-        Err(e) => Err(DataFusionError::Execution(format!(
+    if let Err(e) = fs::create_dir(fs_path) {
+        return Err(DataFusionError::Execution(format!(
             "Could not create directory {path}: {e:?}"
-        ))),
+        )));
     }
+
+    let mut tasks = vec![];
+    // Create cancellation-token to interrupt background execution on drop.
+    let cancellation_token = CancellationToken::new();

Review Comment:
   I've decided not to introduce a new pattern here and changed it to `AbortOnDropSingle` per task. 
   You still can check how it looked like before: https://github.com/apache/arrow-datafusion/pull/5196/commits/3918d49bf9f130989a61391a3bc45fbe5318a14c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] DDtKey commented on a diff in pull request #5196: fix: add cancellation token to `write_csv/json/parquet`

Posted by "DDtKey (via GitHub)" <gi...@apache.org>.
DDtKey commented on code in PR #5196:
URL: https://github.com/apache/arrow-datafusion/pull/5196#discussion_r1097198239


##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -286,38 +288,44 @@ pub async fn plan_to_csv(
     let path = path.as_ref();
     // create directory to contain the CSV files (one per partition)
     let fs_path = Path::new(path);
-    match fs::create_dir(fs_path) {
-        Ok(()) => {
-            let mut tasks = vec![];
-            for i in 0..plan.output_partitioning().partition_count() {
-                let plan = plan.clone();
-                let filename = format!("part-{i}.csv");
-                let path = fs_path.join(filename);
-                let file = fs::File::create(path)?;
-                let mut writer = csv::Writer::new(file);
-                let task_ctx = Arc::new(TaskContext::from(state));
-                let stream = plan.execute(i, task_ctx)?;
-                let handle: JoinHandle<Result<()>> = task::spawn(async move {
-                    stream
-                        .map(|batch| writer.write(&batch?))
-                        .try_collect()
-                        .await
-                        .map_err(DataFusionError::from)
-                });
-                tasks.push(handle);
-            }
-            futures::future::join_all(tasks)
-                .await
-                .into_iter()
-                .try_for_each(|result| {
-                    result.map_err(|e| DataFusionError::Execution(format!("{e}")))?
-                })?;
-            Ok(())
-        }
-        Err(e) => Err(DataFusionError::Execution(format!(
+    if let Err(e) = fs::create_dir(fs_path) {
+        return Err(DataFusionError::Execution(format!(
             "Could not create directory {path}: {e:?}"
-        ))),
+        )));
     }
+
+    let mut tasks = vec![];
+    // Create cancellation-token to interrupt background execution on drop.
+    let cancellation_token = CancellationToken::new();

Review Comment:
   Another way is to use abort-on-drop wrapper, I see this pattern is used in datafusion and there are built-in wrapper already. Let me know if you think it would be better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] DDtKey commented on pull request #5196: fix: make `write_csv/json/parquet` cancel-safe

Posted by "DDtKey (via GitHub)" <gi...@apache.org>.
DDtKey commented on PR #5196:
URL: https://github.com/apache/arrow-datafusion/pull/5196#issuecomment-1418903229

   Another place with similar issue: https://github.com/apache/arrow-datafusion/pull/5197


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #5196: fix: make `write_csv/json/parquet` cancel-safe

Posted by "ursabot (via GitHub)" <gi...@apache.org>.
ursabot commented on PR #5196:
URL: https://github.com/apache/arrow-datafusion/pull/5196#issuecomment-1420587214

   Benchmark runs are scheduled for baseline = f63b97246ba9ee4a11baf15ff1001333f230b39b and contender = d5077b55ef39e5376ace93d4e9068b94b678e7f1. d5077b55ef39e5376ace93d4e9068b94b678e7f1 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/1ceb717f20dd478ea0d6718ff1c2743f...9af64835b8314e0080f3d8a008cf7def/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/0b1644b30b0347688b46fb24ef00c199...f3e5d39313f84eca9caf47b26eb3fe1b/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/3ef13f2d3c5e47608ea2398067ed7037...a262374b8c124a8aa16dd0141ef14d2a/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/12d51477eca949659c66cb0a4b16726b...2f9a9cf043214f90a23e46ecf10dea52/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #5196: fix: make `write_csv/json/parquet` cancel-safe

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #5196:
URL: https://github.com/apache/arrow-datafusion/pull/5196


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org