You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/10 17:48:32 UTC

[GitHub] [arrow] iChauster opened a new pull request, #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

iChauster opened a new pull request, #13366:
URL: https://github.com/apache/arrow/pull/13366

   Add `filter_benchmark.cc` and add to `arrow/compute/exec/CMakeLists`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1152598215

   :warning: Ticket **has not been started in JIRA**, please click 'Start Progress'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1163390116

   > > For a filtering operation I think there is an extra parameter which is the selectivity (what percentage of rows are kept). I think it would be valuable to add that as a parameter but it would make test data generation more complicated.
   > 
   > @westonpace one idea I had to benchmark selectivity is perhaps using the existing 'null_percent/proportion' generators we already have, and then using `is_null` as the filter. Let me know if you think that would be the right approach.
   
   After thinking some more about it, maybe we should use some numerical array with uniform distribution, which will allow us to test multiple filter passes. I think with the other approach, we only can get two (filter out nulls, then filter out by true / false).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r899691871


##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+
+static void FilterOverhead(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/true, /*slow=*/false)},
+                       "custom_source_label"},
+                      {"filter",
+                       FilterNodeOptions{
+                           expr,
+                       }},
+                      {"sink", SinkNodeOptions{&sink_gen}, "custom_sink_label"},
+                  })
+                  .AddToPlan(plan.get()));
+    state.ResumeTiming();
+    ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+static void FilterOverheadIsolated(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    // Source and sink nodes have no effect on the benchmark.
+    // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
+    ASSERT_OK_AND_ASSIGN(
+        arrow::compute::ExecNode * source_node,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions{data.schema, data.gen(/*parallel=*/true,
+                                                             /*slow=*/false)}));
+    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * project_node,
+                         MakeExecNode("filter", plan.get(), {source_node},
+                                      FilterNodeOptions{
+                                          expr,
+                                      }));
+    MakeExecNode("sink", plan.get(), {project_node}, SinkNodeOptions{&sink_gen});
+
+    std::unique_ptr<arrow::compute::TaskScheduler> scheduler = TaskScheduler::Make();
+    std::condition_variable all_tasks_finished_cv;
+    std::mutex mutex;
+    int task_group_id = scheduler->RegisterTaskGroup(
+        [&](size_t thread_id, int64_t task_id) {
+          project_node->InputReceived(source_node, data.batches[task_id]);
+          return Status::OK();
+        },
+        [&](size_t thread_id) {
+          project_node->InputFinished(source_node, static_cast<int>(data.batches.size()));
+          std::unique_lock<std::mutex> lk(mutex);
+          all_tasks_finished_cv.notify_one();
+          return Status::OK();
+        });
+    scheduler->RegisterEnd();
+    ThreadIndexer thread_indexer;
+
+    state.ResumeTiming();
+    arrow::internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool();
+    ASSERT_OK(scheduler->StartScheduling(
+        thread_indexer(),
+        [&](std::function<Status(size_t)> task) -> Status {
+          return thread_pool->Spawn([&, task]() {
+            size_t tid = thread_indexer();
+            ARROW_DCHECK_OK(task(tid));
+          });
+        },
+        thread_pool->GetCapacity(),
+        /*use_sync_execution=*/false));
+    std::unique_lock<std::mutex> lk(mutex);
+    ASSERT_OK(scheduler->StartTaskGroup(thread_indexer(), task_group_id, num_batches));
+    all_tasks_finished_cv.wait(lk);
+    ASSERT_TRUE(project_node->finished().is_finished());
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+arrow::compute::Expression complex_expression =
+    less(less(field_ref("i64"), literal(20)), greater(field_ref("i64"), literal(0)));
+arrow::compute::Expression simple_expression =
+    less(call("negate", {field_ref("i64")}), literal(0));
+arrow::compute::Expression zero_copy_expression = is_valid((call(
+    "cast", {field_ref("i64")}, compute::CastOptions::Safe(timestamp(TimeUnit::NANO)))));
+arrow::compute::Expression ref_only_expression = less(field_ref("i64"), literal(0));

Review Comment:
   I think that's fine.  I'm not sure `ref_only_expression` is the same thing though?  Perhaps `ref_only_expressions` could be something like `field_ref("bool")` where `bool` is a randomly generated boolean array?



