You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/06/21 13:57:25 UTC

[arrow] branch main updated: GH-35838: [C++] Add backpressure test for asof join node (#35874)

This is an automated email from the ASF dual-hosted git repository.

icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 732395261c GH-35838: [C++] Add backpressure test for asof join node (#35874)
732395261c is described below

commit 732395261cd13bdd3aab9cb02ce06b76fd709e25
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Wed Jun 21 16:57:18 2023 +0300

    GH-35838: [C++] Add backpressure test for asof join node (#35874)
    
    ### What changes are included in this PR?
    
    Passing the correct nodes to the backpressure controller, along with better parameter naming/doc. Also added reusable gate-classes (`Gate`, `GatedNodeOptions` and `GatedNode`) that enable holding all input batches until a gate is released, in order to support more robust backpressure testing in this PR.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    No.
    
    **This PR contains a "Critical Fix".**
    * Closes: #35838
    
    Lead-authored-by: Yaron Gvili <rt...@hotmail.com>
    Co-authored-by: Weston Pace <we...@gmail.com>
    Co-authored-by: rtpsw <rt...@hotmail.com>
    Signed-off-by: Li Jin <ic...@gmail.com>
---
 cpp/src/arrow/acero/asof_join_node.cc      |  13 ++-
 cpp/src/arrow/acero/asof_join_node_test.cc | 172 ++++++++++++++++++++++++-----
 cpp/src/arrow/acero/test_nodes.cc          | 157 ++++++++++++++++++++++++++
 cpp/src/arrow/acero/test_nodes.h           |  27 +++++
 4 files changed, 336 insertions(+), 33 deletions(-)

diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc
index f8dee5aac8..d3c988e18e 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -495,9 +495,9 @@ class KeyHasher {
                        4 * kMiniBatchLength * sizeof(uint32_t));
   }
 
-  void Invalidate() {
-    batch_ = NULLPTR;  // invalidate cached hashes for batch - required when it changes
-  }
+  // invalidate cached hashes for batch - required when it changes
+  // only this method can be called concurrently with HashesFor
+  void Invalidate() { batch_ = NULLPTR; }
 
   // compute and cache a hash for each row of the given batch
   const std::vector<HashType>& HashesFor(const RecordBatch* batch) {
@@ -668,18 +668,19 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
+      KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,
       std::atomic<int32_t>& backpressure_counter,
       const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
       const std::vector<col_index_t>& key_col_index) {
     constexpr size_t low_threshold = 4, high_threshold = 8;
     std::unique_ptr<BackpressureControl> backpressure_control =
-        std::make_unique<BackpressureController>(node, output, backpressure_counter);
+        std::make_unique<BackpressureController>(
+            /*node=*/asof_input, /*output=*/asof_node, backpressure_counter);
     ARROW_ASSIGN_OR_RAISE(auto handler,
                           BackpressureHandler::Make(low_threshold, high_threshold,
                                                     std::move(backpressure_control)));
     return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
-                                        key_hasher, output, std::move(handler), schema,
+                                        key_hasher, asof_node, std::move(handler), schema,
                                         time_col_index, key_col_index);
   }
 
diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc
index b113eb86e5..c62d0c0b85 100644
--- a/cpp/src/arrow/acero/asof_join_node_test.cc
+++ b/cpp/src/arrow/acero/asof_join_node_test.cc
@@ -19,9 +19,12 @@
 
 #include <chrono>
 #include <memory>
+#include <mutex>
 #include <numeric>
 #include <random>
 #include <string_view>
+#include "arrow/acero/exec_plan.h"
+#include "arrow/testing/future_util.h"
 #ifndef NDEBUG
 #include <sstream>
 #endif
@@ -31,6 +34,8 @@
 #ifndef NDEBUG
 #include "arrow/acero/options_internal.h"
 #endif
+#include "arrow/acero/map_node.h"
+#include "arrow/acero/query_context.h"
 #include "arrow/acero/test_nodes.h"
 #include "arrow/acero/test_util_internal.h"
 #include "arrow/acero/util.h"
@@ -1360,9 +1365,66 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
       schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
 })
 
