You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/07/08 23:17:05 UTC

[arrow] branch master updated: ARROW-16809: [C++] Add Benchmarks for FilterNode (#13366)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a2114c0605 ARROW-16809: [C++] Add Benchmarks for FilterNode (#13366)
a2114c0605 is described below

commit a2114c0605be66bb16d16ee0b25c9d81ab68f5ce
Author: Ivan Chau <iv...@gmail.com>
AuthorDate: Fri Jul 8 19:16:59 2022 -0400

    ARROW-16809: [C++] Add Benchmarks for FilterNode (#13366)
    
    Add `filter_benchmark.cc` and add to `arrow/compute/exec/CMakeLists`
    
    Lead-authored-by: Ivan Chau <iv...@gmail.com>
    Co-authored-by: Ivan Chau <iv...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/compute/exec/CMakeLists.txt       |  14 +-
 cpp/src/arrow/compute/exec/benchmark_util.cc    | 149 +++++++++++++++++++++
 cpp/src/arrow/compute/exec/benchmark_util.h     |  46 +++++++
 cpp/src/arrow/compute/exec/filter_benchmark.cc  | 166 ++++++++++++++++++++++++
 cpp/src/arrow/compute/exec/project_benchmark.cc |  97 ++------------
 5 files changed, 382 insertions(+), 90 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt
index 3353616e97..2c8d0ab210 100644
--- a/cpp/src/arrow/compute/exec/CMakeLists.txt
+++ b/cpp/src/arrow/compute/exec/CMakeLists.txt
@@ -48,7 +48,19 @@ add_arrow_compute_test(util_test
 
 add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")
 
-add_arrow_benchmark(project_benchmark PREFIX "arrow-compute")
+add_arrow_benchmark(filter_benchmark
+                    PREFIX
+                    "arrow-compute"
+                    SOURCES
+                    benchmark_util.cc
+                    filter_benchmark.cc)
+
+add_arrow_benchmark(project_benchmark
+                    PREFIX
+                    "arrow-compute"
+                    SOURCES
+                    benchmark_util.cc
+                    project_benchmark.cc)
 
 add_arrow_benchmark(tpch_benchmark PREFIX "arrow-compute")
 
diff --git a/cpp/src/arrow/compute/exec/benchmark_util.cc b/cpp/src/arrow/compute/exec/benchmark_util.cc
new file mode 100644
index 0000000000..5bac508854
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/benchmark_util.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/benchmark_util.h"
+
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+namespace compute {
+
+// Generates batches from data, then benchmark rows_per_second and batches_per_second for
+// an isolated node. We do this by passing in batches through a task scheduler, and
+// calling InputFinished and InputReceived.
+
+Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
+                                     arrow::compute::ExecContext ctx,
+                                     arrow::compute::Expression expr, int32_t num_batches,
+                                     int32_t batch_size,
+                                     arrow::compute::BatchesWithSchema data,
+                                     std::string factory_name,
+                                     arrow::compute::ExecNodeOptions& options) {
+  for (auto _ : state) {
+    state.PauseTiming();
+    AsyncGenerator<util::optional<arrow::compute::ExecBatch>> sink_gen;
+
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                          arrow::compute::ExecPlan::Make(&ctx));
+    // Source and sink nodes have no effect on the benchmark.
+    // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
+    ARROW_ASSIGN_OR_RAISE(arrow::compute::ExecNode * source_node,
+                          MakeExecNode("source", plan.get(), {},
+                                       arrow::compute::SourceNodeOptions{
+                                           data.schema, data.gen(/*parallel=*/true,
+                                                                 /*slow=*/false)}));
+
+    ARROW_ASSIGN_OR_RAISE(arrow::compute::ExecNode * node,
+                          MakeExecNode(factory_name, plan.get(), {source_node}, options));
+    ARROW_RETURN_NOT_OK(MakeExecNode("sink", plan.get(), {node},
+                                     arrow::compute::SinkNodeOptions{&sink_gen}));
+
+    std::unique_ptr<arrow::compute::TaskScheduler> scheduler =
+        arrow::compute::TaskScheduler::Make();
+    std::condition_variable all_tasks_finished_cv;
+    std::mutex mutex;
+
+    int task_group_id = scheduler->RegisterTaskGroup(
+        [&](size_t thread_id, int64_t task_id) {
+          node->InputReceived(source_node, data.batches[task_id]);
+          return Status::OK();
+        },
+        [&](size_t thread_id) {
+          node->InputFinished(source_node, static_cast<int>(data.batches.size()));
+          std::unique_lock<std::mutex> lk(mutex);
+          all_tasks_finished_cv.notify_one();
+          return Status::OK();
+        });
+    scheduler->RegisterEnd();
+
+    arrow::compute::ThreadIndexer thread_indexer;
+
+    state.ResumeTiming();
+    arrow::internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool();
+    ARROW_RETURN_NOT_OK(scheduler->StartScheduling(
+        thread_indexer(),
+        [&](std::function<Status(size_t)> task) -> Status {
+          return thread_pool->Spawn([&, task]() {
+            size_t tid = thread_indexer();
+            ARROW_DCHECK_OK(task(tid));
+          });
+        },
+        thread_pool->GetCapacity(),
+        /*use_sync_execution=*/false));
+    std::unique_lock<std::mutex> lk(mutex);
+    ARROW_RETURN_NOT_OK(
+        scheduler->StartTaskGroup(thread_indexer(), task_group_id, num_batches));
+    all_tasks_finished_cv.wait(lk);
+    if (!node->finished().is_finished()) {
+      return Status::Invalid("All tasks were finsihed but the node was not finished");
+    }
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+  return Status::OK();
+}
+
+// Generates batches from data, then benchmark rows_per_second and batches_per_second for
+// a source -> node_declarations -> sink sequence.
+
+Status BenchmarkNodeOverhead(
+    benchmark::State& state, arrow::compute::ExecContext ctx, int32_t num_batches,
+    int32_t batch_size, arrow::compute::BatchesWithSchema data,
+    std::vector<arrow::compute::Declaration>& node_declarations) {
+  for (auto _ : state) {
+    state.PauseTiming();
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                          arrow::compute::ExecPlan::Make(&ctx));
+    AsyncGenerator<util::optional<arrow::compute::ExecBatch>> sink_gen;
+    arrow::compute::Declaration source = arrow::compute::Declaration(
+        {"source",
+         arrow::compute::SourceNodeOptions{data.schema,
+                                           data.gen(/*parallel=*/true, /*slow=*/false)},
+         "custom_source_label"});
+    arrow::compute::Declaration sink = arrow::compute::Declaration(
+        {"sink", arrow::compute::SinkNodeOptions{&sink_gen}, "custom_sink_label"});
+    std::vector<arrow::compute::Declaration> sequence = {source};
+    sequence.insert(sequence.end(), node_declarations.begin(), node_declarations.end());
+    sequence.push_back(sink);
+    ARROW_RETURN_NOT_OK(
+        arrow::compute::Declaration::Sequence(sequence).AddToPlan(plan.get()));
+    state.ResumeTiming();
+    ARROW_RETURN_NOT_OK(StartAndCollect(plan.get(), sink_gen).status());
+  }
+
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+  return Status::OK();
+}
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/benchmark_util.h b/cpp/src/arrow/compute/exec/benchmark_util.h
new file mode 100644
index 0000000000..7897288cb8
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/benchmark_util.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/test_util.h"
+
+namespace arrow {
+
+namespace compute {
+
+Status BenchmarkNodeOverhead(benchmark::State& state, arrow::compute::ExecContext ctx,
+                             int32_t num_batches, int32_t batch_size,
+                             arrow::compute::BatchesWithSchema data,
+                             std::vector<arrow::compute::Declaration>& node_declarations);
+
+Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
+                                     arrow::compute::ExecContext ctx,
+                                     arrow::compute::Expression expr, int32_t num_batches,
+                                     int32_t batch_size,
+                                     arrow::compute::BatchesWithSchema data,
+                                     std::string factory_name,
+                                     arrow::compute::ExecNodeOptions& options);
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/filter_benchmark.cc b/cpp/src/arrow/compute/exec/filter_benchmark.cc
new file mode 100644
index 0000000000..64cf307580
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/filter_benchmark.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/exec/benchmark_util.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/record_batch.h"
+#include "arrow/testing/random.h"
+
+namespace arrow {
+namespace compute {
+
+constexpr int64_t kTotalBatchSize = 1000000;
+constexpr auto kSeed = 0x94378165;
+
+// Will return batches of size length, with fields as specified.
+// null_probability controls the likelihood that an element within the batch is null,
+// across all fields. bool_true_probability controls the likelihood that an element
+// belonging to a boolean field is true.
+
+static std::shared_ptr<arrow::RecordBatch> GetBatchesWithNullProbability(
+    const FieldVector& fields, int64_t length, double null_probability,
+    double bool_true_probability = 0.5) {
+  std::vector<std::shared_ptr<Array>> arrays(fields.size());
+  auto rand = random::RandomArrayGenerator(kSeed);
+  for (size_t i = 0; i < fields.size(); i++) {
+    const auto& field = fields[i];
+    if (field.get()->name() == "bool") {
+      arrays[i] = rand.Boolean(length, bool_true_probability, null_probability);
+    } else {
+      arrays[i] = rand.ArrayOf(field.get()->type(), length, null_probability);
+    }
+  }
+  return RecordBatch::Make(schema(fields), length, std::move(arrays));
+}
+
+BatchesWithSchema MakeRandomBatchesWithNullProbability(
+    std::shared_ptr<Schema> schema, int num_batches, int batch_size,
+    double null_probability = 0.5, double bool_true_probability = 0.5) {
+  BatchesWithSchema out;
+  out.batches.resize(num_batches);
+
+  for (int i = 0; i < num_batches; ++i) {
+    out.batches[i] = ExecBatch(*GetBatchesWithNullProbability(
+        schema->fields(), batch_size, null_probability, bool_true_probability));
+    out.batches[i].values.emplace_back(i);
+  }
+  out.schema = std::move(schema);
+  return out;
+}
+
+static void FilterOverhead(benchmark::State& state, std::vector<Expression> expr_vector) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const double null_prob = state.range(1) / 100.0;
+  const double bool_true_probability = state.range(2) / 100.0;
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatchesWithNullProbability(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size,
+      null_prob, bool_true_probability);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  std::vector<arrow::compute::Declaration> filter_node_dec;
+  for (Expression expr : expr_vector) {
+    filter_node_dec.push_back({"filter", FilterNodeOptions(expr)});
+  }
+  ASSERT_OK(
+      BenchmarkNodeOverhead(state, ctx, num_batches, batch_size, data, filter_node_dec));
+}
+
+static void FilterOverheadIsolated(benchmark::State& state, Expression expr) {
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  FilterNodeOptions options = FilterNodeOptions{expr};
+  ASSERT_OK(BenchmarkIsolatedNodeOverhead(state, ctx, expr, num_batches, batch_size, data,
+                                          "filter", options));
+}
+
+arrow::compute::Expression complex_expression =
+    less(less(field_ref("i64"), literal(20)), greater(field_ref("i64"), literal(0)));
+arrow::compute::Expression simple_expression =
+    less(call("negate", {field_ref("i64")}), literal(0));
+arrow::compute::Expression ref_only_expression = field_ref("bool");
+
+arrow::compute::Expression is_not_null_expression =
+    not_(is_null(field_ref("bool"), false));
+arrow::compute::Expression is_true_expression = equal(field_ref("bool"), literal(true));
+arrow::compute::Expression is_not_null_and_true_expression =
+    and_(is_not_null_expression, is_true_expression);
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  for (int batch_size = 1000; batch_size <= kTotalBatchSize; batch_size *= 10) {
+    bench->ArgNames({"batch_size", "null_prob", "bool_true_prob"})
+        ->Args({batch_size, 0, 50})
+        ->UseRealTime();
+  }
+}
+
+void SetSelectivityArgs(benchmark::internal::Benchmark* bench) {
+  for (int batch_size = 1000; batch_size <= kTotalBatchSize; batch_size *= 10) {
+    for (double null_prob : {0.1, 0.5, 0.75, 1.0}) {
+      bench->ArgNames({"batch_size", "null_prob", "bool_true_prob"})
+          ->Args({batch_size, static_cast<int64_t>(null_prob * 100.0), 50})
+          ->UseRealTime();
+    }
+  }
+}
+
+void SetMultiPassArgs(benchmark::internal::Benchmark* bench) {
+  for (int batch_size = 1000; batch_size <= kTotalBatchSize; batch_size *= 10) {
+    for (double null_prob : {0.0, 0.25, 0.5, 0.75}) {
+      // we keep the number of selected elements constant for all benchmarks.
+      double bool_true_prob = 0.25 / (1 - null_prob);
+      bench->ArgNames({"batch_size", "null_prob", "bool_true_prob"})
+          ->Args({batch_size, static_cast<int64_t>(null_prob * 100.0),
+                  static_cast<int64_t>(bool_true_prob * 100)})
+          ->UseRealTime();
+    }
+  }
+}
+
+BENCHMARK_CAPTURE(FilterOverheadIsolated, complex_expression, complex_expression)
+    ->Apply(SetArgs);
+BENCHMARK_CAPTURE(FilterOverheadIsolated, simple_expression, simple_expression)
+    ->Apply(SetArgs);
+BENCHMARK_CAPTURE(FilterOverheadIsolated, ref_only_expression, ref_only_expression)
+    ->Apply(SetArgs);
+
+BENCHMARK_CAPTURE(FilterOverhead, complex_expression, {complex_expression})
+    ->Apply(SetArgs);
+BENCHMARK_CAPTURE(FilterOverhead, simple_expression, {simple_expression})->Apply(SetArgs);
+BENCHMARK_CAPTURE(FilterOverhead, ref_only_expression, {ref_only_expression})
+    ->Apply(SetArgs);
+
+BENCHMARK_CAPTURE(FilterOverhead, selectivity_benchmark, {is_not_null_expression})
+    ->Apply(SetSelectivityArgs);
+
+BENCHMARK_CAPTURE(FilterOverhead, not_null_to_is_true_multipass_benchmark,
+                  {is_not_null_expression, is_true_expression})
+    ->Apply(SetMultiPassArgs);
+BENCHMARK_CAPTURE(FilterOverhead, not_null_and_is_true_singlepass_benchmark,
+                  {is_not_null_and_true_expression})
+    ->Apply(SetMultiPassArgs);
+
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/project_benchmark.cc b/cpp/src/arrow/compute/exec/project_benchmark.cc
index 542239eb62..cb4fdc4ffd 100644
--- a/cpp/src/arrow/compute/exec/project_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/project_benchmark.cc
@@ -22,6 +22,7 @@
 
 #include "arrow/compute/cast.h"
 #include "arrow/compute/exec.h"
+#include "arrow/compute/exec/benchmark_util.h"
 #include "arrow/compute/exec/expression.h"
 #include "arrow/compute/exec/options.h"
 #include "arrow/compute/exec/task_util.h"
@@ -44,33 +45,10 @@ static void ProjectionOverhead(benchmark::State& state, Expression expr) {
   arrow::compute::BatchesWithSchema data = MakeRandomBatches(
       schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
   ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
-  for (auto _ : state) {
-    state.PauseTiming();
-    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
-                         ExecPlan::Make(&ctx));
-    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
-    ASSERT_OK(Declaration::Sequence(
-                  {
-                      {"source",
-                       SourceNodeOptions{data.schema,
-                                         data.gen(/*parallel=*/true, /*slow=*/false)},
-                       "custom_source_label"},
-                      {"project", ProjectNodeOptions{{
-                                      expr,
-                                  }}},
-                      {"sink", SinkNodeOptions{&sink_gen}, "custom_sink_label"},
-                  })
-                  .AddToPlan(plan.get()));
-    state.ResumeTiming();
-    ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
-  }
-
-  state.counters["rows_per_second"] = benchmark::Counter(
-      static_cast<double>(state.iterations() * num_batches * batch_size),
-      benchmark::Counter::kIsRate);
-
-  state.counters["batches_per_second"] = benchmark::Counter(
-      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+  std::vector<arrow::compute::Declaration> project_node_dec = {
+      {"project", ProjectNodeOptions{{expr}}}};
+  ASSERT_OK(
+      BenchmarkNodeOverhead(state, ctx, num_batches, batch_size, data, project_node_dec));
 }
 
 static void ProjectionOverheadIsolated(benchmark::State& state, Expression expr) {
@@ -80,68 +58,9 @@ static void ProjectionOverheadIsolated(benchmark::State& state, Expression expr)
   arrow::compute::BatchesWithSchema data = MakeRandomBatches(
       schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
   ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
-  for (auto _ : state) {
-    state.PauseTiming();
-
-    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
-    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
-                         ExecPlan::Make(&ctx));
-    // Source and sink nodes have no effect on the benchmark.
-    // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
-    ASSERT_OK_AND_ASSIGN(
-        arrow::compute::ExecNode * source_node,
-        MakeExecNode("source", plan.get(), {},
-                     SourceNodeOptions{data.schema, data.gen(/*parallel=*/true,
-                                                             /*slow=*/false)}));
-    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * project_node,
-                         MakeExecNode("project", plan.get(), {source_node},
-                                      ProjectNodeOptions{{
-                                          expr,
-                                      }}));
-    ASSERT_OK(
-        MakeExecNode("sink", plan.get(), {project_node}, SinkNodeOptions{&sink_gen}));
-
-    std::unique_ptr<arrow::compute::TaskScheduler> scheduler = TaskScheduler::Make();
-    std::condition_variable all_tasks_finished_cv;
-    std::mutex mutex;
-    int task_group_id = scheduler->RegisterTaskGroup(
-        [&](size_t thread_id, int64_t task_id) {
-          project_node->InputReceived(source_node, data.batches[task_id]);
-          return Status::OK();
-        },
-        [&](size_t thread_id) {
-          project_node->InputFinished(source_node, static_cast<int>(data.batches.size()));
-          std::unique_lock<std::mutex> lk(mutex);
-          all_tasks_finished_cv.notify_one();
-          return Status::OK();
-        });
-    scheduler->RegisterEnd();
-    ThreadIndexer thread_indexer;
-
-    state.ResumeTiming();
-    arrow::internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool();
-    ASSERT_OK(scheduler->StartScheduling(
-        thread_indexer(),
-        [&](std::function<Status(size_t)> task) -> Status {
-          return thread_pool->Spawn([&, task]() {
-            size_t tid = thread_indexer();
-            ARROW_DCHECK_OK(task(tid));
-          });
-        },
-        thread_pool->GetCapacity(),
-        /*use_sync_execution=*/false));
-    std::unique_lock<std::mutex> lk(mutex);
-    ASSERT_OK(scheduler->StartTaskGroup(thread_indexer(), task_group_id, num_batches));
-    all_tasks_finished_cv.wait(lk);
-    ASSERT_TRUE(project_node->finished().is_finished());
-  }
-
-  state.counters["rows_per_second"] = benchmark::Counter(
-      static_cast<double>(state.iterations() * num_batches * batch_size),
-      benchmark::Counter::kIsRate);
-
-  state.counters["batches_per_second"] = benchmark::Counter(
-      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+  ProjectNodeOptions options = ProjectNodeOptions{{expr}};
+  ASSERT_OK(BenchmarkIsolatedNodeOverhead(state, ctx, expr, num_batches, batch_size, data,
+                                          "project", options));
 }
 
 arrow::compute::Expression complex_expression =