You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/07/08 23:17:05 UTC
[arrow] branch master updated: ARROW-16809: [C++] Add Benchmarks for FilterNode (#13366)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a2114c0605 ARROW-16809: [C++] Add Benchmarks for FilterNode (#13366)
a2114c0605 is described below
commit a2114c0605be66bb16d16ee0b25c9d81ab68f5ce
Author: Ivan Chau <iv...@gmail.com>
AuthorDate: Fri Jul 8 19:16:59 2022 -0400
ARROW-16809: [C++] Add Benchmarks for FilterNode (#13366)
Add `filter_benchmark.cc` and add to `arrow/compute/exec/CMakeLists`
Lead-authored-by: Ivan Chau <iv...@gmail.com>
Co-authored-by: Ivan Chau <iv...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/compute/exec/CMakeLists.txt | 14 +-
cpp/src/arrow/compute/exec/benchmark_util.cc | 149 +++++++++++++++++++++
cpp/src/arrow/compute/exec/benchmark_util.h | 46 +++++++
cpp/src/arrow/compute/exec/filter_benchmark.cc | 166 ++++++++++++++++++++++++
cpp/src/arrow/compute/exec/project_benchmark.cc | 97 ++------------
5 files changed, 382 insertions(+), 90 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt
index 3353616e97..2c8d0ab210 100644
--- a/cpp/src/arrow/compute/exec/CMakeLists.txt
+++ b/cpp/src/arrow/compute/exec/CMakeLists.txt
@@ -48,7 +48,19 @@ add_arrow_compute_test(util_test
add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")
-add_arrow_benchmark(project_benchmark PREFIX "arrow-compute")
+add_arrow_benchmark(filter_benchmark
+ PREFIX
+ "arrow-compute"
+ SOURCES
+ benchmark_util.cc
+ filter_benchmark.cc)
+
+add_arrow_benchmark(project_benchmark
+ PREFIX
+ "arrow-compute"
+ SOURCES
+ benchmark_util.cc
+ project_benchmark.cc)
add_arrow_benchmark(tpch_benchmark PREFIX "arrow-compute")
diff --git a/cpp/src/arrow/compute/exec/benchmark_util.cc b/cpp/src/arrow/compute/exec/benchmark_util.cc
new file mode 100644
index 0000000000..5bac508854
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/benchmark_util.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/benchmark_util.h"
+
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+namespace compute {
+
+// Generates batches from data, then benchmark rows_per_second and batches_per_second for
+// an isolated node. We do this by passing in batches through a task scheduler, and
+// calling InputFinished and InputReceived.
+
+Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
+ arrow::compute::ExecContext ctx,
+ arrow::compute::Expression expr, int32_t num_batches,
+ int32_t batch_size,
+ arrow::compute::BatchesWithSchema data,
+ std::string factory_name,
+ arrow::compute::ExecNodeOptions& options) {
+ for (auto _ : state) {
+ state.PauseTiming();
+ AsyncGenerator<util::optional<arrow::compute::ExecBatch>> sink_gen;
+
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::compute::ExecPlan> plan,
+ arrow::compute::ExecPlan::Make(&ctx));
+ // Source and sink nodes have no effect on the benchmark.
+ // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
+ ARROW_ASSIGN_OR_RAISE(arrow::compute::ExecNode * source_node,
+ MakeExecNode("source", plan.get(), {},
+ arrow::compute::SourceNodeOptions{
+ data.schema, data.gen(/*parallel=*/true,
+ /*slow=*/false)}));
+
+ ARROW_ASSIGN_OR_RAISE(arrow::compute::ExecNode * node,
+ MakeExecNode(factory_name, plan.get(), {source_node}, options));
+ ARROW_RETURN_NOT_OK(MakeExecNode("sink", plan.get(), {node},
+ arrow::compute::SinkNodeOptions{&sink_gen}));
+
+ std::unique_ptr<arrow::compute::TaskScheduler> scheduler =
+ arrow::compute::TaskScheduler::Make();
+ std::condition_variable all_tasks_finished_cv;
+ std::mutex mutex;
+
+ int task_group_id = scheduler->RegisterTaskGroup(
+ [&](size_t thread_id, int64_t task_id) {
+ node->InputReceived(source_node, data.batches[task_id]);
+ return Status::OK();
+ },
+ [&](size_t thread_id) {
+ node->InputFinished(source_node, static_cast<int>(data.batches.size()));
+ std::unique_lock<std::mutex> lk(mutex);
+ all_tasks_finished_cv.notify_one();
+ return Status::OK();
+ });
+ scheduler->RegisterEnd();
+
+ arrow::compute::ThreadIndexer thread_indexer;
+
+ state.ResumeTiming();
+ arrow::internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool();
+ ARROW_RETURN_NOT_OK(scheduler->StartScheduling(
+ thread_indexer(),
+ [&](std::function<Status(size_t)> task) -> Status {
+ return thread_pool->Spawn([&, task]() {
+ size_t tid = thread_indexer();
+ ARROW_DCHECK_OK(task(tid));
+ });
+ },
+ thread_pool->GetCapacity(),
+ /*use_sync_execution=*/false));
+ std::unique_lock<std::mutex> lk(mutex);
+ ARROW_RETURN_NOT_OK(
+ scheduler->StartTaskGroup(thread_indexer(), task_group_id, num_batches));
+ all_tasks_finished_cv.wait(lk);
+ if (!node->finished().is_finished()) {
+ return Status::Invalid("All tasks were finsihed but the node was not finished");
+ }
+ }
+ state.counters["rows_per_second"] = benchmark::Counter(
+ static_cast<double>(state.iterations() * num_batches * batch_size),
+ benchmark::Counter::kIsRate);
+
+ state.counters["batches_per_second"] = benchmark::Counter(
+ static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+ return Status::OK();
+}
+
+// Generates batches from data, then benchmark rows_per_second and batches_per_second for
+// a source -> node_declarations -> sink sequence.
+
+Status BenchmarkNodeOverhead(
+ benchmark::State& state, arrow::compute::ExecContext ctx, int32_t num_batches,
+ int32_t batch_size, arrow::compute::BatchesWithSchema data,
+ std::vector<arrow::compute::Declaration>& node_declarations) {
+ for (auto _ : state) {
+ state.PauseTiming();
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::compute::ExecPlan> plan,
+ arrow::compute::ExecPlan::Make(&ctx));
+ AsyncGenerator<util::optional<arrow::compute::ExecBatch>> sink_gen;
+ arrow::compute::Declaration source = arrow::compute::Declaration(
+ {"source",
+ arrow::compute::SourceNodeOptions{data.schema,
+ data.gen(/*parallel=*/true, /*slow=*/false)},
+ "custom_source_label"});
+ arrow::compute::Declaration sink = arrow::compute::Declaration(
+ {"sink", arrow::compute::SinkNodeOptions{&sink_gen}, "custom_sink_label"});
+ std::vector<arrow::compute::Declaration> sequence = {source};
+ sequence.insert(sequence.end(), node_declarations.begin(), node_declarations.end());
+ sequence.push_back(sink);
+ ARROW_RETURN_NOT_OK(
+ arrow::compute::Declaration::Sequence(sequence).AddToPlan(plan.get()));
+ state.ResumeTiming();
+ ARROW_RETURN_NOT_OK(StartAndCollect(plan.get(), sink_gen).status());
+ }
+
+ state.counters["rows_per_second"] = benchmark::Counter(
+ static_cast<double>(state.iterations() * num_batches * batch_size),
+ benchmark::Counter::kIsRate);
+
+ state.counters["batches_per_second"] = benchmark::Counter(
+ static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+ return Status::OK();
+}
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/benchmark_util.h b/cpp/src/arrow/compute/exec/benchmark_util.h
new file mode 100644
index 0000000000..7897288cb8
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/benchmark_util.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/test_util.h"
+
+namespace arrow {
+
+namespace compute {
+
+Status BenchmarkNodeOverhead(benchmark::State& state, arrow::compute::ExecContext ctx,
+ int32_t num_batches, int32_t batch_size,
+ arrow::compute::BatchesWithSchema data,
+ std::vector<arrow::compute::Declaration>& node_declarations);
+
+Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
+ arrow::compute::ExecContext ctx,
+ arrow::compute::Expression expr, int32_t num_batches,
+ int32_t batch_size,
+ arrow::compute::BatchesWithSchema data,
+ std::string factory_name,
+ arrow::compute::ExecNodeOptions& options);
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/filter_benchmark.cc b/cpp/src/arrow/compute/exec/filter_benchmark.cc
new file mode 100644
index 0000000000..64cf307580
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/filter_benchmark.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/exec/benchmark_util.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/record_batch.h"
+#include "arrow/testing/random.h"
+
+namespace arrow {
+namespace compute {
+
+constexpr int64_t kTotalBatchSize = 1000000;
+constexpr auto kSeed = 0x94378165;
+
+// Will return batches of size length, with fields as specified.
+// null_probability controls the likelihood that an element within the batch is null,
+// across all fields. bool_true_probability controls the likelihood that an element
+// belonging to a boolean field is true.
+
+static std::shared_ptr<arrow::RecordBatch> GetBatchesWithNullProbability(
+ const FieldVector& fields, int64_t length, double null_probability,
+ double bool_true_probability = 0.5) {
+ std::vector<std::shared_ptr<Array>> arrays(fields.size());
+ auto rand = random::RandomArrayGenerator(kSeed);
+ for (size_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ if (field.get()->name() == "bool") {
+ arrays[i] = rand.Boolean(length, bool_true_probability, null_probability);
+ } else {
+ arrays[i] = rand.ArrayOf(field.get()->type(), length, null_probability);
+ }
+ }
+ return RecordBatch::Make(schema(fields), length, std::move(arrays));
+}
+
+BatchesWithSchema MakeRandomBatchesWithNullProbability(
+ std::shared_ptr<Schema> schema, int num_batches, int batch_size,
+ double null_probability = 0.5, double bool_true_probability = 0.5) {
+ BatchesWithSchema out;
+ out.batches.resize(num_batches);
+
+ for (int i = 0; i < num_batches; ++i) {
+ out.batches[i] = ExecBatch(*GetBatchesWithNullProbability(
+ schema->fields(), batch_size, null_probability, bool_true_probability));
+ out.batches[i].values.emplace_back(i);
+ }
+ out.schema = std::move(schema);
+ return out;
+}
+
+static void FilterOverhead(benchmark::State& state, std::vector<Expression> expr_vector) {
+ const int32_t batch_size = static_cast<int32_t>(state.range(0));
+ const double null_prob = state.range(1) / 100.0;
+ const double bool_true_probability = state.range(2) / 100.0;
+ const int32_t num_batches = kTotalBatchSize / batch_size;
+
+ arrow::compute::BatchesWithSchema data = MakeRandomBatchesWithNullProbability(
+ schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size,
+ null_prob, bool_true_probability);
+ ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+ std::vector<arrow::compute::Declaration> filter_node_dec;
+ for (Expression expr : expr_vector) {
+ filter_node_dec.push_back({"filter", FilterNodeOptions(expr)});
+ }
+ ASSERT_OK(
+ BenchmarkNodeOverhead(state, ctx, num_batches, batch_size, data, filter_node_dec));
+}
+
+static void FilterOverheadIsolated(benchmark::State& state, Expression expr) {
+ ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+ const int32_t batch_size = static_cast<int32_t>(state.range(0));
+ const int32_t num_batches = kTotalBatchSize / batch_size;
+ arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+ schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+ FilterNodeOptions options = FilterNodeOptions{expr};
+ ASSERT_OK(BenchmarkIsolatedNodeOverhead(state, ctx, expr, num_batches, batch_size, data,
+ "filter", options));
+}
+
+arrow::compute::Expression complex_expression =
+ less(less(field_ref("i64"), literal(20)), greater(field_ref("i64"), literal(0)));
+arrow::compute::Expression simple_expression =
+ less(call("negate", {field_ref("i64")}), literal(0));
+arrow::compute::Expression ref_only_expression = field_ref("bool");
+
+arrow::compute::Expression is_not_null_expression =
+ not_(is_null(field_ref("bool"), false));
+arrow::compute::Expression is_true_expression = equal(field_ref("bool"), literal(true));
+arrow::compute::Expression is_not_null_and_true_expression =
+ and_(is_not_null_expression, is_true_expression);
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+ for (int batch_size = 1000; batch_size <= kTotalBatchSize; batch_size *= 10) {
+ bench->ArgNames({"batch_size", "null_prob", "bool_true_prob"})
+ ->Args({batch_size, 0, 50})
+ ->UseRealTime();
+ }
+}
+
+void SetSelectivityArgs(benchmark::internal::Benchmark* bench) {
+ for (int batch_size = 1000; batch_size <= kTotalBatchSize; batch_size *= 10) {
+ for (double null_prob : {0.1, 0.5, 0.75, 1.0}) {
+ bench->ArgNames({"batch_size", "null_prob", "bool_true_prob"})
+ ->Args({batch_size, static_cast<int64_t>(null_prob * 100.0), 50})
+ ->UseRealTime();
+ }
+ }
+}
+
+void SetMultiPassArgs(benchmark::internal::Benchmark* bench) {
+ for (int batch_size = 1000; batch_size <= kTotalBatchSize; batch_size *= 10) {
+ for (double null_prob : {0.0, 0.25, 0.5, 0.75}) {
+ // we keep the number of selected elements constant for all benchmarks.
+ double bool_true_prob = 0.25 / (1 - null_prob);
+ bench->ArgNames({"batch_size", "null_prob", "bool_true_prob"})
+ ->Args({batch_size, static_cast<int64_t>(null_prob * 100.0),
+ static_cast<int64_t>(bool_true_prob * 100)})
+ ->UseRealTime();
+ }
+ }
+}
+
+BENCHMARK_CAPTURE(FilterOverheadIsolated, complex_expression, complex_expression)
+ ->Apply(SetArgs);
+BENCHMARK_CAPTURE(FilterOverheadIsolated, simple_expression, simple_expression)
+ ->Apply(SetArgs);
+BENCHMARK_CAPTURE(FilterOverheadIsolated, ref_only_expression, ref_only_expression)
+ ->Apply(SetArgs);
+
+BENCHMARK_CAPTURE(FilterOverhead, complex_expression, {complex_expression})
+ ->Apply(SetArgs);
+BENCHMARK_CAPTURE(FilterOverhead, simple_expression, {simple_expression})->Apply(SetArgs);
+BENCHMARK_CAPTURE(FilterOverhead, ref_only_expression, {ref_only_expression})
+ ->Apply(SetArgs);
+
+BENCHMARK_CAPTURE(FilterOverhead, selectivity_benchmark, {is_not_null_expression})
+ ->Apply(SetSelectivityArgs);
+
+BENCHMARK_CAPTURE(FilterOverhead, not_null_to_is_true_multipass_benchmark,
+ {is_not_null_expression, is_true_expression})
+ ->Apply(SetMultiPassArgs);
+BENCHMARK_CAPTURE(FilterOverhead, not_null_and_is_true_singlepass_benchmark,
+ {is_not_null_and_true_expression})
+ ->Apply(SetMultiPassArgs);
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/project_benchmark.cc b/cpp/src/arrow/compute/exec/project_benchmark.cc
index 542239eb62..cb4fdc4ffd 100644
--- a/cpp/src/arrow/compute/exec/project_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/project_benchmark.cc
@@ -22,6 +22,7 @@
#include "arrow/compute/cast.h"
#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/benchmark_util.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/task_util.h"
@@ -44,33 +45,10 @@ static void ProjectionOverhead(benchmark::State& state, Expression expr) {
arrow::compute::BatchesWithSchema data = MakeRandomBatches(
schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
- for (auto _ : state) {
- state.PauseTiming();
- ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
- ExecPlan::Make(&ctx));
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
- ASSERT_OK(Declaration::Sequence(
- {
- {"source",
- SourceNodeOptions{data.schema,
- data.gen(/*parallel=*/true, /*slow=*/false)},
- "custom_source_label"},
- {"project", ProjectNodeOptions{{
- expr,
- }}},
- {"sink", SinkNodeOptions{&sink_gen}, "custom_sink_label"},
- })
- .AddToPlan(plan.get()));
- state.ResumeTiming();
- ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
- }
-
- state.counters["rows_per_second"] = benchmark::Counter(
- static_cast<double>(state.iterations() * num_batches * batch_size),
- benchmark::Counter::kIsRate);
-
- state.counters["batches_per_second"] = benchmark::Counter(
- static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+ std::vector<arrow::compute::Declaration> project_node_dec = {
+ {"project", ProjectNodeOptions{{expr}}}};
+ ASSERT_OK(
+ BenchmarkNodeOverhead(state, ctx, num_batches, batch_size, data, project_node_dec));
}
static void ProjectionOverheadIsolated(benchmark::State& state, Expression expr) {
@@ -80,68 +58,9 @@ static void ProjectionOverheadIsolated(benchmark::State& state, Expression expr)
arrow::compute::BatchesWithSchema data = MakeRandomBatches(
schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
- for (auto _ : state) {
- state.PauseTiming();
-
- AsyncGenerator<util::optional<ExecBatch>> sink_gen;
- ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
- ExecPlan::Make(&ctx));
- // Source and sink nodes have no effect on the benchmark.
- // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
- ASSERT_OK_AND_ASSIGN(
- arrow::compute::ExecNode * source_node,
- MakeExecNode("source", plan.get(), {},
- SourceNodeOptions{data.schema, data.gen(/*parallel=*/true,
- /*slow=*/false)}));
- ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * project_node,
- MakeExecNode("project", plan.get(), {source_node},
- ProjectNodeOptions{{
- expr,
- }}));
- ASSERT_OK(
- MakeExecNode("sink", plan.get(), {project_node}, SinkNodeOptions{&sink_gen}));
-
- std::unique_ptr<arrow::compute::TaskScheduler> scheduler = TaskScheduler::Make();
- std::condition_variable all_tasks_finished_cv;
- std::mutex mutex;
- int task_group_id = scheduler->RegisterTaskGroup(
- [&](size_t thread_id, int64_t task_id) {
- project_node->InputReceived(source_node, data.batches[task_id]);
- return Status::OK();
- },
- [&](size_t thread_id) {
- project_node->InputFinished(source_node, static_cast<int>(data.batches.size()));
- std::unique_lock<std::mutex> lk(mutex);
- all_tasks_finished_cv.notify_one();
- return Status::OK();
- });
- scheduler->RegisterEnd();
- ThreadIndexer thread_indexer;
-
- state.ResumeTiming();
- arrow::internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool();
- ASSERT_OK(scheduler->StartScheduling(
- thread_indexer(),
- [&](std::function<Status(size_t)> task) -> Status {
- return thread_pool->Spawn([&, task]() {
- size_t tid = thread_indexer();
- ARROW_DCHECK_OK(task(tid));
- });
- },
- thread_pool->GetCapacity(),
- /*use_sync_execution=*/false));
- std::unique_lock<std::mutex> lk(mutex);
- ASSERT_OK(scheduler->StartTaskGroup(thread_indexer(), task_group_id, num_batches));
- all_tasks_finished_cv.wait(lk);
- ASSERT_TRUE(project_node->finished().is_finished());
- }
-
- state.counters["rows_per_second"] = benchmark::Counter(
- static_cast<double>(state.iterations() * num_batches * batch_size),
- benchmark::Counter::kIsRate);
-
- state.counters["batches_per_second"] = benchmark::Counter(
- static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+ ProjectNodeOptions options = ProjectNodeOptions{{expr}};
+ ASSERT_OK(BenchmarkIsolatedNodeOverhead(state, ctx, expr, num_batches, batch_size, data,
+ "project", options));
}
arrow::compute::Expression complex_expression =