You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/01/05 01:33:10 UTC

[arrow] branch master updated: GH-15176: Fix various issues introduced in the asof-join benchmark by ARROW-17980 and ARROW-15732 (#15190)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 773b5d815c GH-15176: Fix various issues introduced in the asof-join benchmark by ARROW-17980 and ARROW-15732 (#15190)
773b5d815c is described below

commit 773b5d815c8dd68a4ff1e3b90f7838ab770a9d27
Author: Weston Pace <we...@gmail.com>
AuthorDate: Wed Jan 4 17:33:03 2023 -0800

    GH-15176: Fix various issues introduced in the asof-join benchmark by ARROW-17980 and ARROW-15732 (#15190)
    
    
    * Closes: #15176
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/compute/exec/asof_join_benchmark.cc | 13 ++++++++-----
 cpp/src/arrow/compute/exec/exec_plan.cc           |  8 +++++++-
 cpp/src/arrow/compute/exec/util.h                 | 15 +++++++++++++++
 cpp/src/arrow/engine/substrait/serde_test.cc      | 21 +++------------------
 4 files changed, 33 insertions(+), 24 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/asof_join_benchmark.cc b/cpp/src/arrow/compute/exec/asof_join_benchmark.cc
index 366508e34b..7ab4ee7a7f 100644
--- a/cpp/src/arrow/compute/exec/asof_join_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/asof_join_benchmark.cc
@@ -53,7 +53,8 @@ static void TableJoinOverhead(benchmark::State& state,
                               TableGenerationProperties left_table_properties,
                               TableGenerationProperties right_table_properties,
                               int batch_size, int num_right_tables,
-                              std::string factory_name, ExecNodeOptions& options) {
+                              std::string factory_name,
+                              std::shared_ptr<ExecNodeOptions> options) {
   left_table_properties.column_prefix = "lt";
   left_table_properties.seed = 0;
   ASSERT_OK_AND_ASSIGN(TableStats left_table_stats, MakeTable(left_table_properties));
@@ -85,7 +86,9 @@ static void TableJoinOverhead(benchmark::State& state,
     }
     Declaration join_node{factory_name, {input_nodes}, options};
     state.ResumeTiming();
-    ASSERT_OK(DeclarationToStatus(std::move(join_node)));
+    // asof-join must currently be run synchronously as it relies on data arriving
+    // in-order
+    ASSERT_OK(DeclarationToStatus(std::move(join_node), /*use_threads=*/false));
   }
 
   state.counters["input_rows_per_second"] = benchmark::Counter(
@@ -112,8 +115,8 @@ AsofJoinNodeOptions GetRepeatedOptions(size_t repeat, FieldRef on_key,
 
 static void AsOfJoinOverhead(benchmark::State& state) {
   int64_t tolerance = 0;
-  AsofJoinNodeOptions options =
-      GetRepeatedOptions(int(state.range(4)), kTimeCol, {kKeyCol}, tolerance);
+  auto options = std::make_shared<AsofJoinNodeOptions>(
+      GetRepeatedOptions(int(state.range(4) + 1), kTimeCol, {kKeyCol}, tolerance));
   TableJoinOverhead(
       state,
       TableGenerationProperties{int(state.range(0)), int(state.range(1)),
@@ -122,7 +125,7 @@ static void AsOfJoinOverhead(benchmark::State& state) {
       TableGenerationProperties{int(state.range(5)), int(state.range(6)),
                                 int(state.range(7)), "", kDefaultMinColumnVal,
                                 kDefaultMaxColumnVal, 0, kDefaultStart, kDefaultEnd},
-      int(state.range(3)), int(state.range(4)), "asofjoin", options);
+      int(state.range(3)), int(state.range(4)), "asofjoin", std::move(options));
 }
 
 // this generates the set of right hand tables to test on.
diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc
index 757c0db882..b8886619d7 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.cc
+++ b/cpp/src/arrow/compute/exec/exec_plan.cc
@@ -667,7 +667,13 @@ Result<BatchesWithCommonSchema> DeclarationToExecBatches(Declaration declaration
 Future<> DeclarationToStatusAsync(Declaration declaration, ExecContext exec_context) {
   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
                         ExecPlan::Make(exec_context));
-  ARROW_RETURN_NOT_OK(declaration.AddToPlan(exec_plan.get()));
+  ARROW_ASSIGN_OR_RAISE(ExecNode * last_node, declaration.AddToPlan(exec_plan.get()));
+  for (int i = 0; i < last_node->num_outputs(); i++) {
+    ARROW_RETURN_NOT_OK(
+        Declaration("consuming_sink", {last_node},
+                    ConsumingSinkNodeOptions(NullSinkNodeConsumer::Make()))
+            .AddToPlan(exec_plan.get()));
+  }
   ARROW_RETURN_NOT_OK(exec_plan->Validate());
   ARROW_RETURN_NOT_OK(exec_plan->StartProducing());
   // Keep the exec_plan alive until it finishes
diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h
index 8f6b3de2e1..ea0c8cf36f 100644
--- a/cpp/src/arrow/compute/exec/util.h
+++ b/cpp/src/arrow/compute/exec/util.h
@@ -362,6 +362,21 @@ struct ARROW_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer {
   util::Mutex consume_mutex_;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {
+ public:
+  Status Init(const std::shared_ptr<Schema>&, BackpressureControl*,
+              ExecPlan* plan) override {
+    return Status::OK();
+  }
+  Status Consume(ExecBatch exec_batch) override { return Status::OK(); }
+  Future<> Finish() override { return Status::OK(); }
+
+ public:
+  static std::shared_ptr<NullSinkNodeConsumer> Make() {
+    return std::make_shared<NullSinkNodeConsumer>();
+  }
+};
+
 /// Modify an Expression with pre-order and post-order visitation.
 /// `pre` will be invoked on each Expression. `pre` will visit Calls before their
 /// arguments, `post_call` will visit Calls (and no other Expressions) after their
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index afd053338b..1a966ab2fc 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -107,22 +107,7 @@ void WriteIpcData(const std::string& path,
   ASSERT_OK(file_writer->Close());
 }
 
-class NullSinkNodeConsumer : public compute::SinkNodeConsumer {
- public:
-  Status Init(const std::shared_ptr<Schema>&, compute::BackpressureControl*,
-              compute::ExecPlan* plan) override {
-    return Status::OK();
-  }
-  Status Consume(compute::ExecBatch exec_batch) override { return Status::OK(); }
-  Future<> Finish() override { return Status::OK(); }
-
- public:
-  static std::shared_ptr<NullSinkNodeConsumer> Make() {
-    return std::make_shared<NullSinkNodeConsumer>();
-  }
-};
-
-const auto kNullConsumer = std::make_shared<NullSinkNodeConsumer>();
+const auto kNullConsumer = std::make_shared<compute::NullSinkNodeConsumer>();
 
 const std::shared_ptr<Schema> kBoringSchema = schema({
     field("bool", boolean()),
@@ -1124,7 +1109,7 @@ TEST(Substrait, DeserializeWithConsumerFactory) {
   ASSERT_OK_AND_ASSIGN(std::string substrait_json, GetSubstraitJSON());
   ASSERT_OK_AND_ASSIGN(auto buf, SerializeJsonPlan(substrait_json));
   ASSERT_OK_AND_ASSIGN(auto declarations,
-                       DeserializePlans(*buf, NullSinkNodeConsumer::Make));
+                       DeserializePlans(*buf, compute::NullSinkNodeConsumer::Make));
   ASSERT_EQ(declarations.size(), 1);
   compute::Declaration* decl = &declarations[0];
   ASSERT_EQ(decl->factory_name, "consuming_sink");
@@ -1143,7 +1128,7 @@ TEST(Substrait, DeserializeSinglePlanWithConsumerFactory) {
   ASSERT_OK_AND_ASSIGN(std::string substrait_json, GetSubstraitJSON());
   ASSERT_OK_AND_ASSIGN(auto buf, SerializeJsonPlan(substrait_json));
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<compute::ExecPlan> plan,
-                       DeserializePlan(*buf, NullSinkNodeConsumer::Make()));
+                       DeserializePlan(*buf, compute::NullSinkNodeConsumer::Make()));
   ASSERT_EQ(1, plan->sinks().size());
   compute::ExecNode* sink_node = plan->sinks()[0];
   ASSERT_STREQ(sink_node->kind_name(), "ConsumingSinkNode");