##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+
+static void FilterOverhead(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/true, /*slow=*/false)},
+                       "custom_source_label"},
+                      {"filter",
+                       FilterNodeOptions{
+                           expr,
+                       }},
+                      {"sink", SinkNodeOptions{&sink_gen}, "custom_sink_label"},
+                  })
+                  .AddToPlan(plan.get()));
+    state.ResumeTiming();
+    ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+static void FilterOverheadIsolated(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    // Source and sink nodes have no effect on the benchmark.
+    // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
+    ASSERT_OK_AND_ASSIGN(
+        arrow::compute::ExecNode * source_node,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions{data.schema, data.gen(/*parallel=*/true,
+                                                             /*slow=*/false)}));
+    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * project_node,
+                         MakeExecNode("filter", plan.get(), {source_node},
+                                      FilterNodeOptions{
+                                          expr,
+                                      }));
+    MakeExecNode("sink", plan.get(), {project_node}, SinkNodeOptions{&sink_gen});
+
+    std::unique_ptr<arrow::compute::TaskScheduler> scheduler = TaskScheduler::Make();
+    std::condition_variable all_tasks_finished_cv;
+    std::mutex mutex;
+    int task_group_id = scheduler->RegisterTaskGroup(
+        [&](size_t thread_id, int64_t task_id) {
+          project_node->InputReceived(source_node, data.batches[task_id]);
+          return Status::OK();
+        },
+        [&](size_t thread_id) {
+          project_node->InputFinished(source_node, static_cast<int>(data.batches.size()));
+          std::unique_lock<std::mutex> lk(mutex);
+          all_tasks_finished_cv.notify_one();
+          return Status::OK();
+        });
+    scheduler->RegisterEnd();
+    ThreadIndexer thread_indexer;
+
+    state.ResumeTiming();
+    arrow::internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool();
+    ASSERT_OK(scheduler->StartScheduling(
+        thread_indexer(),
+        [&](std::function<Status(size_t)> task) -> Status {
+          return thread_pool->Spawn([&, task]() {
+            size_t tid = thread_indexer();
+            ARROW_DCHECK_OK(task(tid));
+          });
+        },
+        thread_pool->GetCapacity(),
+        /*use_sync_execution=*/false));
+    std::unique_lock<std::mutex> lk(mutex);
+    ASSERT_OK(scheduler->StartTaskGroup(thread_indexer(), task_group_id, num_batches));
+    all_tasks_finished_cv.wait(lk);
+    ASSERT_TRUE(project_node->finished().is_finished());
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+arrow::compute::Expression complex_expression =
+    less(less(field_ref("i64"), literal(20)), greater(field_ref("i64"), literal(0)));
+arrow::compute::Expression simple_expression =
+    less(call("negate", {field_ref("i64")}), literal(0));
+arrow::compute::Expression zero_copy_expression = is_valid((call(
+    "cast", {field_ref("i64")}, compute::CastOptions::Safe(timestamp(TimeUnit::NANO)))));

Review Comment:
   I don't think we need zero copy for the filter benchmark.  We can probably just get rid of this one.



##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+
+static void FilterOverhead(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/true, /*slow=*/false)},
+                       "custom_source_label"},
+                      {"filter",
+                       FilterNodeOptions{
+                           expr,
+                       }},
+                      {"sink", SinkNodeOptions{&sink_gen}, "custom_sink_label"},
+                  })
+                  .AddToPlan(plan.get()));
+    state.ResumeTiming();
+    ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+static void FilterOverheadIsolated(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    // Source and sink nodes have no effect on the benchmark.
+    // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
+    ASSERT_OK_AND_ASSIGN(
+        arrow::compute::ExecNode * source_node,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions{data.schema, data.gen(/*parallel=*/true,
+                                                             /*slow=*/false)}));
+    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * project_node,

Review Comment:
   ```suggestion
       ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * filter_node,
   ```
   
   Alternatively, we might want to see if we can create a benchmark_util.cc/h that we can use to share some of this logic that will be identical between different benchmarks.  Though maybe filter & project are the only ones simple enough to share the same basic structure.
   
   Also, long term, I think the `XyzIsolated` benchmarks will be less useful, as we address the scheduling issues, so maybe we don't need to spend too much time optimizing the case.  Still, it would be kind of nice to have less code that feels copy/pasted between the benchmarks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r904166211


