You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/08 07:14:15 UTC

[GitHub] [arrow] michalursa commented on a change in pull request #12067: ARROW-15239: [C++][Compute] Adding Bloom filter implementation

michalursa commented on a change in pull request #12067:
URL: https://github.com/apache/arrow/pull/12067#discussion_r821380560



##########
File path: cpp/src/arrow/compute/exec/bloom_filter.cc
##########
@@ -0,0 +1,434 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/bloom_filter.h"
+#include <random>
+#include "arrow/compute/exec/util.h"  // PREFETCH
+#include "arrow/util/bit_util.h"      // Log2
+#include "arrow/util/bitmap_ops.h"    // CountSetBits
+
+namespace arrow {
+namespace compute {
+
+BloomFilterMasks::BloomFilterMasks() {
+  std::seed_seq seed{0, 0, 0, 0, 0, 0, 0, 0};
+  std::mt19937 re(seed);
+  std::uniform_int_distribution<uint64_t> rd;
+  auto random = [&re, &rd](int min_value, int max_value) {
+    return min_value + rd(re) % (max_value - min_value + 1);
+  };
+
+  memset(masks_, 0, kTotalBytes);
+
+  // Prepare the first mask
+  //
+  int num_bits_set = static_cast<int>(random(kMinBitsSet, kMaxBitsSet));
+  for (int i = 0; i < num_bits_set; ++i) {
+    for (;;) {
+      int bit_pos = static_cast<int>(random(0, kBitsPerMask - 1));
+      if (!bit_util::GetBit(masks_, bit_pos)) {
+        bit_util::SetBit(masks_, bit_pos);
+        break;
+      }
+    }
+  }
+
+  int64_t num_bits_total = kNumMasks + kBitsPerMask - 1;
+
+  // The value num_bits_set will be updated in each iteration of the loop to
+  // represent the number of bits set in the entire mask directly preceding the
+  // currently processed bit.
+  //
+  for (int64_t i = kBitsPerMask; i < num_bits_total; ++i) {
+    // The value of the lowest bit of the previous mask that will be removed
+    // from the current mask as we move to the next position in the bit vector
+    // of masks.
+    //
+    int bit_leaving = bit_util::GetBit(masks_, i - kBitsPerMask) ? 1 : 0;
+
+    // Next bit has to be 1 because of minimum bits in a mask requirement
+    //
+    if (bit_leaving == 1 && num_bits_set == kMinBitsSet) {
+      bit_util::SetBit(masks_, i);
+      continue;
+    }
+
+    // Next bit has to be 0 because of maximum bits in a mask requirement
+    //
+    if (bit_leaving == 0 && num_bits_set == kMaxBitsSet) {
+      continue;
+    }
+
+    // Next bit can be random. Use the expected number of bits set in a mask
+    // as a probability of 1.
+    //
+    if (random(0, kBitsPerMask * 2 - 1) < kMinBitsSet + kMaxBitsSet) {
+      bit_util::SetBit(masks_, i);
+      if (bit_leaving == 0) {
+        ++num_bits_set;
+      }
+    } else {
+      if (bit_leaving == 1) {
+        --num_bits_set;
+      }
+    }
+  }
+}
+
+BloomFilterMasks BlockedBloomFilter::masks_;
+
+Status BlockedBloomFilter::CreateEmpty(int64_t num_rows_to_insert, MemoryPool* pool) {
+  // Compute the size
+  //
+  constexpr int64_t min_num_bits_per_key = 8;
+  constexpr int64_t min_num_bits = 512;
+  int64_t desired_num_bits =
+      std::max(min_num_bits, num_rows_to_insert * min_num_bits_per_key);
+  int log_num_bits = bit_util::Log2(desired_num_bits);
+
+  log_num_blocks_ = log_num_bits - 6;
+  num_blocks_ = 1ULL << log_num_blocks_;
+
+  // Allocate and zero out bit vector
+  //
+  int64_t buffer_size = num_blocks_ * sizeof(uint64_t);
+  ARROW_ASSIGN_OR_RAISE(buf_, AllocateBuffer(buffer_size, pool));
+  blocks_ = reinterpret_cast<uint64_t*>(buf_->mutable_data());
+  memset(blocks_, 0, buffer_size);
+
+  return Status::OK();
+}
+
+template <typename T>
+void BlockedBloomFilter::InsertImp(int64_t num_rows, const T* hashes) {
+  for (int64_t i = 0; i < num_rows; ++i) {
+    Insert(hashes[i]);
+  }
+}
+
+void BlockedBloomFilter::Insert(int64_t hardware_flags, int64_t num_rows,
+                                const uint32_t* hashes) {
+  int64_t num_processed = 0;
+#if defined(ARROW_HAVE_AVX2)
+  if (hardware_flags & arrow::internal::CpuInfo::AVX2) {
+    num_processed = Insert_avx2(num_rows, hashes);
+  }
+#endif
+  InsertImp(num_rows - num_processed, hashes + num_processed);

Review comment:
       Yes. This is the pattern I have used everywhere with AVX2 code. All AVX2 functions process only a multiple of AVX2 blocks. They return the number of rows processed and the tail is processed using regular scalar code-path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org