You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/06/21 13:57:25 UTC
[arrow] branch main updated: GH-35838: [C++] Add backpressure test for asof join node (#35874)
This is an automated email from the ASF dual-hosted git repository.
icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 732395261c GH-35838: [C++] Add backpressure test for asof join node (#35874)
732395261c is described below
commit 732395261cd13bdd3aab9cb02ce06b76fd709e25
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Wed Jun 21 16:57:18 2023 +0300
GH-35838: [C++] Add backpressure test for asof join node (#35874)
### What changes are included in this PR?
Passing the correct nodes to the backpressure controller, along with better parameter naming/doc. Also added reusable gate-classes (`Gate`, `GatedNodeOptions` and `GatedNode`) that enable holding all input batches until a gate is released, in order to support more robust backpressure testing in this PR.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
No.
**This PR contains a "Critical Fix".**
* Closes: #35838
Lead-authored-by: Yaron Gvili <rt...@hotmail.com>
Co-authored-by: Weston Pace <we...@gmail.com>
Co-authored-by: rtpsw <rt...@hotmail.com>
Signed-off-by: Li Jin <ic...@gmail.com>
---
cpp/src/arrow/acero/asof_join_node.cc | 13 ++-
cpp/src/arrow/acero/asof_join_node_test.cc | 172 ++++++++++++++++++++++++-----
cpp/src/arrow/acero/test_nodes.cc | 157 ++++++++++++++++++++++++++
cpp/src/arrow/acero/test_nodes.h | 27 +++++
4 files changed, 336 insertions(+), 33 deletions(-)
diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc
index f8dee5aac8..d3c988e18e 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -495,9 +495,9 @@ class KeyHasher {
4 * kMiniBatchLength * sizeof(uint32_t));
}
- void Invalidate() {
- batch_ = NULLPTR; // invalidate cached hashes for batch - required when it changes
- }
+ // invalidate cached hashes for batch - required when it changes
+ // only this method can be called concurrently with HashesFor
+ void Invalidate() { batch_ = NULLPTR; }
// compute and cache a hash for each row of the given batch
const std::vector<HashType>& HashesFor(const RecordBatch* batch) {
@@ -668,18 +668,19 @@ class InputState {
static Result<std::unique_ptr<InputState>> Make(
size_t index, TolType tolerance, bool must_hash, bool may_rehash,
- KeyHasher* key_hasher, ExecNode* node, AsofJoinNode* output,
+ KeyHasher* key_hasher, ExecNode* asof_input, AsofJoinNode* asof_node,
std::atomic<int32_t>& backpressure_counter,
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index) {
constexpr size_t low_threshold = 4, high_threshold = 8;
std::unique_ptr<BackpressureControl> backpressure_control =
- std::make_unique<BackpressureController>(node, output, backpressure_counter);
+ std::make_unique<BackpressureController>(
+ /*node=*/asof_input, /*output=*/asof_node, backpressure_counter);
ARROW_ASSIGN_OR_RAISE(auto handler,
BackpressureHandler::Make(low_threshold, high_threshold,
std::move(backpressure_control)));
return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
- key_hasher, output, std::move(handler), schema,
+ key_hasher, asof_node, std::move(handler), schema,
time_col_index, key_col_index);
}
diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc
index b113eb86e5..c62d0c0b85 100644
--- a/cpp/src/arrow/acero/asof_join_node_test.cc
+++ b/cpp/src/arrow/acero/asof_join_node_test.cc
@@ -19,9 +19,12 @@
#include <chrono>
#include <memory>
+#include <mutex>
#include <numeric>
#include <random>
#include <string_view>
+#include "arrow/acero/exec_plan.h"
+#include "arrow/testing/future_util.h"
#ifndef NDEBUG
#include <sstream>
#endif
@@ -31,6 +34,8 @@
#ifndef NDEBUG
#include "arrow/acero/options_internal.h"
#endif
+#include "arrow/acero/map_node.h"
+#include "arrow/acero/query_context.h"
#include "arrow/acero/test_nodes.h"
#include "arrow/acero/test_util_internal.h"
#include "arrow/acero/util.h"
@@ -1360,9 +1365,66 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
})
+struct BackpressureCounters {
+ std::atomic<int32_t> pause_count = 0;
+ std::atomic<int32_t> resume_count = 0;
+};
+
+struct BackpressureCountingNodeOptions : public ExecNodeOptions {
+ BackpressureCountingNodeOptions(BackpressureCounters* counters) : counters(counters) {}
+
+ BackpressureCounters* counters;
+};
+
+struct BackpressureCountingNode : public MapNode {
+ static constexpr const char* kKindName = "BackpressureCountingNode";
+ static constexpr const char* kFactoryName = "backpressure_count";
+
+ static void Register() {
+ auto exec_reg = default_exec_factory_registry();
+ if (!exec_reg->GetFactory(kFactoryName).ok()) {
+ ASSERT_OK(exec_reg->AddFactory(kFactoryName, BackpressureCountingNode::Make));
+ }
+ }
+
+ BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ std::shared_ptr<Schema> output_schema,
+ const BackpressureCountingNodeOptions& options)
+ : MapNode(plan, inputs, output_schema), counters(options.counters) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+ auto bp_options = static_cast<const BackpressureCountingNodeOptions&>(options);
+ return plan->EmplaceNode<BackpressureCountingNode>(
+ plan, inputs, inputs[0]->output_schema(), bp_options);
+ }
+
+ const char* kind_name() const override { return kKindName; }
+ Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
+
+ void PauseProducing(ExecNode* output, int32_t counter) override {
+ ++counters->pause_count;
+ inputs()[0]->PauseProducing(this, counter);
+ }
+ void ResumeProducing(ExecNode* output, int32_t counter) override {
+ ++counters->resume_count;
+ inputs()[0]->ResumeProducing(this, counter);
+ }
+
+ BackpressureCounters* counters;
+};
+
+AsyncGenerator<std::optional<ExecBatch>> GetGen(
+ AsyncGenerator<std::optional<ExecBatch>> gen) {
+ return gen;
+}
+AsyncGenerator<std::optional<ExecBatch>> GetGen(BatchesWithSchema bws) {
+ return bws.gen(false, false);
+}
+
template <typename BatchesMaker>
-void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
- double fast_delay, double slow_delay, bool noisy = false) {
+void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
auto r0_schema =
@@ -1381,36 +1443,93 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size,
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
- Declaration l_src = {
- "source", SourceNodeOptions(
- l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
- Declaration r0_src = {
- "source", SourceNodeOptions(
- r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
- Declaration r1_src = {
- "source", SourceNodeOptions(
- r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};
+ BackpressureCountingNode::Register();
+ RegisterTestNodes(); // for GatedNode
- Declaration asofjoin = {
- "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};
+ struct BackpressureSourceConfig {
+ std::string name_prefix;
+ bool is_gated;
+ std::shared_ptr<Schema> schema;
+ decltype(l_batches) batches;
+
+ std::string name() const {
+ return name_prefix + ";" + (is_gated ? "gated" : "ungated");
+ }
+ };
+
+ auto gate_ptr = Gate::Make();
+ auto& gate = *gate_ptr;
+ GatedNodeOptions gate_options(gate_ptr.get());
+
+ // Two ungated and one gated
+ std::vector<BackpressureSourceConfig> source_configs = {
+ {"0", false, l_schema, l_batches},
+ {"1", true, r0_schema, r0_batches},
+ {"2", false, r1_schema, r1_batches},
+ };
+
+ std::vector<BackpressureCounters> bp_counters(source_configs.size());
+ std::vector<Declaration> src_decls;
+ std::vector<std::shared_ptr<BackpressureCountingNodeOptions>> bp_options;
+ std::vector<Declaration::Input> bp_decls;
+ for (size_t i = 0; i < source_configs.size(); i++) {
+ const auto& config = source_configs[i];
+
+ src_decls.emplace_back("source",
+ SourceNodeOptions(config.schema, GetGen(config.batches)));
+ bp_options.push_back(
+ std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
+ std::shared_ptr<ExecNodeOptions> options = bp_options.back();
+ std::vector<Declaration::Input> bp_in = {src_decls.back()};
+ Declaration bp_decl = {BackpressureCountingNode::kFactoryName, bp_in,
+ std::move(options)};
+ if (config.is_gated) {
+ bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options};
+ }
+ bp_decls.push_back(bp_decl);
+ }
+
+ Declaration asofjoin = {"asofjoin", bp_decls,
+ GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
+ internal::ThreadPool::Make(1));
+ ExecContext exec_ctx(default_memory_pool(), tpool.get());
+ Future<BatchesWithCommonSchema> batches_fut =
+ DeclarationToExecBatchesAsync(asofjoin, exec_ctx);
+
+ auto has_bp_been_applied = [&] {
+ // One of the inputs is gated. The other two will eventually be paused by the asof
+ // join node
+ for (size_t i = 0; i < source_configs.size(); i++) {
+ const auto& counters = bp_counters[i];
+ if (source_configs[i].is_gated) {
+ if (counters.pause_count > 0) return false;
+ } else {
+ if (counters.pause_count != 1) return false;
+ }
+ }
+ return true;
+ };
+
+ BusyWait(10.0, has_bp_been_applied);
+ ASSERT_TRUE(has_bp_been_applied());
- ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
- DeclarationToReader(asofjoin, /*use_threads=*/false));
+ gate.ReleaseAllBatches();
+ ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);
- int64_t total_length = 0;
- for (;;) {
- ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
- if (!batch) {
- break;
+ // One of the inputs is gated. The other two will eventually be resumed by the asof
+ // join node
+ for (size_t i = 0; i < source_configs.size(); i++) {
+ const auto& counters = bp_counters[i];
+ if (!source_configs[i].is_gated) {
+ ASSERT_GE(counters.resume_count, 0);
}
- total_length += batch->num_rows();
}
- ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
}
TEST(AsofJoinTest, BackpressureWithBatches) {
- return TestBackpressure(MakeIntegerBatches, /*num_batches=*/10, /*batch_size=*/1,
- /*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/false);
+ return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1);
}
template <typename BatchesMaker>
@@ -1473,10 +1592,9 @@ T GetEnvValue(const std::string& var, T default_value) {
} // namespace
TEST(AsofJoinTest, BackpressureWithBatchesGen) {
- int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 10);
+ int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20);
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
- return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size,
- /*fast_delay=*/0.001, /*slow_delay=*/0.01);
+ return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size);
}
} // namespace acero
diff --git a/cpp/src/arrow/acero/test_nodes.cc b/cpp/src/arrow/acero/test_nodes.cc
index ff95f72e6e..e109afbe1b 100644
--- a/cpp/src/arrow/acero/test_nodes.cc
+++ b/cpp/src/arrow/acero/test_nodes.cc
@@ -31,6 +31,7 @@
#include "arrow/testing/random.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
+#include "arrow/util/tracing_internal.h"
namespace arrow {
@@ -200,12 +201,168 @@ class JitterNode : public ExecNode {
} // namespace
+class GateImpl {
+ public:
+ void ReleaseAllBatches() {
+ std::lock_guard lg(mutex_);
+ num_allowed_batches_ = -1;
+ NotifyAll();
+ }
+
+ void ReleaseOneBatch() {
+ std::lock_guard lg(mutex_);
+ DCHECK_GE(num_allowed_batches_, 0)
+ << "you can't call ReleaseOneBatch() after calling ReleaseAllBatches()";
+ num_allowed_batches_++;
+ NotifyAll();
+ }
+
+ Future<> WaitForNextReleasedBatch() {
+ std::lock_guard lg(mutex_);
+ if (current_waiter_.is_valid()) {
+ return current_waiter_;
+ }
+ Future<> fut;
+ if (num_allowed_batches_ < 0 || num_released_batches_ < num_allowed_batches_) {
+ num_released_batches_++;
+ return Future<>::MakeFinished();
+ }
+
+ current_waiter_ = Future<>::Make();
+ return current_waiter_;
+ }
+
+ private:
+ void NotifyAll() {
+ if (current_waiter_.is_valid()) {
+ Future<> to_unlock = current_waiter_;
+ current_waiter_ = {};
+ to_unlock.MarkFinished();
+ }
+ }
+
+ Future<> current_waiter_;
+ int num_released_batches_ = 0;
+ int num_allowed_batches_ = 0;
+ std::mutex mutex_;
+};
+
+std::shared_ptr<Gate> Gate::Make() { return std::make_shared<Gate>(); }
+
+Gate::Gate() : impl_(new GateImpl()) {}
+
+Gate::~Gate() { delete impl_; }
+
+void Gate::ReleaseAllBatches() { impl_->ReleaseAllBatches(); }
+
+void Gate::ReleaseOneBatch() { impl_->ReleaseOneBatch(); }
+
+Future<> Gate::WaitForNextReleasedBatch() { return impl_->WaitForNextReleasedBatch(); }
+
+namespace {
+
+struct GatedNode : public ExecNode, public TracedNode {
+ static constexpr auto kKindName = "BackpressureDelayingNode";
+ static constexpr const char* kFactoryName = "backpressure_delay";
+
+ static void Register() {
+ auto exec_reg = default_exec_factory_registry();
+ if (!exec_reg->GetFactory(kFactoryName).ok()) {
+ ASSERT_OK(exec_reg->AddFactory(kFactoryName, GatedNode::Make));
+ }
+ }
+
+ GatedNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ std::shared_ptr<Schema> output_schema, const GatedNodeOptions& options)
+ : ExecNode(plan, inputs, {"input"}, output_schema),
+ TracedNode(this),
+ gate_(options.gate) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+ auto gated_node_opts = static_cast<const GatedNodeOptions&>(options);
+ return plan->EmplaceNode<GatedNode>(plan, inputs, inputs[0]->output_schema(),
+ gated_node_opts);
+ }
+
+ const char* kind_name() const override { return kKindName; }
+
+ const Ordering& ordering() const override { return inputs_[0]->ordering(); }
+ Status InputFinished(ExecNode* input, int total_batches) override {
+ return output_->InputFinished(this, total_batches);
+ }
+ Status StartProducing() override {
+ NoteStartProducing(ToStringExtra());
+ return Status::OK();
+ }
+
+ void PauseProducing(ExecNode* output, int32_t counter) override {
+ inputs_[0]->PauseProducing(this, counter);
+ }
+
+ void ResumeProducing(ExecNode* output, int32_t counter) override {
+ inputs_[0]->ResumeProducing(this, counter);
+ }
+
+ Status StopProducingImpl() override { return Status::OK(); }
+
+ Status SendBatchesUnlocked(std::unique_lock<std::mutex>&& lock) {
+ while (!queued_batches_.empty()) {
+ // If we are ready to release the batch, do so immediately.
+ Future<> maybe_unlocked = gate_->WaitForNextReleasedBatch();
+ bool callback_added = maybe_unlocked.TryAddCallback([this] {
+ return [this](const Status& st) {
+ DCHECK_OK(st);
+ plan_->query_context()->ScheduleTask(
+ [this] {
+ std::unique_lock lk(mutex_);
+ return SendBatchesUnlocked(std::move(lk));
+ },
+ "GatedNode::ResumeAfterNotify");
+ };
+ });
+ if (callback_added) {
+ break;
+ }
+ // Otherwise, the future is already finished which means the gate is unlocked
+ // and we are allowed to send a batch
+ ExecBatch next = std::move(queued_batches_.front());
+ queued_batches_.pop();
+ lock.unlock();
+ ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next)));
+ lock.lock();
+ }
+ return Status::OK();
+ }
+
+ Status InputReceived(ExecNode* input, ExecBatch batch) override {
+ auto scope = TraceInputReceived(batch);
+ DCHECK_EQ(input, inputs_[0]);
+
+ // This may be called concurrently by the source and by a restart attempt. Process
+ // one at a time (this critical section should be pretty small)
+ std::unique_lock lk(mutex_);
+ queued_batches_.push(std::move(batch));
+
+ return SendBatchesUnlocked(std::move(lk));
+ }
+
+ Gate* gate_;
+ std::queue<ExecBatch> queued_batches_;
+ std::mutex mutex_;
+};
+
+} // namespace
+
void RegisterTestNodes() {
static std::once_flag registered;
std::call_once(registered, [] {
ExecFactoryRegistry* registry = default_exec_factory_registry();
DCHECK_OK(
registry->AddFactory(std::string(JitterNodeOptions::kName), JitterNode::Make));
+ DCHECK_OK(
+ registry->AddFactory(std::string(GatedNodeOptions::kName), GatedNode::Make));
});
}
diff --git a/cpp/src/arrow/acero/test_nodes.h b/cpp/src/arrow/acero/test_nodes.h
index 2d1d630b3b..7e31aa31b3 100644
--- a/cpp/src/arrow/acero/test_nodes.h
+++ b/cpp/src/arrow/acero/test_nodes.h
@@ -53,6 +53,33 @@ struct JitterNodeOptions : public ExecNodeOptions {
static constexpr std::string_view kName = "jitter";
};
+class GateImpl;
+
+class Gate {
+ public:
+ static std::shared_ptr<Gate> Make();
+
+ Gate();
+ virtual ~Gate();
+
+ void ReleaseAllBatches();
+ void ReleaseOneBatch();
+ Future<> WaitForNextReleasedBatch();
+
+ private:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(Gate);
+
+ GateImpl* impl_;
+};
+
+// A node that holds all input batches until a given gate is released
+struct GatedNodeOptions : public ExecNodeOptions {
+ explicit GatedNodeOptions(Gate* gate) : gate(gate) {}
+ Gate* gate;
+
+ static constexpr std::string_view kName = "gated";
+};
+
void RegisterTestNodes();
} // namespace acero