You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "rtpsw (via GitHub)" <gi...@apache.org> on 2023/06/01 16:41:16 UTC

[GitHub] [arrow] rtpsw opened a new pull request, #35874: GH-35838: [C++] Backpressure broken in asof join node

rtpsw opened a new pull request, #35874:
URL: https://github.com/apache/arrow/pull/35874

   ### What changes are included in this PR?
   
   Passing the correct nodes to the backpressure controller.
   
   ### Are these changes tested?
   
   No. This is a quick fix.
   
   ### Are there any user-facing changes?
   
   No.
   
   **This PR contains a "Critical Fix".**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1587766408

   Per discussion @rtpsw let's use the gated node because it doesn't rely on timing, update this PR and then we should be able to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1232247664


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  Gate gate;
+  GatedNodeOptions gate_options(&gate);
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
     }
-    total_length += batch->num_rows();
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    int total_paused = 0;
+    for (const auto& counters : bp_counters) {
+      total_paused += counters.pause_count;
+    }
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    return total_paused >= 2;

Review Comment:
   I don't think this is purely empirically - Asof join should apply back-pressure to all other nodes when one if the input is gated, otherwise it is a bug I think.



##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  Gate gate;
+  GatedNodeOptions gate_options(&gate);
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
     }
-    total_length += batch->num_rows();
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    int total_paused = 0;
+    for (const auto& counters : bp_counters) {
+      total_paused += counters.pause_count;
+    }
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    return total_paused >= 2;

Review Comment:
   I don't think this is purely empirically - Asof join should apply back-pressure to all other nodes when one if the input is gated, otherwise it is a bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1573881515

   >  IMO, both (1) and (2) above could be flaky due to non-deterministic timing
   I think this can be avoided if we make sure the slow sink is blocking. i.e. 
   
   ```
   slow_sink.pause_processing() # make sure slow_sink stop processing so what we can trigger backpresure
   # validate backpresure is triggered
   slow_sink.resume_processing()
   # validate plan exists normally and resume counter is expected
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1572538901

   @westonpace Are you OK with me merging this to unblock internal issue? @rtpsw Is working on adding tests now but it might take a while.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213536821


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   @rtpsw I think it is easier to revert this line as to unblock the internal first:
   https://github.com/apache/arrow/pull/34392/files#diff-5493b6ae7ea2a4d5cfb581034c076e9c4be7608382168de6d1301ef67b6c01eeR1410
   
   Then work on cleaning up changes introduced in GH-36391. The code is quite confusing now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213594678


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   I am fine with figure out the cleaner way to do this in follow up to GH-36391. But for now I think it's easier to just revert the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1217677513


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1360,6 +1361,102 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
       schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
 })
 
+struct BackpressureCounters {
+  int32_t pause_count = 0;
+  int32_t resume_count = 0;

Review Comment:
   Confirmed by TSAN job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1232188840


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,18 +663,19 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
+      KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,

Review Comment:
   We are looking at two nodes: a node pushing to an as-of-join node. The code here has the perspective of the latter node, so the former node is seen as an input, which is why the code has `asof_input`. The `BackpressureController` has the perspective of the former node, which it controls, so the latter node is seen as an output. `BackpressureController` is designed to be reusable outside the context of `AsofJoinNode` (even though currently it is not exposed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231409358


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,18 +663,19 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
+      KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,

Review Comment:
   Not sure I fully understand, but from my understanding both the InputState and BackpressureController takes two node, one node presents the input table of asof join, and another is the asof join node itself. So my point here is it's better to make them consistent naming to avoid confusion. And you seem to say that InputState and BackpressureController have different perspective of input and output, which I do not fully get but perhaps I misunderstood what you mean. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1593557961

   > (2) Please update the original iGH ssue to better reflect the change in this PR
   
   It's authored by Weston - I don't have permission to edit. I edited [the description of this PR](https://github.com/apache/arrow/pull/35874#issue-1736711712) instead,


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231015778


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,18 +663,19 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
+      KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,

Review Comment:
   Can you elaborate why input and output perspective are different? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229995586


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  Gate gate;
+  GatedNodeOptions gate_options(&gate);
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
     }
-    total_length += batch->num_rows();
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    int total_paused = 0;
+    for (const auto& counters : bp_counters) {
+      total_paused += counters.pause_count;
+    }
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    return total_paused >= 2;
+  };
+
+  BusyWait(10.0, has_bp_been_applied);
+  ASSERT_TRUE(has_bp_been_applied());
+
+  gate.ReleaseAllBatches();
+  ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);
+
+  size_t total_resumed = 0;
+  for (const auto& counters : bp_counters) {
+    total_resumed += counters.resume_count;
   }
-  ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
+  ASSERT_GE(total_resumed, 2);

Review Comment:
   Shouldn't this be at least 3? (three sources)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1580770533

   https://github.com/rtpsw/arrow/pull/4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1217627690


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1360,6 +1361,102 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
       schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
 })
 
+struct BackpressureCounters {
+  int32_t pause_count = 0;
+  int32_t resume_count = 0;

Review Comment:
   Should these be atomic? Presumably they can be updated from multiple threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229987565


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -821,8 +817,11 @@ class InputState {
         ++batches_processed_;
         latest_ref_row_ = 0;
         have_active_batch &= !queue_.TryPop();
-        if (have_active_batch)
+        if (have_active_batch) {
           DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0);  // empty batches disallowed
+          key_hasher_->Invalidate();  // batch changed - invalidate key hasher's cache

Review Comment:
   Seems unrelated change - can you make a separate PR for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231406275


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  Gate gate;
+  GatedNodeOptions gate_options(&gate);
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
     }
-    total_length += batch->num_rows();
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    int total_paused = 0;
+    for (const auto& counters : bp_counters) {
+      total_paused += counters.pause_count;
+    }
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    return total_paused >= 2;

Review Comment:
   It looks the resume counter for the ungated nodes are >= 1 in all cases. So I think we can check them (both resume and paused) individually here. Instead of total.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213533392


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   It is also confusing that 
   
   ```
       return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
                                           key_hasher, node, std::move(handler), schema,
                                           time_col_index, key_col_index);
   ```
   On line 681 passes the asof join node to the input state instead of the input node, why is it?
   
                                           



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213557505


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   > This still looking confusing:
   > 
   > BackpressureController takes `ExecNode* node, ExecNode* output` and this one nows takes `AsofJoinNode* node, ExecNode* input` which is inconsistent
   > 
   > Can we make this consistent between the two?
   
   This is actually intended to make things clearer. The two places have a different perspective of what is an input and what is an output (which likely caused confusion in the first place). The `ExecNode` passed to `Make` is an input of the as-of-join node while `PauseProducing` (and similarly `ResumeProducing`) sees the as-of-join node as [an output](https://github.com/apache/arrow/blob/3299d12efc91220237266bfa6f985f9eb37492f8/cpp/src/arrow/acero/exec_plan.h#L291) of the `ExecNode`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213549103


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,
       std::atomic<int32_t>& backpressure_counter,
       const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
       const std::vector<col_index_t>& key_col_index) {
     constexpr size_t low_threshold = 4, high_threshold = 8;
     std::unique_ptr<BackpressureControl> backpressure_control =
-        std::make_unique<BackpressureController>(node, output, backpressure_counter);
+        std::make_unique<BackpressureController>(input, node, backpressure_counter);

Review Comment:
   Since there's been some confusion, can you make parameter names explicit?
   ```suggestion
           std::make_unique<BackpressureController>(/*xxx=*/ input, /*yyy=*/ node, backpressure_counter);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213596635


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   > The two places have a different perspective of what is an input and what is an output (which likely caused confusion in the first place). The ExecNode passed to Make is an input of the as-of-join node while PauseProducing (and similarly ResumeProducing) sees the as-of-join node as [an output](https://github.com/apache/arrow/blob/3299d12efc91220237266bfa6f985f9eb37492f8/cpp/src/arrow/acero/exec_plan.h#L291) of the ExecNode.
   
   We should probably fix the variable naming in the follow PR to GH-36391 how to call these things then. But for now let's just revert to what was before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213700921


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1406,6 +1458,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
     total_length += batch->num_rows();
   }
   ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