+struct BackpressureCounters {
+  std::atomic<int32_t> pause_count = 0;
+  std::atomic<int32_t> resume_count = 0;
+};
+
+struct BackpressureCountingNodeOptions : public ExecNodeOptions {
+  BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}
+
+  BackpressureCounters* counters;
+};
+
+struct BackpressureCountingNode : public MapNode {
+  static constexpr const char* kKindName = "BackpressureCountingNode";
+  static constexpr const char* kFactoryName = "backpressure_count";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
+    }
+  }
+
+  BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                           std::shared_ptr<Schema> output_schema,
+                           const BackpressureCountingNodeOptions& options)
+      : MapNode(plan, inputs, output_schema), counters(options.counters) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
+    return plan->EmplaceNode<BackpressureCountingNode>(
+        plan, inputs, inputs[0]->output_schema(), bp_options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+  Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    ++counters->pause_count;
+    inputs()[0]->PauseProducing(this, counter);
+  }
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    ++counters->resume_count;
+    inputs()[0]->ResumeProducing(this, counter);
+  }
+
+  BackpressureCounters* counters;
+};
+
+AsyncGenerator<std::optional<ExecBatch>> GetGen(
+    AsyncGenerator<std::optional<ExecBatch>> gen) {
+  return gen;
+}
+AsyncGenerator<std::optional<ExecBatch>> GetGen(BatchesWithSchema bws) {
+  return bws.gen(false, false);
+}
+
 template <typename BatchesMaker>
-void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
-                      double fast_delay, double slow_delay, bool noisy = false) {
+void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
   auto l_schema =
       schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
   auto r0_schema =
@@ -1381,36 +1443,93 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  RegisterTestNodes();  // for GatedNode
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
+
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  auto gate_ptr = Gate::Make();
+  auto& gate = *gate_ptr;
+  GatedNodeOptions gate_options(gate_ptr.get());
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
+
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options};
+    }
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    for (size_t i = 0; i < source_configs.size(); i++) {
+      const auto& counters = bp_counters[i];
+      if (source_configs[i].is_gated) {
+        if (counters.pause_count > 0) return false;
+      } else {
+        if (counters.pause_count != 1) return false;
+      }
+    }
+    return true;
+  };
+
+  BusyWait(10.0, has_bp_been_applied);
+  ASSERT_TRUE(has_bp_been_applied());
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+  gate.ReleaseAllBatches();
+  ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  // One of the inputs is gated.  The other two will eventually be resumed by the asof
+  // join node
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& counters = bp_counters[i];
+    if (!source_configs[i].is_gated) {
+      ASSERT_GE(counters.resume_count, 0);
     }
-    total_length += batch->num_rows();
   }
-  ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
 }
 
 TEST(AsofJoinTest, BackpressureWithBatches) {
-  return TestBackpressure(MakeIntegerBatches, /*num_batches=*/10, /*batch_size=*/1,
-                          /*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false);
+  return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1);
 }
 
 template <typename BatchesMaker>
@@ -1473,10 +1592,9 @@ T GetEnvValue(const std::string& var, T default_value) {
 }  // namespace
 
 TEST(AsofJoinTest, BackpressureWithBatchesGen) {
-  int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 10);
+  int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20);
   int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
-  return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size,
-                          /*fast_delay=*/0.001, /*slow_delay=*/0.01);
+  return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size);
 }
 
 }  // namespace acero
diff --git a/cpp/src/arrow/acero/test_nodes.cc b/cpp/src/arrow/acero/test_nodes.cc
index ff95f72e6e..e109afbe1b 100644
--- a/cpp/src/arrow/acero/test_nodes.cc
+++ b/cpp/src/arrow/acero/test_nodes.cc
@@ -31,6 +31,7 @@
 #include "arrow/testing/random.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/iterator.h"
