You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "rtpsw (via GitHub)" <gi...@apache.org> on 2023/06/06 16:34:55 UTC

[GitHub] [arrow] rtpsw opened a new pull request, #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

rtpsw opened a new pull request, #35953:
URL: https://github.com/apache/arrow/pull/35953

   ### What changes are included in this PR?
   
   The execution plan set-up code is refactored to pass a stop-token to the plan's serial-executor that is stopped upon interruption.
   
   ### Are these changes tested?
   
   Not yet. Tests will be added following feedback.
   
   ### Are there any user-facing changes?
   
   Yes; this will fix a hang visible to users.
   
   **This PR contains a "Critical Fix".**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1228648531


##########
cpp/src/arrow/util/thread_pool.h:
##########
@@ -301,8 +301,10 @@ class ARROW_EXPORT SerialExecutor : public Executor {
   /// approach is to use a stop token to cause the generator to exhaust early.
   template <typename T>
   static Iterator<T> IterateGenerator(
-      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task) {
+      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task,
+      internal::SerialExecutor** serial_executor_out = NULLPTR) {

Review Comment:
   The `StopProducing` approach ([noted here](https://github.com/apache/arrow/pull/35953/#issuecomment-1589892510)) worked in an internal experiment. Side note: the pre-PR version also had a call to `StopProducing`, in the destructor of `BatchConverter`, but it was too late - a hang happens earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1594187748

   In the [recent commit](https://github.com/apache/arrow/pull/35953/commits/15cc16873aea9d5270f8b8ae12650d478afa8b51), we ended up moving the `StopProducing` call from the destructor of `BatchConverter`, which is too late due to a hang occurring earlier, into `PlanReader::Close` before the draining loop, which required exposing the plan to the reader.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1237176571


##########
cpp/src/arrow/acero/plan_test.cc:
##########
@@ -767,6 +767,24 @@ TEST(ExecPlanExecution, DeclarationToReader) {
                                   reader->Next());
 }
 
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+  auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+  auto plan = Declaration::Sequence(
+      {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+                       DeclarationToReader(plan, /*use_threads=*/false));
+
+  // read only a few batches
+  for (size_t i = 0; i < 10; i++) {
+    ASSERT_OK(reader->Next());
+  }
+  // then close
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, HasSubstr("Plan was cancelled early"),
+                                  reader->Close());

Review Comment:
   I actually included the fix you requested, so please take a look. In any case, I'm fine with whatever is decided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1584690825

   > I have now confirmed that it is not hit even in the version based on this PR. So, the next step should be to make sure it is called (in the internal experiment) and then see what the conclusion is.
   
   Sorry, the build didn't have the debug symbols where I expected them. After fixing this, I do see the version based on this PR hits `StopProducing`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1235769930


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -1001,18 +1000,24 @@ struct BatchConverter {
 
 Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
     Declaration declaration, QueryOptions options,
-    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema) {
+    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema,
+    std::shared_ptr<ExecPlan>* out_plan) {

Review Comment:
   I think it is worthwhile to at least explain that the `out_plan` is used for cancellation, otherwise this API looks confusion - it is not intuitive to return exec plan in a method called DeclarationToRecordBatchGenerator)?



##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -1001,18 +1000,24 @@ struct BatchConverter {
 
 Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
     Declaration declaration, QueryOptions options,
-    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema) {
+    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema,
+    std::shared_ptr<ExecPlan>* out_plan) {

Review Comment:
   I think it is worthwhile to at least explain that the `out_plan` is used for cancellation, otherwise this API looks confusion - it is not intuitive to return exec plan in a method called DeclarationToRecordBatchGenerator)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1235716833


##########
cpp/src/arrow/acero/plan_test.cc:
##########
@@ -767,6 +767,24 @@ TEST(ExecPlanExecution, DeclarationToReader) {
                                   reader->Next());
 }
 
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+  auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+  auto plan = Declaration::Sequence(
+      {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+                       DeclarationToReader(plan, /*use_threads=*/false));
+
+  // read only a few batches
+  for (size_t i = 0; i < 10; i++) {
+    ASSERT_OK(reader->Next());
+  }
+  // then close
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, HasSubstr("Plan was cancelled early"),
+                                  reader->Close());

Review Comment:
   Reinstating this invocation requires a guarantee that `ExecPlan::StopProducing()` is idempotent, otherwise if `PlanReader::Close()` occurs then the invocation here which must occur as well may misbehave. I don't see this in guarantee in the documentation. @westonpace, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1584579666

   In an internal experiment, I confirmed that #35902 is insufficient - the interrupt leads to a hang. In fact, A breakpoint I set at `StopProducing` wasn't even hit. @westonpace, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220185473


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -1025,21 +1031,31 @@ Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration decla
     return Status::Invalid("Cannot use synchronous methods with a custom CPU executor");
   }
   std::shared_ptr<Schema> schema;
+  std::shared_ptr<BatchConverter> converter;
+  auto make_gen = [&](::arrow::internal::Executor* executor)
+      -> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
+    ExecContext exec_ctx(options.memory_pool, executor, options.function_registry);
+    ARROW_ASSIGN_OR_RAISE(
+        converter,
+        DeclarationToBatchConverter(declaration, std::move(options), executor, &schema));
+    return [converter] { return (*converter)(); };
+  };
+  arrow::internal::SerialExecutor* ser_exec = nullptr;
   auto batch_iterator = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
