You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/01/05 01:33:10 UTC
[arrow] branch master updated: GH-15176: Fix various issues introduced in the asof-join benchmark by ARROW-17980 and ARROW-15732 (#15190)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 773b5d815c GH-15176: Fix various issues introduced in the asof-join benchmark by ARROW-17980 and ARROW-15732 (#15190)
773b5d815c is described below
commit 773b5d815c8dd68a4ff1e3b90f7838ab770a9d27
Author: Weston Pace <we...@gmail.com>
AuthorDate: Wed Jan 4 17:33:03 2023 -0800
GH-15176: Fix various issues introduced in the asof-join benchmark by ARROW-17980 and ARROW-15732 (#15190)
* Closes: #15176
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/compute/exec/asof_join_benchmark.cc | 13 ++++++++-----
cpp/src/arrow/compute/exec/exec_plan.cc | 8 +++++++-
cpp/src/arrow/compute/exec/util.h | 15 +++++++++++++++
cpp/src/arrow/engine/substrait/serde_test.cc | 21 +++------------------
4 files changed, 33 insertions(+), 24 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/asof_join_benchmark.cc b/cpp/src/arrow/compute/exec/asof_join_benchmark.cc
index 366508e34b..7ab4ee7a7f 100644
--- a/cpp/src/arrow/compute/exec/asof_join_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/asof_join_benchmark.cc
@@ -53,7 +53,8 @@ static void TableJoinOverhead(benchmark::State& state,
TableGenerationProperties left_table_properties,
TableGenerationProperties right_table_properties,
int batch_size, int num_right_tables,
- std::string factory_name, ExecNodeOptions& options) {
+ std::string factory_name,
+ std::shared_ptr<ExecNodeOptions> options) {
left_table_properties.column_prefix = "lt";
left_table_properties.seed = 0;
ASSERT_OK_AND_ASSIGN(TableStats left_table_stats, MakeTable(left_table_properties));
@@ -85,7 +86,9 @@ static void TableJoinOverhead(benchmark::State& state,
}
Declaration join_node{factory_name, {input_nodes}, options};
state.ResumeTiming();
- ASSERT_OK(DeclarationToStatus(std::move(join_node)));
+ // asof-join must currently be run synchronously as it relies on data arriving
+ // in-order
+ ASSERT_OK(DeclarationToStatus(std::move(join_node), /*use_threads=*/false));
}
state.counters["input_rows_per_second"] = benchmark::Counter(
@@ -112,8 +115,8 @@ AsofJoinNodeOptions GetRepeatedOptions(size_t repeat, FieldRef on_key,
static void AsOfJoinOverhead(benchmark::State& state) {
int64_t tolerance = 0;
- AsofJoinNodeOptions options =
- GetRepeatedOptions(int(state.range(4)), kTimeCol, {kKeyCol}, tolerance);
+ auto options = std::make_shared<AsofJoinNodeOptions>(
+ GetRepeatedOptions(int(state.range(4) + 1), kTimeCol, {kKeyCol}, tolerance));
TableJoinOverhead(
state,
TableGenerationProperties{int(state.range(0)), int(state.range(1)),
@@ -122,7 +125,7 @@ static void AsOfJoinOverhead(benchmark::State& state) {
TableGenerationProperties{int(state.range(5)), int(state.range(6)),
int(state.range(7)), "", kDefaultMinColumnVal,
kDefaultMaxColumnVal, 0, kDefaultStart, kDefaultEnd},
- int(state.range(3)), int(state.range(4)), "asofjoin", options);
+ int(state.range(3)), int(state.range(4)), "asofjoin", std::move(options));
}
// this generates the set of right hand tables to test on.
diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc
index 757c0db882..b8886619d7 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.cc
+++ b/cpp/src/arrow/compute/exec/exec_plan.cc
@@ -667,7 +667,13 @@ Result<BatchesWithCommonSchema> DeclarationToExecBatches(Declaration declaration
Future<> DeclarationToStatusAsync(Declaration declaration, ExecContext exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(exec_context));
- ARROW_RETURN_NOT_OK(declaration.AddToPlan(exec_plan.get()));
+ ARROW_ASSIGN_OR_RAISE(ExecNode * last_node, declaration.AddToPlan(exec_plan.get()));
+ for (int i = 0; i < last_node->num_outputs(); i++) {
+ ARROW_RETURN_NOT_OK(
+ Declaration("consuming_sink", {last_node},
+ ConsumingSinkNodeOptions(NullSinkNodeConsumer::Make()))
+ .AddToPlan(exec_plan.get()));
+ }
ARROW_RETURN_NOT_OK(exec_plan->Validate());
ARROW_RETURN_NOT_OK(exec_plan->StartProducing());
// Keep the exec_plan alive until it finishes
diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h
index 8f6b3de2e1..ea0c8cf36f 100644
--- a/cpp/src/arrow/compute/exec/util.h
+++ b/cpp/src/arrow/compute/exec/util.h
@@ -362,6 +362,21 @@ struct ARROW_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer {
util::Mutex consume_mutex_;
};
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {
+ public:
+ Status Init(const std::shared_ptr<Schema>&, BackpressureControl*,
+ ExecPlan* plan) override {
+ return Status::OK();
+ }
+ Status Consume(ExecBatch exec_batch) override { return Status::OK(); }
+ Future<> Finish() override { return Status::OK(); }
+
+ public:
+ static std::shared_ptr<NullSinkNodeConsumer> Make() {
+ return std::make_shared<NullSinkNodeConsumer>();
+ }
+};
+
/// Modify an Expression with pre-order and post-order visitation.
/// `pre` will be invoked on each Expression. `pre` will visit Calls before their
/// arguments, `post_call` will visit Calls (and no other Expressions) after their
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index afd053338b..1a966ab2fc 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -107,22 +107,7 @@ void WriteIpcData(const std::string& path,
ASSERT_OK(file_writer->Close());
}
-class NullSinkNodeConsumer : public compute::SinkNodeConsumer {
- public:
- Status Init(const std::shared_ptr<Schema>&, compute::BackpressureControl*,
- compute::ExecPlan* plan) override {
- return Status::OK();
- }
- Status Consume(compute::ExecBatch exec_batch) override { return Status::OK(); }
- Future<> Finish() override { return Status::OK(); }
-
- public:
- static std::shared_ptr<NullSinkNodeConsumer> Make() {
- return std::make_shared<NullSinkNodeConsumer>();
- }
-};
-
-const auto kNullConsumer = std::make_shared<NullSinkNodeConsumer>();
+const auto kNullConsumer = std::make_shared<compute::NullSinkNodeConsumer>();
const std::shared_ptr<Schema> kBoringSchema = schema({
field("bool", boolean()),
@@ -1124,7 +1109,7 @@ TEST(Substrait, DeserializeWithConsumerFactory) {
ASSERT_OK_AND_ASSIGN(std::string substrait_json, GetSubstraitJSON());
ASSERT_OK_AND_ASSIGN(auto buf, SerializeJsonPlan(substrait_json));
ASSERT_OK_AND_ASSIGN(auto declarations,
- DeserializePlans(*buf, NullSinkNodeConsumer::Make));
+ DeserializePlans(*buf, compute::NullSinkNodeConsumer::Make));
ASSERT_EQ(declarations.size(), 1);
compute::Declaration* decl = &declarations[0];
ASSERT_EQ(decl->factory_name, "consuming_sink");
@@ -1143,7 +1128,7 @@ TEST(Substrait, DeserializeSinglePlanWithConsumerFactory) {
ASSERT_OK_AND_ASSIGN(std::string substrait_json, GetSubstraitJSON());
ASSERT_OK_AND_ASSIGN(auto buf, SerializeJsonPlan(substrait_json));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<compute::ExecPlan> plan,
- DeserializePlan(*buf, NullSinkNodeConsumer::Make()));
+ DeserializePlan(*buf, compute::NullSinkNodeConsumer::Make()));
ASSERT_EQ(1, plan->sinks().size());
compute::ExecNode* sink_node = plan->sinks()[0];
ASSERT_STREQ(sink_node->kind_name(), "ConsumingSinkNode");