+
+  ASSERT_GT(pause_count, 0);

Review Comment:
   Can we validate pause resume counter for all sources? i.e the slow table should not have been paused, but the fast tables should?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1214316493


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   Maybe use more specific names like `asof_input` and `asof_node`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1214471386


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1406,6 +1477,20 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
     total_length += batch->num_rows();
   }
   ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
+
+  std::unordered_map<bool, BackpressureCounters> counters_by_is_fast;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    BackpressureCounters& counters = counters_by_is_fast[source_configs[i].is_fast];
+    counters.pause_count += bp_counters[i].pause_count;
+    counters.resume_count += bp_counters[i].resume_count;
+  }
+  ASSERT_EQ(counters_by_is_fast.size(), 2);
+  ASSERT_GT(counters_by_is_fast[true].pause_count, 0);
+  ASSERT_GT(counters_by_is_fast[true].resume_count, 0);
+  // runs on some slow machines may not see any pause/resume, but if at least one pause is
+  // seen then at least one resume must also be seen
+  ASSERT_EQ(counters_by_is_fast[false].pause_count > 0,

Review Comment:
   I think we do. My understanding is that the intention of the pre-PR test code was to drive the as-of-join slower using a slower input to it, which should lead to backpressure from the as-of-join node toward a faster source. The additional intention of the post-PR test code is to check that indeed this backpressure happens, and this is observed at the test nodes inserted after each source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1214346669


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1225578602


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   I ended up just fixing it to be single-threaded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1230000141


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1360,9 +1366,209 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
       schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
 })
 