##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/record_batch.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/type.h"
+#include "arrow/util/benchmark_util.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+constexpr auto kSeed = 0x94378165;
+
+static std::shared_ptr<arrow::RecordBatch> GetBatchesWithNullProbability(
+    const FieldVector& fields, int64_t length, double null_probability,
+    double bool_true_probability = 0.5) {
+  std::vector<std::shared_ptr<Array>> arrays(fields.size());
+  auto rand = random::RandomArrayGenerator(kSeed);
+  for (size_t i = 0; i < fields.size(); i++) {
+    const auto& field = fields[i];
+    if (field.get()->name() == "bool") {
+      arrays[i] = rand.Boolean(length, bool_true_probability, null_probability);
+    } else {

Review Comment:
   Because `ArrayOf` doesn't given us too much control over the array of the datatype, we need to add this here... This makes it a bit more difficult to generalize as a utility function that can be used elsewhere -- any ideas to resolve this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1169377521

   Just a few minor nits.  This looks pretty good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1169981636

   Hi @westonpace, was wondering if you could publish the changes you requested -- I don't think I can see them yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1156462898

   Hi @westonpace, let me know if you've gotten a chance to take a look at this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r910249487


##########
cpp/src/arrow/util/benchmark_util.h:
##########
@@ -15,10 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <condition_variable>
+#include <mutex>

Review Comment:
   Ah, this is going to be a problem.  We can't include `mutex` in public headers due to a Windows compatibility issue.  Can you create a `benchmark_util.cc` and move the definition of these methods there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1152598209

   https://issues.apache.org/jira/browse/ARROW-16809


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r894772961


##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+
+static void FilterOverhead(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/true, /*slow=*/false)},
+                       "custom_source_label"},
+                      {"filter",
+                       FilterNodeOptions{
+                           expr,
+                       }},
+                      {"sink", SinkNodeOptions{&sink_gen}, "custom_sink_label"},
+                  })
+                  .AddToPlan(plan.get()));
+    state.ResumeTiming();
+    ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+static void FilterOverheadIsolated(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    // Source and sink nodes have no effect on the benchmark.
+    // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
+    ASSERT_OK_AND_ASSIGN(
+        arrow::compute::ExecNode * source_node,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions{data.schema, data.gen(/*parallel=*/true,
+                                                             /*slow=*/false)}));
+    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * project_node,
+                         MakeExecNode("filter", plan.get(), {source_node},
+                                      FilterNodeOptions{
+                                          expr,
+                                      }));
+    MakeExecNode("sink", plan.get(), {project_node}, SinkNodeOptions{&sink_gen});
+
+    std::unique_ptr<arrow::compute::TaskScheduler> scheduler = TaskScheduler::Make();
+    std::condition_variable all_tasks_finished_cv;
+    std::mutex mutex;
+    int task_group_id = scheduler->RegisterTaskGroup(
+        [&](size_t thread_id, int64_t task_id) {
+          project_node->InputReceived(source_node, data.batches[task_id]);
+          return Status::OK();
+        },
+        [&](size_t thread_id) {
+          project_node->InputFinished(source_node, static_cast<int>(data.batches.size()));
+          std::unique_lock<std::mutex> lk(mutex);
+          all_tasks_finished_cv.notify_one();
+          return Status::OK();
+        });
+    scheduler->RegisterEnd();
+    ThreadIndexer thread_indexer;
+
+    state.ResumeTiming();
+    arrow::internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool();
+    ASSERT_OK(scheduler->StartScheduling(
+        thread_indexer(),
+        [&](std::function<Status(size_t)> task) -> Status {
+          return thread_pool->Spawn([&, task]() {
+            size_t tid = thread_indexer();
+            ARROW_DCHECK_OK(task(tid));
+          });
+        },
+        thread_pool->GetCapacity(),
+        /*use_sync_execution=*/false));
+    std::unique_lock<std::mutex> lk(mutex);
+    ASSERT_OK(scheduler->StartTaskGroup(thread_indexer(), task_group_id, num_batches));
+    all_tasks_finished_cv.wait(lk);
+    ASSERT_TRUE(project_node->finished().is_finished());
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+arrow::compute::Expression complex_expression =
+    less(less(field_ref("i64"), literal(20)), greater(field_ref("i64"), literal(0)));
+arrow::compute::Expression simple_expression =
+    less(call("negate", {field_ref("i64")}), literal(0));
+arrow::compute::Expression zero_copy_expression = is_valid((call(
+    "cast", {field_ref("i64")}, compute::CastOptions::Safe(timestamp(TimeUnit::NANO)))));
+arrow::compute::Expression ref_only_expression = less(field_ref("i64"), literal(0));

Review Comment:
   I'm not sure how we want to express the complexity of filter expressions, so I used a similar approach to `ExpressionOverhead` / `ProjectBenchmarks`. `zero_copy` seems to perform a bit better than the others because it uses `is_valid` (I cannot do the less operation on timestamps). 
   
   Is there something I can change to make these more consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1176288462

   @westonpace, that looks great, thanks for the help! I can run all the benchmarks on my system as well -- I think this is ready to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r915306761


