You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/11/15 08:51:18 UTC
[arrow] 22/27: ARROW-18310: [C++] Use atomic backpressure counter (#14622)
This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit e79870d46817fc6d52aad690570e2e457b8d9b92
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Mon Nov 14 18:40:05 2022 +0200
ARROW-18310: [C++] Use atomic backpressure counter (#14622)
See https://issues.apache.org/jira/browse/ARROW-18310
Authored-by: Yaron Gvili <rt...@hotmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/compute/exec/sink_node.cc | 3 ++-
cpp/src/arrow/compute/exec/source_node.cc | 3 ++-
cpp/src/arrow/dataset/file_base.cc | 3 ++-
3 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc
index 96a34bff43..1f518ef75d 100644
--- a/cpp/src/arrow/compute/exec/sink_node.cc
+++ b/cpp/src/arrow/compute/exec/sink_node.cc
@@ -16,6 +16,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <atomic>
#include <mutex>
#include <optional>
@@ -386,7 +387,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
AtomicCounter input_counter_;
std::shared_ptr<SinkNodeConsumer> consumer_;
std::vector<std::string> names_;
- int32_t backpressure_counter_ = 0;
+ std::atomic<int32_t> backpressure_counter_ = 0;
};
static Result<ExecNode*> MakeTableConsumingSinkNode(
compute::ExecPlan* plan, std::vector<compute::ExecNode*> inputs,
diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc
index 1d51a5c1d2..e0534b1b39 100644
--- a/cpp/src/arrow/compute/exec/source_node.cc
+++ b/cpp/src/arrow/compute/exec/source_node.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <atomic>
#include <mutex>
#include <optional>
@@ -216,7 +217,7 @@ struct SourceNode : ExecNode {
private:
std::mutex mutex_;
- int32_t backpressure_counter_{0};
+ std::atomic<int32_t> backpressure_counter_{0};
Future<> backpressure_future_ = Future<>::MakeFinished();
bool stop_requested_{false};
bool started_ = false;
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index bd19c99a52..10b9e82d5c 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -20,6 +20,7 @@
#include <arrow/compute/exec/exec_plan.h>
#include <algorithm>
+#include <atomic>
#include <memory>
#include <unordered_map>
#include <variant>
@@ -582,7 +583,7 @@ class TeeNode : public compute::MapNode {
// only returns an unfinished future when it needs backpressure. Using a serial
// scheduler here ensures we pause while we wait for backpressure to clear
std::shared_ptr<util::AsyncTaskScheduler> serial_scheduler_;
- int32_t backpressure_counter_ = 0;
+ std::atomic<int32_t> backpressure_counter_ = 0;
};
} // namespace