You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/09 21:50:52 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #12289: ARROW-15498: [C++][Compute] Implement Bloom filter pushdown between hash joins

westonpace commented on code in PR #12289:
URL: https://github.com/apache/arrow/pull/12289#discussion_r868481678


##########
cpp/src/arrow/compute/exec/bloom_filter_test.cc:
##########
@@ -32,39 +33,108 @@
 namespace arrow {
 namespace compute {
 
-Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
-                        MemoryPool* pool, int64_t num_rows,
-                        std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
-                        std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
-                        BlockedBloomFilter* target) {
-  constexpr int batch_size_max = 32 * 1024;
-  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
-
-  auto builder = BloomFilterBuilder::Make(strategy);
-
-  std::vector<uint32_t> thread_local_hashes32;
-  std::vector<uint64_t> thread_local_hashes64;
-  thread_local_hashes32.resize(batch_size_max);
-  thread_local_hashes64.resize(batch_size_max);
-
-  RETURN_NOT_OK(builder->Begin(/*num_threads=*/1, hardware_flags, pool, num_rows,
-                               bit_util::CeilDiv(num_rows, batch_size_max), target));
-
-  for (int64_t i = 0; i < num_batches; ++i) {
+constexpr int kBatchSizeMax = 32 * 1024;
+Status BuildBloomFilter_Serial(
+    std::unique_ptr<BloomFilterBuilder>& builder, int64_t num_rows, int64_t num_batches,
+    std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  std::vector<uint32_t> hashes32(kBatchSizeMax);
+  std::vector<uint64_t> hashes64(kBatchSizeMax);
+  for (int64_t i = 0; i < num_batches; i++) {
     size_t thread_index = 0;
     int batch_size = static_cast<int>(
-        std::min(num_rows - i * batch_size_max, static_cast<int64_t>(batch_size_max)));
+        std::min(num_rows - i * kBatchSizeMax, static_cast<int64_t>(kBatchSizeMax)));
     if (target->NumHashBitsUsed() > 32) {
-      uint64_t* hashes = thread_local_hashes64.data();
-      get_hash64_impl(i * batch_size_max, batch_size, hashes);
+      uint64_t* hashes = hashes64.data();
+      get_hash64_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     } else {
-      uint32_t* hashes = thread_local_hashes32.data();
-      get_hash32_impl(i * batch_size_max, batch_size, hashes);
+      uint32_t* hashes = hashes32.data();
+      get_hash32_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     }
   }
+  return Status::OK();
+}
+
+Status BuildBloomFilter_Parallel(
+    std::unique_ptr<BloomFilterBuilder>& builder, size_t num_threads, int64_t num_rows,
+    int64_t num_batches, std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  ThreadIndexer thread_indexer;
+  std::unique_ptr<TaskScheduler> scheduler = TaskScheduler::Make();
+  std::vector<std::vector<uint32_t>> thread_local_hashes32(num_threads);
+  std::vector<std::vector<uint64_t>> thread_local_hashes64(num_threads);
+  for (std::vector<uint32_t>& h : thread_local_hashes32) h.resize(kBatchSizeMax);
+  for (std::vector<uint64_t>& h : thread_local_hashes64) h.resize(kBatchSizeMax);
+
+  std::condition_variable cv;
+  std::mutex mutex;
+  auto group = scheduler->RegisterTaskGroup(
+      [&](size_t thread_index, int64_t task_id) -> Status {
+        int batch_size = static_cast<int>(std::min(num_rows - task_id * kBatchSizeMax,
+                                                   static_cast<int64_t>(kBatchSizeMax)));
+        if (target->NumHashBitsUsed() > 32) {
+          uint64_t* hashes = thread_local_hashes64[thread_index].data();
+          get_hash64_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        } else {
+          uint32_t* hashes = thread_local_hashes32[thread_index].data();
+          get_hash32_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        }
+        return Status::OK();
+      },
+      [&](size_t thread_index) -> Status {
+        {
+          std::unique_lock<std::mutex> lk(mutex);
+          cv.notify_all();
+        }
+        return Status::OK();
+      });

Review Comment:
   ```suggestion
         [&](size_t thread_index) -> Status {
           std::unique_lock<std::mutex> lk(mutex);
           cv.notify_all();
           return Status::OK();
         });
   ```



##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -651,23 +777,44 @@ class HashJoinBasicImpl : public HashJoinImpl {
     return Status::OK();
   }
 
+  Status BuildBloomFilter_on_finished(size_t thread_index) {
+    if (cancelled_) return Status::Cancelled("Hash join cancelled");
+    ARROW_DCHECK(pushdown_target_);
+    RETURN_NOT_OK(pushdown_target_->PushBloomFilter(
+        thread_index, std::move(bloom_filter_), std::move(column_map_)));
+    return BuildHashTable(thread_index);
+  }
+
   Status BuildHashTable_on_finished(size_t thread_index) {
     if (cancelled_) {
       return Status::Cancelled("Hash join cancelled");
     }
 
+    bool proceed;
     {
       std::lock_guard<std::mutex> lock(left_batches_mutex_);
+      std::lock_guard<std::mutex> lock_finish(finished_mutex_);
+      left_queue_bloom_finished_ =
+          left_queue_bloom_finished_ || num_expected_bloom_filters_ == 0;
+      proceed = !has_hash_table_ && left_queue_bloom_finished_;
       has_hash_table_ = true;
     }
+    if (proceed) RETURN_NOT_OK(ProbeQueuedBatches(thread_index));
 
     right_batches_.clear();

Review Comment:
   I wonder if this should be the first thing we do in this method.  In most cases I don't think it will really matter much but if we're running serially this means we're holding on to a bunch of memory while we probe the queued batches.



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -212,6 +212,13 @@ class ARROW_EXPORT ExecNode {
   // A node with multiple outputs will also need to ensure it is applying backpressure if
   // any of its outputs is asking to pause
 
+  /// \brief Steps performed immediately before StartProducing is called
+  ///
+  /// This hook performs any actions in between creation of ExecPlan and the call to
+  /// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes
+  /// that executes this method is undefined, but the calls are made synchronously.
+  virtual Status PrepareToProduce() { return Status::OK(); }

Review Comment:
   ```suggestion
     /// that executes this method is undefined, but the calls are made synchronously.
     ///
     /// At this point a node can rely on all inputs & outputs (and the input schemas)
     /// being well defined.
     virtual Status PrepareToProduce() { return Status::OK(); }
   ```



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -212,6 +212,13 @@ class ARROW_EXPORT ExecNode {
   // A node with multiple outputs will also need to ensure it is applying backpressure if
   // any of its outputs is asking to pause
 
+  /// \brief Steps performed immediately before StartProducing is called

Review Comment:
   ```suggestion
     /// \brief Perform any needed initialization
   ```



##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -138,6 +162,30 @@ class HashJoinBasicImpl : public HashJoinImpl {
     scheduler_->Abort(std::move(pos_abort_callback));
   }
 
+  Status PushBloomFilter(size_t thread_index, std::unique_ptr<BlockedBloomFilter> filter,

Review Comment:
   ```suggestion
     // Called by a downstream node after they have constructed a bloom filter
     // that this node can use to filter inputs.
     Status PushBloomFilter(size_t thread_index, std::unique_ptr<BlockedBloomFilter> filter,
   ```



##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -604,8 +654,84 @@ class HashJoinBasicImpl : public HashJoinImpl {
     return Status::OK();
   }
 
+  Status ApplyBloomFiltersToBatch(size_t thread_index, ExecBatch& batch) {
+    if (batch.length == 0) return Status::OK();
+    int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length);
+    std::vector<uint8_t> selected(bit_vector_bytes);
+    std::vector<uint32_t> hashes(batch.length);
+    std::vector<uint8_t> bv(bit_vector_bytes);
+
+    RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
+    // Start with full selection for the current minibatch

Review Comment:
   ```suggestion
       // Start with full selection for the current batch
   ```
   I'm pretty sure "minibatch" has a different meaning for when we split input up into size 1024 batches on the temp vector stack but I could be wrong.



##########
cpp/src/arrow/compute/exec/util.h:
##########
@@ -280,7 +282,7 @@ class AtomicCounter {
   std::atomic<bool> complete_{false};
 };
 
-class ThreadIndexer {
+class ARROW_EXPORT ThreadIndexer {

Review Comment:
   Why is this getting exported?



##########
cpp/src/arrow/compute/exec/task_util.h:
##########
@@ -53,7 +53,7 @@ class AtomicWithPadding {
 //
 // Also allows for executing next pending tasks immediately using a caller thread.
 //
-class TaskScheduler {
+class ARROW_EXPORT TaskScheduler {

Review Comment:
   Why is this being exported?



##########
cpp/src/arrow/compute/exec/hash_join_node_test.cc:
##########
@@ -1000,11 +1000,14 @@ TEST(HashJoin, Random) {
   Random64Bit rng(42);
 #if defined(THREAD_SANITIZER) || defined(ARROW_VALGRIND)
   const int num_tests = 15;
+#elsif defined(ADDRESS_SANITIZER)

Review Comment:
   ```suggestion
   #elif defined(ADDRESS_SANITIZER)
   ```



##########
cpp/src/arrow/compute/exec/task_util.h:
##########
@@ -32,7 +32,7 @@ namespace compute {
 // whenever it is modified by a concurrent thread on a different CPU core.
 //
 template <typename T>
-class AtomicWithPadding {
+class ARROW_EXPORT AtomicWithPadding {

Review Comment:
   Why is this being exported?



##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -809,22 +1009,40 @@ class HashJoinBasicImpl : public HashJoinImpl {
                                       ScanHashTable_num_tasks());
   }
 
-  bool QueueBatchIfNeeded(int side, ExecBatch batch) {
+  Result<bool> QueueBatchIfNeeded(size_t thread_index, int side, ExecBatch& batch) {
     if (side == 0) {
-      std::lock_guard<std::mutex> lock(left_batches_mutex_);
-      if (has_hash_table_) {
-        return false;
+      // We don't want to do the filtering while holding the lock, since that can get
+      // expensive.
+      bool needs_filtering;
+      {
+        std::lock_guard<std::mutex> lock(left_batches_mutex_);
+        bloom_filters_ready_ = bloom_filters_ready_ || num_expected_bloom_filters_ == 0;
+        needs_filtering = bloom_filters_ready_ && num_expected_bloom_filters_ != 0;
       }
-      left_batches_.emplace_back(std::move(batch));
-      return true;
+      if (needs_filtering) RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));

Review Comment:
   ```suggestion
         if (needs_filtering)
         {
             RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
             return false;
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org