##########
cpp/src/arrow/util/benchmark_util.h:
##########
@@ -19,9 +19,8 @@
 #include <cstdint>

Review Comment:
   Can you revert the changes to this file?
   
   * We like to keep 3rd party includes in their own group and not clustered with arrow includes.
   * Closing namespaces don't need the `;`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1179423462

   The build failure appears unrelated.  I will merge.  Thanks again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1152601282

   Hi @westonpace, this is a similar PR to #13314, just switched out the `ProjectNode` for a `FilterNode`. 
   
   I had some questions about the expression complexity and if we would want to adjust that for these benchmarks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r915875396


##########
cpp/src/arrow/util/benchmark_util.h:
##########
@@ -19,9 +19,8 @@
 #include <cstdint>

Review Comment:
   Sorry about that! Should be good here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r914033572


##########
cpp/src/arrow/CMakeLists.txt:
##########
@@ -193,6 +193,7 @@ set(ARROW_SRCS
     io/transform.cc
     util/async_util.cc
     util/basic_decimal.cc
+    # util/benchmark_util.cc

Review Comment:
   Why is this commented out?  If I uncomment this line then everything compiles on my system.  Can you expand a bit more on the error you were getting when you included this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1163546329

   @westonpace , I think this is ready for another round of review. I've packaged some of the helpful benchmarking code into `benchmark_util.h`, and have expanded our filter benchmarks. Given some of your notes, most of these have been on the `FilterOverhead` version, which uses the source, node, and sink.
   
   Regarding the multi-pass, there are two parameters: `null probability` (the probability that some element in the batch is null), and `bool_true_probability` (the probability that the boolean in the array is true). 
   
   Here are some of my notes:
   - The expressions we are using seem to have far less impact on throughput as compared to projections. All results are around the same magnitude, ~100M rows/s.
   - Interestingly, selectivity can cause some big performance differences, possibly up to 50x speedup, reaching 2G rows/s.
   - Because of this, it may actually be advantageous in some cases for multi-pass filter operations. 
   > FilterOverhead/not_null_to_is_true_multipass_benchmark/batch_size:100000/null_prob:100/bool_true_prob:25/real_time        400338 ns        56020 ns         1819 batches_per_second=24.9789k/s rows_per_second=2.49789G/s
   > FilterOverhead/not_null_and_is_true_singlepass_benchmark/batch_size:100000/null_prob:100/bool_true_prob:25/real_time     1226684 ns        78326 ns          581 batches_per_second=8.15206k/s rows_per_second=815.206M/s
   This is probably because the first pass (checking for not null) really shrinks the table, making the second pass (checking for truth) fairly quicker. One caveat is that the passes have to be in the correct order. For example, if we instead check for truth first, and then not null, we do not observe the speedup; but our performance roughly matches single pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r910402222


##########
cpp/src/arrow/util/benchmark_util.h:
##########
@@ -15,10 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <condition_variable>
+#include <mutex>

Review Comment:
   @westonpace , sorry if this is trivial, but I'm having some trouble here.
   
   I committed my attempt -- it seems like the compiler is having trouble finding third party dependencies imported in `benchmark_util.h` (like benchmarks, gtest) after including `benchmark_util.cc` in the `src/arrow/CMakeLists.txt`. (the edit in cmake is currently commented out)
   
   I'm not too familiar with how to specify that those are needed in `CMake`, and if that would be the right solution.
   
   Let me know if I'm missing a step!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r910402222


##########
cpp/src/arrow/util/benchmark_util.h:
##########
@@ -15,10 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <condition_variable>
+#include <mutex>

Review Comment:
   @westonpace , sorry if this is trivial, but I'm having some trouble here.
   
   I committed my attempt -- it seems like the compiler is having trouble finding third party dependencies imported in `benchmark_util.h` (like benchmarks, gtest) after including `benchmark_util.cc` in the `src/arrow/CMakeLists.txt`.
   
   Let me know if I'm missing a step.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace merged pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace merged PR #13366:
URL: https://github.com/apache/arrow/pull/13366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1162283321

   > For a filtering operation I think there is an extra parameter which is the selectivity (what percentage of rows are kept). I think it would be valuable to add that as a parameter but it would make test data generation more complicated.
   
   @westonpace one idea I had to benchmark selectivity is perhaps using the existing 'null_percent/proportion' generators we already have, and then using `is_null` as the filter. Let me know if you think that would be the right approach. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r894772961


##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+
+static void FilterOverhead(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/true, /*slow=*/false)},
+                       "custom_source_label"},
+                      {"filter",
+                       FilterNodeOptions{
+                           expr,
+                       }},
+                      {"sink", SinkNodeOptions{&sink_gen}, "custom_sink_label"},
+                  })
+                  .AddToPlan(plan.get()));
+    state.ResumeTiming();
+    ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+static void FilterOverheadIsolated(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    // Source and sink nodes have no effect on the benchmark.
+    // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
+    ASSERT_OK_AND_ASSIGN(
+        arrow::compute::ExecNode * source_node,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions{data.schema, data.gen(/*parallel=*/true,
+                                                             /*slow=*/false)}));
+    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * project_node,
+                         MakeExecNode("filter", plan.get(), {source_node},
+                                      FilterNodeOptions{
+                                          expr,
+                                      }));
+    MakeExecNode("sink", plan.get(), {project_node}, SinkNodeOptions{&sink_gen});
+
+    std::unique_ptr<arrow::compute::TaskScheduler> scheduler = TaskScheduler::Make();
+    std::condition_variable all_tasks_finished_cv;
+    std::mutex mutex;
+    int task_group_id = scheduler->RegisterTaskGroup(
+        [&](size_t thread_id, int64_t task_id) {
+          project_node->InputReceived(source_node, data.batches[task_id]);
+          return Status::OK();
+        },
+        [&](size_t thread_id) {
+          project_node->InputFinished(source_node, static_cast<int>(data.batches.size()));
+          std::unique_lock<std::mutex> lk(mutex);
+          all_tasks_finished_cv.notify_one();
+          return Status::OK();
+        });
+    scheduler->RegisterEnd();
+    ThreadIndexer thread_indexer;
+
+    state.ResumeTiming();
+    arrow::internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool();
+    ASSERT_OK(scheduler->StartScheduling(
+        thread_indexer(),
+        [&](std::function<Status(size_t)> task) -> Status {
+          return thread_pool->Spawn([&, task]() {
+            size_t tid = thread_indexer();
+            ARROW_DCHECK_OK(task(tid));
+          });
+        },
+        thread_pool->GetCapacity(),
+        /*use_sync_execution=*/false));
+    std::unique_lock<std::mutex> lk(mutex);
+    ASSERT_OK(scheduler->StartTaskGroup(thread_indexer(), task_group_id, num_batches));
+    all_tasks_finished_cv.wait(lk);
+    ASSERT_TRUE(project_node->finished().is_finished());
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+arrow::compute::Expression complex_expression =
+    less(less(field_ref("i64"), literal(20)), greater(field_ref("i64"), literal(0)));
+arrow::compute::Expression simple_expression =
+    less(call("negate", {field_ref("i64")}), literal(0));
+arrow::compute::Expression zero_copy_expression = is_valid((call(
+    "cast", {field_ref("i64")}, compute::CastOptions::Safe(timestamp(TimeUnit::NANO)))));
+arrow::compute::Expression ref_only_expression = less(field_ref("i64"), literal(0));

