You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/26 17:19:27 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #12289: ARROW-15498: [C++][Compute] Implement Bloom filter pushdown between hash joins

westonpace commented on code in PR #12289:
URL: https://github.com/apache/arrow/pull/12289#discussion_r858853024


##########
cpp/src/arrow/compute/exec/bloom_filter.cc:
##########
@@ -114,7 +114,7 @@ Status BlockedBloomFilter::CreateEmpty(int64_t num_rows_to_insert, MemoryPool* p
 }
 
 template <typename T>
-void BlockedBloomFilter::InsertImp(int64_t num_rows, const T* hashes) {
+NO_TSAN void BlockedBloomFilter::InsertImp(int64_t num_rows, const T* hashes) {

Review Comment:
   Is this still needed after we fixed other TSAN related issues?



##########
cpp/src/arrow/compute/exec/partition_util.h:
##########
@@ -59,14 +60,14 @@ class PartitionSort {
   /// out_arr: [2, 5, 3, 5, 4, 7]
   /// prtn_ranges: [0, 1, 5, 6]
   template <class INPUT_PRTN_ID_FN, class OUTPUT_POS_FN>
-  static void Eval(int num_rows, int num_prtns, uint16_t* prtn_ranges,
+  static void Eval(int64_t num_rows, int num_prtns, uint16_t* prtn_ranges,

Review Comment:
   Why promote this to `int64_t` if we are going to assert it is less than `1 << 15`?



##########
cpp/src/arrow/compute/exec/bloom_filter_test.cc:
##########
@@ -32,39 +33,107 @@
 namespace arrow {
 namespace compute {
 
-Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
-                        MemoryPool* pool, int64_t num_rows,
-                        std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
-                        std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
-                        BlockedBloomFilter* target) {
-  constexpr int batch_size_max = 32 * 1024;
-  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
-
-  auto builder = BloomFilterBuilder::Make(strategy);
-
-  std::vector<uint32_t> thread_local_hashes32;
-  std::vector<uint64_t> thread_local_hashes64;
-  thread_local_hashes32.resize(batch_size_max);
-  thread_local_hashes64.resize(batch_size_max);
-
-  RETURN_NOT_OK(builder->Begin(/*num_threads=*/1, hardware_flags, pool, num_rows,
-                               bit_util::CeilDiv(num_rows, batch_size_max), target));
-
-  for (int64_t i = 0; i < num_batches; ++i) {
+constexpr int kBatchSizeMax = 32 * 1024;
+Status BuildBloomFilter_Serial(
+    std::unique_ptr<BloomFilterBuilder>& builder, int64_t num_rows, int64_t num_batches,
+    std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  std::vector<uint32_t> hashes32(kBatchSizeMax);
+  std::vector<uint64_t> hashes64(kBatchSizeMax);
+  for (int64_t i = 0; i < num_batches; i++) {
     size_t thread_index = 0;
     int batch_size = static_cast<int>(
-        std::min(num_rows - i * batch_size_max, static_cast<int64_t>(batch_size_max)));
+        std::min(num_rows - i * kBatchSizeMax, static_cast<int64_t>(kBatchSizeMax)));
     if (target->NumHashBitsUsed() > 32) {
-      uint64_t* hashes = thread_local_hashes64.data();
-      get_hash64_impl(i * batch_size_max, batch_size, hashes);
+      uint64_t* hashes = hashes64.data();
+      get_hash64_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     } else {
-      uint32_t* hashes = thread_local_hashes32.data();
-      get_hash32_impl(i * batch_size_max, batch_size, hashes);
+      uint32_t* hashes = hashes32.data();
+      get_hash32_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     }
   }
+  return Status::OK();
+}
+
+Status BuildBloomFilter_Parallel(
+    std::unique_ptr<BloomFilterBuilder>& builder, size_t num_threads, int64_t num_rows,
+    int64_t num_batches, std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  std::mutex mutex;
+  ThreadIndexer thread_indexer;
+  std::unique_ptr<TaskScheduler> scheduler = TaskScheduler::Make();
+  std::vector<std::vector<uint32_t>> thread_local_hashes32(num_threads);
+  std::vector<std::vector<uint64_t>> thread_local_hashes64(num_threads);
+  for (std::vector<uint32_t>& h : thread_local_hashes32) h.resize(kBatchSizeMax);
+  for (std::vector<uint64_t>& h : thread_local_hashes64) h.resize(kBatchSizeMax);
+
+  std::condition_variable cv;
+  std::unique_lock<std::mutex> lk(mutex, std::defer_lock);
+  auto group = scheduler->RegisterTaskGroup(
+      [&](size_t thread_index, int64_t task_id) -> Status {
+        int batch_size = static_cast<int>(std::min(num_rows - task_id * kBatchSizeMax,
+                                                   static_cast<int64_t>(kBatchSizeMax)));
+        if (target->NumHashBitsUsed() > 32) {
+          uint64_t* hashes = thread_local_hashes64[thread_index].data();
+          get_hash64_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        } else {
+          uint32_t* hashes = thread_local_hashes32[thread_index].data();
+          get_hash32_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        }
+        return Status::OK();
+      },
+      [&](size_t thread_index) -> Status {
+        lk.unlock();
+        cv.notify_one();
+        return Status::OK();
+      });
+  scheduler->RegisterEnd();
+  auto tp = arrow::internal::GetCpuThreadPool();
+  RETURN_NOT_OK(scheduler->StartScheduling(
+      0,
+      [&](std::function<Status(size_t)> func) -> Status {
+        return tp->Spawn([&, func] {
+          size_t tid = thread_indexer();
+          std::ignore = func(tid);

Review Comment:
   Why ignore it?  Return it if it isn't ok.



##########
cpp/src/arrow/compute/exec/hash_join_node_test.cc:
##########
@@ -1900,5 +1903,150 @@ TEST(HashJoin, TrivialResidualFilter) {
   }
 }
 
+HashJoinNodeOptions GenerateHashJoinNodeOptions(Random64Bit& rng, int num_left_cols,
+                                                int num_right_cols) {
+  HashJoinNodeOptions opts;
+  opts.join_type = static_cast<JoinType>(rng.from_range(0, 7));
+  bool is_left_join = opts.join_type == JoinType::LEFT_SEMI ||
+                      opts.join_type == JoinType::LEFT_ANTI ||
+                      opts.join_type == JoinType::LEFT_OUTER;
+  bool is_right_join = opts.join_type == JoinType::RIGHT_SEMI ||
+                       opts.join_type == JoinType::RIGHT_ANTI ||
+                       opts.join_type == JoinType::RIGHT_OUTER;
+
+  int num_keys = rng.from_range(1, std::min(num_left_cols, num_right_cols));
+  for (int i = 0; i < num_left_cols; i++) {
+    bool is_out = rng.from_range(0, 2) != 2;
+    if (is_out && !is_right_join) opts.left_output.push_back(FieldRef(i));
+  }
+  for (int i = 0; i < num_right_cols; i++) {
+    bool is_out = rng.from_range(0, 2) == 2;
+    if (is_out && !is_left_join) opts.right_output.push_back(FieldRef(i));
+  }
+  // We need at least one output
+  if (opts.right_output.empty() && opts.left_output.empty()) {
+    if (is_left_join) {
+      int col = rng.from_range(0, num_left_cols - 1);
+      opts.left_output.push_back(FieldRef(col));
+    } else if (is_right_join) {
+      int col = rng.from_range(0, num_right_cols - 1);
+      opts.right_output.push_back(FieldRef(col));
+    } else {
+      if (rng.from_range(0, 1) == 0) {
+        int col = rng.from_range(0, num_left_cols - 1);
+        opts.left_output.push_back(FieldRef(col));
+      } else {
+        int col = rng.from_range(0, num_right_cols - 1);
+        opts.right_output.push_back(FieldRef(col));
+      }
+    }
+  }
+
+  for (int i = 0; i < num_keys; i++) {
+    int left = rng.from_range(0, num_left_cols - 1);
+    int right = rng.from_range(0, num_right_cols - 1);
+    bool is_or_eq = rng.from_range(0, 1) == 0;
+    opts.left_keys.push_back(FieldRef(left));
+    opts.right_keys.push_back(FieldRef(right));
+    opts.key_cmp.push_back(is_or_eq ? JoinKeyCmp::IS : JoinKeyCmp::EQ);
+  }
+  return opts;
+}
+
+void TestSingleChainOfHashJoins(Random64Bit& rng) {
+  int num_joins = rng.from_range(2, 5);
+  std::vector<HashJoinNodeOptions> opts;
+  int num_left_cols = rng.from_range(1, 8);
+  int num_right_cols = rng.from_range(1, 8);
+  HashJoinNodeOptions first_opt =
+      GenerateHashJoinNodeOptions(rng, num_left_cols, num_right_cols);
+  opts.push_back(std::move(first_opt));
+
+  std::unordered_map<std::string, std::string> metadata_map;
+  metadata_map["min"] = "0";
+  metadata_map["max"] = "10";
+  auto metadata = key_value_metadata(metadata_map);
+  std::vector<std::shared_ptr<Field>> left_fields;
+  for (int i = 0; i < num_left_cols; i++)
+    left_fields.push_back(field(std::string("l") + std::to_string(i), int32(), metadata));
+  std::vector<std::shared_ptr<Field>> first_right_fields;
+  for (int i = 0; i < num_right_cols; i++)
+    first_right_fields.push_back(
+        field(std::string("r_0_") + std::to_string(i), int32(), metadata));
+
+  BatchesWithSchema input_left = MakeRandomBatches(schema(std::move(left_fields)));
+  std::vector<BatchesWithSchema> input_right;
+  input_right.push_back(MakeRandomBatches(schema(std::move(first_right_fields))));
+
+  for (int i = 1; i < num_joins; i++) {
+    int num_right_cols = rng.from_range(1, 8);
+    HashJoinNodeOptions opt =
+        GenerateHashJoinNodeOptions(rng,
+                                    static_cast<int>(opts[i - 1].left_output.size() +
+                                                     opts[i - 1].right_output.size()),
+                                    num_right_cols);
+    opts.push_back(std::move(opt));
+
+    std::vector<std::shared_ptr<Field>> right_fields;
+    for (int j = 0; j < num_right_cols; j++)
+      right_fields.push_back(
+          field(std::string("r_") + std::to_string(i) + "_" + std::to_string(j), int32(),
+                metadata));
+    BatchesWithSchema input = MakeRandomBatches(schema(std::move(right_fields)));
+    input_right.push_back(std::move(input));
+  }
+
+  std::vector<ExecBatch> reference;
+  for (bool bloom_filters : {false, true}) {
+    bool parallel = true;

Review Comment:
   ```suggestion
       constexpr bool kParallel = true;
   ```
   Is this variable for readability?  If so, making it a constant will make that obvious.  Or was there originally a for loop loop here and should it come back?



##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -83,50 +85,75 @@ class HashJoinBasicImpl : public HashJoinImpl {
   }
 
   Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution,
-              size_t num_threads, HashJoinSchema* schema_mgr,
+              size_t /*num_threads*/, HashJoinSchema* schema_mgr,
               std::vector<JoinKeyCmp> key_cmp, Expression filter,
               OutputBatchCallback output_batch_callback,
               FinishedCallback finished_callback,
-              TaskScheduler::ScheduleImpl schedule_task_callback) override {
-    num_threads = std::max(num_threads, static_cast<size_t>(1));
+              TaskScheduler::ScheduleImpl schedule_task_callback,
+              HashJoinImpl* pushdown_target, std::vector<int> column_map) override {
+    // TODO(ARROW-15732)
+    // Each side of join might have an IO thread being called from.
+    // As of right now, we ignore the `num_threads` argument, so later we will have to
+    // readd `num_threads_ = num_threads;`
+    num_threads_ = GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1;
+    // num_threads_ = GetCpuThreadPoolCapacity() + 1;

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/compute/exec/hash_join_benchmark.cc:
##########
@@ -126,6 +129,22 @@ class JoinBenchmark {
 
     join_ = *HashJoinImpl::MakeBasic();
 
+    HashJoinImpl* bloom_filter_pushdown_target = nullptr;
+    std::vector<int> key_input_map;
+
+    bool bloom_filter_does_not_apply_to_join =

Review Comment:
   So is the idea here to measure the overhead cost of building the bloom filter?



##########
cpp/src/arrow/compute/exec/hash_join_node.cc:
##########
@@ -571,24 +573,82 @@ class HashJoinNode : public ExecNode {
     }
   }
 
-  Status StartProducing() override {
-    START_SPAN(span_, std::string(kind_name()) + ":" + label(),
-               {{"node.label", label()},
-                {"node.detail", ToString()},
-                {"node.kind", kind_name()}});
-    END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this);
+  std::pair<HashJoinImpl*, std::vector<int>> GetPushdownTarget() {

Review Comment:
   Since we operate on these pairs everywhere why not create:
   ```
   struct BloomFilterTarget {
     HashJoinImpl* join_impl;
     std::vector<int> column_map;
   };
   ```
   
   It's also takes a bit of reading to figure out what the purpose of `column_map` is so this could be a place to briefly describe that.



##########
cpp/src/arrow/compute/exec/bloom_filter_test.cc:
##########
@@ -32,39 +33,107 @@
 namespace arrow {
 namespace compute {
 
-Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
-                        MemoryPool* pool, int64_t num_rows,
-                        std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
-                        std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
-                        BlockedBloomFilter* target) {
-  constexpr int batch_size_max = 32 * 1024;
-  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
-
-  auto builder = BloomFilterBuilder::Make(strategy);
-
-  std::vector<uint32_t> thread_local_hashes32;
-  std::vector<uint64_t> thread_local_hashes64;
-  thread_local_hashes32.resize(batch_size_max);
-  thread_local_hashes64.resize(batch_size_max);
-
-  RETURN_NOT_OK(builder->Begin(/*num_threads=*/1, hardware_flags, pool, num_rows,
-                               bit_util::CeilDiv(num_rows, batch_size_max), target));
-
-  for (int64_t i = 0; i < num_batches; ++i) {
+constexpr int kBatchSizeMax = 32 * 1024;
+Status BuildBloomFilter_Serial(
+    std::unique_ptr<BloomFilterBuilder>& builder, int64_t num_rows, int64_t num_batches,
+    std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  std::vector<uint32_t> hashes32(kBatchSizeMax);
+  std::vector<uint64_t> hashes64(kBatchSizeMax);
+  for (int64_t i = 0; i < num_batches; i++) {
     size_t thread_index = 0;
     int batch_size = static_cast<int>(
-        std::min(num_rows - i * batch_size_max, static_cast<int64_t>(batch_size_max)));
+        std::min(num_rows - i * kBatchSizeMax, static_cast<int64_t>(kBatchSizeMax)));
     if (target->NumHashBitsUsed() > 32) {
-      uint64_t* hashes = thread_local_hashes64.data();
-      get_hash64_impl(i * batch_size_max, batch_size, hashes);
+      uint64_t* hashes = hashes64.data();
+      get_hash64_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     } else {
-      uint32_t* hashes = thread_local_hashes32.data();
-      get_hash32_impl(i * batch_size_max, batch_size, hashes);
+      uint32_t* hashes = hashes32.data();
+      get_hash32_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     }
   }
+  return Status::OK();
+}
+
+Status BuildBloomFilter_Parallel(
+    std::unique_ptr<BloomFilterBuilder>& builder, size_t num_threads, int64_t num_rows,
+    int64_t num_batches, std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  std::mutex mutex;
+  ThreadIndexer thread_indexer;
+  std::unique_ptr<TaskScheduler> scheduler = TaskScheduler::Make();
+  std::vector<std::vector<uint32_t>> thread_local_hashes32(num_threads);
+  std::vector<std::vector<uint64_t>> thread_local_hashes64(num_threads);
+  for (std::vector<uint32_t>& h : thread_local_hashes32) h.resize(kBatchSizeMax);
+  for (std::vector<uint64_t>& h : thread_local_hashes64) h.resize(kBatchSizeMax);
+
+  std::condition_variable cv;
+  std::unique_lock<std::mutex> lk(mutex, std::defer_lock);
+  auto group = scheduler->RegisterTaskGroup(
+      [&](size_t thread_index, int64_t task_id) -> Status {
+        int batch_size = static_cast<int>(std::min(num_rows - task_id * kBatchSizeMax,
+                                                   static_cast<int64_t>(kBatchSizeMax)));
+        if (target->NumHashBitsUsed() > 32) {
+          uint64_t* hashes = thread_local_hashes64[thread_index].data();
+          get_hash64_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        } else {
+          uint32_t* hashes = thread_local_hashes32[thread_index].data();
+          get_hash32_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        }
+        return Status::OK();
+      },
+      [&](size_t thread_index) -> Status {
+        lk.unlock();
+        cv.notify_one();
+        return Status::OK();
+      });
+  scheduler->RegisterEnd();
+  auto tp = arrow::internal::GetCpuThreadPool();
+  RETURN_NOT_OK(scheduler->StartScheduling(
+      0,
+      [&](std::function<Status(size_t)> func) -> Status {
+        return tp->Spawn([&, func] {
+          size_t tid = thread_indexer();
+          std::ignore = func(tid);
+        });
+      },
+      static_cast<int>(2 * num_threads), false));

Review Comment:
   Why 2x?



##########
cpp/src/arrow/compute/exec/bloom_filter_test.cc:
##########
@@ -32,39 +33,107 @@
 namespace arrow {
 namespace compute {
 
-Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
-                        MemoryPool* pool, int64_t num_rows,
-                        std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
-                        std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
-                        BlockedBloomFilter* target) {
-  constexpr int batch_size_max = 32 * 1024;
-  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
-
-  auto builder = BloomFilterBuilder::Make(strategy);
-
-  std::vector<uint32_t> thread_local_hashes32;
-  std::vector<uint64_t> thread_local_hashes64;
-  thread_local_hashes32.resize(batch_size_max);
-  thread_local_hashes64.resize(batch_size_max);
-
-  RETURN_NOT_OK(builder->Begin(/*num_threads=*/1, hardware_flags, pool, num_rows,
-                               bit_util::CeilDiv(num_rows, batch_size_max), target));
-
-  for (int64_t i = 0; i < num_batches; ++i) {
+constexpr int kBatchSizeMax = 32 * 1024;
+Status BuildBloomFilter_Serial(
+    std::unique_ptr<BloomFilterBuilder>& builder, int64_t num_rows, int64_t num_batches,
+    std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  std::vector<uint32_t> hashes32(kBatchSizeMax);
+  std::vector<uint64_t> hashes64(kBatchSizeMax);
+  for (int64_t i = 0; i < num_batches; i++) {
     size_t thread_index = 0;
     int batch_size = static_cast<int>(
-        std::min(num_rows - i * batch_size_max, static_cast<int64_t>(batch_size_max)));
+        std::min(num_rows - i * kBatchSizeMax, static_cast<int64_t>(kBatchSizeMax)));
     if (target->NumHashBitsUsed() > 32) {
-      uint64_t* hashes = thread_local_hashes64.data();
-      get_hash64_impl(i * batch_size_max, batch_size, hashes);
+      uint64_t* hashes = hashes64.data();
+      get_hash64_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     } else {
-      uint32_t* hashes = thread_local_hashes32.data();
-      get_hash32_impl(i * batch_size_max, batch_size, hashes);
+      uint32_t* hashes = hashes32.data();
+      get_hash32_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     }
   }
+  return Status::OK();
+}
+
+Status BuildBloomFilter_Parallel(
+    std::unique_ptr<BloomFilterBuilder>& builder, size_t num_threads, int64_t num_rows,
+    int64_t num_batches, std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  std::mutex mutex;
+  ThreadIndexer thread_indexer;
+  std::unique_ptr<TaskScheduler> scheduler = TaskScheduler::Make();
+  std::vector<std::vector<uint32_t>> thread_local_hashes32(num_threads);
+  std::vector<std::vector<uint64_t>> thread_local_hashes64(num_threads);
+  for (std::vector<uint32_t>& h : thread_local_hashes32) h.resize(kBatchSizeMax);
+  for (std::vector<uint64_t>& h : thread_local_hashes64) h.resize(kBatchSizeMax);
+
+  std::condition_variable cv;
+  std::unique_lock<std::mutex> lk(mutex, std::defer_lock);
+  auto group = scheduler->RegisterTaskGroup(
+      [&](size_t thread_index, int64_t task_id) -> Status {
+        int batch_size = static_cast<int>(std::min(num_rows - task_id * kBatchSizeMax,
+                                                   static_cast<int64_t>(kBatchSizeMax)));
+        if (target->NumHashBitsUsed() > 32) {
+          uint64_t* hashes = thread_local_hashes64[thread_index].data();
+          get_hash64_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        } else {
+          uint32_t* hashes = thread_local_hashes32[thread_index].data();
+          get_hash32_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        }
+        return Status::OK();
+      },
+      [&](size_t thread_index) -> Status {
+        lk.unlock();

Review Comment:
   Locks are automatically unlocked when a mutex is waited on so this seems like it would trigger a double-unlock.  You should be obtaining the lock here, if anything.  Howver, you don't technically need to hold the lock to the mutex in order to notify a cv.



##########
cpp/src/arrow/compute/exec/hash_join_node_test.cc:
##########
@@ -1900,5 +1903,150 @@ TEST(HashJoin, TrivialResidualFilter) {
   }
 }
 
+HashJoinNodeOptions GenerateHashJoinNodeOptions(Random64Bit& rng, int num_left_cols,
+                                                int num_right_cols) {
+  HashJoinNodeOptions opts;
+  opts.join_type = static_cast<JoinType>(rng.from_range(0, 7));
+  bool is_left_join = opts.join_type == JoinType::LEFT_SEMI ||
+                      opts.join_type == JoinType::LEFT_ANTI ||
+                      opts.join_type == JoinType::LEFT_OUTER;
+  bool is_right_join = opts.join_type == JoinType::RIGHT_SEMI ||
+                       opts.join_type == JoinType::RIGHT_ANTI ||
+                       opts.join_type == JoinType::RIGHT_OUTER;
+
+  int num_keys = rng.from_range(1, std::min(num_left_cols, num_right_cols));
+  for (int i = 0; i < num_left_cols; i++) {
+    bool is_out = rng.from_range(0, 2) != 2;

Review Comment:
   Why a 2/3 chance here (as opposed to 1/2 or 1/3)



##########
cpp/src/arrow/compute/exec/util.h:
##########
@@ -92,7 +92,7 @@ class TempVectorStack {
   Status Init(MemoryPool* pool, int64_t size) {
     num_vectors_ = 0;
     top_ = 0;
-    buffer_size_ = size;
+    buffer_size_ = PaddedAllocationSize(size) + kPadding + 2 * sizeof(uint64_t);

Review Comment:
   Why are we padding here?



##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -83,50 +85,75 @@ class HashJoinBasicImpl : public HashJoinImpl {
   }
 
   Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution,
-              size_t num_threads, HashJoinSchema* schema_mgr,
+              size_t /*num_threads*/, HashJoinSchema* schema_mgr,
               std::vector<JoinKeyCmp> key_cmp, Expression filter,
               OutputBatchCallback output_batch_callback,
               FinishedCallback finished_callback,
-              TaskScheduler::ScheduleImpl schedule_task_callback) override {
-    num_threads = std::max(num_threads, static_cast<size_t>(1));
+              TaskScheduler::ScheduleImpl schedule_task_callback,
+              HashJoinImpl* pushdown_target, std::vector<int> column_map) override {
+    // TODO(ARROW-15732)
+    // Each side of join might have an IO thread being called from.
+    // As of right now, we ignore the `num_threads` argument, so later we will have to
+    // readd `num_threads_ = num_threads;`

Review Comment:
   ```suggestion
       // read `num_threads_ = num_threads;`
   ```



##########
cpp/src/arrow/CMakeLists.txt:
##########
@@ -393,7 +393,6 @@ if(ARROW_COMPUTE)
        compute/exec/key_encode.cc
        compute/exec/key_hash.cc
        compute/exec/key_map.cc
-       compute/exec/options.cc

Review Comment:
   If this file is no longer needed could you also remove the file itself from source control?



##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -83,50 +85,75 @@ class HashJoinBasicImpl : public HashJoinImpl {
   }
 
   Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution,
-              size_t num_threads, HashJoinSchema* schema_mgr,
+              size_t /*num_threads*/, HashJoinSchema* schema_mgr,
               std::vector<JoinKeyCmp> key_cmp, Expression filter,
               OutputBatchCallback output_batch_callback,
               FinishedCallback finished_callback,
-              TaskScheduler::ScheduleImpl schedule_task_callback) override {
-    num_threads = std::max(num_threads, static_cast<size_t>(1));
+              TaskScheduler::ScheduleImpl schedule_task_callback,
+              HashJoinImpl* pushdown_target, std::vector<int> column_map) override {
+    // TODO(ARROW-15732)
+    // Each side of join might have an IO thread being called from.
+    // As of right now, we ignore the `num_threads` argument, so later we will have to
+    // readd `num_threads_ = num_threads;`
+    num_threads_ = GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1;
+    // num_threads_ = GetCpuThreadPoolCapacity() + 1;
 
     START_SPAN(span_, "HashJoinBasicImpl",
                {{"detail", filter.ToString()},
                 {"join.kind", ToString(join_type)},
-                {"join.threads", static_cast<uint32_t>(num_threads)}});
+                {"join.threads", static_cast<uint32_t>(num_threads_)}});
 
     ctx_ = ctx;
     join_type_ = join_type;
-    num_threads_ = num_threads;
     schema_mgr_ = schema_mgr;
     key_cmp_ = std::move(key_cmp);
     filter_ = std::move(filter);
     output_batch_callback_ = std::move(output_batch_callback);
     finished_callback_ = std::move(finished_callback);
-    // TODO(ARROW-15732)
-    // Each side of join might have an IO thread being called from.
     local_states_.resize(GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1);

Review Comment:
   ```suggestion
       local_states_.resize(num_threads_);
   ```



##########
cpp/src/arrow/compute/exec/hash_join_node_test.cc:
##########
@@ -1900,5 +1903,150 @@ TEST(HashJoin, TrivialResidualFilter) {
   }
 }
 
+HashJoinNodeOptions GenerateHashJoinNodeOptions(Random64Bit& rng, int num_left_cols,
+                                                int num_right_cols) {
+  HashJoinNodeOptions opts;
+  opts.join_type = static_cast<JoinType>(rng.from_range(0, 7));
+  bool is_left_join = opts.join_type == JoinType::LEFT_SEMI ||
+                      opts.join_type == JoinType::LEFT_ANTI ||
+                      opts.join_type == JoinType::LEFT_OUTER;
+  bool is_right_join = opts.join_type == JoinType::RIGHT_SEMI ||
+                       opts.join_type == JoinType::RIGHT_ANTI ||
+                       opts.join_type == JoinType::RIGHT_OUTER;
+
+  int num_keys = rng.from_range(1, std::min(num_left_cols, num_right_cols));
+  for (int i = 0; i < num_left_cols; i++) {
+    bool is_out = rng.from_range(0, 2) != 2;
+    if (is_out && !is_right_join) opts.left_output.push_back(FieldRef(i));
+  }
+  for (int i = 0; i < num_right_cols; i++) {
+    bool is_out = rng.from_range(0, 2) == 2;
+    if (is_out && !is_left_join) opts.right_output.push_back(FieldRef(i));
+  }
+  // We need at least one output
+  if (opts.right_output.empty() && opts.left_output.empty()) {
+    if (is_left_join) {
+      int col = rng.from_range(0, num_left_cols - 1);
+      opts.left_output.push_back(FieldRef(col));
+    } else if (is_right_join) {
+      int col = rng.from_range(0, num_right_cols - 1);
+      opts.right_output.push_back(FieldRef(col));
+    } else {
+      if (rng.from_range(0, 1) == 0) {
+        int col = rng.from_range(0, num_left_cols - 1);
+        opts.left_output.push_back(FieldRef(col));
+      } else {
+        int col = rng.from_range(0, num_right_cols - 1);
+        opts.right_output.push_back(FieldRef(col));
+      }
+    }
+  }
+
+  for (int i = 0; i < num_keys; i++) {
+    int left = rng.from_range(0, num_left_cols - 1);
+    int right = rng.from_range(0, num_right_cols - 1);
+    bool is_or_eq = rng.from_range(0, 1) == 0;
+    opts.left_keys.push_back(FieldRef(left));
+    opts.right_keys.push_back(FieldRef(right));
+    opts.key_cmp.push_back(is_or_eq ? JoinKeyCmp::IS : JoinKeyCmp::EQ);
+  }
+  return opts;
+}
+
+void TestSingleChainOfHashJoins(Random64Bit& rng) {
+  int num_joins = rng.from_range(2, 5);
+  std::vector<HashJoinNodeOptions> opts;
+  int num_left_cols = rng.from_range(1, 8);
+  int num_right_cols = rng.from_range(1, 8);
+  HashJoinNodeOptions first_opt =
+      GenerateHashJoinNodeOptions(rng, num_left_cols, num_right_cols);
+  opts.push_back(std::move(first_opt));
+
+  std::unordered_map<std::string, std::string> metadata_map;
+  metadata_map["min"] = "0";
+  metadata_map["max"] = "10";
+  auto metadata = key_value_metadata(metadata_map);
+  std::vector<std::shared_ptr<Field>> left_fields;
+  for (int i = 0; i < num_left_cols; i++)
+    left_fields.push_back(field(std::string("l") + std::to_string(i), int32(), metadata));
+  std::vector<std::shared_ptr<Field>> first_right_fields;
+  for (int i = 0; i < num_right_cols; i++)
+    first_right_fields.push_back(
+        field(std::string("r_0_") + std::to_string(i), int32(), metadata));
+
+  BatchesWithSchema input_left = MakeRandomBatches(schema(std::move(left_fields)));
+  std::vector<BatchesWithSchema> input_right;
+  input_right.push_back(MakeRandomBatches(schema(std::move(first_right_fields))));
+
+  for (int i = 1; i < num_joins; i++) {
+    int num_right_cols = rng.from_range(1, 8);
+    HashJoinNodeOptions opt =
+        GenerateHashJoinNodeOptions(rng,
+                                    static_cast<int>(opts[i - 1].left_output.size() +
+                                                     opts[i - 1].right_output.size()),
+                                    num_right_cols);
+    opts.push_back(std::move(opt));
+
+    std::vector<std::shared_ptr<Field>> right_fields;
+    for (int j = 0; j < num_right_cols; j++)
+      right_fields.push_back(
+          field(std::string("r_") + std::to_string(i) + "_" + std::to_string(j), int32(),
+                metadata));
+    BatchesWithSchema input = MakeRandomBatches(schema(std::move(right_fields)));
+    input_right.push_back(std::move(input));
+  }
+
+  std::vector<ExecBatch> reference;
+  for (bool bloom_filters : {false, true}) {
+    bool parallel = true;
+    ARROW_SCOPED_TRACE(bloom_filters ? "bloom filtered" : "unfiltered");
+    auto exec_ctx = arrow::internal::make_unique<ExecContext>(
+        default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
+
+    ExecNode* left_source;
+    ASSERT_OK_AND_ASSIGN(
+        left_source,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions{input_left.schema,
+                                       input_left.gen(parallel, /*slow=*/false)}));
+    std::vector<ExecNode*> joins(num_joins);
+    for (int i = 0; i < num_joins; i++) {
+      opts[i].disable_bloom_filter = !bloom_filters;
+      ExecNode* right_source;
+      ASSERT_OK_AND_ASSIGN(
+          right_source,
+          MakeExecNode("source", plan.get(), {},
+                       SourceNodeOptions{input_right[i].schema,
+                                         input_right[i].gen(parallel, /*slow=*/false)}));
+
+      std::vector<ExecNode*> inputs;
+      if (i == 0)
+        inputs = {left_source, right_source};
+      else
+        inputs = {joins[i - 1], right_source};
+      ASSERT_OK_AND_ASSIGN(joins[i],
+                           MakeExecNode("hashjoin", plan.get(), inputs, opts[i]));
+    }
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {joins.back()},
+                                                   SinkNodeOptions{&sink_gen}));

Review Comment:
   ```suggestion
       ASSERT_OK(MakeExecNode("sink", plan.get(), {joins.back()},
                                                      SinkNodeOptions{&sink_gen}));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org