You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ra...@apache.org on 2023/01/18 08:34:18 UTC
[arrow] 02/10: GH-15243: [C++] fix for potential deadlock in the group-by node (#33700)
This is an automated email from the ASF dual-hosted git repository.
raulcd pushed a commit to branch maint-11.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit fa52bb02aee393e0fc078400335636b5b11b1dd2
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Jan 16 07:17:02 2023 -0800
GH-15243: [C++] fix for potential deadlock in the group-by node (#33700)
* Closes: #15243
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/compute/exec/aggregate_node.cc | 32 ++++++++++++++++++++++------
1 file changed, 25 insertions(+), 7 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc
index 0b70577ae7..725372700c 100644
--- a/cpp/src/arrow/compute/exec/aggregate_node.cc
+++ b/cpp/src/arrow/compute/exec/aggregate_node.cc
@@ -486,7 +486,7 @@ class GroupByNode : public ExecNode {
outputs_[0]->InputReceived(this, out_data_.Slice(batch_size * n, batch_size));
}
- Status OutputResult() {
+ Status DoOutputResult() {
// To simplify merging, ensure that the first grouper is nonempty
for (size_t i = 0; i < local_states_.size(); i++) {
if (local_states_[i].grouper) {
@@ -500,11 +500,28 @@ class GroupByNode : public ExecNode {
int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size());
outputs_[0]->InputFinished(this, static_cast<int>(num_output_batches));
- RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_,
- num_output_batches));
+ Status st =
+ plan_->query_context()->StartTaskGroup(output_task_group_id_, num_output_batches);
+ if (st.IsCancelled()) {
+ // This means the user has cancelled/aborted the plan. We will not send any batches
+ // and end immediately.
+ finished_.MarkFinished();
+ return Status::OK();
+ } else {
+ return st;
+ }
return Status::OK();
}
+ void OutputResult() {
+ // If something goes wrong outputting the result we need to make sure
+ // we still mark finished.
+ Status st = DoOutputResult();
+ if (!st.ok()) {
+ finished_.MarkFinished(st);
+ }
+ }
+
void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
@@ -521,7 +538,7 @@ class GroupByNode : public ExecNode {
if (ErrorIfNotOk(Consume(ExecSpan(batch)))) return;
if (input_counter_.Increment()) {
- ErrorIfNotOk(OutputResult());
+ OutputResult();
}
}
@@ -542,7 +559,7 @@ class GroupByNode : public ExecNode {
DCHECK_EQ(input, inputs_[0]);
if (input_counter_.SetTotal(total_batches)) {
- ErrorIfNotOk(OutputResult());
+ OutputResult();
}
}
@@ -551,7 +568,6 @@ class GroupByNode : public ExecNode {
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
-
local_states_.resize(plan_->query_context()->max_concurrency());
return Status::OK();
}
@@ -570,7 +586,9 @@ class GroupByNode : public ExecNode {
EVENT(span_, "StopProducing");
DCHECK_EQ(output, outputs_[0]);
- if (input_counter_.Cancel()) finished_.MarkFinished();
+ if (input_counter_.Cancel()) {
+ finished_.MarkFinished();
+ }
inputs_[0]->StopProducing(this);
}