Review Comment:
   I'm not sure how we want to express the complexity of filter expressions, so I used a similar approach to `ExpressionOverhead` / `ProjectBenchmarks`. `zero_copy` seems to perform a bit better than the others because it uses `is_valid` (Not sure why the less operation isn't working on timestamps). 
   
   Is there something I can change to make these more consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1175595939

   How about this: https://github.com/apache/arrow/commit/f7109966366f389ba3bace571aa55571f70b2aec
   
   * moves the benchmark utilities for exec plans into a new benchmark_util that is in the compute/exec folder.  This change isn't strictly necessary but helps keep the code separated.
   * removes any dependency on googletest and instead changes the functions to return a status.  Then only the benchmark needs to depend on googletest (and can do ASSERT_OK in one spot).
   * changes the build slightly so that the benchmark_util.cc file is only built when building benchmarks so that this code doesn't get included with the libarrow.so (or libarrow_compute.so) files.
   
   Locally, tests seem to pass on my fork.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r904131551


##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+
+static void FilterOverhead(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source",
+                       SourceNodeOptions{data.schema,
+                                         data.gen(/*parallel=*/true, /*slow=*/false)},
+                       "custom_source_label"},
+                      {"filter",
+                       FilterNodeOptions{
+                           expr,
+                       }},
+                      {"sink", SinkNodeOptions{&sink_gen}, "custom_sink_label"},
+                  })
+                  .AddToPlan(plan.get()));
+    state.ResumeTiming();
+    ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+static void FilterOverheadIsolated(benchmark::State& state, Expression expr) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  for (auto _ : state) {
+    state.PauseTiming();
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         ExecPlan::Make(&ctx));
+    // Source and sink nodes have no effect on the benchmark.
+    // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
+    ASSERT_OK_AND_ASSIGN(
+        arrow::compute::ExecNode * source_node,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions{data.schema, data.gen(/*parallel=*/true,
+                                                             /*slow=*/false)}));
+    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * project_node,

