You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ra...@apache.org on 2023/01/18 08:34:18 UTC

[arrow] 02/10: GH-15243: [C++] fix for potential deadlock in the group-by node (#33700)

This is an automated email from the ASF dual-hosted git repository.

raulcd pushed a commit to branch maint-11.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit fa52bb02aee393e0fc078400335636b5b11b1dd2
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Jan 16 07:17:02 2023 -0800

    GH-15243: [C++] fix for potential deadlock in the group-by node (#33700)
    
    
    * Closes: #15243
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/compute/exec/aggregate_node.cc | 32 ++++++++++++++++++++++------
 1 file changed, 25 insertions(+), 7 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc
index 0b70577ae7..725372700c 100644
--- a/cpp/src/arrow/compute/exec/aggregate_node.cc
+++ b/cpp/src/arrow/compute/exec/aggregate_node.cc
@@ -486,7 +486,7 @@ class GroupByNode : public ExecNode {
     outputs_[0]->InputReceived(this, out_data_.Slice(batch_size * n, batch_size));
   }
 
-  Status OutputResult() {
+  Status DoOutputResult() {
     // To simplify merging, ensure that the first grouper is nonempty
     for (size_t i = 0; i < local_states_.size(); i++) {
       if (local_states_[i].grouper) {
@@ -500,11 +500,28 @@ class GroupByNode : public ExecNode {
 
     int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size());
     outputs_[0]->InputFinished(this, static_cast<int>(num_output_batches));
-    RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_,
-                                                         num_output_batches));
+    Status st =
+        plan_->query_context()->StartTaskGroup(output_task_group_id_, num_output_batches);
+    if (st.IsCancelled()) {
+      // This means the user has cancelled/aborted the plan.  We will not send any batches
+      // and end immediately.
+      finished_.MarkFinished();
+      return Status::OK();
+    } else {
+      return st;
+    }
     return Status::OK();
   }
 
+  void OutputResult() {
+    // If something goes wrong outputting the result we need to make sure
+    // we still mark finished.
+    Status st = DoOutputResult();
+    if (!st.ok()) {
+      finished_.MarkFinished(st);
+    }
+  }
+
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
     util::tracing::Span span;
@@ -521,7 +538,7 @@ class GroupByNode : public ExecNode {
     if (ErrorIfNotOk(Consume(ExecSpan(batch)))) return;
 
     if (input_counter_.Increment()) {
-      ErrorIfNotOk(OutputResult());
+      OutputResult();
     }
   }
 
@@ -542,7 +559,7 @@ class GroupByNode : public ExecNode {
     DCHECK_EQ(input, inputs_[0]);
 
     if (input_counter_.SetTotal(total_batches)) {
-      ErrorIfNotOk(OutputResult());
+      OutputResult();
     }
   }
 
@@ -551,7 +568,6 @@ class GroupByNode : public ExecNode {
                        {{"node.label", label()},
                         {"node.detail", ToString()},
                         {"node.kind", kind_name()}});
-
     local_states_.resize(plan_->query_context()->max_concurrency());
     return Status::OK();
   }
@@ -570,7 +586,9 @@ class GroupByNode : public ExecNode {
     EVENT(span_, "StopProducing");
     DCHECK_EQ(output, outputs_[0]);
 
-    if (input_counter_.Cancel()) finished_.MarkFinished();
+    if (input_counter_.Cancel()) {
+      finished_.MarkFinished();
+    }
     inputs_[0]->StopProducing(this);
   }