+#include "arrow/util/tracing_internal.h"
 
 namespace arrow {
 
@@ -200,12 +201,168 @@ class JitterNode : public ExecNode {
 
 }  // namespace
 
+class GateImpl {
+ public:
+  void ReleaseAllBatches() {
+    std::lock_guard lg(mutex_);
+    num_allowed_batches_ = -1;
+    NotifyAll();
+  }
+
+  void ReleaseOneBatch() {
+    std::lock_guard lg(mutex_);
+    DCHECK_GE(num_allowed_batches_, 0)
+        << "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()";
+    num_allowed_batches_++;
+    NotifyAll();
+  }
+
+  Future<> WaitForNextReleasedBatch() {
+    std::lock_guard lg(mutex_);
+    if (current_waiter_.is_valid()) {
+      return current_waiter_;
+    }
+    Future<> fut;
+    if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) {
+      num_released_batches_++;
+      return Future<>::MakeFinished();
+    }
+
+    current_waiter_ = Future<>::Make();
+    return current_waiter_;
+  }
+
+ private:
+  void NotifyAll() {
+    if (current_waiter_.is_valid()) {
+      Future<> to_unlock = current_waiter_;
+      current_waiter_ = {};
+      to_unlock.MarkFinished();
+    }
+  }
+
+  Future<> current_waiter_;
+  int num_released_batches_ = 0;
+  int num_allowed_batches_ = 0;
+  std::mutex mutex_;
+};
+
+std::shared_ptr<Gate> Gate::Make() { return std::make_shared<Gate>(); }
+
+Gate::Gate() : impl_(new GateImpl()) {}
+
+Gate::~Gate() { delete impl_; }
+
+void Gate::ReleaseAllBatches() { impl_->ReleaseAllBatches(); }
+
+void Gate::ReleaseOneBatch() { impl_->ReleaseOneBatch(); }
+
+Future<> Gate::WaitForNextReleasedBatch() { return impl_->WaitForNextReleasedBatch(); }
+
+namespace {
+
+struct GatedNode : public ExecNode, public TracedNode {
+  static constexpr auto kKindName = "BackpressureDelayingNode";
+  static constexpr const char* kFactoryName = "backpressure_delay";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make));
+    }
+  }
+
+  GatedNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+            std::shared_ptr<Schema> output_schema, const GatedNodeOptions& options)
+      : ExecNode(plan, inputs, {"input"}, output_schema),
+        TracedNode(this),
+        gate_(options.gate) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto gated_node_opts = static_cast<const GatedNodeOptions&>(options);
+    return plan->EmplaceNode<GatedNode>(plan, inputs, inputs[0]->output_schema(),
+                                        gated_node_opts);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+
+  const Ordering& ordering() const override { return inputs_[0]->ordering(); }
+  Status InputFinished(ExecNode* input, int total_batches) override {
+    return output_->InputFinished(this, total_batches);
+  }
+  Status StartProducing() override {
+    NoteStartProducing(ToStringExtra());
+    return Status::OK();
+  }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    inputs_[0]->PauseProducing(this, counter);
+  }
+
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    inputs_[0]->ResumeProducing(this, counter);
+  }
+
+  Status StopProducingImpl() override { return Status::OK(); }
+
+  Status SendBatchesUnlocked(std::unique_lock<std::mutex>&& lock) {
+    while (!queued_batches_.empty()) {
+      // If we are ready to release the batch, do so immediately.
+      Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch();
+      bool callback_added = maybe_unlocked.TryAddCallback([this] {
+        return [this](const Status& st) {
+          DCHECK_OK(st);
+          plan_->query_context()->ScheduleTask(
+              [this] {
+                std::unique_lock lk(mutex_);
+                return SendBatchesUnlocked(std::move(lk));
+              },
+              "GatedNode::ResumeAfterNotify");
+        };
+      });
+      if (callback_added) {
+        break;
+      }
+      // Otherwise, the future is already finished which means the gate is unlocked
+      // and we are allowed to send a batch
+      ExecBatch next = std::move(queued_batches_.front());
+      queued_batches_.pop();
+      lock.unlock();
+      ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next)));
+      lock.lock();
+    }
+    return Status::OK();
+  }
+
+  Status InputReceived(ExecNode* input, ExecBatch batch) override {
+    auto scope = TraceInputReceived(batch);
+    DCHECK_EQ(input, inputs_[0]);
+
+    // This may be called concurrently by the source and by a restart attempt.  Process
+    // one at a time (this critical section should be pretty small)
+    std::unique_lock lk(mutex_);
+    queued_batches_.push(std::move(batch));
+
+    return SendBatchesUnlocked(std::move(lk));
+  }
+
+  Gate* gate_;
+  std::queue<ExecBatch> queued_batches_;
+  std::mutex mutex_;
+};
+
+}  // namespace
+
 void RegisterTestNodes() {
   static std::once_flag registered;
   std::call_once(registered, [] {
     ExecFactoryRegistry* registry = default_exec_factory_registry();
     DCHECK_OK(
         registry->AddFactory(std::string(JitterNodeOptions::kName), JitterNode::Make));
+    DCHECK_OK(
+        registry->AddFactory(std::string(GatedNodeOptions::kName), GatedNode::Make));
   });
 }
 
diff --git a/cpp/src/arrow/acero/test_nodes.h b/cpp/src/arrow/acero/test_nodes.h
index 2d1d630b3b..7e31aa31b3 100644
--- a/cpp/src/arrow/acero/test_nodes.h
+++ b/cpp/src/arrow/acero/test_nodes.h
@@ -53,6 +53,33 @@ struct JitterNodeOptions : public ExecNodeOptions {
   static constexpr std::string_view kName = "jitter";
 };
 
+class GateImpl;
+
+class Gate {
+ public:
+  static std::shared_ptr<Gate> Make();
+
+  Gate();
+  virtual ~Gate();
+
+  void ReleaseAllBatches();
+  void ReleaseOneBatch();
+  Future<> WaitForNextReleasedBatch();
+
+ private:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(Gate);
+
+  GateImpl* impl_;
+};
+
+// A node that holds all input batches until a given gate is released
+struct GatedNodeOptions : public ExecNodeOptions {
+  explicit GatedNodeOptions(Gate* gate) : gate(gate) {}
+  Gate* gate;
+
+  static constexpr std::string_view kName = "gated";
+};
+
 void RegisterTestNodes();
 
 }  // namespace acero