Review Comment:
   Moved most of the logic into `benchmark_util.h`, reflected in `project_benchmark.cc` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1167652804

   Hi @westonpace, let me know if you've gotten a chance to look at this!
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1170274922

   Excellent, if there's nothing else I think this is ready to go @westonpace!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r910402222


##########
cpp/src/arrow/util/benchmark_util.h:
##########
@@ -15,10 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <condition_variable>
+#include <mutex>

Review Comment:
   @westonpace , sorry if this is trivial, but I'm having some trouble here.
   
   I committed some changes, but it seems like the compiler is having trouble finding third party dependencies imported in `benchmark_util.h` (like benchmarks, gtest) after including `benchmark_util.cc` in the `src/arrow/CMakeLists.txt`. Let me know if I'm missing a step.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r909068110


##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/record_batch.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/type.h"
+#include "arrow/util/benchmark_util.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+constexpr auto kSeed = 0x94378165;
+
+/*
+    Will return batches of size length, with fields as specified.
+    null_probability controls the likelihood that an element within the batch is null,
+   across all fields. bool_true_probability controls the likelihood that an element
+   belonging to a boolean field is true.
+*/
+static std::shared_ptr<arrow::RecordBatch> GetBatchesWithNullProbability(
+    const FieldVector& fields, int64_t length, double null_probability,
+    double bool_true_probability = 0.5) {
+  std::vector<std::shared_ptr<Array>> arrays(fields.size());
+  auto rand = random::RandomArrayGenerator(kSeed);
+  for (size_t i = 0; i < fields.size(); i++) {
+    const auto& field = fields[i];
+    if (field.get()->name() == "bool") {
+      arrays[i] = rand.Boolean(length, bool_true_probability, null_probability);
+    } else {
+      arrays[i] = rand.ArrayOf(field.get()->type(), length, null_probability);
+    }
+  }
+  return RecordBatch::Make(schema(fields), length, std::move(arrays));
+}
+
+BatchesWithSchema MakeRandomBatchesWithNullProbability(
+    std::shared_ptr<Schema> schema, int num_batches, int batch_size,
+    double null_probability = 0.5, double bool_true_probability = 0.5) {
+  BatchesWithSchema out;
+  out.batches.resize(num_batches);
+
+  for (int i = 0; i < num_batches; ++i) {
+    out.batches[i] = ExecBatch(*GetBatchesWithNullProbability(
+        schema->fields(), batch_size, null_probability, bool_true_probability));
+    out.batches[i].values.emplace_back(i);
+  }
+  out.schema = schema;
+  return out;
+}
+
+static void FilterOverhead(benchmark::State& state, std::vector<Expression> expr_vector) {
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const double null_prob = state.range(1) / 100.0;
+  const double bool_true_probability = state.range(2) / 100.0;
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+
+  arrow::compute::BatchesWithSchema data = MakeRandomBatchesWithNullProbability(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size,
+      null_prob, bool_true_probability);
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  std::vector<arrow::compute::Declaration> filter_node_dec;
+  for (Expression expr : expr_vector) {
+    filter_node_dec.push_back({"filter", FilterNodeOptions(expr)});
+  }
+  BenchmarkNodeOverhead(state, ctx, num_batches, batch_size, data, filter_node_dec);
+}
+
+static void FilterOverheadIsolated(benchmark::State& state, Expression expr) {
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  const int32_t batch_size = static_cast<int32_t>(state.range(0));
+  const int32_t num_batches = kTotalBatchSize / batch_size;
+  arrow::compute::BatchesWithSchema data = MakeRandomBatches(
+      schema({field("i64", int64()), field("bool", boolean())}), num_batches, batch_size);
+  FilterNodeOptions options = FilterNodeOptions{expr};
+  BenchmarkIsolatedNodeOverhead(state, ctx, expr, num_batches, batch_size, data, "filter",
+                                options);
+}
+
+arrow::compute::Expression complex_expression =
+    less(less(field_ref("i64"), literal(20)), greater(field_ref("i64"), literal(0)));
+arrow::compute::Expression simple_expression =
+    less(call("negate", {field_ref("i64")}), literal(0));
+arrow::compute::Expression ref_only_expression = field_ref("bool");
+
+arrow::compute::Expression is_not_null_expression =
+    not_(is_null(field_ref("bool"), false));
+arrow::compute::Expression is_true_expression = equal(field_ref("bool"), literal(true));
+arrow::compute::Expression is_not_null_and_true_expression =
+    and_(is_not_null_expression, is_true_expression);
+
+void SetArgs(benchmark::internal::Benchmark* bench) {
+  for (int batch_size = 1000; batch_size <= kTotalBatchSize; batch_size *= 10) {
+    bench->ArgNames({"batch_size", "null_prob", "bool_true_prob"})
+        ->Args({batch_size, 0, 50})
+        ->UseRealTime();
+  }
+}
+
+void SelectivityArgs(benchmark::internal::Benchmark* bench) {

Review Comment:
   Nit: `SetSelectivityArgs`



##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/record_batch.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/type.h"
+#include "arrow/util/benchmark_util.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+constexpr auto kSeed = 0x94378165;

Review Comment:
   These should probably both be static or neither be static.  For a benchmark file I don't think it really matters (so I'd probably just drop the `static`).