-      ::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
-          [&](::arrow::internal::Executor* executor)
-              -> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
-            ExecContext exec_ctx(options.memory_pool, executor,
-                                 options.function_registry);
-            return DeclarationToRecordBatchGenerator(declaration, std::move(options),
-                                                     executor, &schema);
-          },
-          options.use_threads));
+      options.use_threads
+          ? ::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
+                std::move(make_gen), options.use_threads)
+          : arrow::internal::SerialExecutor::IterateGenerator<
+                std::shared_ptr<RecordBatch>>(std::move(make_gen), &ser_exec));
 
   struct PlanReader : RecordBatchReader {
-    PlanReader(std::shared_ptr<Schema> schema,
-               std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator)
-        : schema_(std::move(schema)), iterator_(std::move(iterator)) {}
+    PlanReader(std::shared_ptr<Schema> schema, std::shared_ptr<BatchConverter> converter,
+               std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator,
+               arrow::internal::SerialExecutor* ser_exec)

Review Comment:
   The SerialExecutor is needed for a clean interruption in case of `use_threads=false` (see other posts here for more details). If the plan execution is not serial then [it is null](https://github.com/apache/arrow/pull/35953/files#diff-70e843181eb977ed11ad131f79a0f54a0bf9e30a0cb752d440658ee0cc16cc34R1043) and not used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1228662852


##########
cpp/src/arrow/util/thread_pool.h:
##########
@@ -301,8 +301,10 @@ class ARROW_EXPORT SerialExecutor : public Executor {
   /// approach is to use a stop token to cause the generator to exhaust early.
   template <typename T>
   static Iterator<T> IterateGenerator(
-      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task) {
+      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task,
+      internal::SerialExecutor** serial_executor_out = NULLPTR) {

Review Comment:
   If I read the code correctly - it seems like BatchConverter destructor is called when the PlanReader gets destructed (since BatchConverter is a member of the batch_iterator which is a member of the PlanReader) (related code: https://github.com/apache/arrow/blob/e53db939bfad2f20e332172ab4f453add1dc680d/cpp/src/arrow/acero/exec_plan.cc#L1028)
   
   This means that the destructor doesn't gets called during `Close()` / `iterator_.reset()`, which would explain why it didn't work before. @westonpace Does that sound correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1221591136


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   I think both the stop token and `StopProducing` were valid approaches we could have taken.  I don't think we actually need to do both.  Since we spoke we made a lot more progress on `StopProducing` and I have gained more confidence in that approach.  I'd prefer to stick purely with `StopProducing` if possible.  As such, I think I am -1 on this PR at the moment, sorry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1579292192

   > @rtpsw Can you add what is the rationale for this PR?
   
   The rationale is in [the issue description](https://github.com/apache/arrow/issues/35935#issue-1743619221). The PR-template says that in this case it's fine to not repeat it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220134591


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -1025,21 +1031,31 @@ Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration decla
     return Status::Invalid("Cannot use synchronous methods with a custom CPU executor");
   }
   std::shared_ptr<Schema> schema;
+  std::shared_ptr<BatchConverter> converter;
+  auto make_gen = [&](::arrow::internal::Executor* executor)
+      -> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
+    ExecContext exec_ctx(options.memory_pool, executor, options.function_registry);
+    ARROW_ASSIGN_OR_RAISE(
+        converter,
+        DeclarationToBatchConverter(declaration, std::move(options), executor, &schema));
+    return [converter] { return (*converter)(); };
+  };
+  arrow::internal::SerialExecutor* ser_exec = nullptr;
   auto batch_iterator = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
-      ::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
-          [&](::arrow::internal::Executor* executor)
-              -> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
-            ExecContext exec_ctx(options.memory_pool, executor,
-                                 options.function_registry);
-            return DeclarationToRecordBatchGenerator(declaration, std::move(options),
-                                                     executor, &schema);
-          },
-          options.use_threads));
+      options.use_threads
+          ? ::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
+                std::move(make_gen), options.use_threads)
+          : arrow::internal::SerialExecutor::IterateGenerator<
+                std::shared_ptr<RecordBatch>>(std::move(make_gen), &ser_exec));
 
   struct PlanReader : RecordBatchReader {
-    PlanReader(std::shared_ptr<Schema> schema,
-               std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator)
-        : schema_(std::move(schema)), iterator_(std::move(iterator)) {}
+    PlanReader(std::shared_ptr<Schema> schema, std::shared_ptr<BatchConverter> converter,
+               std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator,
+               arrow::internal::SerialExecutor* ser_exec)

Review Comment:
   Why does plan reader takes a SerialExecutor? What if this is not serial execution?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220870027


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   I imagine it's possible to make `StopProducing` be sufficient to shut down the plan even if it is serially executing, based on the facilities in this PR. However, there may be unforeseen interactions so I'd suggest deferring this to a separate discussion/issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1594226553

   > Please update the PR description and add a test (calls Reader.Close() to stop an execution)
   
   Done.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1584608610

   Ok so it sounds like a new problem then, @rtpsw can you investigate when is going on? (i.e. breakpoint in ExecPlan::StopProducing, ExecPlan::ExecProducingImpl etc and where the call chain breaks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1222192926


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   Thanks @westonpace for chiming in. I would agree with you here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1228662852


##########
cpp/src/arrow/util/thread_pool.h:
##########
@@ -301,8 +301,10 @@ class ARROW_EXPORT SerialExecutor : public Executor {
   /// approach is to use a stop token to cause the generator to exhaust early.
   template <typename T>
   static Iterator<T> IterateGenerator(
-      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task) {
+      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task,
+      internal::SerialExecutor** serial_executor_out = NULLPTR) {

Review Comment:
   If I read the code correctly - it seems like BatchConverter destructor is called when the PlanReader gets destructed (since it is a member of the batch_iterator which is a member of the PlanReader) (related code: https://github.com/apache/arrow/blob/e53db939bfad2f20e332172ab4f453add1dc680d/cpp/src/arrow/acero/exec_plan.cc#L1028)
   
   This means that the destructor doesn't gets called during `Close()` / `iterator_.reset()`, which would explain why it didn't work before. @westonpace Does that sound correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1229968256


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -951,7 +951,6 @@ struct BatchConverter {
     if (exec_plan->finished().is_finished()) {
       return;
     }
-    exec_plan->StopProducing();

Review Comment:
   Why removing this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1236656916


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -951,7 +951,6 @@ struct BatchConverter {
     if (exec_plan->finished().is_finished()) {
       return;
     }
-    exec_plan->StopProducing();

Review Comment:
   Looking at `StopProducing`, it has an atomic-bool guarding against double-execution, I'll reinstate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss merged pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss merged PR #35953:
URL: https://github.com/apache/arrow/pull/35953


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1594227656

   > Can you also please add a test?
   
   Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1235717729


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -951,7 +951,6 @@ struct BatchConverter {
     if (exec_plan->finished().is_finished()) {
       return;
     }
-    exec_plan->StopProducing();

Review Comment:
   Reinstating this invocation requires a guarantee that `ExecPlan::StopProducing()` is idempotent, otherwise if `PlanReader::Close()` occurs then the invocation here which must occur as well may misbehave. I don't see this guarantee in the documentation. @westonpace, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1235757414


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -951,7 +951,6 @@ struct BatchConverter {
     if (exec_plan->finished().is_finished()) {
       return;
     }
-    exec_plan->StopProducing();

Review Comment:
   @rtpsw Can you try to figure out if ExecPlan::StopProducing() is idempotent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1584592789

   @rtpsw Do you mean that `StopProducing` of the source node is not hit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1231853763


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -1001,18 +1000,24 @@ struct BatchConverter {
 
 Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
     Declaration declaration, QueryOptions options,
-    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema) {
+    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema,
+    std::shared_ptr<ExecPlan>* out_plan) {

Review Comment:
   What is the policy for documenting arguments? Most arguments are not documented, especially in internal functions, and in many cases their meaning is clear. I think this is the case here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220132231


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -997,9 +1001,11 @@ struct BatchConverter {
   AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
   std::shared_ptr<Schema> schema;
   std::shared_ptr<ExecPlan> exec_plan;
+  StopSource stop_source;
+  StopToken stop_token = stop_source.token();
 };
 
-Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
+Result<std::shared_ptr<BatchConverter>> DeclarationToBatchConverter(

Review Comment:
   What is batch converter? Why do we change the API here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1579105886

   cc @icexelloss @westonpace


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220209662


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   > If you mean caller, then this options allows the caller to signal stopping via the stop-token. 
   Why does the caller need to signal stopping via the stop-token? Doesn't https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/exec_plan.h#L107 already allow user to stop the execution?



##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   > If you mean caller, then this options allows the caller to signal stopping via the stop-token. 
   
   Why does the caller need to signal stopping via the stop-token? Doesn't https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/exec_plan.h#L107 already allow user to stop the execution?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1580773844

   Yaron - where are a couple PR/discussion to fix cancellation in serial
   execution (or just execution in general?) and I think we should consolidate
   the discussion first, figure out what is needed to be done, instead of
   merging PR piece by piece without understanding the large picture. Is there
   a place we can consolidate discussion and figure next steps?
   
   On Wed, Jun 7, 2023 at 1:27 AM rtpsw ***@***.***> wrote:
   
   > ***@***.**** commented on this pull request.
   > ------------------------------
   >
   > In cpp/src/arrow/acero/exec_plan.h
   > <https://github.com/apache/arrow/pull/35953#discussion_r1220870027>:
   >
   > > @@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   >    /// If this field is not set then it will be treated as kWarn unless overridden
   >    /// by the ACERO_ALIGNMENT_HANDLING environment variable
   >    std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
   > +
   > +  /// \brief An optional stop-token for the query. Defaults to unstoppable.
   > +  StopToken stop_token = StopToken::Unstoppable();
   >
   > I imagine it's possible to make StopProducing be sufficient to shut down
   > the plan even if it is serially executing, based on the facilities in this
   > PR. However, there may be unforeseen interactions so I'd suggest deferring
   > this to a separate discussion/issue.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/35953#discussion_r1220870027>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAGBXLBRSBZOEEMXOB2JFU3XKAGK5ANCNFSM6AAAAAAY4VPKU4>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220182605


##########
cpp/src/arrow/util/thread_pool.h:
##########
@@ -301,8 +301,10 @@ class ARROW_EXPORT SerialExecutor : public Executor {
   /// approach is to use a stop token to cause the generator to exhaust early.
   template <typename T>
   static Iterator<T> IterateGenerator(
-      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task) {
+      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task,
+      internal::SerialExecutor** serial_executor_out = NULLPTR) {

Review Comment:
   It is a bit strange. At least so far I didn't find a better way to expose, given that the [`SerialExecutor` constructor is private](https://github.com/apache/arrow/blob/daacbcc4c5f0e435b2158896584a4385dbf38986/cpp/src/arrow/util/thread_pool.h#L370-L371). I needed to expose it to [send the stop-token to it](https://github.com/apache/arrow/pull/35953/files#diff-70e843181eb977ed11ad131f79a0f54a0bf9e30a0cb752d440658ee0cc16cc34R1076).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1591668433

   I haven’t reviewed yet - Can you update the description?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1235360574


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -951,7 +951,6 @@ struct BatchConverter {
     if (exec_plan->finished().is_finished()) {
       return;
     }
-    exec_plan->StopProducing();

Review Comment:
   I am not sure about this. It's not clear to me what's the intent of calling StopProducing here in the first place. And `reader->Close()` isn't called in all cases so I think it is safer to leave this unchanged. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1235707314


##########
cpp/src/arrow/acero/plan_test.cc:
##########
@@ -767,6 +767,24 @@ TEST(ExecPlanExecution, DeclarationToReader) {
                                   reader->Next());
 }
 
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+  auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+  auto plan = Declaration::Sequence(
+      {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+                       DeclarationToReader(plan, /*use_threads=*/false));
+
+  // read only a few batches
+  for (size_t i = 0; i < 10; i++) {
+    ASSERT_OK(reader->Next());
+  }
+  // then close
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, HasSubstr("Plan was cancelled early"),
+                                  reader->Close());

Review Comment:
   `reader->Close()` returns `Status::OK()` when it is able to drain the reader, which is not the case here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1591808430

   FYI I am removing "This PR contains a Critical Fix" because I don't think this applies to one of the below:
   
   "
   (a) a security vulnerability, (b) a bug that caused incorrect or invalid data to be produced, or (c) a bug that causes a crash (even when the API contract is upheld)
   "


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1228659722


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   You're right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1236695821


##########
cpp/src/arrow/acero/plan_test.cc:
##########
@@ -767,6 +767,24 @@ TEST(ExecPlanExecution, DeclarationToReader) {
                                   reader->Next());
 }
 
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+  auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+  auto plan = Declaration::Sequence(
+      {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+                       DeclarationToReader(plan, /*use_threads=*/false));
+
+  // read only a few batches
+  for (size_t i = 0; i < 10; i++) {
+    ASSERT_OK(reader->Next());
+  }
+  // then close
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, HasSubstr("Plan was cancelled early"),
+                                  reader->Close());

Review Comment:
   The currently tested behavior exists pre-PR, so be aware that you're asking to change it. I don't know what the expectation in this case is because there is no documentation for `RecordBatchReader::Close()` and `PlanReader::Close()` that addresses this. There are other C++ reading APIs out there where a close operation is expected to report errors, possibly via a separate call ([like `std::ifstream::close()`](https://en.cppreference.com/w/cpp/io/basic_ifstream/close)), so I suspect people's expectations about this may vary. I'm fine with whatever behavior is in consensus, and in any case documentation of it should be added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1237176571


##########
cpp/src/arrow/acero/plan_test.cc:
##########
@@ -767,6 +767,24 @@ TEST(ExecPlanExecution, DeclarationToReader) {
                                   reader->Next());
 }
 
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+  auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+  auto plan = Declaration::Sequence(
+      {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+                       DeclarationToReader(plan, /*use_threads=*/false));
+
+  // read only a few batches
+  for (size_t i = 0; i < 10; i++) {
+    ASSERT_OK(reader->Next());
+  }
+  // then close
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, HasSubstr("Plan was cancelled early"),
+                                  reader->Close());

Review Comment:
   I actually included [the fix you requested](https://github.com/apache/arrow/pull/35953/commits/4e028366702a9da8fd4d59e065ef39a1dc95383f#diff-70e843181eb977ed11ad131f79a0f54a0bf9e30a0cb752d440658ee0cc16cc34R1075-R1079) (and added [a doc)](https://github.com/apache/arrow/pull/35953/commits/968ec2177be61b0dfb69099cf85adb88b3ff0757#diff-637d4c4c9b526e4f685b90cdcdd5499b31f2ee3d81fd6241ef734df581144dcbR755-R757), so please take a look. In any case, I'm fine with whatever is decided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220137494


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   This API feels strange. Why would we want the user to pass a stop token in QueryOptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220243140


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   My understanding from an offline discussion with Weston is that the stop-token is the way to shutdown the scheduler with the serial executor. If only `StopProducing` is called then the nodes in the plan stop pushing batches but the (serial-execution) plan doesn't shut down. @westonpace should be able to explain better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220173894


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   What do you mean by user? If you mean end-user, then the end-user does not need to change this, because it is set by higher-level APIs. If you mean caller, then this options allows the caller to signal stopping via the stop-token. Specifically, [a stop is requested on the stop-source](https://github.com/apache/arrow/pull/35953/files#diff-70e843181eb977ed11ad131f79a0f54a0bf9e30a0cb752d440658ee0cc16cc34R962) from which [this stop-token was created](https://github.com/apache/arrow/pull/35953/files#diff-70e843181eb977ed11ad131f79a0f54a0bf9e30a0cb752d440658ee0cc16cc34R1005); then, [the stop-token is passed to the scheduler](https://github.com/apache/arrow/pull/35953/files#diff-70e843181eb977ed11ad131f79a0f54a0bf9e30a0cb752d440658ee0cc16cc34R194) which will detect the signal via the stop-token.
   
   This approach was suggested by @westonpace, so he may be able to say more about why.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1231843754


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -32,6 +32,7 @@
 #include "arrow/compute/exec.h"
 #include "arrow/compute/ordering.h"
 #include "arrow/type_fwd.h"
+#include "arrow/util/cancel.h"

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1580774651

   Typo: “There is a couple of PR/discussion”
   
   On Wed, Jun 7, 2023 at 9:03 AM Li Jin ***@***.***> wrote:
   
   > Yaron - where are a couple PR/discussion to fix cancellation in serial
   > execution (or just execution in general?) and I think we should consolidate
   > the discussion first, figure out what is needed to be done, instead of
   > merging PR piece by piece without understanding the large picture. Is there
   > a place we can consolidate discussion and figure next steps?
   >
   > On Wed, Jun 7, 2023 at 1:27 AM rtpsw ***@***.***> wrote:
   >
   >> ***@***.**** commented on this pull request.
   >> ------------------------------
   >>
   >> In cpp/src/arrow/acero/exec_plan.h
   >> <https://github.com/apache/arrow/pull/35953#discussion_r1220870027>:
   >>
   >> > @@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   >>    /// If this field is not set then it will be treated as kWarn unless overridden
   >>    /// by the ACERO_ALIGNMENT_HANDLING environment variable
   >>    std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
   >> +
   >> +  /// \brief An optional stop-token for the query. Defaults to unstoppable.
   >> +  StopToken stop_token = StopToken::Unstoppable();
   >>
   >> I imagine it's possible to make StopProducing be sufficient to shut down
   >> the plan even if it is serially executing, based on the facilities in this
   >> PR. However, there may be unforeseen interactions so I'd suggest deferring
   >> this to a separate discussion/issue.
   >>
   >> —
   >> Reply to this email directly, view it on GitHub
   >> <https://github.com/apache/arrow/pull/35953#discussion_r1220870027>, or
   >> unsubscribe
   >> <https://github.com/notifications/unsubscribe-auth/AAGBXLBRSBZOEEMXOB2JFU3XKAGK5ANCNFSM6AAAAAAY4VPKU4>
   >> .
   >> You are receiving this because you were mentioned.Message ID:
   >> ***@***.***>
   >>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1579267274

   @rtpsw Can you add what is the rationale for this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220237885


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -997,9 +1001,11 @@ struct BatchConverter {
   AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
   std::shared_ptr<Schema> schema;
   std::shared_ptr<ExecPlan> exec_plan;
+  StopSource stop_source;
+  StopToken stop_token = stop_source.token();
 };
 
-Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
+Result<std::shared_ptr<BatchConverter>> DeclarationToBatchConverter(

Review Comment:
   [The batch converter](https://github.com/apache/arrow/pull/35953/files#diff-70e843181eb977ed11ad131f79a0f54a0bf9e30a0cb752d440658ee0cc16cc34R947) is mainly responsible for [converting exec-batches to record-batches](https://github.com/apache/arrow/pull/35953/files#diff-70e843181eb977ed11ad131f79a0f54a0bf9e30a0cb752d440658ee0cc16cc34R965). We change the (note: internal) API here because [this new `IterateGenerator`-invoking code-branch](https://github.com/apache/arrow/pull/35953/files#diff-70e843181eb977ed11ad131f79a0f54a0bf9e30a0cb752d440658ee0cc16cc34R1048-R1049), which is specific for the `use_threads=false` case, needs access to `make_gen`, which in turn needs access to the batch converter. With this refactoring, it turned out that `DeclarationToBatchConverter` was the only API needed, so it replaced `DeclarationToRecordBatchGenerator`, and the generator was constructed on top of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1222178918


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   I'd like to verify this in an internal experiment. it will take a bit of time due to other priorities. Let me get back to you about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1228681146


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1231844962


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -951,7 +951,6 @@ struct BatchConverter {
     if (exec_plan->finished().is_finished()) {
       return;
     }
-    exec_plan->StopProducing();

Review Comment:
   It's moved, not removed. See [this post](https://github.com/apache/arrow/pull/35953/#issuecomment-1594187748).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1591565461

   @icexelloss, does this look good to go? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1235717729


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -951,7 +951,6 @@ struct BatchConverter {
     if (exec_plan->finished().is_finished()) {
       return;
     }
-    exec_plan->StopProducing();

Review Comment:
   Reinstating this invocation requires a guarantee that `ExecPlan::StopProducing()` is idempotent, otherwise if `PlanReader::Close()` occurs then the invocation here which must occur as well may misbehave. I don't see this in guarantee in the documentation. @westonpace, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1579106064

   In an internal experiment, I observed that the changes here and along with those in #35902 fixed the issue. I'd like to get some feedback on what tests to add in order to verify this. @westonpace, WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220139490


##########
cpp/src/arrow/util/thread_pool.h:
##########
@@ -301,8 +301,10 @@ class ARROW_EXPORT SerialExecutor : public Executor {
   /// approach is to use a stop token to cause the generator to exhaust early.
   template <typename T>
   static Iterator<T> IterateGenerator(
-      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task) {
+      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task,
+      internal::SerialExecutor** serial_executor_out = NULLPTR) {

Review Comment:
   Why do we return the serial executor from Iterate Generator? This  feels a little strange



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1221663589


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   @westonpace, the approach here is from an offline discussion with you that was active just a few days ago, and in my experiments `StopProducing` wasn't enough. What progress with `StopProducing` are you referring to? What is the approach you are currently in favor of? If it ends up being straightforward, then I'd be in favor too, but I'd like to first validate this is the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220346253


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   Hmm..OK. It feels strange that StopProducing does not work with Serial execution and we need a different way to stop an execution with serial execution. I would like to understand (1) why serial execution doesn't work with StopProducing now and (2) why we cannot make `StopProducing` to work (with serial execution.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1579384391

   > Can you explain what is the cause of this?
   
   For a detailed answer, I'd have to repro the experiment and investigate. I suspect that, pre-PR, the scheduler got stuck before `StopProducing` could get called.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1581522364

   > I think we should consolidate the discussion first, figure out what is needed to be done
   
   The situation is that we have #35902 as a fix related to `StopProducing`, as [mentioned by Weston](https://github.com/apache/arrow/pull/35953/#discussion_r1221591136), and the question is [whether it is sufficient for a clean interruption](https://github.com/apache/arrow/pull/35953/#discussion_r1222178918) or otherwise this PR is needed. I'll get back with an answer for this later. In the meantime, this PR is on hold.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1584959609

   I got the same results (#35902 isn't sufficient for clean interruption but this PR is) when using an earlier version of Arrow as the base. This is good evidence that the cause the results are correct. If this PR is not favored, there should be some middle-point between #35902 and this PR that would both work and be favored. @westonpace, any insights about this possible middle-point?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1229975979


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -1001,18 +1000,24 @@ struct BatchConverter {
 
 Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
     Declaration declaration, QueryOptions options,
-    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema) {
+    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema,
+    std::shared_ptr<ExecPlan>* out_plan) {

Review Comment:
   Please document this argument



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1229975384


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -32,6 +32,7 @@
 #include "arrow/compute/exec.h"
 #include "arrow/compute/ordering.h"
 #include "arrow/type_fwd.h"
+#include "arrow/util/cancel.h"

Review Comment:
   Revert?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1584599900

   > @rtpsw Do you mean that `StopProducing` of the source node is not hit?
   
   Correct. I have now confirmed that it is not hit even in the version based on this PR. So, the next step should be to make sure it is called (in the internal experiment) and then see what the conclusion is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1594191355

   > I haven’t reviewed yet - Can you update the description?
   
   Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1594193215

   > FYI I am removing "This PR contains a Critical Fix" because I don't think this applies to one of the below:
   > 
   > " (a) a security vulnerability, (b) a bug that caused incorrect or invalid data to be produced, or (c) a bug that causes a crash (even when the API contract is upheld) "
   
   The bug causes a hang, which is even worse than a crash, when the API contract is upheld. Whatever you decide.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1237051420


##########
cpp/src/arrow/acero/plan_test.cc:
##########
@@ -767,6 +767,24 @@ TEST(ExecPlanExecution, DeclarationToReader) {
                                   reader->Next());
 }
 
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+  auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+  auto plan = Declaration::Sequence(
+      {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+                       DeclarationToReader(plan, /*use_threads=*/false));
+
+  // read only a few batches
+  for (size_t i = 0; i < 10; i++) {
+    ASSERT_OK(reader->Next());
+  }
+  // then close
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, HasSubstr("Plan was cancelled early"),
+                                  reader->Close());

Review Comment:
   We can leave this as follow up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] conbench-apache-arrow[bot] commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "conbench-apache-arrow[bot] (via GitHub)" <gi...@apache.org>.
conbench-apache-arrow[bot] commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1603426413

   Conbench analyzed the 6 benchmark runs on commit `b451b34a`.
   
   There were 6 benchmark results indicating a performance regression:
   
   - Commit Run on `ursa-thinkcentre-m75q` at [2023-06-22 13:44:32Z](http://conbench.ursa.dev/compare/runs/acea5719bcef4f76bf1b2e16e3a7f403...5f84984b47494060bb5f7ebb82571dbd/)
     - [params=<SMALL_VECTOR(int)>, source=cpp-micro, suite=arrow-small-vector-benchmark](http://conbench.ursa.dev/compare/benchmarks/0649429bc177761e8000deebd3e3d5e4...064945080bcc710d800084fef813292f)
     - [params=DecodeArrow_Dense/1024, source=cpp-micro, suite=parquet-encoding-benchmark](http://conbench.ursa.dev/compare/benchmarks/0649429e5af779e6800098c04f5ca4ef...0649450a966779ab8000061501dd61ff)
   - and 4 more (see the report linked below)
   
   The [full Conbench report](https://github.com/apache/arrow/runs/14487726817) has more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1221883462


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   If I recall the problem with `StopProducing` was that sometimes we were still seeing a hang even after `StopProducing` was called.  However, this was later revealed to be the "paused source node doesn't respect stop producing" problem.  Beyond that everything seemed to work correctly with `StopProducing`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1231853763


##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -1001,18 +1000,24 @@ struct BatchConverter {
 
 Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
     Declaration declaration, QueryOptions options,
-    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema) {
+    ::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema,
+    std::shared_ptr<ExecPlan>* out_plan) {

Review Comment:
   What is the policy for documenting arguments? Most arguments are not documented, and in many cases their meaning is clear. I think this is the case here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1589892510

   We opted to switch to an approach in which the plan is stopped directly, when the plan reader closes. @westonpace, I believe this is along the lines you suggested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1228651353


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   @rtpsw Should we revert this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1228704935


##########
cpp/src/arrow/util/thread_pool.h:
##########
@@ -301,8 +301,10 @@ class ARROW_EXPORT SerialExecutor : public Executor {
   /// approach is to use a stop token to cause the generator to exhaust early.
   template <typename T>
   static Iterator<T> IterateGenerator(
-      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task) {
+      internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task,
+      internal::SerialExecutor** serial_executor_out = NULLPTR) {

Review Comment:
   That sounds right though an additional point is that `Close` hanged in the loop for draining the remaining batches, so the destructor was never reached.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1591670795

   Can you also please add a test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1235355709


##########
cpp/src/arrow/acero/plan_test.cc:
##########
@@ -767,6 +767,24 @@ TEST(ExecPlanExecution, DeclarationToReader) {
                                   reader->Next());
 }
 
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+  auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+  auto plan = Declaration::Sequence(
+      {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+                       DeclarationToReader(plan, /*use_threads=*/false));
+
+  // read only a few batches
+  for (size_t i = 0; i < 10; i++) {
+    ASSERT_OK(reader->Next());
+  }
+  // then close
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, HasSubstr("Plan was cancelled early"),
+                                  reader->Close());

Review Comment:
   Shouldn't `reader->Close()` return Status::OK here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1235766359


##########
cpp/src/arrow/acero/plan_test.cc:
##########
@@ -767,6 +767,24 @@ TEST(ExecPlanExecution, DeclarationToReader) {
                                   reader->Next());
 }
 
+TEST(ExecPlanExecution, DeclarationToReaderWithEarlyClose) {
+  auto random_data = MakeRandomBatches(schema({field("a", int8())}), /*num_batches=*/100);
+  auto plan = Declaration::Sequence(
+      {{"source", SourceNodeOptions(random_data.schema, random_data.gen(false, false))}});
+  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> reader,
+                       DeclarationToReader(plan, /*use_threads=*/false));
+
+  // read only a few batches
+  for (size_t i = 0; i < 10; i++) {
+    ASSERT_OK(reader->Next());
+  }
+  // then close
+  EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, HasSubstr("Plan was cancelled early"),
+                                  reader->Close());

Review Comment:
   If the expectation of "close" here is to "cancel the query gracefully", then I think the reasonable behaviors should be returning status OK if the cancellation is successful. I don't think raises an exception "Plan is cancelled" is the correct behavior then the user actually wanted to "cancel the plan".
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1585143047

   @rtpsw Can you provide more details on what doesn't work after applying #35902? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35953:
URL: https://github.com/apache/arrow/pull/35953#issuecomment-1579331846

   > @rtpsw Not sure I understand what is going on here. 
   
   > In this case, even though the signal is handled, the source nodes continue to push batches until done.
   Can you explain what is the cause of this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35953: GH-35935: [C++] Clean interruption of a Acero plan with `use_threads=false`

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35953:
URL: https://github.com/apache/arrow/pull/35953#discussion_r1220346253


##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -602,6 +603,9 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// If this field is not set then it will be treated as kWarn unless overridden
   /// by the ACERO_ALIGNMENT_HANDLING environment variable
   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
+
+  /// \brief An optional stop-token for the query. Defaults to unstoppable.
+  StopToken stop_token = StopToken::Unstoppable();

Review Comment:
   Hmm..OK. It feels strange that StopProducing does not work with Serial execution and we need a different way to stop  serial execution. I would like to understand (1) why serial execution doesn't work with StopProducing now and (2) why we cannot make `StopProducing` to work (with serial execution.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org