You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/05/18 04:02:00 UTC
[arrow] branch master updated: ARROW-15498: [C++][Compute] Implement Bloom filter pushdown between hash joins
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 0742f78a27 ARROW-15498: [C++][Compute] Implement Bloom filter pushdown between hash joins
0742f78a27 is described below
commit 0742f78a27fa9d7882c7a840c117824e03bee82d
Author: Sasha Krassovsky <kr...@gmail.com>
AuthorDate: Tue May 17 18:01:46 2022 -1000
ARROW-15498: [C++][Compute] Implement Bloom filter pushdown between hash joins
This adds Bloom filter pushdown between hash join nodes.
Closes #12289 from save-buffer/sasha_bloom_pushdown
Lead-authored-by: Sasha Krassovsky <kr...@gmail.com>
Co-authored-by: michalursa <mi...@ursacomputing.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/CMakeLists.txt | 1 -
cpp/src/arrow/compute/exec/bloom_filter.cc | 37 ++-
cpp/src/arrow/compute/exec/bloom_filter.h | 22 +-
cpp/src/arrow/compute/exec/bloom_filter_test.cc | 116 +++++++--
cpp/src/arrow/compute/exec/exec_plan.cc | 3 +
cpp/src/arrow/compute/exec/exec_plan.h | 10 +
cpp/src/arrow/compute/exec/hash_join.cc | 302 +++++++++++++++++++---
cpp/src/arrow/compute/exec/hash_join.h | 8 +-
cpp/src/arrow/compute/exec/hash_join_benchmark.cc | 44 +++-
cpp/src/arrow/compute/exec/hash_join_graphs.py | 2 +-
cpp/src/arrow/compute/exec/hash_join_node.cc | 135 +++++++++-
cpp/src/arrow/compute/exec/hash_join_node_test.cc | 155 ++++++++++-
cpp/src/arrow/compute/exec/key_encode.h | 2 +
cpp/src/arrow/compute/exec/key_hash.cc | 27 ++
cpp/src/arrow/compute/exec/key_hash.h | 8 +
cpp/src/arrow/compute/exec/options.h | 28 +-
cpp/src/arrow/compute/exec/partition_util.cc | 25 +-
cpp/src/arrow/compute/exec/partition_util.h | 22 +-
cpp/src/arrow/compute/exec/util.cc | 71 ++++-
cpp/src/arrow/compute/exec/util.h | 4 +-
cpp/src/arrow/compute/light_array.cc | 5 +-
cpp/src/arrow/compute/light_array.h | 6 +-
22 files changed, 889 insertions(+), 144 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 77f9959fdb..ec6cada1cd 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -393,7 +393,6 @@ if(ARROW_COMPUTE)
compute/exec/key_encode.cc
compute/exec/key_hash.cc
compute/exec/key_map.cc
- compute/exec/options.cc
compute/exec/order_by_impl.cc
compute/exec/partition_util.cc
compute/exec/options.cc
diff --git a/cpp/src/arrow/compute/exec/bloom_filter.cc b/cpp/src/arrow/compute/exec/bloom_filter.cc
index 6103172545..7b348ff687 100644
--- a/cpp/src/arrow/compute/exec/bloom_filter.cc
+++ b/cpp/src/arrow/compute/exec/bloom_filter.cc
@@ -311,21 +311,22 @@ Status BloomFilterBuilder_SingleThreaded::Begin(size_t /*num_threads*/,
}
Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/,
- int num_rows,
+ int64_t num_rows,
const uint32_t* hashes) {
PushNextBatchImp(num_rows, hashes);
return Status::OK();
}
Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/,
- int num_rows,
+ int64_t num_rows,
const uint64_t* hashes) {
PushNextBatchImp(num_rows, hashes);
return Status::OK();
}
template <typename T>
-void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int num_rows, const T* hashes) {
+void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int64_t num_rows,
+ const T* hashes) {
build_target_->Insert(hardware_flags_, num_rows, hashes);
}
@@ -340,29 +341,40 @@ Status BloomFilterBuilder_Parallel::Begin(size_t num_threads, int64_t hardware_f
log_num_prtns_ = std::min(kMaxLogNumPrtns, bit_util::Log2(num_threads));
thread_local_states_.resize(num_threads);
- prtn_locks_.Init(1 << log_num_prtns_);
+ prtn_locks_.Init(num_threads, 1 << log_num_prtns_);
RETURN_NOT_OK(build_target->CreateEmpty(num_rows, pool));
return Status::OK();
}
-Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows,
+Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows,
const uint32_t* hashes) {
PushNextBatchImp(thread_id, num_rows, hashes);
return Status::OK();
}
-Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows,
+Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows,
const uint64_t* hashes) {
PushNextBatchImp(thread_id, num_rows, hashes);
return Status::OK();
}
template <typename T>
-void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_rows,
+void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int64_t num_rows,
const T* hashes) {
- int num_prtns = 1 << log_num_prtns_;
+ // Partition IDs are calculated using the higher bits of the block ID. This
+ // ensures that each block is contained entirely within a partition and prevents
+ // concurrent access to a block.
+ constexpr int kLogBlocksKeptTogether = 7;
+ constexpr int kPrtnIdBitOffset =
+ BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether;
+
+ const int log_num_prtns_max =
+ std::max(0, build_target_->log_num_blocks() - kLogBlocksKeptTogether);
+ const int log_num_prtns_mod = std::min(log_num_prtns_, log_num_prtns_max);
+ int num_prtns = 1 << log_num_prtns_mod;
+
ThreadLocalState& local_state = thread_local_states_[thread_id];
local_state.partition_ranges.resize(num_prtns + 1);
local_state.partitioned_hashes_64.resize(num_rows);
@@ -373,13 +385,10 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row
PartitionSort::Eval(
num_rows, num_prtns, partition_ranges,
- [hashes, num_prtns](int row_id) {
- constexpr int kLogBlocksKeptTogether = 7;
- constexpr int kPrtnIdBitOffset =
- BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether;
+ [=](int64_t row_id) {
return (hashes[row_id] >> (kPrtnIdBitOffset)) & (num_prtns - 1);
},
- [hashes, partitioned_hashes](int row_id, int output_pos) {
+ [=](int64_t row_id, int output_pos) {
partitioned_hashes[output_pos] = hashes[row_id];
});
@@ -393,7 +402,7 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row
while (num_unprocessed_partitions > 0) {
int locked_prtn_id;
int locked_prtn_id_pos;
- prtn_locks_.AcquirePartitionLock(num_unprocessed_partitions,
+ prtn_locks_.AcquirePartitionLock(thread_id, num_unprocessed_partitions,
unprocessed_partition_ids,
/*limit_retries=*/false, /*max_retries=*/-1,
&locked_prtn_id, &locked_prtn_id_pos);
diff --git a/cpp/src/arrow/compute/exec/bloom_filter.h b/cpp/src/arrow/compute/exec/bloom_filter.h
index b89343373a..06920c6c14 100644
--- a/cpp/src/arrow/compute/exec/bloom_filter.h
+++ b/cpp/src/arrow/compute/exec/bloom_filter.h
@@ -261,49 +261,51 @@ class ARROW_EXPORT BloomFilterBuilder {
int64_t num_rows, int64_t num_batches,
BlockedBloomFilter* build_target) = 0;
virtual int64_t num_tasks() const { return 0; }
- virtual Status PushNextBatch(size_t thread_index, int num_rows,
+ virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
const uint32_t* hashes) = 0;
- virtual Status PushNextBatch(size_t thread_index, int num_rows,
+ virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
const uint64_t* hashes) = 0;
virtual void CleanUp() {}
static std::unique_ptr<BloomFilterBuilder> Make(BloomFilterBuildStrategy strategy);
};
-class BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
+class ARROW_EXPORT BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
public:
Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
int64_t num_rows, int64_t num_batches,
BlockedBloomFilter* build_target) override;
- Status PushNextBatch(size_t /*thread_index*/, int num_rows,
+ Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
const uint32_t* hashes) override;
- Status PushNextBatch(size_t /*thread_index*/, int num_rows,
+ Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
const uint64_t* hashes) override;
private:
template <typename T>
- void PushNextBatchImp(int num_rows, const T* hashes);
+ void PushNextBatchImp(int64_t num_rows, const T* hashes);
int64_t hardware_flags_;
BlockedBloomFilter* build_target_;
};
-class BloomFilterBuilder_Parallel : public BloomFilterBuilder {
+class ARROW_EXPORT BloomFilterBuilder_Parallel : public BloomFilterBuilder {
public:
Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
int64_t num_rows, int64_t num_batches,
BlockedBloomFilter* build_target) override;
- Status PushNextBatch(size_t thread_id, int num_rows, const uint32_t* hashes) override;
+ Status PushNextBatch(size_t thread_id, int64_t num_rows,
+ const uint32_t* hashes) override;
- Status PushNextBatch(size_t thread_id, int num_rows, const uint64_t* hashes) override;
+ Status PushNextBatch(size_t thread_id, int64_t num_rows,
+ const uint64_t* hashes) override;
void CleanUp() override;
private:
template <typename T>
- void PushNextBatchImp(size_t thread_id, int num_rows, const T* hashes);
+ void PushNextBatchImp(size_t thread_id, int64_t num_rows, const T* hashes);
int64_t hardware_flags_;
BlockedBloomFilter* build_target_;
diff --git a/cpp/src/arrow/compute/exec/bloom_filter_test.cc b/cpp/src/arrow/compute/exec/bloom_filter_test.cc
index a3b5cded15..5e1dd0218c 100644
--- a/cpp/src/arrow/compute/exec/bloom_filter_test.cc
+++ b/cpp/src/arrow/compute/exec/bloom_filter_test.cc
@@ -24,6 +24,7 @@
#include <unordered_set>
#include "arrow/compute/exec/bloom_filter.h"
#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/exec/test_util.h"
#include "arrow/compute/exec/util.h"
#include "arrow/util/bitmap_ops.h"
@@ -32,39 +33,106 @@
namespace arrow {
namespace compute {
-Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
- MemoryPool* pool, int64_t num_rows,
- std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
- std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
- BlockedBloomFilter* target) {
- constexpr int batch_size_max = 32 * 1024;
- int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
-
- auto builder = BloomFilterBuilder::Make(strategy);
-
- std::vector<uint32_t> thread_local_hashes32;
- std::vector<uint64_t> thread_local_hashes64;
- thread_local_hashes32.resize(batch_size_max);
- thread_local_hashes64.resize(batch_size_max);
-
- RETURN_NOT_OK(builder->Begin(/*num_threads=*/1, hardware_flags, pool, num_rows,
- bit_util::CeilDiv(num_rows, batch_size_max), target));
-
- for (int64_t i = 0; i < num_batches; ++i) {
+constexpr int kBatchSizeMax = 32 * 1024;
+Status BuildBloomFilter_Serial(
+ std::unique_ptr<BloomFilterBuilder>& builder, int64_t num_rows, int64_t num_batches,
+ std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+ std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+ BlockedBloomFilter* target) {
+ std::vector<uint32_t> hashes32(kBatchSizeMax);
+ std::vector<uint64_t> hashes64(kBatchSizeMax);
+ for (int64_t i = 0; i < num_batches; i++) {
size_t thread_index = 0;
int batch_size = static_cast<int>(
- std::min(num_rows - i * batch_size_max, static_cast<int64_t>(batch_size_max)));
+ std::min(num_rows - i * kBatchSizeMax, static_cast<int64_t>(kBatchSizeMax)));
if (target->NumHashBitsUsed() > 32) {
- uint64_t* hashes = thread_local_hashes64.data();
- get_hash64_impl(i * batch_size_max, batch_size, hashes);
+ uint64_t* hashes = hashes64.data();
+ get_hash64_impl(i * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
} else {
- uint32_t* hashes = thread_local_hashes32.data();
- get_hash32_impl(i * batch_size_max, batch_size, hashes);
+ uint32_t* hashes = hashes32.data();
+ get_hash32_impl(i * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
}
}
+ return Status::OK();
+}
+
+Status BuildBloomFilter_Parallel(
+ std::unique_ptr<BloomFilterBuilder>& builder, size_t num_threads, int64_t num_rows,
+ int64_t num_batches, std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+ std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+ BlockedBloomFilter* target) {
+ ThreadIndexer thread_indexer;
+ std::unique_ptr<TaskScheduler> scheduler = TaskScheduler::Make();
+ std::vector<std::vector<uint32_t>> thread_local_hashes32(num_threads);
+ std::vector<std::vector<uint64_t>> thread_local_hashes64(num_threads);
+ for (std::vector<uint32_t>& h : thread_local_hashes32) h.resize(kBatchSizeMax);
+ for (std::vector<uint64_t>& h : thread_local_hashes64) h.resize(kBatchSizeMax);
+
+ std::condition_variable cv;
+ std::mutex mutex;
+ auto group = scheduler->RegisterTaskGroup(
+ [&](size_t thread_index, int64_t task_id) -> Status {
+ int batch_size = static_cast<int>(std::min(num_rows - task_id * kBatchSizeMax,
+ static_cast<int64_t>(kBatchSizeMax)));
+ if (target->NumHashBitsUsed() > 32) {
+ uint64_t* hashes = thread_local_hashes64[thread_index].data();
+ get_hash64_impl(task_id * kBatchSizeMax, batch_size, hashes);
+ RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+ } else {
+ uint32_t* hashes = thread_local_hashes32[thread_index].data();
+ get_hash32_impl(task_id * kBatchSizeMax, batch_size, hashes);
+ RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+ }
+ return Status::OK();
+ },
+ [&](size_t thread_index) -> Status {
+ std::unique_lock<std::mutex> lk(mutex);
+ cv.notify_all();
+ return Status::OK();
+ });
+ scheduler->RegisterEnd();
+ auto tp = arrow::internal::GetCpuThreadPool();
+ RETURN_NOT_OK(scheduler->StartScheduling(
+ 0,
+ [&](std::function<Status(size_t)> func) -> Status {
+ return tp->Spawn([&, func]() {
+ size_t tid = thread_indexer();
+ ARROW_DCHECK_OK(func(tid));
+ });
+ },
+ static_cast<int>(num_threads), false));
+ {
+ std::unique_lock<std::mutex> lk(mutex);
+ RETURN_NOT_OK(scheduler->StartTaskGroup(0, group, num_batches));
+ cv.wait(lk);
+ }
+ return Status::OK();
+}
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
+ MemoryPool* pool, int64_t num_rows,
+ std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+ std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+ BlockedBloomFilter* target) {
+ int64_t num_batches = bit_util::CeilDiv(num_rows, kBatchSizeMax);
+
+ bool is_serial = strategy == BloomFilterBuildStrategy::SINGLE_THREADED;
+ auto builder = BloomFilterBuilder::Make(strategy);
+ size_t num_threads = is_serial ? 1 : arrow::GetCpuThreadPoolCapacity();
+ RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
+ bit_util::CeilDiv(num_rows, kBatchSizeMax), target));
+
+ if (is_serial)
+ RETURN_NOT_OK(BuildBloomFilter_Serial(builder, num_rows, num_batches,
+ std::move(get_hash32_impl),
+ std::move(get_hash64_impl), target));
+ else
+ RETURN_NOT_OK(BuildBloomFilter_Parallel(builder, num_threads, num_rows, num_batches,
+ std::move(get_hash32_impl),
+ std::move(get_hash64_impl), target));
builder->CleanUp();
return Status::OK();
diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc
index b7a9c7e1bb..bb197f8db8 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.cc
+++ b/cpp/src/arrow/compute/exec/exec_plan.cc
@@ -98,6 +98,9 @@ struct ExecPlanImpl : public ExecPlan {
// producers precede consumers
sorted_nodes_ = TopoSort();
+ for (ExecNode* node : sorted_nodes_) {
+ RETURN_NOT_OK(node->PrepareToProduce());
+ }
std::vector<Future<>> futures;
diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h
index be2f23ad24..dcf271bd36 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.h
+++ b/cpp/src/arrow/compute/exec/exec_plan.h
@@ -212,6 +212,16 @@ class ARROW_EXPORT ExecNode {
// A node with multiple outputs will also need to ensure it is applying backpressure if
// any of its outputs is asking to pause
+ /// \brief Perform any needed initialization
+ ///
+ /// This hook performs any actions in between creation of ExecPlan and the call to
+ /// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes
+ /// that executes this method is undefined, but the calls are made synchronously.
+ ///
+ /// At this point a node can rely on all inputs & outputs (and the input schemas)
+ /// being well defined.
+ virtual Status PrepareToProduce() { return Status::OK(); }
+
/// \brief Start producing
///
/// This must only be called once. If this fails, then other lifecycle
diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc
index 3207bb9698..15a006c81d 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -26,6 +26,7 @@
#include <vector>
#include "arrow/compute/exec/hash_join_dict.h"
+#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/kernels/row_encoder.h"
@@ -39,13 +40,16 @@ class HashJoinBasicImpl : public HashJoinImpl {
struct ThreadLocalState;
public:
+ HashJoinBasicImpl() : num_expected_bloom_filters_(0) {}
+
Status InputReceived(size_t thread_index, int side, ExecBatch batch) override {
if (cancelled_) {
return Status::Cancelled("Hash join cancelled");
}
EVENT(span_, "InputReceived");
- if (QueueBatchIfNeeded(side, batch)) {
+ ARROW_ASSIGN_OR_RAISE(bool queued, QueueBatchIfNeeded(thread_index, side, batch));
+ if (queued) {
return Status::OK();
} else {
ARROW_DCHECK(side == 0);
@@ -62,7 +66,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
bool proceed;
{
std::lock_guard<std::mutex> lock(finished_mutex_);
- proceed = !left_side_finished_ && left_queue_finished_;
+ proceed = !left_side_finished_ && left_queue_probe_finished_;
left_side_finished_ = true;
}
if (proceed) {
@@ -83,50 +87,70 @@ class HashJoinBasicImpl : public HashJoinImpl {
}
Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution,
- size_t num_threads, HashJoinSchema* schema_mgr,
+ size_t /*num_threads*/, HashJoinSchema* schema_mgr,
std::vector<JoinKeyCmp> key_cmp, Expression filter,
OutputBatchCallback output_batch_callback,
FinishedCallback finished_callback,
- TaskScheduler::ScheduleImpl schedule_task_callback) override {
- num_threads = std::max(num_threads, static_cast<size_t>(1));
+ TaskScheduler::ScheduleImpl schedule_task_callback,
+ HashJoinImpl* pushdown_target, std::vector<int> column_map) override {
+ // TODO(ARROW-15732)
+ // Each side of join might have an IO thread being called from.
+ // As of right now, we ignore the `num_threads` argument, so later we will have to
+ // re-add `num_threads_ = num_threads;`
+ num_threads_ = GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1;
START_COMPUTE_SPAN(span_, "HashJoinBasicImpl",
{{"detail", filter.ToString()},
{"join.kind", ToString(join_type)},
- {"join.threads", static_cast<uint32_t>(num_threads)}});
+ {"join.threads", static_cast<uint32_t>(num_threads_)}});
ctx_ = ctx;
join_type_ = join_type;
- num_threads_ = num_threads;
schema_mgr_ = schema_mgr;
key_cmp_ = std::move(key_cmp);
filter_ = std::move(filter);
output_batch_callback_ = std::move(output_batch_callback);
finished_callback_ = std::move(finished_callback);
- // TODO(ARROW-15732)
- // Each side of join might have an IO thread being called from.
- local_states_.resize(GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1);
+ local_states_.resize(num_threads_);
for (size_t i = 0; i < local_states_.size(); ++i) {
local_states_[i].is_initialized = false;
local_states_[i].is_has_match_initialized = false;
}
- dict_probe_.Init(GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1);
+ dict_probe_.Init(num_threads_);
+
+ pushdown_target_ = pushdown_target;
+ column_map_ = std::move(column_map);
+ if (pushdown_target_) pushdown_target_->ExpectBloomFilter();
+
+ right_input_row_count_ = 0;
has_hash_table_ = false;
num_batches_produced_.store(0);
cancelled_ = false;
right_side_finished_ = false;
left_side_finished_ = false;
- left_queue_finished_ = false;
+ bloom_filters_ready_ = false;
+ left_queue_bloom_finished_ = false;
+ left_queue_probe_finished_ = false;
scheduler_ = TaskScheduler::Make();
+ if (pushdown_target_) {
+ bloom_filter_ = arrow::internal::make_unique<BlockedBloomFilter>();
+ bloom_filter_builder_ = BloomFilterBuilder::Make(
+ use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED
+ : BloomFilterBuildStrategy::PARALLEL);
+ }
+
+ RegisterBuildBloomFilter();
RegisterBuildHashTable();
+ RegisterBloomFilterQueuedBatches();
RegisterProbeQueuedBatches();
RegisterScanHashTable();
scheduler_->RegisterEnd();
+
RETURN_NOT_OK(scheduler_->StartScheduling(
0 /*thread index*/, std::move(schedule_task_callback),
- static_cast<int>(2 * num_threads) /*concurrent tasks*/, use_sync_execution));
+ static_cast<int>(2 * num_threads_) /*concurrent tasks*/, use_sync_execution));
return Status::OK();
}
@@ -138,6 +162,32 @@ class HashJoinBasicImpl : public HashJoinImpl {
scheduler_->Abort(std::move(pos_abort_callback));
}
+ // Called by a downstream node after they have constructed a bloom filter
+ // that this node can use to filter inputs.
+ Status PushBloomFilter(size_t thread_index, std::unique_ptr<BlockedBloomFilter> filter,
+ std::vector<int> column_map) override {
+ bool proceed;
+ {
+ std::lock_guard<std::mutex> lock_bloom(bloom_filters_mutex_);
+ pushed_bloom_filters_.emplace_back(std::move(filter));
+ bloom_filter_column_maps_.emplace_back(std::move(column_map));
+ proceed = pushed_bloom_filters_.size() == num_expected_bloom_filters_;
+ ARROW_DCHECK(pushed_bloom_filters_.size() <= num_expected_bloom_filters_);
+ }
+ if (proceed) {
+ size_t num_batches;
+ {
+ std::lock_guard<std::mutex> lock(left_batches_mutex_);
+ num_batches = left_batches_.size();
+ bloom_filters_ready_ = true;
+ }
+ RETURN_NOT_OK(BloomFilterQueuedBatches(thread_index, num_batches));
+ }
+ return Status::OK();
+ }
+
+ void ExpectBloomFilter() override { num_expected_bloom_filters_ += 1; }
+
private:
void InitEncoder(int side, HashJoinProjection projection_handle, RowEncoder* encoder) {
std::vector<ValueDescr> data_types;
@@ -152,7 +202,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
encoder->Clear();
}
- void InitLocalStateIfNeeded(size_t thread_index) {
+ Status InitLocalStateIfNeeded(size_t thread_index) {
DCHECK_LT(thread_index, local_states_.size());
ThreadLocalState& local_state = local_states_[thread_index];
if (!local_state.is_initialized) {
@@ -162,9 +212,11 @@ class HashJoinBasicImpl : public HashJoinImpl {
if (has_payload) {
InitEncoder(0, HashJoinProjection::PAYLOAD, &local_state.exec_batch_payloads);
}
-
+ RETURN_NOT_OK(local_state.temp_stack.Init(
+ ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength * sizeof(uint32_t)));
local_state.is_initialized = true;
}
+ return Status::OK();
}
Status EncodeBatch(int side, HashJoinProjection projection_handle, RowEncoder* encoder,
@@ -432,7 +484,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
(schema_mgr_->proj_maps[1].num_cols(HashJoinProjection::PAYLOAD) > 0);
ThreadLocalState& local_state = local_states_[thread_index];
- InitLocalStateIfNeeded(thread_index);
+ RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
ExecBatch left_key;
ExecBatch left_payload;
@@ -544,7 +596,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
Status ProbeBatch(size_t thread_index, const ExecBatch& batch) {
ThreadLocalState& local_state = local_states_[thread_index];
- InitLocalStateIfNeeded(thread_index);
+ RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
local_state.exec_batch_keys.Clear();
@@ -604,8 +656,84 @@ class HashJoinBasicImpl : public HashJoinImpl {
return Status::OK();
}
+ Status ApplyBloomFiltersToBatch(size_t thread_index, ExecBatch& batch) {
+ if (batch.length == 0) return Status::OK();
+ int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length);
+ std::vector<uint8_t> selected(bit_vector_bytes);
+ std::vector<uint32_t> hashes(batch.length);
+ std::vector<uint8_t> bv(bit_vector_bytes);
+
+ RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
+ // Start with full selection for the current batch
+ memset(selected.data(), 0xff, bit_vector_bytes);
+ for (size_t ifilter = 0; ifilter < num_expected_bloom_filters_; ifilter++) {
+ std::vector<Datum> keys(bloom_filter_column_maps_[ifilter].size());
+ for (size_t i = 0; i < keys.size(); i++) {
+ int input_idx = bloom_filter_column_maps_[ifilter][i];
+ keys[i] = batch[input_idx];
+ }
+ ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys)));
+ RETURN_NOT_OK(Hashing32::HashBatch(
+ key_batch, hashes.data(), ctx_->cpu_info()->hardware_flags(),
+ &local_states_[thread_index].temp_stack, 0, key_batch.length));
+
+ pushed_bloom_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(),
+ key_batch.length, hashes.data(), bv.data());
+ arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, key_batch.length, 0,
+ selected.data());
+ }
+ auto selected_buffer =
+ arrow::internal::make_unique<Buffer>(selected.data(), bit_vector_bytes);
+ ArrayData selected_arraydata(boolean(), batch.length,
+ {nullptr, std::move(selected_buffer)});
+ Datum selected_datum(selected_arraydata);
+ FilterOptions options;
+ size_t first_nonscalar = batch.values.size();
+ for (size_t i = 0; i < batch.values.size(); i++) {
+ if (!batch.values[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(batch.values[i],
+ Filter(batch.values[i], selected_datum, options, ctx_));
+ first_nonscalar = std::min(first_nonscalar, i);
+ ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length());
+ }
+ }
+ // If they're all Scalar, then the length of the batch is the number of set bits
+ if (first_nonscalar == batch.values.size())
+ batch.length = arrow::internal::CountSetBits(selected.data(), 0, batch.length);
+ else
+ batch.length = batch.values[first_nonscalar].length();
+ return Status::OK();
+ }
+
int64_t BuildHashTable_num_tasks() { return 1; }
+ Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id) {
+ const ExecBatch& input_batch = right_batches_[task_id];
+ SchemaProjectionMap key_to_in =
+ schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT);
+ std::vector<Datum> key_columns(key_to_in.num_cols);
+ for (size_t i = 0; i < key_columns.size(); i++) {
+ int input_idx = key_to_in.get(static_cast<int>(i));
+ key_columns[i] = input_batch[input_idx];
+ }
+ ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns)));
+
+ RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
+ ThreadLocalState& tls = local_states_[thread_index];
+ util::TempVectorHolder<uint32_t> hash_holder(&tls.temp_stack,
+ util::MiniBatch::kMiniBatchLength);
+ uint32_t* hashes = hash_holder.mutable_data();
+ for (int64_t i = 0; i < key_batch.length; i += util::MiniBatch::kMiniBatchLength) {
+ int64_t length = std::min(static_cast<int64_t>(key_batch.length - i),
+ static_cast<int64_t>(util::MiniBatch::kMiniBatchLength));
+ RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes,
+ ctx_->cpu_info()->hardware_flags(),
+ &tls.temp_stack, i, length));
+ RETURN_NOT_OK(bloom_filter_builder_->PushNextBatch(thread_index, length, hashes));
+ }
+ return Status::OK();
+ }
+
Status BuildHashTable_exec_task(size_t thread_index, int64_t /*task_id*/) {
const std::vector<ExecBatch>& batches = right_batches_;
if (batches.empty()) {
@@ -651,23 +779,45 @@ class HashJoinBasicImpl : public HashJoinImpl {
return Status::OK();
}
+ Status BuildBloomFilter_on_finished(size_t thread_index) {
+ if (cancelled_) return Status::Cancelled("Hash join cancelled");
+ ARROW_DCHECK(pushdown_target_);
+ RETURN_NOT_OK(pushdown_target_->PushBloomFilter(
+ thread_index, std::move(bloom_filter_), std::move(column_map_)));
+ return BuildHashTable(thread_index);
+ }
+
Status BuildHashTable_on_finished(size_t thread_index) {
if (cancelled_) {
return Status::Cancelled("Hash join cancelled");
}
+ right_batches_.clear();
+
+ bool proceed;
{
std::lock_guard<std::mutex> lock(left_batches_mutex_);
+ std::lock_guard<std::mutex> lock_finish(finished_mutex_);
+ left_queue_bloom_finished_ =
+ left_queue_bloom_finished_ || num_expected_bloom_filters_ == 0;
+ proceed = !has_hash_table_ && left_queue_bloom_finished_;
has_hash_table_ = true;
}
-
- right_batches_.clear();
-
- RETURN_NOT_OK(ProbeQueuedBatches(thread_index));
+ if (proceed) RETURN_NOT_OK(ProbeQueuedBatches(thread_index));
return Status::OK();
}
+ void RegisterBuildBloomFilter() {
+ task_group_bloom_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) -> Status {
+ return BuildBloomFilter_exec_task(thread_index, task_id);
+ },
+ [this](size_t thread_index) -> Status {
+ return BuildBloomFilter_on_finished(thread_index);
+ });
+ }
+
void RegisterBuildHashTable() {
task_group_build_ = scheduler_->RegisterTaskGroup(
[this](size_t thread_index, int64_t task_id) -> Status {
@@ -678,11 +828,63 @@ class HashJoinBasicImpl : public HashJoinImpl {
});
}
+ Status BuildBloomFilter(size_t thread_index) {
+ RETURN_NOT_OK(bloom_filter_builder_->Begin(
+ num_threads_, ctx_->cpu_info()->hardware_flags(), ctx_->memory_pool(),
+ right_input_row_count_, right_batches_.size(), bloom_filter_.get()));
+
+ return scheduler_->StartTaskGroup(thread_index, task_group_bloom_,
+ right_batches_.size());
+ }
+
Status BuildHashTable(size_t thread_index) {
return scheduler_->StartTaskGroup(thread_index, task_group_build_,
BuildHashTable_num_tasks());
}
+ Status BloomFilterQueuedBatches_exec_task(size_t thread_index, int64_t task_id) {
+ if (cancelled_) return Status::Cancelled("Hash join cancelled");
+ ExecBatch batch;
+ {
+ std::lock_guard<std::mutex> lock(left_batches_mutex_);
+ batch = std::move(left_batches_[task_id]);
+ ARROW_DCHECK(!batch.values.empty());
+ }
+ RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
+ {
+ std::lock_guard<std::mutex> lock(left_batches_mutex_);
+ left_batches_[task_id] = std::move(batch);
+ }
+ return Status::OK();
+ }
+
+ Status BloomFilterQueuedBatches_on_finished(size_t thread_index) {
+ if (cancelled_) return Status::Cancelled("Hash join cancelled");
+ bool proceed;
+ {
+ std::lock_guard<std::mutex> lock(finished_mutex_);
+ proceed = !left_queue_bloom_finished_ && has_hash_table_;
+ left_queue_bloom_finished_ = true;
+ }
+ if (proceed) return ProbeQueuedBatches(thread_index);
+ return Status::OK();
+ }
+
+ void RegisterBloomFilterQueuedBatches() {
+ task_group_bloom_filter_queued_ = scheduler_->RegisterTaskGroup(
+ [this](size_t thread_index, int64_t task_id) -> Status {
+ return BloomFilterQueuedBatches_exec_task(thread_index, task_id);
+ },
+ [this](size_t thread_index) -> Status {
+ return BloomFilterQueuedBatches_on_finished(thread_index);
+ });
+ }
+
+ Status BloomFilterQueuedBatches(size_t thread_index, size_t num_batches) {
+ return scheduler_->StartTaskGroup(thread_index, task_group_bloom_filter_queued_,
+ num_batches);
+ }
+
int64_t ProbeQueuedBatches_num_tasks() {
return static_cast<int64_t>(left_batches_.size());
}
@@ -704,8 +906,9 @@ class HashJoinBasicImpl : public HashJoinImpl {
bool proceed;
{
std::lock_guard<std::mutex> lock(finished_mutex_);
- proceed = left_side_finished_ && !left_queue_finished_;
- left_queue_finished_ = true;
+ ARROW_DCHECK(left_queue_bloom_finished_);
+ proceed = left_side_finished_ && !left_queue_probe_finished_;
+ left_queue_probe_finished_ = true;
}
if (proceed) {
RETURN_NOT_OK(OnLeftSideAndQueueFinished(thread_index));
@@ -751,7 +954,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
hash_table_scan_unit_ * (task_id + 1)));
ThreadLocalState& local_state = local_states_[thread_index];
- InitLocalStateIfNeeded(thread_index);
+ RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
std::vector<int32_t>& id_left = local_state.no_match;
std::vector<int32_t>& id_right = local_state.match;
@@ -809,22 +1012,40 @@ class HashJoinBasicImpl : public HashJoinImpl {
ScanHashTable_num_tasks());
}
- bool QueueBatchIfNeeded(int side, ExecBatch batch) {
+ Result<bool> QueueBatchIfNeeded(size_t thread_index, int side, ExecBatch& batch) {
if (side == 0) {
- std::lock_guard<std::mutex> lock(left_batches_mutex_);
- if (has_hash_table_) {
- return false;
+ // We don't want to do the filtering while holding the lock, since that can get
+ // expensive.
+ bool needs_filtering;
+ {
+ std::lock_guard<std::mutex> lock(left_batches_mutex_);
+ bloom_filters_ready_ = bloom_filters_ready_ || num_expected_bloom_filters_ == 0;
+ needs_filtering = bloom_filters_ready_ && num_expected_bloom_filters_ != 0;
}
- left_batches_.emplace_back(std::move(batch));
- return true;
+ if (needs_filtering) RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
+
+ bool queued;
+ {
+ std::lock_guard<std::mutex> lock(left_batches_mutex_);
+ queued = !bloom_filters_ready_ || !has_hash_table_;
+ if (queued) left_batches_.emplace_back(std::move(batch));
+ }
+ return queued;
} else {
std::lock_guard<std::mutex> lock(right_batches_mutex_);
+ right_input_row_count_ += batch.length;
right_batches_.emplace_back(std::move(batch));
return true;
}
}
- Status OnRightSideFinished(size_t thread_index) { return BuildHashTable(thread_index); }
+ Status OnRightSideFinished(size_t thread_index) {
+ if (pushdown_target_ == nullptr) {
+ return BuildHashTable(thread_index);
+ } else {
+ return BuildBloomFilter(thread_index);
+ }
+ }
Status OnLeftSideAndQueueFinished(size_t thread_index) {
return ScanHashTable(thread_index);
@@ -875,7 +1096,9 @@ class HashJoinBasicImpl : public HashJoinImpl {
std::vector<JoinKeyCmp> key_cmp_;
Expression filter_;
std::unique_ptr<TaskScheduler> scheduler_;
+ int task_group_bloom_;
int task_group_build_;
+ int task_group_bloom_filter_queued_;
int task_group_queued_;
int task_group_scan_;
@@ -896,6 +1119,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
std::vector<int32_t> match_right;
bool is_has_match_initialized;
std::vector<uint8_t> has_match;
+ util::TempVectorStack temp_stack;
};
std::vector<ThreadLocalState> local_states_;
@@ -916,15 +1140,29 @@ class HashJoinBasicImpl : public HashJoinImpl {
bool has_hash_table_;
std::mutex left_batches_mutex_;
+ size_t right_input_row_count_; // Sum of the lengths of ExecBatches in right_batches_
std::vector<ExecBatch> right_batches_;
std::mutex right_batches_mutex_;
+ // Bloom filter stuff
+ //
+ std::unique_ptr<BloomFilterBuilder> bloom_filter_builder_;
+ std::unique_ptr<BlockedBloomFilter> bloom_filter_;
+ std::vector<int> column_map_;
+ std::vector<std::unique_ptr<BlockedBloomFilter>> pushed_bloom_filters_;
+ std::vector<std::vector<int>> bloom_filter_column_maps_;
+ std::mutex bloom_filters_mutex_;
+ size_t num_expected_bloom_filters_;
+ HashJoinImpl* pushdown_target_;
+
std::atomic<int64_t> num_batches_produced_;
bool cancelled_;
+ bool bloom_filters_ready_;
bool right_side_finished_;
bool left_side_finished_;
- bool left_queue_finished_;
+ bool left_queue_bloom_finished_;
+ bool left_queue_probe_finished_;
std::mutex finished_mutex_;
};
diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h
index 12455f0c6d..9739cbc643 100644
--- a/cpp/src/arrow/compute/exec/hash_join.h
+++ b/cpp/src/arrow/compute/exec/hash_join.h
@@ -21,6 +21,7 @@
#include <memory>
#include <vector>
+#include "arrow/compute/exec/bloom_filter.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/schema_util.h"
#include "arrow/compute/exec/task_util.h"
@@ -107,7 +108,12 @@ class HashJoinImpl {
std::vector<JoinKeyCmp> key_cmp, Expression filter,
OutputBatchCallback output_batch_callback,
FinishedCallback finished_callback,
- TaskScheduler::ScheduleImpl schedule_task_callback) = 0;
+ TaskScheduler::ScheduleImpl schedule_task_callback,
+ HashJoinImpl* pushdown_target, std::vector<int> column_map) = 0;
+ virtual void ExpectBloomFilter() = 0;
+ virtual Status PushBloomFilter(size_t thread_index,
+ std::unique_ptr<BlockedBloomFilter> filter,
+ std::vector<int> column_map) = 0;
virtual Status InputReceived(size_t thread_index, int side, ExecBatch batch) = 0;
virtual Status InputFinished(size_t thread_index, int side) = 0;
virtual void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback) = 0;
diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
index 3d4271b6cb..8d8be7f904 100644
--- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
@@ -36,6 +36,7 @@
namespace arrow {
namespace compute {
struct BenchmarkSettings {
+ bool bloom_filter = false;
int num_threads = 1;
JoinType join_type = JoinType::INNER;
int batch_size = 1024;
@@ -57,6 +58,7 @@ class JoinBenchmark {
SchemaBuilder l_schema_builder, r_schema_builder;
std::vector<FieldRef> left_keys, right_keys;
+ std::vector<JoinKeyCmp> key_cmp;
for (size_t i = 0; i < settings.key_types.size(); i++) {
std::string l_name = "lk" + std::to_string(i);
std::string r_name = "rk" + std::to_string(i);
@@ -93,6 +95,7 @@ class JoinBenchmark {
left_keys.push_back(FieldRef(l_name));
right_keys.push_back(FieldRef(r_name));
+ key_cmp.push_back(JoinKeyCmp::EQ);
}
for (size_t i = 0; i < settings.build_payload_types.size(); i++) {
@@ -126,6 +129,22 @@ class JoinBenchmark {
join_ = *HashJoinImpl::MakeBasic();
+ HashJoinImpl* bloom_filter_pushdown_target = nullptr;
+ std::vector<int> key_input_map;
+
+ bool bloom_filter_does_not_apply_to_join =
+ settings.join_type == JoinType::LEFT_ANTI ||
+ settings.join_type == JoinType::LEFT_OUTER ||
+ settings.join_type == JoinType::FULL_OUTER;
+ if (settings.bloom_filter && !bloom_filter_does_not_apply_to_join) {
+ bloom_filter_pushdown_target = join_.get();
+ SchemaProjectionMap probe_key_to_input = schema_mgr_->proj_maps[0].map(
+ HashJoinProjection::KEY, HashJoinProjection::INPUT);
+ int num_keys = probe_key_to_input.num_cols;
+ for (int i = 0; i < num_keys; i++)
+ key_input_map.push_back(probe_key_to_input.get(i));
+ }
+
omp_set_num_threads(settings.num_threads);
auto schedule_callback = [](std::function<Status(size_t)> func) -> Status {
#pragma omp task
@@ -135,8 +154,9 @@ class JoinBenchmark {
DCHECK_OK(join_->Init(
ctx_.get(), settings.join_type, !is_parallel, settings.num_threads,
- schema_mgr_.get(), {JoinKeyCmp::EQ}, std::move(filter), [](ExecBatch) {},
- [](int64_t x) {}, schedule_callback));
+ schema_mgr_.get(), std::move(key_cmp), std::move(filter), [](ExecBatch) {},
+ [](int64_t x) {}, schedule_callback, bloom_filter_pushdown_target,
+ std::move(key_input_map)));
}
void RunJoin() {
@@ -268,6 +288,16 @@ static void BM_HashJoinBasic_NullPercentage(benchmark::State& st) {
HashJoinBasicBenchmarkImpl(st, settings);
}
+
+static void BM_HashJoinBasic_BloomFilter(benchmark::State& st, bool bloom_filter) {
+ BenchmarkSettings settings;
+ settings.bloom_filter = bloom_filter;
+ settings.selectivity = static_cast<double>(st.range(0)) / 100.0;
+ settings.num_build_batches = static_cast<int>(st.range(1));
+ settings.num_probe_batches = settings.num_build_batches;
+
+ HashJoinBasicBenchmarkImpl(st, settings);
+}
#endif
std::vector<int64_t> hashtable_krows = benchmark::CreateRange(1, 4096, 8);
@@ -395,6 +425,16 @@ BENCHMARK(BM_HashJoinBasic_BuildParallelism)
BENCHMARK(BM_HashJoinBasic_NullPercentage)
->ArgNames({"Null Percentage"})
->DenseRange(0, 100, 10);
+
+std::vector<std::string> bloomfilter_argnames = {"Selectivity", "HashTable krows"};
+std::vector<std::vector<int64_t>> bloomfilter_args = {
+ benchmark::CreateDenseRange(0, 100, 10), hashtable_krows};
+BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "Bloom Filter", true)
+ ->ArgNames(bloomfilter_argnames)
+ ->ArgsProduct(selectivity_args);
+BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "No Bloom Filter", false)
+ ->ArgNames(bloomfilter_argnames)
+ ->ArgsProduct(selectivity_args);
#else
BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()})
diff --git a/cpp/src/arrow/compute/exec/hash_join_graphs.py b/cpp/src/arrow/compute/exec/hash_join_graphs.py
index ff7ab63187..16b4c3e5fd 100755
--- a/cpp/src/arrow/compute/exec/hash_join_graphs.py
+++ b/cpp/src/arrow/compute/exec/hash_join_graphs.py
@@ -148,7 +148,7 @@ def plot_3d(test, sorted_argnames):
num_cols = int(math.ceil(num_graphs / num_rows))
graphs = set(test.args[sorted_argnames[0]])
- for j, graph in enumerate(sorted(graphs, key=string_numeric_sort_key)):
+ for j, graph in enumerate(sorted(graphs, key=try_as_numeric)):
ax = plt.subplot(num_rows, num_cols, j + 1)
filtered_test = Test()
indices = range(len(test.times))
diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc
index 0282e387c4..d8d729dd1a 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -16,6 +16,7 @@
// under the License.
#include <unordered_set>
+#include <utility>
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/hash_join.h"
@@ -466,7 +467,8 @@ class HashJoinNode : public ExecNode {
key_cmp_(join_options.key_cmp),
filter_(std::move(filter)),
schema_mgr_(std::move(schema_mgr)),
- impl_(std::move(impl)) {
+ impl_(std::move(impl)),
+ disable_bloom_filter_(join_options.disable_bloom_filter) {
complete_.store(false);
}
@@ -571,24 +573,138 @@ class HashJoinNode : public ExecNode {
}
}
- Status StartProducing() override {
- START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
- {{"node.label", label()},
- {"node.detail", ToString()},
- {"node.kind", kind_name()}});
- END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this);
+ // The Bloom filter is built on the build side of some upstream join. For a join to
+ // evaluate the Bloom filter on its input columns, it has to rearrange its input columns
+ // to match the column order of the Bloom filter.
+ //
+ // The first part of the pair is the HashJoin to actually perform the pushdown into.
+ // The second part is a mapping such that column_map[i] is the index of key i in
+ // the first part's input.
+ // If we should disable Bloom filter, returns nullptr and an empty vector, and sets
+ // the disable_bloom_filter_ flag.
+ std::pair<HashJoinImpl*, std::vector<int>> GetPushdownTarget() {
+#if !ARROW_LITTLE_ENDIAN
+ // TODO (ARROW-16591): Debug bloom_filter.cc to enable on Big endian. It probably just
+ // needs a few byte swaps in the proper spots.
+ disable_bloom_filter_ = true;
+ return {nullptr, {}};
+#else
+ // A build-side Bloom filter tells us if a row is definitely not in the build side.
+ // This allows us to early-eliminate rows or early-accept rows depending on the type
+ // of join. Left Outer Join and Full Outer Join output all rows, so a build-side Bloom
+ // filter would only allow us to early-output. Left Antijoin outputs only if there is
+ // no match, so again early output. We don't implement early output for now, so we
+ // must disallow these types of joins.
+ bool bloom_filter_does_not_apply_to_join = join_type_ == JoinType::LEFT_ANTI ||
+ join_type_ == JoinType::LEFT_OUTER ||
+ join_type_ == JoinType::FULL_OUTER;
+ disable_bloom_filter_ = disable_bloom_filter_ || bloom_filter_does_not_apply_to_join;
+
+ SchemaProjectionMap build_keys_to_input =
+ schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT);
+ // Bloom filter currently doesn't support dictionaries.
+ for (int i = 0; i < build_keys_to_input.num_cols; i++) {
+ int idx = build_keys_to_input.get(i);
+ bool is_dict =
+ inputs_[1]->output_schema()->field(idx)->type()->id() == Type::DICTIONARY;
+ if (is_dict) {
+ disable_bloom_filter_ = true;
+ break;
+ }
+ }
+
+ bool all_comparisons_is = true;
+ for (JoinKeyCmp cmp : key_cmp_) all_comparisons_is &= (cmp == JoinKeyCmp::IS);
+
+ if ((join_type_ == JoinType::RIGHT_OUTER || join_type_ == JoinType::FULL_OUTER) &&
+ all_comparisons_is)
+ disable_bloom_filter_ = true;
+
+ if (disable_bloom_filter_) return {nullptr, {}};
+
+ // We currently only push Bloom filters on the probe side, and only if that input is
+ // also a join.
+ SchemaProjectionMap probe_key_to_input =
+ schema_mgr_->proj_maps[0].map(HashJoinProjection::KEY, HashJoinProjection::INPUT);
+ int num_keys = probe_key_to_input.num_cols;
+
+ // A mapping such that bloom_to_target[i] is the index of key i in the pushdown
+ // target's input
+ std::vector<int> bloom_to_target(num_keys);
+ HashJoinNode* pushdown_target = this;
+ for (int i = 0; i < num_keys; i++) bloom_to_target[i] = probe_key_to_input.get(i);
+
+ for (ExecNode* candidate = inputs()[0]; candidate->kind_name() == this->kind_name();
+ candidate = candidate->inputs()[0]) {
+ auto* candidate_as_join = checked_cast<HashJoinNode*>(candidate);
+ SchemaProjectionMap candidate_output_to_input =
+ candidate_as_join->schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT,
+ HashJoinProjection::INPUT);
+
+ // Check if any of the keys are missing, if they are, break
+ bool break_outer = false;
+ for (int i = 0; i < num_keys; i++) {
+ // Since all of the probe side columns are before the build side columns,
+ // if the index of an output is greater than the number of probe-side input
+ // columns, it must have come from the candidate's build side.
+ if (bloom_to_target[i] >= candidate_output_to_input.num_cols) {
+ break_outer = true;
+ break;
+ }
+ int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]);
+ // The output column has to have come from somewhere...
+ ARROW_DCHECK_NE(candidate_input_idx, schema_mgr_->kMissingField());
+ }
+ if (break_outer) break;
+
+ // The Bloom filter will filter out nulls, which may cause a Right/Full Outer Join
+ // to incorrectly output some rows with nulls padding the probe-side rows. This may
+ // cause a row with all null keys to be emitted. This is normally not an issue
+ // with EQ, but if all comparisons are IS (i.e. all-null is accepted), this could
+ // produce incorrect rows.
+ bool can_produce_build_side_nulls =
+ candidate_as_join->join_type_ == JoinType::RIGHT_OUTER ||
+ candidate_as_join->join_type_ == JoinType::FULL_OUTER;
+
+ if (all_comparisons_is || can_produce_build_side_nulls) break;
+
+ // All keys are present, we can update the mapping
+ for (int i = 0; i < num_keys; i++) {
+ int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]);
+ bloom_to_target[i] = candidate_input_idx;
+ }
+ pushdown_target = candidate_as_join;
+ }
+ return std::make_pair(pushdown_target->impl_.get(), std::move(bloom_to_target));
+#endif // ARROW_LITTLE_ENDIAN
+ }
+ Status PrepareToProduce() override {
bool use_sync_execution = !(plan_->exec_context()->executor());
size_t num_threads = use_sync_execution ? 1 : thread_indexer_.Capacity();
- RETURN_NOT_OK(impl_->Init(
+ HashJoinImpl* pushdown_target = nullptr;
+ std::vector<int> column_map;
+ std::tie(pushdown_target, column_map) = GetPushdownTarget();
+
+ return impl_->Init(
plan_->exec_context(), join_type_, use_sync_execution, num_threads,
schema_mgr_.get(), key_cmp_, filter_,
[this](ExecBatch batch) { this->OutputBatchCallback(batch); },
[this](int64_t total_num_batches) { this->FinishedCallback(total_num_batches); },
[this](std::function<Status(size_t)> func) -> Status {
return this->ScheduleTaskCallback(std::move(func));
- }));
+ },
+ pushdown_target, std::move(column_map));
+ }
+
+ Status StartProducing() override {
+ START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
+ {{"node.label", label()},
+ {"node.detail", ToString()},
+ {"node.kind", kind_name()}});
+ END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this);
+
return Status::OK();
}
@@ -662,6 +778,7 @@ class HashJoinNode : public ExecNode {
std::unique_ptr<HashJoinSchema> schema_mgr_;
std::unique_ptr<HashJoinImpl> impl_;
util::AsyncTaskGroup task_group_;
+ bool disable_bloom_filter_;
};
namespace internal {
diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
index 99a8fc01e0..f8da71c7b5 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
@@ -1000,11 +1000,14 @@ TEST(HashJoin, Random) {
Random64Bit rng(42);
#if defined(THREAD_SANITIZER) || defined(ARROW_VALGRIND)
const int num_tests = 15;
+#elif defined(ADDRESS_SANITIZER)
+ const int num_tests = 25;
#else
const int num_tests = 100;
#endif
for (int test_id = 0; test_id < num_tests; ++test_id) {
bool parallel = (rng.from_range(0, 1) == 1);
+ bool disable_bloom_filter = (rng.from_range(0, 1) == 1);
auto exec_ctx = arrow::internal::make_unique<ExecContext>(
default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
@@ -1110,7 +1113,8 @@ TEST(HashJoin, Random) {
}
ARROW_SCOPED_TRACE(join_type_name, " ", key_cmp_str,
- " parallel = ", (parallel ? "true" : "false"));
+ " parallel = ", (parallel ? "true" : "false"),
+ " bloom_filter = ", (disable_bloom_filter ? "false" : "true"));
// Run reference join implementation
std::vector<bool> null_in_key_vectors[2];
@@ -1130,7 +1134,7 @@ TEST(HashJoin, Random) {
// Turn the last key comparison into a residual filter expression
Expression filter = literal(true);
- if (key_cmp.size() > 1 && rng.from_range(0, 1) == 0) {
+ if (key_cmp.size() > 1 && rng.from_range(0, 4) == 0) {
for (size_t i = 0; i < key_cmp.size(); i++) {
FieldRef left = key_fields[0][i];
FieldRef right = key_fields[1][i];
@@ -1158,6 +1162,7 @@ TEST(HashJoin, Random) {
HashJoinNodeOptions join_options{
join_type, key_fields[0], key_fields[1], output_fields[0],
output_fields[1], key_cmp, filter};
+ join_options.disable_bloom_filter = disable_bloom_filter;
std::vector<std::shared_ptr<Field>> output_schema_fields;
for (int i = 0; i < 2; ++i) {
for (size_t col = 0; col < output_fields[i].size(); ++col) {
@@ -1168,6 +1173,7 @@ TEST(HashJoin, Random) {
}
std::shared_ptr<Schema> output_schema =
std::make_shared<Schema>(std::move(output_schema_fields));
+
ASSERT_OK_AND_ASSIGN(
auto batches, HashJoinWithExecPlan(
rng, parallel, join_options, output_schema,
@@ -1901,5 +1907,150 @@ TEST(HashJoin, TrivialResidualFilter) {
}
}
+HashJoinNodeOptions GenerateHashJoinNodeOptions(Random64Bit& rng, int num_left_cols,
+ int num_right_cols) {
+ HashJoinNodeOptions opts;
+ opts.join_type = static_cast<JoinType>(rng.from_range(0, 7));
+ bool is_left_join = opts.join_type == JoinType::LEFT_SEMI ||
+ opts.join_type == JoinType::LEFT_ANTI ||
+ opts.join_type == JoinType::LEFT_OUTER;
+ bool is_right_join = opts.join_type == JoinType::RIGHT_SEMI ||
+ opts.join_type == JoinType::RIGHT_ANTI ||
+ opts.join_type == JoinType::RIGHT_OUTER;
+
+ int num_keys = rng.from_range(1, std::min(num_left_cols, num_right_cols));
+ for (int i = 0; i < num_left_cols; i++) {
+ bool is_out = rng.from_range(0, 2) != 2;
+ if (is_out && !is_right_join) opts.left_output.push_back(FieldRef(i));
+ }
+ for (int i = 0; i < num_right_cols; i++) {
+ bool is_out = rng.from_range(0, 2) == 2;
+ if (is_out && !is_left_join) opts.right_output.push_back(FieldRef(i));
+ }
+ // We need at least one output
+ if (opts.right_output.empty() && opts.left_output.empty()) {
+ if (is_left_join) {
+ int col = rng.from_range(0, num_left_cols - 1);
+ opts.left_output.push_back(FieldRef(col));
+ } else if (is_right_join) {
+ int col = rng.from_range(0, num_right_cols - 1);
+ opts.right_output.push_back(FieldRef(col));
+ } else {
+ if (rng.from_range(0, 1) == 0) {
+ int col = rng.from_range(0, num_left_cols - 1);
+ opts.left_output.push_back(FieldRef(col));
+ } else {
+ int col = rng.from_range(0, num_right_cols - 1);
+ opts.right_output.push_back(FieldRef(col));
+ }
+ }
+ }
+
+ for (int i = 0; i < num_keys; i++) {
+ int left = rng.from_range(0, num_left_cols - 1);
+ int right = rng.from_range(0, num_right_cols - 1);
+ bool is_or_eq = rng.from_range(0, 1) == 0;
+ opts.left_keys.push_back(FieldRef(left));
+ opts.right_keys.push_back(FieldRef(right));
+ opts.key_cmp.push_back(is_or_eq ? JoinKeyCmp::IS : JoinKeyCmp::EQ);
+ }
+ return opts;
+}
+
+void TestSingleChainOfHashJoins(Random64Bit& rng) {
+ int num_joins = rng.from_range(2, 5);
+ std::vector<HashJoinNodeOptions> opts;
+ int num_left_cols = rng.from_range(1, 8);
+ int num_right_cols = rng.from_range(1, 8);
+ HashJoinNodeOptions first_opt =
+ GenerateHashJoinNodeOptions(rng, num_left_cols, num_right_cols);
+ opts.push_back(std::move(first_opt));
+
+ std::unordered_map<std::string, std::string> metadata_map;
+ metadata_map["min"] = "0";
+ metadata_map["max"] = "10";
+ auto metadata = key_value_metadata(metadata_map);
+ std::vector<std::shared_ptr<Field>> left_fields;
+ for (int i = 0; i < num_left_cols; i++)
+ left_fields.push_back(field(std::string("l") + std::to_string(i), int32(), metadata));
+ std::vector<std::shared_ptr<Field>> first_right_fields;
+ for (int i = 0; i < num_right_cols; i++)
+ first_right_fields.push_back(
+ field(std::string("r_0_") + std::to_string(i), int32(), metadata));
+
+ BatchesWithSchema input_left = MakeRandomBatches(schema(std::move(left_fields)));
+ std::vector<BatchesWithSchema> input_right;
+ input_right.push_back(MakeRandomBatches(schema(std::move(first_right_fields))));
+
+ for (int i = 1; i < num_joins; i++) {
+ int num_right_cols = rng.from_range(1, 8);
+ HashJoinNodeOptions opt =
+ GenerateHashJoinNodeOptions(rng,
+ static_cast<int>(opts[i - 1].left_output.size() +
+ opts[i - 1].right_output.size()),
+ num_right_cols);
+ opts.push_back(std::move(opt));
+
+ std::vector<std::shared_ptr<Field>> right_fields;
+ for (int j = 0; j < num_right_cols; j++)
+ right_fields.push_back(
+ field(std::string("r_") + std::to_string(i) + "_" + std::to_string(j), int32(),
+ metadata));
+ BatchesWithSchema input = MakeRandomBatches(schema(std::move(right_fields)));
+ input_right.push_back(std::move(input));
+ }
+
+ std::vector<ExecBatch> reference;
+ for (bool bloom_filters : {false, true}) {
+ bool kParallel = true;
+ ARROW_SCOPED_TRACE(bloom_filters ? "bloom filtered" : "unfiltered");
+ auto exec_ctx = arrow::internal::make_unique<ExecContext>(
+ default_memory_pool(), kParallel ? arrow::internal::GetCpuThreadPool() : nullptr);
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
+
+ ExecNode* left_source;
+ ASSERT_OK_AND_ASSIGN(
+ left_source,
+ MakeExecNode("source", plan.get(), {},
+ SourceNodeOptions{input_left.schema,
+ input_left.gen(kParallel, /*slow=*/false)}));
+ std::vector<ExecNode*> joins(num_joins);
+ for (int i = 0; i < num_joins; i++) {
+ opts[i].disable_bloom_filter = !bloom_filters;
+ ExecNode* right_source;
+ ASSERT_OK_AND_ASSIGN(
+ right_source,
+ MakeExecNode("source", plan.get(), {},
+ SourceNodeOptions{input_right[i].schema,
+ input_right[i].gen(kParallel, /*slow=*/false)}));
+
+ std::vector<ExecNode*> inputs;
+ if (i == 0)
+ inputs = {left_source, right_source};
+ else
+ inputs = {joins[i - 1], right_source};
+ ASSERT_OK_AND_ASSIGN(joins[i],
+ MakeExecNode("hashjoin", plan.get(), inputs, opts[i]));
+ }
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+ ASSERT_OK(
+ MakeExecNode("sink", plan.get(), {joins.back()}, SinkNodeOptions{&sink_gen}));
+ ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
+ if (!bloom_filters)
+ reference = std::move(result);
+ else
+ AssertExecBatchesEqual(joins.back()->output_schema(), reference, result);
+ }
+}
+
+TEST(HashJoin, ChainedIntegerHashJoins) {
+ Random64Bit rng(42);
+ int num_tests = 30;
+ for (int i = 0; i < num_tests; i++) {
+ ARROW_SCOPED_TRACE("Test ", std::to_string(i));
+ TestSingleChainOfHashJoins(rng);
+ }
+}
+
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/key_encode.h b/cpp/src/arrow/compute/exec/key_encode.h
index f9de31d9c2..58d8fb233f 100644
--- a/cpp/src/arrow/compute/exec/key_encode.h
+++ b/cpp/src/arrow/compute/exec/key_encode.h
@@ -21,6 +21,8 @@
#include <memory>
#include <vector>
+#include "arrow/array/data.h"
+#include "arrow/compute/exec.h"
#include "arrow/compute/exec/util.h"
#include "arrow/compute/light_array.h"
#include "arrow/memory_pool.h"
diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/exec/key_hash.cc
index 125fd3912e..e81ed64b6f 100644
--- a/cpp/src/arrow/compute/exec/key_hash.cc
+++ b/cpp/src/arrow/compute/exec/key_hash.cc
@@ -22,6 +22,7 @@
#include <algorithm>
#include <cstdint>
+#include "arrow/compute/exec/key_encode.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/ubsan.h"
@@ -456,6 +457,19 @@ void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
}
}
+Status Hashing32::HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
+ int64_t hardware_flags, util::TempVectorStack* temp_stack,
+ int64_t offset, int64_t length) {
+ std::vector<KeyColumnArray> column_arrays;
+ RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays));
+
+ KeyEncoder::KeyEncoderContext ctx;
+ ctx.hardware_flags = hardware_flags;
+ ctx.stack = temp_stack;
+ HashMultiColumn(column_arrays, &ctx, hashes);
+ return Status::OK();
+}
+
inline uint64_t Hashing64::Avalanche(uint64_t acc) {
acc ^= (acc >> 33);
acc *= PRIME64_2;
@@ -875,5 +889,18 @@ void Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
}
}
+Status Hashing64::HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
+ int64_t hardware_flags, util::TempVectorStack* temp_stack,
+ int64_t offset, int64_t length) {
+ std::vector<KeyColumnArray> column_arrays;
+ RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays));
+
+ KeyEncoder::KeyEncoderContext ctx;
+ ctx.hardware_flags = hardware_flags;
+ ctx.stack = temp_stack;
+ HashMultiColumn(column_arrays, &ctx, hashes);
+ return Status::OK();
+}
+
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/exec/key_hash.h
index 719f3dfd46..05f39bb729 100644
--- a/cpp/src/arrow/compute/exec/key_hash.h
+++ b/cpp/src/arrow/compute/exec/key_hash.h
@@ -48,6 +48,10 @@ class ARROW_EXPORT Hashing32 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash);
+ static Status HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
+ int64_t hardware_flags, util::TempVectorStack* temp_stack,
+ int64_t offset, int64_t length);
+
private:
static const uint32_t PRIME32_1 = 0x9E3779B1;
static const uint32_t PRIME32_2 = 0x85EBCA77;
@@ -156,6 +160,10 @@ class ARROW_EXPORT Hashing64 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes);
+ static Status HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
+ int64_t hardware_flags, util::TempVectorStack* temp_stack,
+ int64_t offset, int64_t length);
+
private:
static const uint64_t PRIME64_1 = 0x9E3779B185EBCA87ULL;
static const uint64_t PRIME64_2 = 0xC2B2AE3D27D4EB4FULL;
diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h
index 3c901be0d2..48cbf9d371 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -281,14 +281,16 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
JoinType in_join_type, std::vector<FieldRef> in_left_keys,
std::vector<FieldRef> in_right_keys, Expression filter = literal(true),
std::string output_suffix_for_left = default_output_suffix_for_left,
- std::string output_suffix_for_right = default_output_suffix_for_right)
+ std::string output_suffix_for_right = default_output_suffix_for_right,
+ bool disable_bloom_filter = false)
: join_type(in_join_type),
left_keys(std::move(in_left_keys)),
right_keys(std::move(in_right_keys)),
output_all(true),
output_suffix_for_left(std::move(output_suffix_for_left)),
output_suffix_for_right(std::move(output_suffix_for_right)),
- filter(std::move(filter)) {
+ filter(std::move(filter)),
+ disable_bloom_filter(disable_bloom_filter) {
this->key_cmp.resize(this->left_keys.size());
for (size_t i = 0; i < this->left_keys.size(); ++i) {
this->key_cmp[i] = JoinKeyCmp::EQ;
@@ -299,7 +301,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
std::vector<FieldRef> right_output, Expression filter = literal(true),
std::string output_suffix_for_left = default_output_suffix_for_left,
- std::string output_suffix_for_right = default_output_suffix_for_right)
+ std::string output_suffix_for_right = default_output_suffix_for_right,
+ bool disable_bloom_filter = false)
: join_type(join_type),
left_keys(std::move(left_keys)),
right_keys(std::move(right_keys)),
@@ -308,7 +311,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
right_output(std::move(right_output)),
output_suffix_for_left(std::move(output_suffix_for_left)),
output_suffix_for_right(std::move(output_suffix_for_right)),
- filter(std::move(filter)) {
+ filter(std::move(filter)),
+ disable_bloom_filter(disable_bloom_filter) {
this->key_cmp.resize(this->left_keys.size());
for (size_t i = 0; i < this->left_keys.size(); ++i) {
this->key_cmp[i] = JoinKeyCmp::EQ;
@@ -320,7 +324,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp,
Expression filter = literal(true),
std::string output_suffix_for_left = default_output_suffix_for_left,
- std::string output_suffix_for_right = default_output_suffix_for_right)
+ std::string output_suffix_for_right = default_output_suffix_for_right,
+ bool disable_bloom_filter = false)
: join_type(join_type),
left_keys(std::move(left_keys)),
right_keys(std::move(right_keys)),
@@ -330,17 +335,20 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
key_cmp(std::move(key_cmp)),
output_suffix_for_left(std::move(output_suffix_for_left)),
output_suffix_for_right(std::move(output_suffix_for_right)),
- filter(std::move(filter)) {}
+ filter(std::move(filter)),
+ disable_bloom_filter(disable_bloom_filter) {}
+
+ HashJoinNodeOptions() = default;
// type of join (inner, left, semi...)
- JoinType join_type;
+ JoinType join_type = JoinType::INNER;
// key fields from left input
std::vector<FieldRef> left_keys;
// key fields from right input
std::vector<FieldRef> right_keys;
// if set all valid fields from both left and right input will be output
// (and field ref vectors for output fields will be ignored)
- bool output_all;
+ bool output_all = false;
// output fields passed from left input
std::vector<FieldRef> left_output;
// output fields passed from right input
@@ -358,7 +366,9 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
// the filter are not included. The filter is applied against the
// concatenated input schema (left fields then right fields) and can reference
// fields that are not included in the output.
- Expression filter;
+ Expression filter = literal(true);
+ // whether or not to disable Bloom filters in this join
+ bool disable_bloom_filter = false;
};
/// \brief Make a node which select top_k/bottom_k rows passed through it
diff --git a/cpp/src/arrow/compute/exec/partition_util.cc b/cpp/src/arrow/compute/exec/partition_util.cc
index ed5e37edca..e99007c45a 100644
--- a/cpp/src/arrow/compute/exec/partition_util.cc
+++ b/cpp/src/arrow/compute/exec/partition_util.cc
@@ -21,24 +21,25 @@
namespace arrow {
namespace compute {
-PartitionLocks::PartitionLocks()
- : num_prtns_(0),
- locks_(nullptr),
- rand_seed_{0, 0, 0, 0, 0, 0, 0, 0},
- rand_engine_(rand_seed_) {}
+PartitionLocks::PartitionLocks() : num_prtns_(0), locks_(nullptr), rngs_(nullptr) {}
PartitionLocks::~PartitionLocks() { CleanUp(); }
-void PartitionLocks::Init(int num_prtns) {
+void PartitionLocks::Init(size_t num_threads, int num_prtns) {
num_prtns_ = num_prtns;
locks_.reset(new PartitionLock[num_prtns]);
+ rngs_.reset(new arrow::random::pcg32_fast[num_threads]);
for (int i = 0; i < num_prtns; ++i) {
locks_[i].lock.store(false);
}
+ arrow::random::pcg32_fast seed_gen(0);
+ std::uniform_int_distribution<uint32_t> seed_dist;
+ for (size_t i = 0; i < num_threads; i++) rngs_[i].seed(seed_dist(seed_gen));
}
void PartitionLocks::CleanUp() {
locks_.reset();
+ rngs_.reset();
num_prtns_ = 0;
}
@@ -48,23 +49,23 @@ std::atomic<bool>* PartitionLocks::lock_ptr(int prtn_id) {
return &(locks_[prtn_id].lock);
}
-int PartitionLocks::random_int(int num_values) {
- return rand_distribution_(rand_engine_) % num_values;
+int PartitionLocks::random_int(size_t thread_id, int num_values) {
+ return std::uniform_int_distribution<int>{0, num_values - 1}(rngs_[thread_id]);
}
-bool PartitionLocks::AcquirePartitionLock(int num_prtns_to_try,
+bool PartitionLocks::AcquirePartitionLock(size_t thread_id, int num_prtns_to_try,
const int* prtn_ids_to_try, bool limit_retries,
int max_retries, int* locked_prtn_id,
int* locked_prtn_id_pos) {
int trial = 0;
while (!limit_retries || trial <= max_retries) {
- int prtn_id_pos = random_int(num_prtns_to_try);
+ int prtn_id_pos = random_int(thread_id, num_prtns_to_try);
int prtn_id = prtn_ids_to_try[prtn_id_pos];
std::atomic<bool>* lock = lock_ptr(prtn_id);
bool expected = false;
- if (lock->compare_exchange_weak(expected, true)) {
+ if (lock->compare_exchange_weak(expected, true, std::memory_order_acquire)) {
*locked_prtn_id = prtn_id;
*locked_prtn_id_pos = prtn_id_pos;
return true;
@@ -81,7 +82,7 @@ bool PartitionLocks::AcquirePartitionLock(int num_prtns_to_try,
void PartitionLocks::ReleasePartitionLock(int prtn_id) {
ARROW_DCHECK(prtn_id >= 0 && prtn_id < num_prtns_);
std::atomic<bool>* lock = lock_ptr(prtn_id);
- lock->store(false);
+ lock->store(false, std::memory_order_release);
}
} // namespace compute
diff --git a/cpp/src/arrow/compute/exec/partition_util.h b/cpp/src/arrow/compute/exec/partition_util.h
index 6efda4aeeb..07fb91f2f1 100644
--- a/cpp/src/arrow/compute/exec/partition_util.h
+++ b/cpp/src/arrow/compute/exec/partition_util.h
@@ -24,6 +24,7 @@
#include <random>
#include "arrow/buffer.h"
#include "arrow/compute/exec/util.h"
+#include "arrow/util/pcg_random.h"
namespace arrow {
namespace compute {
@@ -59,14 +60,14 @@ class PartitionSort {
/// out_arr: [2, 5, 3, 5, 4, 7]
/// prtn_ranges: [0, 1, 5, 6]
template <class INPUT_PRTN_ID_FN, class OUTPUT_POS_FN>
- static void Eval(int num_rows, int num_prtns, uint16_t* prtn_ranges,
+ static void Eval(int64_t num_rows, int num_prtns, uint16_t* prtn_ranges,
INPUT_PRTN_ID_FN prtn_id_impl, OUTPUT_POS_FN output_pos_impl) {
ARROW_DCHECK(num_rows > 0 && num_rows <= (1 << 15));
ARROW_DCHECK(num_prtns >= 1 && num_prtns <= (1 << 15));
memset(prtn_ranges, 0, (num_prtns + 1) * sizeof(uint16_t));
- for (int i = 0; i < num_rows; ++i) {
+ for (int64_t i = 0; i < num_rows; ++i) {
int prtn_id = static_cast<int>(prtn_id_impl(i));
++prtn_ranges[prtn_id + 1];
}
@@ -78,7 +79,7 @@ class PartitionSort {
sum = sum_next;
}
- for (int i = 0; i < num_rows; ++i) {
+ for (int64_t i = 0; i < num_rows; ++i) {
int prtn_id = static_cast<int>(prtn_id_impl(i));
int pos = prtn_ranges[prtn_id + 1]++;
output_pos_impl(i, pos);
@@ -93,12 +94,14 @@ class PartitionLocks {
~PartitionLocks();
/// \brief Initializes the control, must be called before use
///
+ /// \param num_threads Maximum number of threads that will access the partitions
/// \param num_prtns Number of partitions to synchronize
- void Init(int num_prtns);
+ void Init(size_t num_threads, int num_prtns);
/// \brief Cleans up the control, it should not be used after this call
void CleanUp();
/// \brief Acquire a partition to work on one
///
+ /// \param thread_id The index of the thread trying to acquire the partition lock
/// \param num_prtns Length of prtns_to_try, must be <= num_prtns used in Init
/// \param prtns_to_try An array of partitions that still have remaining work
/// \param limit_retries If false, this method will spinwait forever until success
@@ -109,15 +112,15 @@ class PartitionLocks {
/// without successfully acquiring a lock
///
/// This method is thread safe
- bool AcquirePartitionLock(int num_prtns, const int* prtns_to_try, bool limit_retries,
- int max_retries, int* locked_prtn_id,
+ bool AcquirePartitionLock(size_t thread_id, int num_prtns, const int* prtns_to_try,
+ bool limit_retries, int max_retries, int* locked_prtn_id,
int* locked_prtn_id_pos);
/// \brief Release a partition so that other threads can work on it
void ReleasePartitionLock(int prtn_id);
private:
std::atomic<bool>* lock_ptr(int prtn_id);
- int random_int(int num_values);
+ int random_int(size_t thread_id, int num_values);
struct PartitionLock {
static constexpr int kCacheLineBytes = 64;
@@ -126,10 +129,7 @@ class PartitionLocks {
};
int num_prtns_;
std::unique_ptr<PartitionLock[]> locks_;
-
- std::seed_seq rand_seed_;
- std::mt19937 rand_engine_;
- std::uniform_int_distribution<uint64_t> rand_distribution_;
+ std::unique_ptr<arrow::random::pcg32_fast[]> rngs_;
};
} // namespace compute
diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc
index f6ac70ad45..ae70cfcd46 100644
--- a/cpp/src/arrow/compute/exec/util.cc
+++ b/cpp/src/arrow/compute/exec/util.cc
@@ -29,6 +29,38 @@ using bit_util::CountTrailingZeros;
namespace util {
+inline uint64_t bit_util::SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes) {
+ // This will not be correct on big-endian architectures.
+#if !ARROW_LITTLE_ENDIAN
+ ARROW_DCHECK(false);
+#endif
+ ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8);
+ if (num_bytes == 8) {
+ return util::SafeLoad(reinterpret_cast<const uint64_t*>(bytes));
+ } else {
+ uint64_t word = 0;
+ for (int i = 0; i < num_bytes; ++i) {
+ word |= static_cast<uint64_t>(bytes[i]) << (8 * i);
+ }
+ return word;
+ }
+}
+
+inline void bit_util::SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value) {
+ // This will not be correct on big-endian architectures.
+#if !ARROW_LITTLE_ENDIAN
+ ARROW_DCHECK(false);
+#endif
+ ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8);
+ if (num_bytes == 8) {
+ util::SafeStore(reinterpret_cast<uint64_t*>(bytes), value);
+ } else {
+ for (int i = 0; i < num_bytes; ++i) {
+ bytes[i] = static_cast<uint8_t>(value >> (8 * i));
+ }
+ }
+}
+
inline void bit_util::bits_to_indexes_helper(uint64_t word, uint16_t base_index,
int* num_indexes, uint16_t* indexes) {
int n = *num_indexes;
@@ -86,8 +118,8 @@ void bit_util::bits_to_indexes_internal(int64_t hardware_flags, const int num_bi
#endif
// Optionally process the last partial word with masking out bits outside range
if (tail) {
- uint64_t word =
- util::SafeLoad(&reinterpret_cast<const uint64_t*>(bits)[num_bits / unroll]);
+ const uint8_t* bits_tail = bits + (num_bits - tail) / 8;
+ uint64_t word = SafeLoadUpTo8Bytes(bits_tail, (tail + 7) / 8);
if (bit_to_search == 0) {
word = ~word;
}
@@ -109,8 +141,7 @@ void bit_util::bits_to_indexes(int bit_to_search, int64_t hardware_flags, int nu
*num_indexes = 0;
uint16_t base_index = 0;
if (bit_offset != 0) {
- uint64_t bits_head =
- util::SafeLoad(reinterpret_cast<const uint64_t*>(bits)) >> bit_offset;
+ uint64_t bits_head = bits[0] >> bit_offset;
int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
bits_to_indexes(bit_to_search, hardware_flags, bits_in_first_byte,
reinterpret_cast<const uint8_t*>(&bits_head), num_indexes, indexes);
@@ -143,8 +174,7 @@ void bit_util::bits_filter_indexes(int bit_to_search, int64_t hardware_flags,
bit_offset %= 8;
if (bit_offset != 0) {
int num_indexes_head = 0;
- uint64_t bits_head =
- util::SafeLoad(reinterpret_cast<const uint64_t*>(bits)) >> bit_offset;
+ uint64_t bits_head = bits[0] >> bit_offset;
int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
bits_filter_indexes(bit_to_search, hardware_flags, bits_in_first_byte,
reinterpret_cast<const uint8_t*>(&bits_head), input_indexes,
@@ -185,8 +215,7 @@ void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits,
bits += bit_offset / 8;
bit_offset %= 8;
if (bit_offset != 0) {
- uint64_t bits_head =
- util::SafeLoad(reinterpret_cast<const uint64_t*>(bits)) >> bit_offset;
+ uint64_t bits_head = bits[0] >> bit_offset;
int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
bits_to_bytes(hardware_flags, bits_in_first_byte,
reinterpret_cast<const uint8_t*>(&bits_head), bytes);
@@ -207,7 +236,7 @@ void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits,
#endif
// Processing 8 bits at a time
constexpr int unroll = 8;
- for (int i = num_processed / unroll; i < (num_bits + unroll - 1) / unroll; ++i) {
+ for (int i = num_processed / unroll; i < num_bits / unroll; ++i) {
uint8_t bits_next = bits[i];
// Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart
// from the previous.
@@ -219,6 +248,19 @@ void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits,
unpacked *= 255;
util::SafeStore(&reinterpret_cast<uint64_t*>(bytes)[i], unpacked);
}
+ int tail = num_bits % unroll;
+ if (tail) {
+ uint8_t bits_next = bits[(num_bits - tail) / unroll];
+ // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart
+ // from the previous.
+ uint64_t unpacked = static_cast<uint64_t>(bits_next & 0xfe) *
+ ((1ULL << 7) | (1ULL << 14) | (1ULL << 21) | (1ULL << 28) |
+ (1ULL << 35) | (1ULL << 42) | (1ULL << 49));
+ unpacked |= (bits_next & 1);
+ unpacked &= 0x0101010101010101ULL;
+ unpacked *= 255;
+ SafeStoreUpTo8Bytes(bytes + num_bits - tail, tail, unpacked);
+ }
}
void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits,
@@ -250,7 +292,7 @@ void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits,
#endif
// Process 8 bits at a time
constexpr int unroll = 8;
- for (int i = num_processed / unroll; i < (num_bits + unroll - 1) / unroll; ++i) {
+ for (int i = num_processed / unroll; i < num_bits / unroll; ++i) {
uint64_t bytes_next = util::SafeLoad(&reinterpret_cast<const uint64_t*>(bytes)[i]);
bytes_next &= 0x0101010101010101ULL;
bytes_next |= (bytes_next >> 7); // Pairs of adjacent output bits in individual bytes
@@ -258,6 +300,15 @@ void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits,
bytes_next |= (bytes_next >> 28); // All 8 output bits in the lowest byte
bits[i] = static_cast<uint8_t>(bytes_next & 0xff);
}
+ int tail = num_bits % unroll;
+ if (tail) {
+ uint64_t bytes_next = SafeLoadUpTo8Bytes(bytes + num_bits - tail, tail);
+ bytes_next &= 0x0101010101010101ULL;
+ bytes_next |= (bytes_next >> 7); // Pairs of adjacent output bits in individual bytes
+ bytes_next |= (bytes_next >> 14); // 4 adjacent output bits in individual bytes
+ bytes_next |= (bytes_next >> 28); // All 8 output bits in the lowest byte
+ bits[num_bits / 8] = static_cast<uint8_t>(bytes_next & 0xff);
+ }
}
bool bit_util::are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes,
diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h
index 4e7550582a..839a8a7d29 100644
--- a/cpp/src/arrow/compute/exec/util.h
+++ b/cpp/src/arrow/compute/exec/util.h
@@ -92,7 +92,7 @@ class TempVectorStack {
Status Init(MemoryPool* pool, int64_t size) {
num_vectors_ = 0;
top_ = 0;
- buffer_size_ = size;
+ buffer_size_ = PaddedAllocationSize(size) + kPadding + 2 * sizeof(uint64_t);
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool));
// Ensure later operations don't accidentally read uninitialized memory.
std::memset(buffer->mutable_data(), 0xFF, size);
@@ -193,6 +193,8 @@ class bit_util {
uint32_t num_bytes);
private:
+ inline static uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes);
+ inline static void SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value);
inline static void bits_to_indexes_helper(uint64_t word, uint16_t base_index,
int* num_indexes, uint16_t* indexes);
inline static void bits_filter_indexes_helper(uint64_t word,
diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc
index 390dcc6cbf..4fae01eb6b 100644
--- a/cpp/src/arrow/compute/light_array.cc
+++ b/cpp/src/arrow/compute/light_array.cc
@@ -138,7 +138,7 @@ Result<KeyColumnMetadata> ColumnMetadataFromDataType(
}
Result<KeyColumnArray> ColumnArrayFromArrayData(
- const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows) {
+ const std::shared_ptr<ArrayData>& array_data, int64_t start_row, int64_t num_rows) {
ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata,
ColumnMetadataFromDataType(array_data->type));
KeyColumnArray column_array = KeyColumnArray(
@@ -165,7 +165,8 @@ Status ColumnMetadatasFromExecBatch(const ExecBatch& batch,
return Status::OK();
}
-Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row,
+ int64_t num_rows,
std::vector<KeyColumnArray>* column_arrays) {
int num_columns = static_cast<int>(batch.values.size());
column_arrays->resize(num_columns);
diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h
index 0856e3e8aa..dd13aa0647 100644
--- a/cpp/src/arrow/compute/light_array.h
+++ b/cpp/src/arrow/compute/light_array.h
@@ -171,7 +171,7 @@ ARROW_EXPORT Result<KeyColumnMetadata> ColumnMetadataFromDataType(
/// The caller should ensure this is only called on "key" columns.
/// \see ColumnMetadataFromDataType for details
ARROW_EXPORT Result<KeyColumnArray> ColumnArrayFromArrayData(
- const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows);
+ const std::shared_ptr<ArrayData>& array_data, int64_t start_row, int64_t num_rows);
/// \brief Create KeyColumnMetadata instances from an ExecBatch
///
@@ -188,8 +188,8 @@ ARROW_EXPORT Status ColumnMetadatasFromExecBatch(
///
/// All columns in `batch` must be eligible "key" columns and have an array shape
/// \see ColumnArrayFromArrayData for more details
-ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row,
- int num_rows,
+ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row,
+ int64_t num_rows,
std::vector<KeyColumnArray>* column_arrays);
/// \brief Create KeyColumnArray instances from an ExecBatch