##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/record_batch.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/type.h"
+#include "arrow/util/benchmark_util.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+constexpr auto kSeed = 0x94378165;
+
+/*
+    Will return batches of size length, with fields as specified.
+    null_probability controls the likelihood that an element within the batch is null,
+   across all fields. bool_true_probability controls the likelihood that an element
+   belonging to a boolean field is true.
+*/
+static std::shared_ptr<arrow::RecordBatch> GetBatchesWithNullProbability(
+    const FieldVector& fields, int64_t length, double null_probability,
+    double bool_true_probability = 0.5) {
+  std::vector<std::shared_ptr<Array>> arrays(fields.size());
+  auto rand = random::RandomArrayGenerator(kSeed);
+  for (size_t i = 0; i < fields.size(); i++) {
+    const auto& field = fields[i];
+    if (field.get()->name() == "bool") {
+      arrays[i] = rand.Boolean(length, bool_true_probability, null_probability);
+    } else {
+      arrays[i] = rand.ArrayOf(field.get()->type(), length, null_probability);
+    }
+  }
+  return RecordBatch::Make(schema(fields), length, std::move(arrays));
+}
+
+BatchesWithSchema MakeRandomBatchesWithNullProbability(
+    std::shared_ptr<Schema> schema, int num_batches, int batch_size,
+    double null_probability = 0.5, double bool_true_probability = 0.5) {
+  BatchesWithSchema out;
+  out.batches.resize(num_batches);
+
+  for (int i = 0; i < num_batches; ++i) {
+    out.batches[i] = ExecBatch(*GetBatchesWithNullProbability(
+        schema->fields(), batch_size, null_probability, bool_true_probability));
+    out.batches[i].values.emplace_back(i);
+  }
+  out.schema = schema;

Review Comment:
   ```suggestion
     out.schema = std::move(schema);
   ```



##########
cpp/src/arrow/util/benchmark_util.h:
##########
@@ -135,4 +149,114 @@ struct RegressionArgs {
   bool size_is_bytes_;
 };
 
-}  // namespace arrow
+/*
+  Generates batches from data, then benchmark rows_per_second and batches_per_second for
+  an isolated node. We do this by passing in batches through a task scheduler, and calling
+  InputFinished and InputReceived.
+*/

Review Comment:
   Prefer `//` over `/*`