+struct BackpressureCounters {
+  std::atomic<int32_t> pause_count = 0;
+  std::atomic<int32_t> resume_count = 0;
+};
+
+struct BackpressureCountingNodeOptions : public ExecNodeOptions {
+  BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}
+
+  BackpressureCounters* counters;
+};
+
+struct BackpressureCountingNode : public MapNode {
+  static constexpr const char* kKindName = "BackpressureCountingNode";
+  static constexpr const char* kFactoryName = "backpressure_count";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
+    }
+  }
+
+  BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                           std::shared_ptr<Schema> output_schema,
+                           const BackpressureCountingNodeOptions& options)
+      : MapNode(plan, inputs, output_schema), counters(options.counters) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
+    return plan->EmplaceNode<BackpressureCountingNode>(
+        plan, inputs, inputs[0]->output_schema(), bp_options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+  Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    ++counters->pause_count;
+    inputs()[0]->PauseProducing(this, counter);
+  }
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    ++counters->resume_count;
+    inputs()[0]->ResumeProducing(this, counter);
+  }
+
+  BackpressureCounters* counters;
+};
+
+class Gate {
+ public:
+  void ReleaseAllBatches() {
+    std::lock_guard lg(mutex_);
+    num_allowed_batches_ = -1;
+    NotifyAll();
+  }
+
+  void ReleaseOneBatch() {
+    std::lock_guard lg(mutex_);
+    DCHECK_GE(num_allowed_batches_, 0)
+        << "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()";
+    num_allowed_batches_++;
+    NotifyAll();
+  }
+
+  Future<> WaitForNextReleasedBatch() {
+    std::lock_guard lg(mutex_);
+    if (current_waiter_.is_valid()) {
+      return current_waiter_;
+    }
+    Future<> fut;
+    if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) {
+      num_released_batches_++;
+      return Future<>::MakeFinished();
+    }
+
+    current_waiter_ = Future<>::Make();
+    return current_waiter_;
+  }
+
+ private:
+  void NotifyAll() {
+    if (current_waiter_.is_valid()) {
+      Future<> to_unlock = current_waiter_;
+      current_waiter_ = {};
+      to_unlock.MarkFinished();
+    }
+  }
+
+  Future<> current_waiter_;
+  int num_released_batches_ = 0;
+  int num_allowed_batches_ = 0;
+  std::mutex mutex_;
+};
+
+struct GatedNodeOptions : public ExecNodeOptions {
+  explicit GatedNodeOptions(Gate* gate) : gate(gate) {}
+  Gate* gate;
+};
+
+struct GatedNode : public ExecNode, public TracedNode {

Review Comment:
   Can you add documentation for this class to describe the purpose of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229979188


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -370,15 +370,10 @@ struct MemoStore {
     times_.swap(memo.times_);
   }
 
-  // Updates the current time to `ts` if it is less. A different thread may win the race
-  // to update the current time to more than `ts` but not to less. Returns whether the
-  // current time was changed from its value at the beginning of this invocation.
+  // Updates the current time to `ts` if it is less. Returns true if updated.
   bool UpdateTime(OnType ts) {
-    OnType prev_time = current_time_;

Review Comment:
   Are these change out of the scope of this PR? If so, can you create separate ones for them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231005463


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -821,8 +817,11 @@ class InputState {
         ++batches_processed_;
         latest_ref_row_ = 0;
         have_active_batch &= !queue_.TryPop();
-        if (have_active_batch)
+        if (have_active_batch) {
           DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0);  // empty batches disallowed
+          key_hasher_->Invalidate();  // batch changed - invalidate key hasher's cache

Review Comment:
   Created #36094 and will revert here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231399976


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  Gate gate;
+  GatedNodeOptions gate_options(&gate);
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
     }
-    total_length += batch->num_rows();
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    int total_paused = 0;
+    for (const auto& counters : bp_counters) {
+      total_paused += counters.pause_count;
+    }
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    return total_paused >= 2;

Review Comment:
   > Shouldn't total_paused be >= 3? (The gated one + the other 2)?
   
   No, only 2. The gated one is not exerting pressure.
   
   > Also, is this check robust? i.e. what happens if the gated node counter is > 1? Shouldn't we be checking that all counters are >1 here? (not the total)
   
   For some reason (any idea, @westonpace?), it cannot be tightened for the `BackpressureWithBatchesGen` test. While for the `BackpressureWithBatches` test I see that all resume counts are 1, they vary for the `BackpressureWithBatchesGen` test. Logging the resume counts per source in several runs, I got:
   ```
   i=0 gated=0 resume=1
   i=1 gated=1 resume=1
   i=2 gated=0 resume=1
   ```
   and
   ```
   i=0 gated=0 resume=1
   i=1 gated=1 resume=2
   i=2 gated=0 resume=2
   ```
   and
   ```
   i=0 gated=0 resume=1
   i=1 gated=1 resume=0
   i=2 gated=0 resume=1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231405359


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1360,9 +1366,209 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
       schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
 })
 
+struct BackpressureCounters {
+  std::atomic<int32_t> pause_count = 0;
+  std::atomic<int32_t> resume_count = 0;
+};
+
+struct BackpressureCountingNodeOptions : public ExecNodeOptions {
+  BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}
+
+  BackpressureCounters* counters;
+};
+
+struct BackpressureCountingNode : public MapNode {
+  static constexpr const char* kKindName = "BackpressureCountingNode";
+  static constexpr const char* kFactoryName = "backpressure_count";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
+    }
+  }
+
+  BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                           std::shared_ptr<Schema> output_schema,
+                           const BackpressureCountingNodeOptions& options)
+      : MapNode(plan, inputs, output_schema), counters(options.counters) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
+    return plan->EmplaceNode<BackpressureCountingNode>(
+        plan, inputs, inputs[0]->output_schema(), bp_options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+  Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    ++counters->pause_count;
+    inputs()[0]->PauseProducing(this, counter);
+  }
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    ++counters->resume_count;
+    inputs()[0]->ResumeProducing(this, counter);
+  }
+
+  BackpressureCounters* counters;
+};
+
+class Gate {
+ public:
+  void ReleaseAllBatches() {
+    std::lock_guard lg(mutex_);
+    num_allowed_batches_ = -1;
+    NotifyAll();
+  }
+
+  void ReleaseOneBatch() {
+    std::lock_guard lg(mutex_);
+    DCHECK_GE(num_allowed_batches_, 0)
+        << "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()";
+    num_allowed_batches_++;
+    NotifyAll();
+  }
+
+  Future<> WaitForNextReleasedBatch() {
+    std::lock_guard lg(mutex_);
+    if (current_waiter_.is_valid()) {
+      return current_waiter_;
+    }
+    Future<> fut;
+    if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) {
+      num_released_batches_++;
+      return Future<>::MakeFinished();
+    }
+
+    current_waiter_ = Future<>::Make();
+    return current_waiter_;
+  }
+
+ private:
+  void NotifyAll() {
+    if (current_waiter_.is_valid()) {
+      Future<> to_unlock = current_waiter_;
+      current_waiter_ = {};
+      to_unlock.MarkFinished();
+    }
+  }
+
+  Future<> current_waiter_;
+  int num_released_batches_ = 0;
+  int num_allowed_batches_ = 0;
+  std::mutex mutex_;
+};
+
+struct GatedNodeOptions : public ExecNodeOptions {
+  explicit GatedNodeOptions(Gate* gate) : gate(gate) {}
+  Gate* gate;
+};
+
+struct GatedNode : public ExecNode, public TracedNode {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1573688141

   > I do have a little bit of concern that this test may become flaky on CI machines
   
   Indeed it is flaky - [this](https://github.com/apache/arrow/actions/runs/5155477352/jobs/9285205333?pr=35874) and [this](https://github.com/apache/arrow/actions/runs/5155477352/jobs/9285205732?pr=35874) and [this](https://ci.appveyor.com/project/ApacheSoftwareFoundation/arrow/builds/47206703) CI jobs have failed on the new check. I'll make a quick attempt to fix this.
   
   I agree relying on timing is not good, though that can be said about the pre-PR backpressure test code too, and this PR won't be worse in this respect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213596635


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   > The two places have a different perspective of what is an input and what is an output (which likely caused confusion in the first place).
   
   We should probably fix the variable naming in the follow PR to GH-36391 how to call these things then. But for now let's just revert to what was before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1589631206

   > Per discussion @rtpsw let's use the gated node because it doesn't rely on timing, update this PR and then we should be able to merge.
   
   This approach ran into a problem. See [this post](https://github.com/rtpsw/arrow/pull/4#issuecomment-1589441338).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229991094


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  Gate gate;
+  GatedNodeOptions gate_options(&gate);
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
     }
-    total_length += batch->num_rows();
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    int total_paused = 0;
+    for (const auto& counters : bp_counters) {
+      total_paused += counters.pause_count;
+    }
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    return total_paused >= 2;

Review Comment:
   Shouldn't total_paused be >= 3? (The gated one + the other 2)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229983752


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,18 +663,19 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
+      KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,

Review Comment:
   Can you other functions in this file consistent with this naming here? e.g.
   https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#L541
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229992514


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  Gate gate;
+  GatedNodeOptions gate_options(&gate);
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
     }
-    total_length += batch->num_rows();
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    int total_paused = 0;
+    for (const auto& counters : bp_counters) {
+      total_paused += counters.pause_count;
+    }
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    return total_paused >= 2;

Review Comment:
   Also, is this check robust? i.e. what happens if the gated node counter is > 1? Shouldn't we be checking that all counters are >1 here? (not the total)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1233076581


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   @rtpsw Do we still need to change this. If so, why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1232191968


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  Gate gate;
+  GatedNodeOptions gate_options(&gate);
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
     }
-    total_length += batch->num_rows();
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    int total_paused = 0;
+    for (const auto& counters : bp_counters) {
+      total_paused += counters.pause_count;
+    }
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    return total_paused >= 2;

Review Comment:
   Empirically, yes. I'll try this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1573862020

   > Thinking about loud here:
   > 
   > What we want to test is that if the through put of asof join node is slower than the source, then we would pause the source. Two potential ways that I think we can reliably do this: (1) Add some sort of "debug options" to manipulate the behavior of asof join to make it run slower. (i.e. Sleep a few seconds before actually starting the work in the processing thread) (2) Add a downstream node to asof join that processes data slowly (similar to a slow data sink), i.e., process one batch per second. This way, the backpressure would be pushed from the slow sink to asof join then to the data sources.
   > 
   > I think I prefer (2) a bit more because this affects represents a real life case of slow sink.
   > 
   > @westonpace I am not sure if the idea of GatedSourceNode is similar or different, but happy to hear
   
   While I'm not sure exactly what Weston has in mind, my understanding is that the GatedSourceNode's goal is to avoid flakiness due to non-deterministic timing. IMO, both (1) and (2) above could be flaky due to non-deterministic timing.
   
   Between (1) and (2) I also wouldn't prefer (1) because the debug-options would change the behavior of the as-of-join node being tested, and I prefer to change the code driving it instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1214351235


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1406,6 +1458,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
     total_length += batch->num_rows();
   }
   ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
+
+  ASSERT_GT(pause_count, 0);

Review Comment:
   We may have crossed - the [previous commit](https://github.com/apache/arrow/pull/35874/commits/7d40e92c7d96e34935b2af3f84f74d4e1d8cec49) already has one pause and one resume counter for each source. The issue is that sometimes (due to non-deterministic timing of operations) no pause/resume is requested on one or both of the fast sources. Because of this, in the [recent commit](https://github.com/apache/arrow/pull/35874/commits/3e5fdf9a4a7e48500882f1592d1949c772dca358) I am attempting this test logic: there is a pause request on a fast source if and only if there is a resume one too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1591533196

   > To clarify status of this PR - we should update the test to use GatedNode instead
   
   Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229989024


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1360,9 +1366,209 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
       schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
 })
 
+struct BackpressureCounters {
+  std::atomic<int32_t> pause_count = 0;
+  std::atomic<int32_t> resume_count = 0;
+};
+
+struct BackpressureCountingNodeOptions : public ExecNodeOptions {
+  BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}
+
+  BackpressureCounters* counters;
+};
+
+struct BackpressureCountingNode : public MapNode {
+  static constexpr const char* kKindName = "BackpressureCountingNode";
+  static constexpr const char* kFactoryName = "backpressure_count";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
+    }
+  }
+
+  BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                           std::shared_ptr<Schema> output_schema,
+                           const BackpressureCountingNodeOptions& options)
+      : MapNode(plan, inputs, output_schema), counters(options.counters) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
+    return plan->EmplaceNode<BackpressureCountingNode>(
+        plan, inputs, inputs[0]->output_schema(), bp_options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+  Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    ++counters->pause_count;
+    inputs()[0]->PauseProducing(this, counter);
+  }
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    ++counters->resume_count;
+    inputs()[0]->ResumeProducing(this, counter);
+  }
+
+  BackpressureCounters* counters;
+};
+
+class Gate {

Review Comment:
   Can you move the Gated node to a separate file? It can be reused in other back pressure related test too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1225577069


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1360,6 +1361,102 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
       schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
 })
 
+struct BackpressureCounters {
+  std::atomic<int32_t> pause_count = 0;
+  std::atomic<int32_t> resume_count = 0;
+};
+
+struct BackpressureCountingNodeOptions : public ExecNodeOptions {
+  BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}
+
+  BackpressureCounters* counters;
+};
+
+struct BackpressureCountingNode : public MapNode {
+  static constexpr const char* kKindName = "BackpressureCountingNode";
+  static constexpr const char* kFactoryName = "backpressure_count";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
+    }
+  }
+
+  BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                           std::shared_ptr<Schema> output_schema,
+                           const BackpressureCountingNodeOptions& options)
+      : MapNode(plan, inputs, output_schema), counters(options.counters) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
+    return plan->EmplaceNode<BackpressureCountingNode>(
+        plan, inputs, inputs[0]->output_schema(), bp_options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+  Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    ++counters->pause_count;
+    inputs()[0]->PauseProducing(this, counter);
+  }
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    ++counters->resume_count;
+    inputs()[0]->ResumeProducing(this, counter);
+  }
+
+  BackpressureCounters* counters;
+};
+
+struct BackpressureDelayingNodeOptions : public ExecNodeOptions {
+  BackpressureDelayingNodeOptions(double delay_seconds, std::function<bool()> gate)
+      : delay_seconds(delay_seconds), gate(gate) {}
+
+  double delay_seconds;
+  std::function<bool()> gate;
+};
+
+struct BackpressureDelayingNode : public MapNode {
+  static constexpr auto kKindName = "BackpressureDelayingNode";
+  static constexpr const char* kFactoryName = "backpressure_delay";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureDelayingNode::Make));
+    }
+  }
+
+  BackpressureDelayingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                           std::shared_ptr<Schema> output_schema,
+                           const BackpressureDelayingNodeOptions& options)
+      : MapNode(plan, inputs, output_schema),
+        gate(options.gate),
+        delay_seconds(options.delay_seconds) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto bp_options = static_cast<const BackpressureDelayingNodeOptions&>(options);
+    return plan->EmplaceNode<BackpressureDelayingNode>(
+        plan, inputs, inputs[0]->output_schema(), bp_options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+  Result<ExecBatch> ProcessBatch(ExecBatch batch) override {
+    while (!gate()) {
+      SleepFor(delay_seconds);

Review Comment:
   I went with `SleepABit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1233197482


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1443,93 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  RegisterTestNodes();  // for GatedNode
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
+
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  auto gate_ptr = Gate::Make();
+  auto& gate = *gate_ptr;
+  GatedNodeOptions gate_options(gate_ptr.get());
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
+
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options};
+    }
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    for (size_t i = 0; i < source_configs.size(); i++) {
+      const auto& counters = bp_counters[i];
+      if (source_configs[i].is_gated) {
+        if (counters.pause_count > 0) return false;

Review Comment:
   The logic around here checks the following expectations of correct application of backpressure: a gated node should not have been paused (checked by the current line) whereas an non-gated node should have been paused once (checked by the next if-statement).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1594672375

   > > (2) Please update the original iGH ssue to better reflect the change in this PR
   > 
   > It's authored by Weston - I don't have permission to edit. I edited [the description of this PR](https://github.com/apache/arrow/pull/35874#issue-1736711712) instead,
   
   Ok we can leave it as is then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213516786


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   This still looking confusing:
   
   (1) BackpressureController takes `ExecNode* node, ExecNode* output` and this one nows takes `AsofJoinNode* node, ExecNode* input` which is inconsistent
   
   (2) InputState::Make uses to take inputs[i] but now it takes "node" which is the AsofJoinNode
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229374154


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   Sorry, I only pushed it in [the recent commit](https://github.com/apache/arrow/pull/35874/commits/e29520b4974e9bbc4ef08687913ce0199ab08b8a).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1591193956

   > > Per discussion @rtpsw let's use the gated node because it doesn't rely on timing, update this PR and then we should be able to merge.
   > 
   > This approach ran into a problem. See [this post](https://github.com/rtpsw/arrow/pull/4#issuecomment-1589441338).
   > 
   > Let's decide whether to spend time on the gate approach now or leave it for later.
   
   Yeah let's spend sometime figuring it out. This main purpose of this PR is to add the test to checkout Asof join node (please update the PR titile as well) so I think it's worthwhile. And the gated node is a reusable component in other back pressure related test as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1233077156


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1443,93 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  RegisterTestNodes();  // for GatedNode
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
+
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  auto gate_ptr = Gate::Make();
+  auto& gate = *gate_ptr;
+  GatedNodeOptions gate_options(gate_ptr.get());
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
+
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options};
+    }
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    for (size_t i = 0; i < source_configs.size(); i++) {
+      const auto& counters = bp_counters[i];
+      if (source_configs[i].is_gated) {
+        if (counters.pause_count > 0) return false;
+      } else {
+        if (counters.pause_count != 1) return false;

Review Comment:
   Why `!=1` here instead of `> 0`? i.e., if the non-gated node has pause_count > 1, is it considered "bp has not been applied"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1233197482


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1443,93 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  RegisterTestNodes();  // for GatedNode
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
+
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  auto gate_ptr = Gate::Make();
+  auto& gate = *gate_ptr;
+  GatedNodeOptions gate_options(gate_ptr.get());
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
+
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options};
+    }
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    for (size_t i = 0; i < source_configs.size(); i++) {
+      const auto& counters = bp_counters[i];
+      if (source_configs[i].is_gated) {
+        if (counters.pause_count > 0) return false;

Review Comment:
   The logic around here checks the following expectations of correct application of backpressure: a gated node should not have been paused (checked by the current line) whereas a non-gated node should have been paused once (checked by the next if-statement).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] conbench-apache-arrow[bot] commented on pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "conbench-apache-arrow[bot] (via GitHub)" <gi...@apache.org>.
conbench-apache-arrow[bot] commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1603227993

   Conbench analyzed the 6 benchmark runs on commit `73239526`.
   
   There were no benchmark performance regressions. 🎉
   
   The [full Conbench report](https://github.com/apache/arrow/runs/14483295307) has more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1233077316


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1443,93 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  RegisterTestNodes();  // for GatedNode
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
+
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  auto gate_ptr = Gate::Make();
+  auto& gate = *gate_ptr;
+  GatedNodeOptions gate_options(gate_ptr.get());
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
+
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options};
+    }
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    for (size_t i = 0; i < source_configs.size(); i++) {
+      const auto& counters = bp_counters[i];
+      if (source_configs[i].is_gated) {
+        if (counters.pause_count > 0) return false;

Review Comment:
   The logic here is a bit confusing. If pause_count > 0 then bp has not been applied? (Shouldn't it been the other way around?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1575536003

   Prior to the [recent commit](https://github.com/apache/arrow/pull/35874/commits/6db6574432d1c5f997b2496d5f7072a03dbcb946), I noticed that some (macOS, I think) CI jobs were timing out. I figured this happened because `gate()` never returned true, which means the queue never crossed a threshold, likely due to an extreme timing of threaded operations. Since the low and high queue thresholds are 4 and 8, respectively, my guess was the number of batches in the backpressure tests (previously 10) was too small. Doubling the number of batches should therefore help, and indeed this seems to work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213561052


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   > I think it is easier to revert this line
   
   This reversion alone doesn't compile because `inputs[i]` is not of type `AsofJoinNode*`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1214345360


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1406,6 +1458,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
     total_length += batch->num_rows();
   }
   ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
+
+  ASSERT_GT(pause_count, 0);

Review Comment:
   Let me clarify my point:
   
   Currently it looks like there is one count for all the data sources. What I am asking is if we can have one count per data source so we can check slow/fast table counter separately. This is because I expect the slow table is not paused and the fast table is paused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1576871651

   In the [recent commit](https://github.com/apache/arrow/pull/35874/commits/380aa94bd04df435a0be7c6ee05a5e2581008f77), I included the changes in [this PR](https://github.com/apache/arrow/pull/35904), which are necessary to avoid TSAN errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1572700207

   @icexelloss, I added test cases. This PR can now be reviewed as a (non-quick) resolution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1214369106


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1406,6 +1477,20 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
     total_length += batch->num_rows();
   }
   ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
+
+  std::unordered_map<bool, BackpressureCounters> counters_by_is_fast;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    BackpressureCounters& counters = counters_by_is_fast[source_configs[i].is_fast];
+    counters.pause_count += bp_counters[i].pause_count;
+    counters.resume_count += bp_counters[i].resume_count;
+  }
+  ASSERT_EQ(counters_by_is_fast.size(), 2);
+  ASSERT_GT(counters_by_is_fast[true].pause_count, 0);
+  ASSERT_GT(counters_by_is_fast[true].resume_count, 0);
+  // runs on some slow machines may not see any pause/resume, but if at least one pause is
+  // seen then at least one resume must also be seen
+  ASSERT_EQ(counters_by_is_fast[false].pause_count > 0,

Review Comment:
   I don't think we really need to check the slow sources, because it is entirely possible that it is not paused at all in the expected behavior (since it is slower than asof join?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213593030


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   It's probably easier for me to do it instead of back and forth, opened:
   
   https://github.com/apache/arrow/pull/35878
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213657677


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,
       std::atomic<int32_t>& backpressure_counter,
       const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
       const std::vector<col_index_t>& key_col_index) {
     constexpr size_t low_threshold = 4, high_threshold = 8;
     std::unique_ptr<BackpressureControl> backpressure_control =
-        std::make_unique<BackpressureController>(node, output, backpressure_counter);
+        std::make_unique<BackpressureController>(input, node, backpressure_counter);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1228608997


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   > I ended up just fixing it to be single-threaded.
   Nice - can you point me where the fix is?



##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   > I ended up just fixing it to be single-threaded.
   
   Nice - can you point me where the fix is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229979720


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   I am somewhat confused. If this is single-threaded, then we don't need this to be atomic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1593591892

   This [CI job failure](https://github.com/apache/arrow/actions/runs/5282401504/jobs/9557256027?pr=35874) suggests that the [tightening condition](https://github.com/apache/arrow/pull/35874#discussion_r1229992514) (in [this commit](https://github.com/apache/arrow/pull/35874/commits/c458c2ee7442725d0a82a644a53dae52d51338e5)) isn't always true.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231004010


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,18 +663,19 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
+      KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,

Review Comment:
   The location you are pointing to has a different perspective of the input and output, and should not be changed. Also note that the class there could easily be pulled out for reuse, and should not be tied to specific internals of as-of-join node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231004801


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -370,15 +370,10 @@ struct MemoStore {
     times_.swap(memo.times_);
   }
 
-  // Updates the current time to `ts` if it is less. A different thread may win the race
-  // to update the current time to more than `ts` but not to less. Returns whether the
-  // current time was changed from its value at the beginning of this invocation.
+  // Updates the current time to `ts` if it is less. Returns true if updated.
   bool UpdateTime(OnType ts) {
-    OnType prev_time = current_time_;

Review Comment:
   Created #36094 and will revert here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231400648


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;
+    std::shared_ptr<Schema> schema;
+    decltype(l_batches) batches;
 
-  ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
-                       DeclarationToReader(asofjoin, /*use_threads=*/false));
+    std::string name() const {
+      return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+    }
+  };
+
+  Gate gate;
+  GatedNodeOptions gate_options(&gate);
+
+  // Two ungated and one gated
+  std::vector<BackpressureSourceConfig> source_configs = {
+      {"0", false, l_schema, l_batches},
+      {"1", true, r0_schema, r0_batches},
+      {"2", false, r1_schema, r1_batches},
+  };
 
-  int64_t total_length = 0;
-  for (;;) {
-    ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
-    if (!batch) {
-      break;
+  std::vector<BackpressureCounters> bp_counters(source_configs.size());
+  std::vector<Declaration> src_decls;
+  std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+  std::vector<Declaration::Input> bp_decls;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    const auto& config = source_configs[i];
+
+    src_decls.emplace_back("source",
+                           SourceNodeOptions(config.schema, GetGen(config.batches)));
+    bp_options.push_back(
+        std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+    std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+    std::vector<Declaration::Input> bp_in = {src_decls.back()};
+    Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+                           std::move(options)};
+    if (config.is_gated) {
+      bp_decl = {GatedNode::kFactoryName, {bp_decl}, gate_options};
     }
-    total_length += batch->num_rows();
+    bp_decls.push_back(bp_decl);
+  }
+
+  Declaration asofjoin = {"asofjoin", bp_decls,
+                          GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+                       internal::ThreadPool::Make(1));
+  ExecContext exec_ctx(default_memory_pool(), tpool.get());
+  Future<BatchesWithCommonSchema> batches_fut =
+      DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+  auto has_bp_been_applied = [&] {
+    int total_paused = 0;
+    for (const auto& counters : bp_counters) {
+      total_paused += counters.pause_count;
+    }
+    // One of the inputs is gated.  The other two will eventually be paused by the asof
+    // join node
+    return total_paused >= 2;
+  };
+
+  BusyWait(10.0, has_bp_been_applied);
+  ASSERT_TRUE(has_bp_been_applied());
+
+  gate.ReleaseAllBatches();
+  ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);
+
+  size_t total_resumed = 0;
+  for (const auto& counters : bp_counters) {
+    total_resumed += counters.resume_count;
   }
-  ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
+  ASSERT_GE(total_resumed, 2);

Review Comment:
   See [this post](https://github.com/apache/arrow/pull/35874#discussion_r1231399976).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231405144


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1360,9 +1366,209 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
       schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
 })
 
+struct BackpressureCounters {
+  std::atomic<int32_t> pause_count = 0;
+  std::atomic<int32_t> resume_count = 0;
+};
+
+struct BackpressureCountingNodeOptions : public ExecNodeOptions {
+  BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}
+
+  BackpressureCounters* counters;
+};
+
+struct BackpressureCountingNode : public MapNode {
+  static constexpr const char* kKindName = "BackpressureCountingNode";
+  static constexpr const char* kFactoryName = "backpressure_count";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
+    }
+  }
+
+  BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                           std::shared_ptr<Schema> output_schema,
+                           const BackpressureCountingNodeOptions& options)
+      : MapNode(plan, inputs, output_schema), counters(options.counters) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
+    return plan->EmplaceNode<BackpressureCountingNode>(
+        plan, inputs, inputs[0]->output_schema(), bp_options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+  Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    ++counters->pause_count;
+    inputs()[0]->PauseProducing(this, counter);
+  }
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    ++counters->resume_count;
+    inputs()[0]->ResumeProducing(this, counter);
+  }
+
+  BackpressureCounters* counters;
+};
+
+class Gate {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213557505


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   > This still looking confusing:
   > 
   > BackpressureController takes `ExecNode* node, ExecNode* output` and this one nows takes `AsofJoinNode* node, ExecNode* input` which is inconsistent
   > 
   > Can we make this consistent between the two?
   
   This is actually intended to make things clearer. The two places have a different perspective of what is an input and what is an output. The `ExecNode` passed to `Make` is an input of the as-of-join node while `PauseProducing` (and similarly `ResumeProducing`) sees the as-of-join node as [an output](https://github.com/apache/arrow/blob/3299d12efc91220237266bfa6f985f9eb37492f8/cpp/src/arrow/acero/exec_plan.h#L291) of the `ExecNode`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1573724405

   Thinking about loud here:
   
   What we want to test is that if the through put of asof join node is slower than the source, then we would pause the source. Two potential ways that I think we can reliably do this:
   (1) Add some sort of "debug options" to manipulate the behavior of asof join to make it run slower. (i.e. Sleep a few seconds before actually starting the work in the processing thread)
   (2) Add a downstream node to asof join that processes data slowly (similar to a slow data sink), i.e., process one batch per second. This way, the backpressure would be pushed from the slow sink to asof join then to the data sources.
   
   I think I prefer (2) a bit more because this affects represents a real life case of slow sink. 
   
   @westonpace I am not sure if the idea of GatedSourceNode is similar or different, but happy to hear


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1214471386


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1406,6 +1477,20 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
     total_length += batch->num_rows();
   }
   ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
+
+  std::unordered_map<bool, BackpressureCounters> counters_by_is_fast;
+  for (size_t i = 0; i < source_configs.size(); i++) {
+    BackpressureCounters& counters = counters_by_is_fast[source_configs[i].is_fast];
+    counters.pause_count += bp_counters[i].pause_count;
+    counters.resume_count += bp_counters[i].resume_count;
+  }
+  ASSERT_EQ(counters_by_is_fast.size(), 2);
+  ASSERT_GT(counters_by_is_fast[true].pause_count, 0);
+  ASSERT_GT(counters_by_is_fast[true].resume_count, 0);
+  // runs on some slow machines may not see any pause/resume, but if at least one pause is
+  // seen then at least one resume must also be seen
+  ASSERT_EQ(counters_by_is_fast[false].pause_count > 0,

Review Comment:
   I think we do. My understanding is that the intention of the pre-PR test code was to drive the as-of-join slower using a slower input to it, which should lead to backpressure from the as-of-join node. The additional intention of the post-PR test code is to check that indeed this backpressure happens, and this is observed at the test nodes inserted after each source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1576216004

   @github-actions crossbow submit test-ubuntu-20.04-cpp-thread-sanitizer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1576206201

   @github-actions crossbow submit ubuntu-cpp-thread-sanitizer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1576209367

   ```
   Unable to match any tasks for `ubuntu-cpp-thread-sanitizer`
   The Archery job run can be found at: https://github.com/apache/arrow/actions/runs/5174439156
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1218191706


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   Above it is saying "the key hasher is not thread-safe", if so, why do we care if this is atomic here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1218192781


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   Are we using the key hasher in multiple thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213596635


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   > The two places have a different perspective of what is an input and what is an output (which likely caused confusion in the first place). The ExecNode passed to Make is an input of the as-of-join node while PauseProducing (and similarly ResumeProducing) sees the as-of-join node as [an output](https://github.com/apache/arrow/blob/3299d12efc91220237266bfa6f985f9eb37492f8/cpp/src/arrow/acero/exec_plan.h#L291) of the ExecNode.
   
   We should probably fix the variable naming in the follow PR to GH-36391 how to call these things then. But for now let's just revert to what was before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1587616082

   Let me take a look at this later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1229993346


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1381,36 +1587,85 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
   ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
   ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
 
-  Declaration l_src = {
-      "source", SourceNodeOptions(
-                    l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
-  Declaration r0_src = {
-      "source", SourceNodeOptions(
-                    r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
-  Declaration r1_src = {
-      "source", SourceNodeOptions(
-                    r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+  BackpressureCountingNode::Register();
+  GatedNode::Register();
 
-  Declaration asofjoin = {
-      "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+  struct BackpressureSourceConfig {
+    std::string name_prefix;
+    bool is_gated;

Review Comment:
   Why do we need this config? i.e when is this not gated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss merged pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss merged PR #35874:
URL: https://github.com/apache/arrow/pull/35874


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231122426


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,18 +663,19 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
+      KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,

Review Comment:
   We discussed this [here](https://github.com/apache/arrow/pull/35874#discussion_r1213557505).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Add backpressure test for asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1231410129


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,18 +663,19 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
+      KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,

Review Comment:
   (This is not a major point but I want to make sure I understand what you are saying)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1213516786


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -668,13 +668,13 @@ class InputState {
 
   static Result<std::unique_ptr<InputState>> Make(
       size_t index, TolType tolerance, bool must_hash, bool may_rehash,
-      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
+      KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* input,

Review Comment:
   This still looking confusing:
   
   BackpressureController takes `ExecNode* node, ExecNode* output` and this one nows takes `AsofJoinNode* node, ExecNode* input` which is inconsistent
   
   Can we make this consistent between the two?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1576220903

   Revision: 6db6574432d1c5f997b2496d5f7072a03dbcb946
   
   Submitted crossbow builds: [ursacomputing/crossbow @ actions-5c2212b740](https://github.com/ursacomputing/crossbow/branches/all?query=actions-5c2212b740)
   
   |Task|Status|
   |----|------|
   |test-ubuntu-20.04-cpp-thread-sanitizer|[![Github Actions](https://github.com/ursacomputing/crossbow/workflows/Crossbow/badge.svg?branch=actions-5c2212b740-github-test-ubuntu-20.04-cpp-thread-sanitizer)](https://github.com/ursacomputing/crossbow/actions/runs/5174485194/jobs/9320855945)|


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1214316831


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1406,6 +1458,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
     total_length += batch->num_rows();
   }
   ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
+
+  ASSERT_GT(pause_count, 0);

Review Comment:
   > Can we validate pause resume counter for all sources?
   
   I added validation ...
   
   > i.e the slow table should not have been paused, but the fast tables should?
   
   ... but this is not the actual behavior - instead only one fast source gets paused and resumed - so this is what the recent commit validates. @westonpace, is this behavior expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on PR #35874:
URL: https://github.com/apache/arrow/pull/35874#issuecomment-1574273430

   I added a delaying node after the as-of-join-node and used different backpressure detection checks. I observed that the as-of-join-node calls to `Pause` and `Resume` are independent, because the former/latter occurs when crossing the high/low threshold up/down.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "rtpsw (via GitHub)" <gi...@apache.org>.
rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1218255714


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   It is queried from one thread but invalidated from another. We discussed offline that this can be simplified so that the key hasher would only be used from one thread, but this is (currently?) out of scope for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1222189098


##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -529,7 +529,7 @@ class KeyHasher {
   size_t index_;
   std::vector<col_index_t> indices_;
   std::vector<KeyColumnMetadata> metadata_;
-  const RecordBatch* batch_;
+  std::atomic<const RecordBatch*> batch_;

Review Comment:
   I see in that case you can update the doc to explain how this class should be used in multi-threaded execution? Currently looks like the doc about thread safety is not correct, i.e., it is used by multiple thread while the docstring says it's not thread safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #35874: GH-35838: [C++] Backpressure broken in asof join node

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1220861413


##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1360,6 +1361,102 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
       schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
 })
 
+struct BackpressureCounters {
+  std::atomic<int32_t> pause_count = 0;
+  std::atomic<int32_t> resume_count = 0;
+};
+
+struct BackpressureCountingNodeOptions : public ExecNodeOptions {
+  BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}
+
+  BackpressureCounters* counters;
+};
+
+struct BackpressureCountingNode : public MapNode {
+  static constexpr const char* kKindName = "BackpressureCountingNode";
+  static constexpr const char* kFactoryName = "backpressure_count";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
+    }
+  }
+
+  BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                           std::shared_ptr<Schema> output_schema,
+                           const BackpressureCountingNodeOptions& options)
+      : MapNode(plan, inputs, output_schema), counters(options.counters) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
+    return plan->EmplaceNode<BackpressureCountingNode>(
+        plan, inputs, inputs[0]->output_schema(), bp_options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+  Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
+
+  void PauseProducing(ExecNode* output, int32_t counter) override {
+    ++counters->pause_count;
+    inputs()[0]->PauseProducing(this, counter);
+  }
+  void ResumeProducing(ExecNode* output, int32_t counter) override {
+    ++counters->resume_count;
+    inputs()[0]->ResumeProducing(this, counter);
+  }
+
+  BackpressureCounters* counters;
+};
+
+struct BackpressureDelayingNodeOptions : public ExecNodeOptions {
+  BackpressureDelayingNodeOptions(double delay_seconds, std::function<bool()> gate)
+      : delay_seconds(delay_seconds), gate(gate) {}
+
+  double delay_seconds;
+  std::function<bool()> gate;
+};
+
+struct BackpressureDelayingNode : public MapNode {
+  static constexpr auto kKindName = "BackpressureDelayingNode";
+  static constexpr const char* kFactoryName = "backpressure_delay";
+
+  static void Register() {
+    auto exec_reg = default_exec_factory_registry();
+    if (!exec_reg->GetFactory(kFactoryName).ok()) {
+      ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureDelayingNode::Make));
+    }
+  }
+
+  BackpressureDelayingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                           std::shared_ptr<Schema> output_schema,
+                           const BackpressureDelayingNodeOptions& options)
+      : MapNode(plan, inputs, output_schema),
+        gate(options.gate),
+        delay_seconds(options.delay_seconds) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+    auto bp_options = static_cast<const BackpressureDelayingNodeOptions&>(options);
+    return plan->EmplaceNode<BackpressureDelayingNode>(
+        plan, inputs, inputs[0]->output_schema(), bp_options);
+  }
+
+  const char* kind_name() const override { return kKindName; }
+  Result<ExecBatch> ProcessBatch(ExecBatch batch) override {
+    while (!gate()) {
+      SleepFor(delay_seconds);

Review Comment:
   You could use a condition variable instead of a loop here.  This entire test could be written without any sleeps in that case.  However, this isn't too bad.  You could also get rid of `delay_seconds` and just use `SleepABit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org