You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/11/15 08:51:18 UTC

[arrow] 22/27: ARROW-18310: [C++] Use atomic backpressure counter (#14622)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-10.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit e79870d46817fc6d52aad690570e2e457b8d9b92
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Mon Nov 14 18:40:05 2022 +0200

    ARROW-18310: [C++] Use atomic backpressure counter (#14622)
    
    See https://issues.apache.org/jira/browse/ARROW-18310
    
    Authored-by: Yaron Gvili <rt...@hotmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/compute/exec/sink_node.cc   | 3 ++-
 cpp/src/arrow/compute/exec/source_node.cc | 3 ++-
 cpp/src/arrow/dataset/file_base.cc        | 3 ++-
 3 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc
index 96a34bff43..1f518ef75d 100644
--- a/cpp/src/arrow/compute/exec/sink_node.cc
+++ b/cpp/src/arrow/compute/exec/sink_node.cc
@@ -16,6 +16,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
 #include <mutex>
 #include <optional>
 
@@ -386,7 +387,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
   AtomicCounter input_counter_;
   std::shared_ptr<SinkNodeConsumer> consumer_;
   std::vector<std::string> names_;
-  int32_t backpressure_counter_ = 0;
+  std::atomic<int32_t> backpressure_counter_ = 0;
 };
 static Result<ExecNode*> MakeTableConsumingSinkNode(
     compute::ExecPlan* plan, std::vector<compute::ExecNode*> inputs,
diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc
index 1d51a5c1d2..e0534b1b39 100644
--- a/cpp/src/arrow/compute/exec/source_node.cc
+++ b/cpp/src/arrow/compute/exec/source_node.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
 #include <mutex>
 #include <optional>
 
@@ -216,7 +217,7 @@ struct SourceNode : ExecNode {
 
  private:
   std::mutex mutex_;
-  int32_t backpressure_counter_{0};
+  std::atomic<int32_t> backpressure_counter_{0};
   Future<> backpressure_future_ = Future<>::MakeFinished();
   bool stop_requested_{false};
   bool started_ = false;
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index bd19c99a52..10b9e82d5c 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -20,6 +20,7 @@
 #include <arrow/compute/exec/exec_plan.h>
 
 #include <algorithm>
+#include <atomic>
 #include <memory>
 #include <unordered_map>
 #include <variant>
@@ -582,7 +583,7 @@ class TeeNode : public compute::MapNode {
   // only returns an unfinished future when it needs backpressure.  Using a serial
   // scheduler here ensures we pause while we wait for backpressure to clear
   std::shared_ptr<util::AsyncTaskScheduler> serial_scheduler_;
-  int32_t backpressure_counter_ = 0;
+  std::atomic<int32_t> backpressure_counter_ = 0;
 };
 
 }  // namespace