##########
cpp/src/arrow/compute/exec/filter_benchmark.cc:
##########
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <condition_variable>
+#include <mutex>
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/expression.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/dataset/partition.h"
+#include "arrow/record_batch.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/type.h"
+#include "arrow/util/benchmark_util.h"
+
+namespace arrow {
+namespace compute {
+
+static constexpr int64_t kTotalBatchSize = 1000000;
+constexpr auto kSeed = 0x94378165;
+
+/*
+    Will return batches of size length, with fields as specified.
+    null_probability controls the likelihood that an element within the batch is null,
+   across all fields. bool_true_probability controls the likelihood that an element
+   belonging to a boolean field is true.
+*/

Review Comment:
   Can you use `//` for internal comments



##########
cpp/src/arrow/util/benchmark_util.h:
##########
@@ -135,4 +149,114 @@ struct RegressionArgs {
   bool size_is_bytes_;
 };
 
-}  // namespace arrow
+/*
+  Generates batches from data, then benchmark rows_per_second and batches_per_second for
+  an isolated node. We do this by passing in batches through a task scheduler, and calling
+  InputFinished and InputReceived.
+*/
+void BenchmarkIsolatedNodeOverhead(benchmark::State& state,
+                                   arrow::compute::ExecContext ctx,
+                                   arrow::compute::Expression expr, int32_t num_batches,
+                                   int32_t batch_size,
+                                   arrow::compute::BatchesWithSchema data,
+                                   std::string factory_name,
+                                   arrow::compute::ExecNodeOptions& options) {
+  for (auto _ : state) {
+    state.PauseTiming();
+    AsyncGenerator<util::optional<arrow::compute::ExecBatch>> sink_gen;
+
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
+                         arrow::compute::ExecPlan::Make(&ctx));
+    // Source and sink nodes have no effect on the benchmark.
+    // Used for dummy purposes as they are referenced in InputReceived and InputFinished.
+    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * source_node,
+                         MakeExecNode("source", plan.get(), {},
+                                      arrow::compute::SourceNodeOptions{
+                                          data.schema, data.gen(/*parallel=*/true,
+                                                                /*slow=*/false)}));
+
+    ASSERT_OK_AND_ASSIGN(arrow::compute::ExecNode * node,
+                         MakeExecNode(factory_name, plan.get(), {source_node}, options));
+    MakeExecNode("sink", plan.get(), {node}, arrow::compute::SinkNodeOptions{&sink_gen});
+
+    std::unique_ptr<arrow::compute::TaskScheduler> scheduler =
+        arrow::compute::TaskScheduler::Make();
+    std::condition_variable all_tasks_finished_cv;
+    std::mutex mutex;
+
+    int task_group_id = scheduler->RegisterTaskGroup(
+        [&](size_t thread_id, int64_t task_id) {
+          node->InputReceived(source_node, data.batches[task_id]);
+          return Status::OK();
+        },
+        [&](size_t thread_id) {
+          node->InputFinished(source_node, static_cast<int>(data.batches.size()));
+          std::unique_lock<std::mutex> lk(mutex);
+          all_tasks_finished_cv.notify_one();
+          return Status::OK();
+        });
+    scheduler->RegisterEnd();
+
+    arrow::compute::ThreadIndexer thread_indexer;
+
+    state.ResumeTiming();
+    arrow::internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool();
+    ASSERT_OK(scheduler->StartScheduling(
+        thread_indexer(),
+        [&](std::function<Status(size_t)> task) -> Status {
+          return thread_pool->Spawn([&, task]() {
+            size_t tid = thread_indexer();
+            ARROW_DCHECK_OK(task(tid));
+          });
+        },
+        thread_pool->GetCapacity(),
+        /*use_sync_execution=*/false));
+    std::unique_lock<std::mutex> lk(mutex);
+    ASSERT_OK(scheduler->StartTaskGroup(thread_indexer(), task_group_id, num_batches));
+    all_tasks_finished_cv.wait(lk);
+    ASSERT_TRUE(node->finished().is_finished());
+  }
+  state.counters["rows_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches * batch_size),
+      benchmark::Counter::kIsRate);
+
+  state.counters["batches_per_second"] = benchmark::Counter(
+      static_cast<double>(state.iterations() * num_batches), benchmark::Counter::kIsRate);
+}
+
+/*

Review Comment:
   Prefer `//` over `/*`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #13366:
URL: https://github.com/apache/arrow/pull/13366#issuecomment-1170211854

   > Hi @westonpace, was wondering if you could publish the changes you requested -- I don't think I can see them yet.
   
   @iChauster Done, sorry :facepalm: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] iChauster commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
iChauster commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r913799935


##########
cpp/src/arrow/util/benchmark_util.h:
##########
@@ -15,10 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <condition_variable>
+#include <mutex>

Review Comment:
   Hi @westonpace, have you had a chance to take a look at this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13366: ARROW-16809: [C++] Add Benchmarks for FilterNode

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13366:
URL: https://github.com/apache/arrow/pull/13366#discussion_r914034008


##########
cpp/src/arrow/CMakeLists.txt:
##########
@@ -193,6 +193,7 @@ set(ARROW_SRCS
     io/transform.cc
     util/async_util.cc
     util/basic_decimal.cc
+    # util/benchmark_util.cc

Review Comment:
   Ah, nevermind, I think I see what you mean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org