You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/05/18 04:02:00 UTC

[arrow] branch master updated: ARROW-15498: [C++][Compute] Implement Bloom filter pushdown between hash joins

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0742f78a27 ARROW-15498: [C++][Compute] Implement Bloom filter pushdown between hash joins
0742f78a27 is described below

commit 0742f78a27fa9d7882c7a840c117824e03bee82d
Author: Sasha Krassovsky <kr...@gmail.com>
AuthorDate: Tue May 17 18:01:46 2022 -1000

    ARROW-15498: [C++][Compute] Implement Bloom filter pushdown between hash joins
    
    This adds Bloom filter pushdown between hash join nodes.
    
    Closes #12289 from save-buffer/sasha_bloom_pushdown
    
    Lead-authored-by: Sasha Krassovsky <kr...@gmail.com>
    Co-authored-by: michalursa <mi...@ursacomputing.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/CMakeLists.txt                      |   1 -
 cpp/src/arrow/compute/exec/bloom_filter.cc        |  37 ++-
 cpp/src/arrow/compute/exec/bloom_filter.h         |  22 +-
 cpp/src/arrow/compute/exec/bloom_filter_test.cc   | 116 +++++++--
 cpp/src/arrow/compute/exec/exec_plan.cc           |   3 +
 cpp/src/arrow/compute/exec/exec_plan.h            |  10 +
 cpp/src/arrow/compute/exec/hash_join.cc           | 302 +++++++++++++++++++---
 cpp/src/arrow/compute/exec/hash_join.h            |   8 +-
 cpp/src/arrow/compute/exec/hash_join_benchmark.cc |  44 +++-
 cpp/src/arrow/compute/exec/hash_join_graphs.py    |   2 +-
 cpp/src/arrow/compute/exec/hash_join_node.cc      | 135 +++++++++-
 cpp/src/arrow/compute/exec/hash_join_node_test.cc | 155 ++++++++++-
 cpp/src/arrow/compute/exec/key_encode.h           |   2 +
 cpp/src/arrow/compute/exec/key_hash.cc            |  27 ++
 cpp/src/arrow/compute/exec/key_hash.h             |   8 +
 cpp/src/arrow/compute/exec/options.h              |  28 +-
 cpp/src/arrow/compute/exec/partition_util.cc      |  25 +-
 cpp/src/arrow/compute/exec/partition_util.h       |  22 +-
 cpp/src/arrow/compute/exec/util.cc                |  71 ++++-
 cpp/src/arrow/compute/exec/util.h                 |   4 +-
 cpp/src/arrow/compute/light_array.cc              |   5 +-
 cpp/src/arrow/compute/light_array.h               |   6 +-
 22 files changed, 889 insertions(+), 144 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 77f9959fdb..ec6cada1cd 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -393,7 +393,6 @@ if(ARROW_COMPUTE)
        compute/exec/key_encode.cc
        compute/exec/key_hash.cc
        compute/exec/key_map.cc
-       compute/exec/options.cc
        compute/exec/order_by_impl.cc
        compute/exec/partition_util.cc
        compute/exec/options.cc
diff --git a/cpp/src/arrow/compute/exec/bloom_filter.cc b/cpp/src/arrow/compute/exec/bloom_filter.cc
index 6103172545..7b348ff687 100644
--- a/cpp/src/arrow/compute/exec/bloom_filter.cc
+++ b/cpp/src/arrow/compute/exec/bloom_filter.cc
@@ -311,21 +311,22 @@ Status BloomFilterBuilder_SingleThreaded::Begin(size_t /*num_threads*/,
 }
 
 Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/,
-                                                        int num_rows,
+                                                        int64_t num_rows,
                                                         const uint32_t* hashes) {
   PushNextBatchImp(num_rows, hashes);
   return Status::OK();
 }
 
 Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/,
-                                                        int num_rows,
+                                                        int64_t num_rows,
                                                         const uint64_t* hashes) {
   PushNextBatchImp(num_rows, hashes);
   return Status::OK();
 }
 
 template <typename T>
-void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int num_rows, const T* hashes) {
+void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int64_t num_rows,
+                                                         const T* hashes) {
   build_target_->Insert(hardware_flags_, num_rows, hashes);
 }
 
@@ -340,29 +341,40 @@ Status BloomFilterBuilder_Parallel::Begin(size_t num_threads, int64_t hardware_f
   log_num_prtns_ = std::min(kMaxLogNumPrtns, bit_util::Log2(num_threads));
 
   thread_local_states_.resize(num_threads);
-  prtn_locks_.Init(1 << log_num_prtns_);
+  prtn_locks_.Init(num_threads, 1 << log_num_prtns_);
 
   RETURN_NOT_OK(build_target->CreateEmpty(num_rows, pool));
 
   return Status::OK();
 }
 
-Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows,
+Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows,
                                                   const uint32_t* hashes) {
   PushNextBatchImp(thread_id, num_rows, hashes);
   return Status::OK();
 }
 
-Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows,
+Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows,
                                                   const uint64_t* hashes) {
   PushNextBatchImp(thread_id, num_rows, hashes);
   return Status::OK();
 }
 
 template <typename T>
-void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_rows,
+void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int64_t num_rows,
                                                    const T* hashes) {
-  int num_prtns = 1 << log_num_prtns_;
+  // Partition IDs are calculated using the higher bits of the block ID.  This
+  // ensures that each block is contained entirely within a partition and prevents
+  // concurrent access to a block.
+  constexpr int kLogBlocksKeptTogether = 7;
+  constexpr int kPrtnIdBitOffset =
+      BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether;
+
+  const int log_num_prtns_max =
+      std::max(0, build_target_->log_num_blocks() - kLogBlocksKeptTogether);
+  const int log_num_prtns_mod = std::min(log_num_prtns_, log_num_prtns_max);
+  int num_prtns = 1 << log_num_prtns_mod;
+
   ThreadLocalState& local_state = thread_local_states_[thread_id];
   local_state.partition_ranges.resize(num_prtns + 1);
   local_state.partitioned_hashes_64.resize(num_rows);
@@ -373,13 +385,10 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row
 
   PartitionSort::Eval(
       num_rows, num_prtns, partition_ranges,
-      [hashes, num_prtns](int row_id) {
-        constexpr int kLogBlocksKeptTogether = 7;
-        constexpr int kPrtnIdBitOffset =
-            BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether;
+      [=](int64_t row_id) {
         return (hashes[row_id] >> (kPrtnIdBitOffset)) & (num_prtns - 1);
       },
-      [hashes, partitioned_hashes](int row_id, int output_pos) {
+      [=](int64_t row_id, int output_pos) {
         partitioned_hashes[output_pos] = hashes[row_id];
       });
 
@@ -393,7 +402,7 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row
   while (num_unprocessed_partitions > 0) {
     int locked_prtn_id;
     int locked_prtn_id_pos;
-    prtn_locks_.AcquirePartitionLock(num_unprocessed_partitions,
+    prtn_locks_.AcquirePartitionLock(thread_id, num_unprocessed_partitions,
                                      unprocessed_partition_ids,
                                      /*limit_retries=*/false, /*max_retries=*/-1,
                                      &locked_prtn_id, &locked_prtn_id_pos);
diff --git a/cpp/src/arrow/compute/exec/bloom_filter.h b/cpp/src/arrow/compute/exec/bloom_filter.h
index b89343373a..06920c6c14 100644
--- a/cpp/src/arrow/compute/exec/bloom_filter.h
+++ b/cpp/src/arrow/compute/exec/bloom_filter.h
@@ -261,49 +261,51 @@ class ARROW_EXPORT BloomFilterBuilder {
                        int64_t num_rows, int64_t num_batches,
                        BlockedBloomFilter* build_target) = 0;
   virtual int64_t num_tasks() const { return 0; }
-  virtual Status PushNextBatch(size_t thread_index, int num_rows,
+  virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
                                const uint32_t* hashes) = 0;
-  virtual Status PushNextBatch(size_t thread_index, int num_rows,
+  virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
                                const uint64_t* hashes) = 0;
   virtual void CleanUp() {}
   static std::unique_ptr<BloomFilterBuilder> Make(BloomFilterBuildStrategy strategy);
 };
 
-class BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
+class ARROW_EXPORT BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
  public:
   Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
                int64_t num_rows, int64_t num_batches,
                BlockedBloomFilter* build_target) override;
 
-  Status PushNextBatch(size_t /*thread_index*/, int num_rows,
+  Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
                        const uint32_t* hashes) override;
 
-  Status PushNextBatch(size_t /*thread_index*/, int num_rows,
+  Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
                        const uint64_t* hashes) override;
 
  private:
   template <typename T>
-  void PushNextBatchImp(int num_rows, const T* hashes);
+  void PushNextBatchImp(int64_t num_rows, const T* hashes);
 
   int64_t hardware_flags_;
   BlockedBloomFilter* build_target_;
 };
 
-class BloomFilterBuilder_Parallel : public BloomFilterBuilder {
+class ARROW_EXPORT BloomFilterBuilder_Parallel : public BloomFilterBuilder {
  public:
   Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
                int64_t num_rows, int64_t num_batches,
                BlockedBloomFilter* build_target) override;
 
-  Status PushNextBatch(size_t thread_id, int num_rows, const uint32_t* hashes) override;
+  Status PushNextBatch(size_t thread_id, int64_t num_rows,
+                       const uint32_t* hashes) override;
 
-  Status PushNextBatch(size_t thread_id, int num_rows, const uint64_t* hashes) override;
+  Status PushNextBatch(size_t thread_id, int64_t num_rows,
+                       const uint64_t* hashes) override;
 
   void CleanUp() override;
 
  private:
   template <typename T>
-  void PushNextBatchImp(size_t thread_id, int num_rows, const T* hashes);
+  void PushNextBatchImp(size_t thread_id, int64_t num_rows, const T* hashes);
 
   int64_t hardware_flags_;
   BlockedBloomFilter* build_target_;
diff --git a/cpp/src/arrow/compute/exec/bloom_filter_test.cc b/cpp/src/arrow/compute/exec/bloom_filter_test.cc
index a3b5cded15..5e1dd0218c 100644
--- a/cpp/src/arrow/compute/exec/bloom_filter_test.cc
+++ b/cpp/src/arrow/compute/exec/bloom_filter_test.cc
@@ -24,6 +24,7 @@
 #include <unordered_set>
 #include "arrow/compute/exec/bloom_filter.h"
 #include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/task_util.h"
 #include "arrow/compute/exec/test_util.h"
 #include "arrow/compute/exec/util.h"
 #include "arrow/util/bitmap_ops.h"
@@ -32,39 +33,106 @@
 namespace arrow {
 namespace compute {
 
-Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
-                        MemoryPool* pool, int64_t num_rows,
-                        std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
-                        std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
-                        BlockedBloomFilter* target) {
-  constexpr int batch_size_max = 32 * 1024;
-  int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);
-
-  auto builder = BloomFilterBuilder::Make(strategy);
-
-  std::vector<uint32_t> thread_local_hashes32;
-  std::vector<uint64_t> thread_local_hashes64;
-  thread_local_hashes32.resize(batch_size_max);
-  thread_local_hashes64.resize(batch_size_max);
-
-  RETURN_NOT_OK(builder->Begin(/*num_threads=*/1, hardware_flags, pool, num_rows,
-                               bit_util::CeilDiv(num_rows, batch_size_max), target));
-
-  for (int64_t i = 0; i < num_batches; ++i) {
+constexpr int kBatchSizeMax = 32 * 1024;
+Status BuildBloomFilter_Serial(
+    std::unique_ptr<BloomFilterBuilder>& builder, int64_t num_rows, int64_t num_batches,
+    std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  std::vector<uint32_t> hashes32(kBatchSizeMax);
+  std::vector<uint64_t> hashes64(kBatchSizeMax);
+  for (int64_t i = 0; i < num_batches; i++) {
     size_t thread_index = 0;
     int batch_size = static_cast<int>(
-        std::min(num_rows - i * batch_size_max, static_cast<int64_t>(batch_size_max)));
+        std::min(num_rows - i * kBatchSizeMax, static_cast<int64_t>(kBatchSizeMax)));
     if (target->NumHashBitsUsed() > 32) {
-      uint64_t* hashes = thread_local_hashes64.data();
-      get_hash64_impl(i * batch_size_max, batch_size, hashes);
+      uint64_t* hashes = hashes64.data();
+      get_hash64_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     } else {
-      uint32_t* hashes = thread_local_hashes32.data();
-      get_hash32_impl(i * batch_size_max, batch_size, hashes);
+      uint32_t* hashes = hashes32.data();
+      get_hash32_impl(i * kBatchSizeMax, batch_size, hashes);
       RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
     }
   }
+  return Status::OK();
+}
+
+Status BuildBloomFilter_Parallel(
+    std::unique_ptr<BloomFilterBuilder>& builder, size_t num_threads, int64_t num_rows,
+    int64_t num_batches, std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+    std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+    BlockedBloomFilter* target) {
+  ThreadIndexer thread_indexer;
+  std::unique_ptr<TaskScheduler> scheduler = TaskScheduler::Make();
+  std::vector<std::vector<uint32_t>> thread_local_hashes32(num_threads);
+  std::vector<std::vector<uint64_t>> thread_local_hashes64(num_threads);
+  for (std::vector<uint32_t>& h : thread_local_hashes32) h.resize(kBatchSizeMax);
+  for (std::vector<uint64_t>& h : thread_local_hashes64) h.resize(kBatchSizeMax);
+
+  std::condition_variable cv;
+  std::mutex mutex;
+  auto group = scheduler->RegisterTaskGroup(
+      [&](size_t thread_index, int64_t task_id) -> Status {
+        int batch_size = static_cast<int>(std::min(num_rows - task_id * kBatchSizeMax,
+                                                   static_cast<int64_t>(kBatchSizeMax)));
+        if (target->NumHashBitsUsed() > 32) {
+          uint64_t* hashes = thread_local_hashes64[thread_index].data();
+          get_hash64_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        } else {
+          uint32_t* hashes = thread_local_hashes32[thread_index].data();
+          get_hash32_impl(task_id * kBatchSizeMax, batch_size, hashes);
+          RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
+        }
+        return Status::OK();
+      },
+      [&](size_t thread_index) -> Status {
+        std::unique_lock<std::mutex> lk(mutex);
+        cv.notify_all();
+        return Status::OK();
+      });
+  scheduler->RegisterEnd();
+  auto tp = arrow::internal::GetCpuThreadPool();
+  RETURN_NOT_OK(scheduler->StartScheduling(
+      0,
+      [&](std::function<Status(size_t)> func) -> Status {
+        return tp->Spawn([&, func]() {
+          size_t tid = thread_indexer();
+          ARROW_DCHECK_OK(func(tid));
+        });
+      },
+      static_cast<int>(num_threads), false));
+  {
+    std::unique_lock<std::mutex> lk(mutex);
+    RETURN_NOT_OK(scheduler->StartTaskGroup(0, group, num_batches));
+    cv.wait(lk);
+  }
+  return Status::OK();
+}
+
+Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
+                        MemoryPool* pool, int64_t num_rows,
+                        std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
+                        std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
+                        BlockedBloomFilter* target) {
+  int64_t num_batches = bit_util::CeilDiv(num_rows, kBatchSizeMax);
+
+  bool is_serial = strategy == BloomFilterBuildStrategy::SINGLE_THREADED;
+  auto builder = BloomFilterBuilder::Make(strategy);
 
+  size_t num_threads = is_serial ? 1 : arrow::GetCpuThreadPoolCapacity();
+  RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
+                               bit_util::CeilDiv(num_rows, kBatchSizeMax), target));
+
+  if (is_serial)
+    RETURN_NOT_OK(BuildBloomFilter_Serial(builder, num_rows, num_batches,
+                                          std::move(get_hash32_impl),
+                                          std::move(get_hash64_impl), target));
+  else
+    RETURN_NOT_OK(BuildBloomFilter_Parallel(builder, num_threads, num_rows, num_batches,
+                                            std::move(get_hash32_impl),
+                                            std::move(get_hash64_impl), target));
   builder->CleanUp();
 
   return Status::OK();
diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc
index b7a9c7e1bb..bb197f8db8 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.cc
+++ b/cpp/src/arrow/compute/exec/exec_plan.cc
@@ -98,6 +98,9 @@ struct ExecPlanImpl : public ExecPlan {
 
     // producers precede consumers
     sorted_nodes_ = TopoSort();
+    for (ExecNode* node : sorted_nodes_) {
+      RETURN_NOT_OK(node->PrepareToProduce());
+    }
 
     std::vector<Future<>> futures;
 
diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h
index be2f23ad24..dcf271bd36 100644
--- a/cpp/src/arrow/compute/exec/exec_plan.h
+++ b/cpp/src/arrow/compute/exec/exec_plan.h
@@ -212,6 +212,16 @@ class ARROW_EXPORT ExecNode {
   // A node with multiple outputs will also need to ensure it is applying backpressure if
   // any of its outputs is asking to pause
 
+  /// \brief Perform any needed initialization
+  ///
+  /// This hook performs any actions in between creation of ExecPlan and the call to
+  /// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes
+  /// that executes this method is undefined, but the calls are made synchronously.
+  ///
+  /// At this point a node can rely on all inputs & outputs (and the input schemas)
+  /// being well defined.
+  virtual Status PrepareToProduce() { return Status::OK(); }
+
   /// \brief Start producing
   ///
   /// This must only be called once.  If this fails, then other lifecycle
diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc
index 3207bb9698..15a006c81d 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "arrow/compute/exec/hash_join_dict.h"
+#include "arrow/compute/exec/key_hash.h"
 #include "arrow/compute/exec/task_util.h"
 #include "arrow/compute/kernels/row_encoder.h"
 
@@ -39,13 +40,16 @@ class HashJoinBasicImpl : public HashJoinImpl {
   struct ThreadLocalState;
 
  public:
+  HashJoinBasicImpl() : num_expected_bloom_filters_(0) {}
+
   Status InputReceived(size_t thread_index, int side, ExecBatch batch) override {
     if (cancelled_) {
       return Status::Cancelled("Hash join cancelled");
     }
     EVENT(span_, "InputReceived");
 
-    if (QueueBatchIfNeeded(side, batch)) {
+    ARROW_ASSIGN_OR_RAISE(bool queued, QueueBatchIfNeeded(thread_index, side, batch));
+    if (queued) {
       return Status::OK();
     } else {
       ARROW_DCHECK(side == 0);
@@ -62,7 +66,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
       bool proceed;
       {
         std::lock_guard<std::mutex> lock(finished_mutex_);
-        proceed = !left_side_finished_ && left_queue_finished_;
+        proceed = !left_side_finished_ && left_queue_probe_finished_;
         left_side_finished_ = true;
       }
       if (proceed) {
@@ -83,50 +87,70 @@ class HashJoinBasicImpl : public HashJoinImpl {
   }
 
   Status Init(ExecContext* ctx, JoinType join_type, bool use_sync_execution,
-              size_t num_threads, HashJoinSchema* schema_mgr,
+              size_t /*num_threads*/, HashJoinSchema* schema_mgr,
               std::vector<JoinKeyCmp> key_cmp, Expression filter,
               OutputBatchCallback output_batch_callback,
               FinishedCallback finished_callback,
-              TaskScheduler::ScheduleImpl schedule_task_callback) override {
-    num_threads = std::max(num_threads, static_cast<size_t>(1));
+              TaskScheduler::ScheduleImpl schedule_task_callback,
+              HashJoinImpl* pushdown_target, std::vector<int> column_map) override {
+    // TODO(ARROW-15732)
+    // Each side of join might have an IO thread being called from.
+    // As of right now, we ignore the `num_threads` argument, so later we will have to
+    // re-add `num_threads_ = num_threads;`
+    num_threads_ = GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1;
 
     START_COMPUTE_SPAN(span_, "HashJoinBasicImpl",
                        {{"detail", filter.ToString()},
                         {"join.kind", ToString(join_type)},
-                        {"join.threads", static_cast<uint32_t>(num_threads)}});
+                        {"join.threads", static_cast<uint32_t>(num_threads_)}});
 
     ctx_ = ctx;
     join_type_ = join_type;
-    num_threads_ = num_threads;
     schema_mgr_ = schema_mgr;
     key_cmp_ = std::move(key_cmp);
     filter_ = std::move(filter);
     output_batch_callback_ = std::move(output_batch_callback);
     finished_callback_ = std::move(finished_callback);
-    // TODO(ARROW-15732)
-    // Each side of join might have an IO thread being called from.
-    local_states_.resize(GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1);
+    local_states_.resize(num_threads_);
     for (size_t i = 0; i < local_states_.size(); ++i) {
       local_states_[i].is_initialized = false;
       local_states_[i].is_has_match_initialized = false;
     }
-    dict_probe_.Init(GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1);
 
+    dict_probe_.Init(num_threads_);
+
+    pushdown_target_ = pushdown_target;
+    column_map_ = std::move(column_map);
+    if (pushdown_target_) pushdown_target_->ExpectBloomFilter();
+
+    right_input_row_count_ = 0;
     has_hash_table_ = false;
     num_batches_produced_.store(0);
     cancelled_ = false;
     right_side_finished_ = false;
     left_side_finished_ = false;
-    left_queue_finished_ = false;
+    bloom_filters_ready_ = false;
+    left_queue_bloom_finished_ = false;
+    left_queue_probe_finished_ = false;
 
     scheduler_ = TaskScheduler::Make();
+    if (pushdown_target_) {
+      bloom_filter_ = arrow::internal::make_unique<BlockedBloomFilter>();
+      bloom_filter_builder_ = BloomFilterBuilder::Make(
+          use_sync_execution ? BloomFilterBuildStrategy::SINGLE_THREADED
+                             : BloomFilterBuildStrategy::PARALLEL);
+    }
+
+    RegisterBuildBloomFilter();
     RegisterBuildHashTable();
+    RegisterBloomFilterQueuedBatches();
     RegisterProbeQueuedBatches();
     RegisterScanHashTable();
     scheduler_->RegisterEnd();
+
     RETURN_NOT_OK(scheduler_->StartScheduling(
         0 /*thread index*/, std::move(schedule_task_callback),
-        static_cast<int>(2 * num_threads) /*concurrent tasks*/, use_sync_execution));
+        static_cast<int>(2 * num_threads_) /*concurrent tasks*/, use_sync_execution));
 
     return Status::OK();
   }
@@ -138,6 +162,32 @@ class HashJoinBasicImpl : public HashJoinImpl {
     scheduler_->Abort(std::move(pos_abort_callback));
   }
 
+  // Called by a downstream node after they have constructed a bloom filter
+  // that this node can use to filter inputs.
+  Status PushBloomFilter(size_t thread_index, std::unique_ptr<BlockedBloomFilter> filter,
+                         std::vector<int> column_map) override {
+    bool proceed;
+    {
+      std::lock_guard<std::mutex> lock_bloom(bloom_filters_mutex_);
+      pushed_bloom_filters_.emplace_back(std::move(filter));
+      bloom_filter_column_maps_.emplace_back(std::move(column_map));
+      proceed = pushed_bloom_filters_.size() == num_expected_bloom_filters_;
+      ARROW_DCHECK(pushed_bloom_filters_.size() <= num_expected_bloom_filters_);
+    }
+    if (proceed) {
+      size_t num_batches;
+      {
+        std::lock_guard<std::mutex> lock(left_batches_mutex_);
+        num_batches = left_batches_.size();
+        bloom_filters_ready_ = true;
+      }
+      RETURN_NOT_OK(BloomFilterQueuedBatches(thread_index, num_batches));
+    }
+    return Status::OK();
+  }
+
+  void ExpectBloomFilter() override { num_expected_bloom_filters_ += 1; }
+
  private:
   void InitEncoder(int side, HashJoinProjection projection_handle, RowEncoder* encoder) {
     std::vector<ValueDescr> data_types;
@@ -152,7 +202,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
     encoder->Clear();
   }
 
-  void InitLocalStateIfNeeded(size_t thread_index) {
+  Status InitLocalStateIfNeeded(size_t thread_index) {
     DCHECK_LT(thread_index, local_states_.size());
     ThreadLocalState& local_state = local_states_[thread_index];
     if (!local_state.is_initialized) {
@@ -162,9 +212,11 @@ class HashJoinBasicImpl : public HashJoinImpl {
       if (has_payload) {
         InitEncoder(0, HashJoinProjection::PAYLOAD, &local_state.exec_batch_payloads);
       }
-
+      RETURN_NOT_OK(local_state.temp_stack.Init(
+          ctx_->memory_pool(), 4 * util::MiniBatch::kMiniBatchLength * sizeof(uint32_t)));
       local_state.is_initialized = true;
     }
+    return Status::OK();
   }
 
   Status EncodeBatch(int side, HashJoinProjection projection_handle, RowEncoder* encoder,
@@ -432,7 +484,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
         (schema_mgr_->proj_maps[1].num_cols(HashJoinProjection::PAYLOAD) > 0);
 
     ThreadLocalState& local_state = local_states_[thread_index];
-    InitLocalStateIfNeeded(thread_index);
+    RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
 
     ExecBatch left_key;
     ExecBatch left_payload;
@@ -544,7 +596,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
 
   Status ProbeBatch(size_t thread_index, const ExecBatch& batch) {
     ThreadLocalState& local_state = local_states_[thread_index];
-    InitLocalStateIfNeeded(thread_index);
+    RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
 
     local_state.exec_batch_keys.Clear();
 
@@ -604,8 +656,84 @@ class HashJoinBasicImpl : public HashJoinImpl {
     return Status::OK();
   }
 
+  Status ApplyBloomFiltersToBatch(size_t thread_index, ExecBatch& batch) {
+    if (batch.length == 0) return Status::OK();
+    int64_t bit_vector_bytes = bit_util::BytesForBits(batch.length);
+    std::vector<uint8_t> selected(bit_vector_bytes);
+    std::vector<uint32_t> hashes(batch.length);
+    std::vector<uint8_t> bv(bit_vector_bytes);
+
+    RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
+    // Start with full selection for the current batch
+    memset(selected.data(), 0xff, bit_vector_bytes);
+    for (size_t ifilter = 0; ifilter < num_expected_bloom_filters_; ifilter++) {
+      std::vector<Datum> keys(bloom_filter_column_maps_[ifilter].size());
+      for (size_t i = 0; i < keys.size(); i++) {
+        int input_idx = bloom_filter_column_maps_[ifilter][i];
+        keys[i] = batch[input_idx];
+      }
+      ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys)));
+      RETURN_NOT_OK(Hashing32::HashBatch(
+          key_batch, hashes.data(), ctx_->cpu_info()->hardware_flags(),
+          &local_states_[thread_index].temp_stack, 0, key_batch.length));
+
+      pushed_bloom_filters_[ifilter]->Find(ctx_->cpu_info()->hardware_flags(),
+                                           key_batch.length, hashes.data(), bv.data());
+      arrow::internal::BitmapAnd(bv.data(), 0, selected.data(), 0, key_batch.length, 0,
+                                 selected.data());
+    }
+    auto selected_buffer =
+        arrow::internal::make_unique<Buffer>(selected.data(), bit_vector_bytes);
+    ArrayData selected_arraydata(boolean(), batch.length,
+                                 {nullptr, std::move(selected_buffer)});
+    Datum selected_datum(selected_arraydata);
+    FilterOptions options;
+    size_t first_nonscalar = batch.values.size();
+    for (size_t i = 0; i < batch.values.size(); i++) {
+      if (!batch.values[i].is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(batch.values[i],
+                              Filter(batch.values[i], selected_datum, options, ctx_));
+        first_nonscalar = std::min(first_nonscalar, i);
+        ARROW_DCHECK_EQ(batch.values[i].length(), batch.values[first_nonscalar].length());
+      }
+    }
+    // If they're all Scalar, then the length of the batch is the number of set bits
+    if (first_nonscalar == batch.values.size())
+      batch.length = arrow::internal::CountSetBits(selected.data(), 0, batch.length);
+    else
+      batch.length = batch.values[first_nonscalar].length();
+    return Status::OK();
+  }
+
   int64_t BuildHashTable_num_tasks() { return 1; }
 
+  Status BuildBloomFilter_exec_task(size_t thread_index, int64_t task_id) {
+    const ExecBatch& input_batch = right_batches_[task_id];
+    SchemaProjectionMap key_to_in =
+        schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT);
+    std::vector<Datum> key_columns(key_to_in.num_cols);
+    for (size_t i = 0; i < key_columns.size(); i++) {
+      int input_idx = key_to_in.get(static_cast<int>(i));
+      key_columns[i] = input_batch[input_idx];
+    }
+    ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns)));
+
+    RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
+    ThreadLocalState& tls = local_states_[thread_index];
+    util::TempVectorHolder<uint32_t> hash_holder(&tls.temp_stack,
+                                                 util::MiniBatch::kMiniBatchLength);
+    uint32_t* hashes = hash_holder.mutable_data();
+    for (int64_t i = 0; i < key_batch.length; i += util::MiniBatch::kMiniBatchLength) {
+      int64_t length = std::min(static_cast<int64_t>(key_batch.length - i),
+                                static_cast<int64_t>(util::MiniBatch::kMiniBatchLength));
+      RETURN_NOT_OK(Hashing32::HashBatch(key_batch, hashes,
+                                         ctx_->cpu_info()->hardware_flags(),
+                                         &tls.temp_stack, i, length));
+      RETURN_NOT_OK(bloom_filter_builder_->PushNextBatch(thread_index, length, hashes));
+    }
+    return Status::OK();
+  }
+
   Status BuildHashTable_exec_task(size_t thread_index, int64_t /*task_id*/) {
     const std::vector<ExecBatch>& batches = right_batches_;
     if (batches.empty()) {
@@ -651,23 +779,45 @@ class HashJoinBasicImpl : public HashJoinImpl {
     return Status::OK();
   }
 
+  Status BuildBloomFilter_on_finished(size_t thread_index) {
+    if (cancelled_) return Status::Cancelled("Hash join cancelled");
+    ARROW_DCHECK(pushdown_target_);
+    RETURN_NOT_OK(pushdown_target_->PushBloomFilter(
+        thread_index, std::move(bloom_filter_), std::move(column_map_)));
+    return BuildHashTable(thread_index);
+  }
+
   Status BuildHashTable_on_finished(size_t thread_index) {
     if (cancelled_) {
       return Status::Cancelled("Hash join cancelled");
     }
 
+    right_batches_.clear();
+
+    bool proceed;
     {
       std::lock_guard<std::mutex> lock(left_batches_mutex_);
+      std::lock_guard<std::mutex> lock_finish(finished_mutex_);
+      left_queue_bloom_finished_ =
+          left_queue_bloom_finished_ || num_expected_bloom_filters_ == 0;
+      proceed = !has_hash_table_ && left_queue_bloom_finished_;
       has_hash_table_ = true;
     }
-
-    right_batches_.clear();
-
-    RETURN_NOT_OK(ProbeQueuedBatches(thread_index));
+    if (proceed) RETURN_NOT_OK(ProbeQueuedBatches(thread_index));
 
     return Status::OK();
   }
 
+  void RegisterBuildBloomFilter() {
+    task_group_bloom_ = scheduler_->RegisterTaskGroup(
+        [this](size_t thread_index, int64_t task_id) -> Status {
+          return BuildBloomFilter_exec_task(thread_index, task_id);
+        },
+        [this](size_t thread_index) -> Status {
+          return BuildBloomFilter_on_finished(thread_index);
+        });
+  }
+
   void RegisterBuildHashTable() {
     task_group_build_ = scheduler_->RegisterTaskGroup(
         [this](size_t thread_index, int64_t task_id) -> Status {
@@ -678,11 +828,63 @@ class HashJoinBasicImpl : public HashJoinImpl {
         });
   }
 
+  Status BuildBloomFilter(size_t thread_index) {
+    RETURN_NOT_OK(bloom_filter_builder_->Begin(
+        num_threads_, ctx_->cpu_info()->hardware_flags(), ctx_->memory_pool(),
+        right_input_row_count_, right_batches_.size(), bloom_filter_.get()));
+
+    return scheduler_->StartTaskGroup(thread_index, task_group_bloom_,
+                                      right_batches_.size());
+  }
+
   Status BuildHashTable(size_t thread_index) {
     return scheduler_->StartTaskGroup(thread_index, task_group_build_,
                                       BuildHashTable_num_tasks());
   }
 
+  Status BloomFilterQueuedBatches_exec_task(size_t thread_index, int64_t task_id) {
+    if (cancelled_) return Status::Cancelled("Hash join cancelled");
+    ExecBatch batch;
+    {
+      std::lock_guard<std::mutex> lock(left_batches_mutex_);
+      batch = std::move(left_batches_[task_id]);
+      ARROW_DCHECK(!batch.values.empty());
+    }
+    RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
+    {
+      std::lock_guard<std::mutex> lock(left_batches_mutex_);
+      left_batches_[task_id] = std::move(batch);
+    }
+    return Status::OK();
+  }
+
+  Status BloomFilterQueuedBatches_on_finished(size_t thread_index) {
+    if (cancelled_) return Status::Cancelled("Hash join cancelled");
+    bool proceed;
+    {
+      std::lock_guard<std::mutex> lock(finished_mutex_);
+      proceed = !left_queue_bloom_finished_ && has_hash_table_;
+      left_queue_bloom_finished_ = true;
+    }
+    if (proceed) return ProbeQueuedBatches(thread_index);
+    return Status::OK();
+  }
+
+  void RegisterBloomFilterQueuedBatches() {
+    task_group_bloom_filter_queued_ = scheduler_->RegisterTaskGroup(
+        [this](size_t thread_index, int64_t task_id) -> Status {
+          return BloomFilterQueuedBatches_exec_task(thread_index, task_id);
+        },
+        [this](size_t thread_index) -> Status {
+          return BloomFilterQueuedBatches_on_finished(thread_index);
+        });
+  }
+
+  Status BloomFilterQueuedBatches(size_t thread_index, size_t num_batches) {
+    return scheduler_->StartTaskGroup(thread_index, task_group_bloom_filter_queued_,
+                                      num_batches);
+  }
+
   int64_t ProbeQueuedBatches_num_tasks() {
     return static_cast<int64_t>(left_batches_.size());
   }
@@ -704,8 +906,9 @@ class HashJoinBasicImpl : public HashJoinImpl {
     bool proceed;
     {
       std::lock_guard<std::mutex> lock(finished_mutex_);
-      proceed = left_side_finished_ && !left_queue_finished_;
-      left_queue_finished_ = true;
+      ARROW_DCHECK(left_queue_bloom_finished_);
+      proceed = left_side_finished_ && !left_queue_probe_finished_;
+      left_queue_probe_finished_ = true;
     }
     if (proceed) {
       RETURN_NOT_OK(OnLeftSideAndQueueFinished(thread_index));
@@ -751,7 +954,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
                                       hash_table_scan_unit_ * (task_id + 1)));
 
     ThreadLocalState& local_state = local_states_[thread_index];
-    InitLocalStateIfNeeded(thread_index);
+    RETURN_NOT_OK(InitLocalStateIfNeeded(thread_index));
 
     std::vector<int32_t>& id_left = local_state.no_match;
     std::vector<int32_t>& id_right = local_state.match;
@@ -809,22 +1012,40 @@ class HashJoinBasicImpl : public HashJoinImpl {
                                       ScanHashTable_num_tasks());
   }
 
-  bool QueueBatchIfNeeded(int side, ExecBatch batch) {
+  Result<bool> QueueBatchIfNeeded(size_t thread_index, int side, ExecBatch& batch) {
     if (side == 0) {
-      std::lock_guard<std::mutex> lock(left_batches_mutex_);
-      if (has_hash_table_) {
-        return false;
+      // We don't want to do the filtering while holding the lock, since that can get
+      // expensive.
+      bool needs_filtering;
+      {
+        std::lock_guard<std::mutex> lock(left_batches_mutex_);
+        bloom_filters_ready_ = bloom_filters_ready_ || num_expected_bloom_filters_ == 0;
+        needs_filtering = bloom_filters_ready_ && num_expected_bloom_filters_ != 0;
       }
-      left_batches_.emplace_back(std::move(batch));
-      return true;
+      if (needs_filtering) RETURN_NOT_OK(ApplyBloomFiltersToBatch(thread_index, batch));
+
+      bool queued;
+      {
+        std::lock_guard<std::mutex> lock(left_batches_mutex_);
+        queued = !bloom_filters_ready_ || !has_hash_table_;
+        if (queued) left_batches_.emplace_back(std::move(batch));
+      }
+      return queued;
     } else {
       std::lock_guard<std::mutex> lock(right_batches_mutex_);
+      right_input_row_count_ += batch.length;
       right_batches_.emplace_back(std::move(batch));
       return true;
     }
   }
 
-  Status OnRightSideFinished(size_t thread_index) { return BuildHashTable(thread_index); }
+  Status OnRightSideFinished(size_t thread_index) {
+    if (pushdown_target_ == nullptr) {
+      return BuildHashTable(thread_index);
+    } else {
+      return BuildBloomFilter(thread_index);
+    }
+  }
 
   Status OnLeftSideAndQueueFinished(size_t thread_index) {
     return ScanHashTable(thread_index);
@@ -875,7 +1096,9 @@ class HashJoinBasicImpl : public HashJoinImpl {
   std::vector<JoinKeyCmp> key_cmp_;
   Expression filter_;
   std::unique_ptr<TaskScheduler> scheduler_;
+  int task_group_bloom_;
   int task_group_build_;
+  int task_group_bloom_filter_queued_;
   int task_group_queued_;
   int task_group_scan_;
 
@@ -896,6 +1119,7 @@ class HashJoinBasicImpl : public HashJoinImpl {
     std::vector<int32_t> match_right;
     bool is_has_match_initialized;
     std::vector<uint8_t> has_match;
+    util::TempVectorStack temp_stack;
   };
   std::vector<ThreadLocalState> local_states_;
 
@@ -916,15 +1140,29 @@ class HashJoinBasicImpl : public HashJoinImpl {
   bool has_hash_table_;
   std::mutex left_batches_mutex_;
 
+  size_t right_input_row_count_;  // Sum of the lengths of ExecBatches in right_batches_
   std::vector<ExecBatch> right_batches_;
   std::mutex right_batches_mutex_;
 
+  // Bloom filter stuff
+  //
+  std::unique_ptr<BloomFilterBuilder> bloom_filter_builder_;
+  std::unique_ptr<BlockedBloomFilter> bloom_filter_;
+  std::vector<int> column_map_;
+  std::vector<std::unique_ptr<BlockedBloomFilter>> pushed_bloom_filters_;
+  std::vector<std::vector<int>> bloom_filter_column_maps_;
+  std::mutex bloom_filters_mutex_;
+  size_t num_expected_bloom_filters_;
+  HashJoinImpl* pushdown_target_;
+
   std::atomic<int64_t> num_batches_produced_;
   bool cancelled_;
 
+  bool bloom_filters_ready_;
   bool right_side_finished_;
   bool left_side_finished_;
-  bool left_queue_finished_;
+  bool left_queue_bloom_finished_;
+  bool left_queue_probe_finished_;
   std::mutex finished_mutex_;
 };
 
diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h
index 12455f0c6d..9739cbc643 100644
--- a/cpp/src/arrow/compute/exec/hash_join.h
+++ b/cpp/src/arrow/compute/exec/hash_join.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/compute/exec/bloom_filter.h"
 #include "arrow/compute/exec/options.h"
 #include "arrow/compute/exec/schema_util.h"
 #include "arrow/compute/exec/task_util.h"
@@ -107,7 +108,12 @@ class HashJoinImpl {
                       std::vector<JoinKeyCmp> key_cmp, Expression filter,
                       OutputBatchCallback output_batch_callback,
                       FinishedCallback finished_callback,
-                      TaskScheduler::ScheduleImpl schedule_task_callback) = 0;
+                      TaskScheduler::ScheduleImpl schedule_task_callback,
+                      HashJoinImpl* pushdown_target, std::vector<int> column_map) = 0;
+  virtual void ExpectBloomFilter() = 0;
+  virtual Status PushBloomFilter(size_t thread_index,
+                                 std::unique_ptr<BlockedBloomFilter> filter,
+                                 std::vector<int> column_map) = 0;
   virtual Status InputReceived(size_t thread_index, int side, ExecBatch batch) = 0;
   virtual Status InputFinished(size_t thread_index, int side) = 0;
   virtual void Abort(TaskScheduler::AbortContinuationImpl pos_abort_callback) = 0;
diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
index 3d4271b6cb..8d8be7f904 100644
--- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc
@@ -36,6 +36,7 @@
 namespace arrow {
 namespace compute {
 struct BenchmarkSettings {
+  bool bloom_filter = false;
   int num_threads = 1;
   JoinType join_type = JoinType::INNER;
   int batch_size = 1024;
@@ -57,6 +58,7 @@ class JoinBenchmark {
 
     SchemaBuilder l_schema_builder, r_schema_builder;
     std::vector<FieldRef> left_keys, right_keys;
+    std::vector<JoinKeyCmp> key_cmp;
     for (size_t i = 0; i < settings.key_types.size(); i++) {
       std::string l_name = "lk" + std::to_string(i);
       std::string r_name = "rk" + std::to_string(i);
@@ -93,6 +95,7 @@ class JoinBenchmark {
 
       left_keys.push_back(FieldRef(l_name));
       right_keys.push_back(FieldRef(r_name));
+      key_cmp.push_back(JoinKeyCmp::EQ);
     }
 
     for (size_t i = 0; i < settings.build_payload_types.size(); i++) {
@@ -126,6 +129,22 @@ class JoinBenchmark {
 
     join_ = *HashJoinImpl::MakeBasic();
 
+    HashJoinImpl* bloom_filter_pushdown_target = nullptr;
+    std::vector<int> key_input_map;
+
+    bool bloom_filter_does_not_apply_to_join =
+        settings.join_type == JoinType::LEFT_ANTI ||
+        settings.join_type == JoinType::LEFT_OUTER ||
+        settings.join_type == JoinType::FULL_OUTER;
+    if (settings.bloom_filter && !bloom_filter_does_not_apply_to_join) {
+      bloom_filter_pushdown_target = join_.get();
+      SchemaProjectionMap probe_key_to_input = schema_mgr_->proj_maps[0].map(
+          HashJoinProjection::KEY, HashJoinProjection::INPUT);
+      int num_keys = probe_key_to_input.num_cols;
+      for (int i = 0; i < num_keys; i++)
+        key_input_map.push_back(probe_key_to_input.get(i));
+    }
+
     omp_set_num_threads(settings.num_threads);
     auto schedule_callback = [](std::function<Status(size_t)> func) -> Status {
 #pragma omp task
@@ -135,8 +154,9 @@ class JoinBenchmark {
 
     DCHECK_OK(join_->Init(
         ctx_.get(), settings.join_type, !is_parallel, settings.num_threads,
-        schema_mgr_.get(), {JoinKeyCmp::EQ}, std::move(filter), [](ExecBatch) {},
-        [](int64_t x) {}, schedule_callback));
+        schema_mgr_.get(), std::move(key_cmp), std::move(filter), [](ExecBatch) {},
+        [](int64_t x) {}, schedule_callback, bloom_filter_pushdown_target,
+        std::move(key_input_map)));
   }
 
   void RunJoin() {
@@ -268,6 +288,16 @@ static void BM_HashJoinBasic_NullPercentage(benchmark::State& st) {
 
   HashJoinBasicBenchmarkImpl(st, settings);
 }
+
+static void BM_HashJoinBasic_BloomFilter(benchmark::State& st, bool bloom_filter) {
+  BenchmarkSettings settings;
+  settings.bloom_filter = bloom_filter;
+  settings.selectivity = static_cast<double>(st.range(0)) / 100.0;
+  settings.num_build_batches = static_cast<int>(st.range(1));
+  settings.num_probe_batches = settings.num_build_batches;
+
+  HashJoinBasicBenchmarkImpl(st, settings);
+}
 #endif
 
 std::vector<int64_t> hashtable_krows = benchmark::CreateRange(1, 4096, 8);
@@ -395,6 +425,16 @@ BENCHMARK(BM_HashJoinBasic_BuildParallelism)
 BENCHMARK(BM_HashJoinBasic_NullPercentage)
     ->ArgNames({"Null Percentage"})
     ->DenseRange(0, 100, 10);
+
+std::vector<std::string> bloomfilter_argnames = {"Selectivity", "HashTable krows"};
+std::vector<std::vector<int64_t>> bloomfilter_args = {
+    benchmark::CreateDenseRange(0, 100, 10), hashtable_krows};
+BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "Bloom Filter", true)
+    ->ArgNames(bloomfilter_argnames)
+    ->ArgsProduct(selectivity_args);
+BENCHMARK_CAPTURE(BM_HashJoinBasic_BloomFilter, "No Bloom Filter", false)
+    ->ArgNames(bloomfilter_argnames)
+    ->ArgsProduct(selectivity_args);
 #else
 
 BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()})
diff --git a/cpp/src/arrow/compute/exec/hash_join_graphs.py b/cpp/src/arrow/compute/exec/hash_join_graphs.py
index ff7ab63187..16b4c3e5fd 100755
--- a/cpp/src/arrow/compute/exec/hash_join_graphs.py
+++ b/cpp/src/arrow/compute/exec/hash_join_graphs.py
@@ -148,7 +148,7 @@ def plot_3d(test, sorted_argnames):
     num_cols = int(math.ceil(num_graphs / num_rows))
     graphs = set(test.args[sorted_argnames[0]])
 
-    for j, graph in enumerate(sorted(graphs, key=string_numeric_sort_key)):
+    for j, graph in enumerate(sorted(graphs, key=try_as_numeric)):
         ax = plt.subplot(num_rows, num_cols, j + 1)
         filtered_test = Test()
         indices = range(len(test.times))
diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc
index 0282e387c4..d8d729dd1a 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <unordered_set>
+#include <utility>
 
 #include "arrow/compute/exec/exec_plan.h"
 #include "arrow/compute/exec/hash_join.h"
@@ -466,7 +467,8 @@ class HashJoinNode : public ExecNode {
         key_cmp_(join_options.key_cmp),
         filter_(std::move(filter)),
         schema_mgr_(std::move(schema_mgr)),
-        impl_(std::move(impl)) {
+        impl_(std::move(impl)),
+        disable_bloom_filter_(join_options.disable_bloom_filter) {
     complete_.store(false);
   }
 
@@ -571,24 +573,138 @@ class HashJoinNode : public ExecNode {
     }
   }
 
-  Status StartProducing() override {
-    START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
-                       {{"node.label", label()},
-                        {"node.detail", ToString()},
-                        {"node.kind", kind_name()}});
-    END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this);
+  // The Bloom filter is built on the build side of some upstream join. For a join to
+  // evaluate the Bloom filter on its input columns, it has to rearrange its input columns
+  // to match the column order of the Bloom filter.
+  //
+  // The first part of the pair is the HashJoin to actually perform the pushdown into.
+  // The second part is a mapping such that column_map[i] is the index of key i in
+  // the first part's input.
+  // If we should disable Bloom filter, returns nullptr and an empty vector, and sets
+  // the disable_bloom_filter_ flag.
+  std::pair<HashJoinImpl*, std::vector<int>> GetPushdownTarget() {
+#if !ARROW_LITTLE_ENDIAN
+    // TODO (ARROW-16591): Debug bloom_filter.cc to enable on Big endian. It probably just
+    // needs a few byte swaps in the proper spots.
+    disable_bloom_filter_ = true;
+    return {nullptr, {}};
+#else
+    // A build-side Bloom filter tells us if a row is definitely not in the build side.
+    // This allows us to early-eliminate rows or early-accept rows depending on the type
+    // of join. Left Outer Join and Full Outer Join output all rows, so a build-side Bloom
+    // filter would only allow us to early-output. Left Antijoin outputs only if there is
+    // no match, so again early output. We don't implement early output for now, so we
+    // must disallow these types of joins.
+    bool bloom_filter_does_not_apply_to_join = join_type_ == JoinType::LEFT_ANTI ||
+                                               join_type_ == JoinType::LEFT_OUTER ||
+                                               join_type_ == JoinType::FULL_OUTER;
+    disable_bloom_filter_ = disable_bloom_filter_ || bloom_filter_does_not_apply_to_join;
+
+    SchemaProjectionMap build_keys_to_input =
+        schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT);
+    // Bloom filter currently doesn't support dictionaries.
+    for (int i = 0; i < build_keys_to_input.num_cols; i++) {
+      int idx = build_keys_to_input.get(i);
+      bool is_dict =
+          inputs_[1]->output_schema()->field(idx)->type()->id() == Type::DICTIONARY;
+      if (is_dict) {
+        disable_bloom_filter_ = true;
+        break;
+      }
+    }
+
+    bool all_comparisons_is = true;
+    for (JoinKeyCmp cmp : key_cmp_) all_comparisons_is &= (cmp == JoinKeyCmp::IS);
+
+    if ((join_type_ == JoinType::RIGHT_OUTER || join_type_ == JoinType::FULL_OUTER) &&
+        all_comparisons_is)
+      disable_bloom_filter_ = true;
+
+    if (disable_bloom_filter_) return {nullptr, {}};
+
+    // We currently only push Bloom filters on the probe side, and only if that input is
+    // also a join.
+    SchemaProjectionMap probe_key_to_input =
+        schema_mgr_->proj_maps[0].map(HashJoinProjection::KEY, HashJoinProjection::INPUT);
+    int num_keys = probe_key_to_input.num_cols;
+
+    // A mapping such that bloom_to_target[i] is the index of key i in the pushdown
+    // target's input
+    std::vector<int> bloom_to_target(num_keys);
+    HashJoinNode* pushdown_target = this;
+    for (int i = 0; i < num_keys; i++) bloom_to_target[i] = probe_key_to_input.get(i);
+
+    for (ExecNode* candidate = inputs()[0]; candidate->kind_name() == this->kind_name();
+         candidate = candidate->inputs()[0]) {
+      auto* candidate_as_join = checked_cast<HashJoinNode*>(candidate);
+      SchemaProjectionMap candidate_output_to_input =
+          candidate_as_join->schema_mgr_->proj_maps[0].map(HashJoinProjection::OUTPUT,
+                                                           HashJoinProjection::INPUT);
+
+      // Check if any of the keys are missing, if they are, break
+      bool break_outer = false;
+      for (int i = 0; i < num_keys; i++) {
+        // Since all of the probe side columns are before the build side columns,
+        // if the index of an output is greater than the number of probe-side input
+        // columns, it must have come from the candidate's build side.
+        if (bloom_to_target[i] >= candidate_output_to_input.num_cols) {
+          break_outer = true;
+          break;
+        }
+        int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]);
+        // The output column has to have come from somewhere...
+        ARROW_DCHECK_NE(candidate_input_idx, schema_mgr_->kMissingField());
+      }
+      if (break_outer) break;
+
+      // The Bloom filter will filter out nulls, which may cause a Right/Full Outer Join
+      // to incorrectly output some rows with nulls padding the probe-side rows. This may
+      // cause a row with all null keys to be emitted. This is normally not an issue
+      // with EQ, but if all comparisons are IS (i.e. all-null is accepted), this could
+      // produce incorrect rows.
+      bool can_produce_build_side_nulls =
+          candidate_as_join->join_type_ == JoinType::RIGHT_OUTER ||
+          candidate_as_join->join_type_ == JoinType::FULL_OUTER;
+
+      if (all_comparisons_is || can_produce_build_side_nulls) break;
+
+      // All keys are present, we can update the mapping
+      for (int i = 0; i < num_keys; i++) {
+        int candidate_input_idx = candidate_output_to_input.get(bloom_to_target[i]);
+        bloom_to_target[i] = candidate_input_idx;
+      }
+      pushdown_target = candidate_as_join;
+    }
+    return std::make_pair(pushdown_target->impl_.get(), std::move(bloom_to_target));
+#endif  // ARROW_LITTLE_ENDIAN
+  }
 
+  Status PrepareToProduce() override {
     bool use_sync_execution = !(plan_->exec_context()->executor());
     size_t num_threads = use_sync_execution ? 1 : thread_indexer_.Capacity();
 
-    RETURN_NOT_OK(impl_->Init(
+    HashJoinImpl* pushdown_target = nullptr;
+    std::vector<int> column_map;
+    std::tie(pushdown_target, column_map) = GetPushdownTarget();
+
+    return impl_->Init(
         plan_->exec_context(), join_type_, use_sync_execution, num_threads,
         schema_mgr_.get(), key_cmp_, filter_,
         [this](ExecBatch batch) { this->OutputBatchCallback(batch); },
         [this](int64_t total_num_batches) { this->FinishedCallback(total_num_batches); },
         [this](std::function<Status(size_t)> func) -> Status {
           return this->ScheduleTaskCallback(std::move(func));
-        }));
+        },
+        pushdown_target, std::move(column_map));
+  }
+
+  Status StartProducing() override {
+    START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
+                       {{"node.label", label()},
+                        {"node.detail", ToString()},
+                        {"node.kind", kind_name()}});
+    END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this);
+
     return Status::OK();
   }
 
@@ -662,6 +778,7 @@ class HashJoinNode : public ExecNode {
   std::unique_ptr<HashJoinSchema> schema_mgr_;
   std::unique_ptr<HashJoinImpl> impl_;
   util::AsyncTaskGroup task_group_;
+  bool disable_bloom_filter_;
 };
 
 namespace internal {
diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
index 99a8fc01e0..f8da71c7b5 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
@@ -1000,11 +1000,14 @@ TEST(HashJoin, Random) {
   Random64Bit rng(42);
 #if defined(THREAD_SANITIZER) || defined(ARROW_VALGRIND)
   const int num_tests = 15;
+#elif defined(ADDRESS_SANITIZER)
+  const int num_tests = 25;
 #else
   const int num_tests = 100;
 #endif
   for (int test_id = 0; test_id < num_tests; ++test_id) {
     bool parallel = (rng.from_range(0, 1) == 1);
+    bool disable_bloom_filter = (rng.from_range(0, 1) == 1);
     auto exec_ctx = arrow::internal::make_unique<ExecContext>(
         default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
 
@@ -1110,7 +1113,8 @@ TEST(HashJoin, Random) {
     }
 
     ARROW_SCOPED_TRACE(join_type_name, " ", key_cmp_str,
-                       " parallel = ", (parallel ? "true" : "false"));
+                       " parallel = ", (parallel ? "true" : "false"),
+                       " bloom_filter = ", (disable_bloom_filter ? "false" : "true"));
 
     // Run reference join implementation
     std::vector<bool> null_in_key_vectors[2];
@@ -1130,7 +1134,7 @@ TEST(HashJoin, Random) {
 
     // Turn the last key comparison into a residual filter expression
     Expression filter = literal(true);
-    if (key_cmp.size() > 1 && rng.from_range(0, 1) == 0) {
+    if (key_cmp.size() > 1 && rng.from_range(0, 4) == 0) {
       for (size_t i = 0; i < key_cmp.size(); i++) {
         FieldRef left = key_fields[0][i];
         FieldRef right = key_fields[1][i];
@@ -1158,6 +1162,7 @@ TEST(HashJoin, Random) {
     HashJoinNodeOptions join_options{
         join_type,        key_fields[0], key_fields[1], output_fields[0],
         output_fields[1], key_cmp,       filter};
+    join_options.disable_bloom_filter = disable_bloom_filter;
     std::vector<std::shared_ptr<Field>> output_schema_fields;
     for (int i = 0; i < 2; ++i) {
       for (size_t col = 0; col < output_fields[i].size(); ++col) {
@@ -1168,6 +1173,7 @@ TEST(HashJoin, Random) {
     }
     std::shared_ptr<Schema> output_schema =
         std::make_shared<Schema>(std::move(output_schema_fields));
+
     ASSERT_OK_AND_ASSIGN(
         auto batches, HashJoinWithExecPlan(
                           rng, parallel, join_options, output_schema,
@@ -1901,5 +1907,150 @@ TEST(HashJoin, TrivialResidualFilter) {
   }
 }
 
+HashJoinNodeOptions GenerateHashJoinNodeOptions(Random64Bit& rng, int num_left_cols,
+                                                int num_right_cols) {
+  HashJoinNodeOptions opts;
+  opts.join_type = static_cast<JoinType>(rng.from_range(0, 7));
+  bool is_left_join = opts.join_type == JoinType::LEFT_SEMI ||
+                      opts.join_type == JoinType::LEFT_ANTI ||
+                      opts.join_type == JoinType::LEFT_OUTER;
+  bool is_right_join = opts.join_type == JoinType::RIGHT_SEMI ||
+                       opts.join_type == JoinType::RIGHT_ANTI ||
+                       opts.join_type == JoinType::RIGHT_OUTER;
+
+  int num_keys = rng.from_range(1, std::min(num_left_cols, num_right_cols));
+  for (int i = 0; i < num_left_cols; i++) {
+    bool is_out = rng.from_range(0, 2) != 2;
+    if (is_out && !is_right_join) opts.left_output.push_back(FieldRef(i));
+  }
+  for (int i = 0; i < num_right_cols; i++) {
+    bool is_out = rng.from_range(0, 2) == 2;
+    if (is_out && !is_left_join) opts.right_output.push_back(FieldRef(i));
+  }
+  // We need at least one output
+  if (opts.right_output.empty() && opts.left_output.empty()) {
+    if (is_left_join) {
+      int col = rng.from_range(0, num_left_cols - 1);
+      opts.left_output.push_back(FieldRef(col));
+    } else if (is_right_join) {
+      int col = rng.from_range(0, num_right_cols - 1);
+      opts.right_output.push_back(FieldRef(col));
+    } else {
+      if (rng.from_range(0, 1) == 0) {
+        int col = rng.from_range(0, num_left_cols - 1);
+        opts.left_output.push_back(FieldRef(col));
+      } else {
+        int col = rng.from_range(0, num_right_cols - 1);
+        opts.right_output.push_back(FieldRef(col));
+      }
+    }
+  }
+
+  for (int i = 0; i < num_keys; i++) {
+    int left = rng.from_range(0, num_left_cols - 1);
+    int right = rng.from_range(0, num_right_cols - 1);
+    bool is_or_eq = rng.from_range(0, 1) == 0;
+    opts.left_keys.push_back(FieldRef(left));
+    opts.right_keys.push_back(FieldRef(right));
+    opts.key_cmp.push_back(is_or_eq ? JoinKeyCmp::IS : JoinKeyCmp::EQ);
+  }
+  return opts;
+}
+
+void TestSingleChainOfHashJoins(Random64Bit& rng) {
+  int num_joins = rng.from_range(2, 5);
+  std::vector<HashJoinNodeOptions> opts;
+  int num_left_cols = rng.from_range(1, 8);
+  int num_right_cols = rng.from_range(1, 8);
+  HashJoinNodeOptions first_opt =
+      GenerateHashJoinNodeOptions(rng, num_left_cols, num_right_cols);
+  opts.push_back(std::move(first_opt));
+
+  std::unordered_map<std::string, std::string> metadata_map;
+  metadata_map["min"] = "0";
+  metadata_map["max"] = "10";
+  auto metadata = key_value_metadata(metadata_map);
+  std::vector<std::shared_ptr<Field>> left_fields;
+  for (int i = 0; i < num_left_cols; i++)
+    left_fields.push_back(field(std::string("l") + std::to_string(i), int32(), metadata));
+  std::vector<std::shared_ptr<Field>> first_right_fields;
+  for (int i = 0; i < num_right_cols; i++)
+    first_right_fields.push_back(
+        field(std::string("r_0_") + std::to_string(i), int32(), metadata));
+
+  BatchesWithSchema input_left = MakeRandomBatches(schema(std::move(left_fields)));
+  std::vector<BatchesWithSchema> input_right;
+  input_right.push_back(MakeRandomBatches(schema(std::move(first_right_fields))));
+
+  for (int i = 1; i < num_joins; i++) {
+    int num_right_cols = rng.from_range(1, 8);
+    HashJoinNodeOptions opt =
+        GenerateHashJoinNodeOptions(rng,
+                                    static_cast<int>(opts[i - 1].left_output.size() +
+                                                     opts[i - 1].right_output.size()),
+                                    num_right_cols);
+    opts.push_back(std::move(opt));
+
+    std::vector<std::shared_ptr<Field>> right_fields;
+    for (int j = 0; j < num_right_cols; j++)
+      right_fields.push_back(
+          field(std::string("r_") + std::to_string(i) + "_" + std::to_string(j), int32(),
+                metadata));
+    BatchesWithSchema input = MakeRandomBatches(schema(std::move(right_fields)));
+    input_right.push_back(std::move(input));
+  }
+
+  std::vector<ExecBatch> reference;
+  for (bool bloom_filters : {false, true}) {
+    bool kParallel = true;
+    ARROW_SCOPED_TRACE(bloom_filters ? "bloom filtered" : "unfiltered");
+    auto exec_ctx = arrow::internal::make_unique<ExecContext>(
+        default_memory_pool(), kParallel ? arrow::internal::GetCpuThreadPool() : nullptr);
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
+
+    ExecNode* left_source;
+    ASSERT_OK_AND_ASSIGN(
+        left_source,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions{input_left.schema,
+                                       input_left.gen(kParallel, /*slow=*/false)}));
+    std::vector<ExecNode*> joins(num_joins);
+    for (int i = 0; i < num_joins; i++) {
+      opts[i].disable_bloom_filter = !bloom_filters;
+      ExecNode* right_source;
+      ASSERT_OK_AND_ASSIGN(
+          right_source,
+          MakeExecNode("source", plan.get(), {},
+                       SourceNodeOptions{input_right[i].schema,
+                                         input_right[i].gen(kParallel, /*slow=*/false)}));
+
+      std::vector<ExecNode*> inputs;
+      if (i == 0)
+        inputs = {left_source, right_source};
+      else
+        inputs = {joins[i - 1], right_source};
+      ASSERT_OK_AND_ASSIGN(joins[i],
+                           MakeExecNode("hashjoin", plan.get(), inputs, opts[i]));
+    }
+    AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+    ASSERT_OK(
+        MakeExecNode("sink", plan.get(), {joins.back()}, SinkNodeOptions{&sink_gen}));
+    ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
+    if (!bloom_filters)
+      reference = std::move(result);
+    else
+      AssertExecBatchesEqual(joins.back()->output_schema(), reference, result);
+  }
+}
+
+TEST(HashJoin, ChainedIntegerHashJoins) {
+  Random64Bit rng(42);
+  int num_tests = 30;
+  for (int i = 0; i < num_tests; i++) {
+    ARROW_SCOPED_TRACE("Test ", std::to_string(i));
+    TestSingleChainOfHashJoins(rng);
+  }
+}
+
 }  // namespace compute
 }  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/key_encode.h b/cpp/src/arrow/compute/exec/key_encode.h
index f9de31d9c2..58d8fb233f 100644
--- a/cpp/src/arrow/compute/exec/key_encode.h
+++ b/cpp/src/arrow/compute/exec/key_encode.h
@@ -21,6 +21,8 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/array/data.h"
+#include "arrow/compute/exec.h"
 #include "arrow/compute/exec/util.h"
 #include "arrow/compute/light_array.h"
 #include "arrow/memory_pool.h"
diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/exec/key_hash.cc
index 125fd3912e..e81ed64b6f 100644
--- a/cpp/src/arrow/compute/exec/key_hash.cc
+++ b/cpp/src/arrow/compute/exec/key_hash.cc
@@ -22,6 +22,7 @@
 #include <algorithm>
 #include <cstdint>
 
+#include "arrow/compute/exec/key_encode.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/ubsan.h"
 
@@ -456,6 +457,19 @@ void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
   }
 }
 
+Status Hashing32::HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
+                            int64_t hardware_flags, util::TempVectorStack* temp_stack,
+                            int64_t offset, int64_t length) {
+  std::vector<KeyColumnArray> column_arrays;
+  RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays));
+
+  KeyEncoder::KeyEncoderContext ctx;
+  ctx.hardware_flags = hardware_flags;
+  ctx.stack = temp_stack;
+  HashMultiColumn(column_arrays, &ctx, hashes);
+  return Status::OK();
+}
+
 inline uint64_t Hashing64::Avalanche(uint64_t acc) {
   acc ^= (acc >> 33);
   acc *= PRIME64_2;
@@ -875,5 +889,18 @@ void Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
   }
 }
 
+Status Hashing64::HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
+                            int64_t hardware_flags, util::TempVectorStack* temp_stack,
+                            int64_t offset, int64_t length) {
+  std::vector<KeyColumnArray> column_arrays;
+  RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays));
+
+  KeyEncoder::KeyEncoderContext ctx;
+  ctx.hardware_flags = hardware_flags;
+  ctx.stack = temp_stack;
+  HashMultiColumn(column_arrays, &ctx, hashes);
+  return Status::OK();
+}
+
 }  // namespace compute
 }  // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/exec/key_hash.h
index 719f3dfd46..05f39bb729 100644
--- a/cpp/src/arrow/compute/exec/key_hash.h
+++ b/cpp/src/arrow/compute/exec/key_hash.h
@@ -48,6 +48,10 @@ class ARROW_EXPORT Hashing32 {
   static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                               KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash);
 
+  static Status HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
+                          int64_t hardware_flags, util::TempVectorStack* temp_stack,
+                          int64_t offset, int64_t length);
+
  private:
   static const uint32_t PRIME32_1 = 0x9E3779B1;
   static const uint32_t PRIME32_2 = 0x85EBCA77;
@@ -156,6 +160,10 @@ class ARROW_EXPORT Hashing64 {
   static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
                               KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes);
 
+  static Status HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
+                          int64_t hardware_flags, util::TempVectorStack* temp_stack,
+                          int64_t offset, int64_t length);
+
  private:
   static const uint64_t PRIME64_1 = 0x9E3779B185EBCA87ULL;
   static const uint64_t PRIME64_2 = 0xC2B2AE3D27D4EB4FULL;
diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h
index 3c901be0d2..48cbf9d371 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -281,14 +281,16 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
       JoinType in_join_type, std::vector<FieldRef> in_left_keys,
       std::vector<FieldRef> in_right_keys, Expression filter = literal(true),
       std::string output_suffix_for_left = default_output_suffix_for_left,
-      std::string output_suffix_for_right = default_output_suffix_for_right)
+      std::string output_suffix_for_right = default_output_suffix_for_right,
+      bool disable_bloom_filter = false)
       : join_type(in_join_type),
         left_keys(std::move(in_left_keys)),
         right_keys(std::move(in_right_keys)),
         output_all(true),
         output_suffix_for_left(std::move(output_suffix_for_left)),
         output_suffix_for_right(std::move(output_suffix_for_right)),
-        filter(std::move(filter)) {
+        filter(std::move(filter)),
+        disable_bloom_filter(disable_bloom_filter) {
     this->key_cmp.resize(this->left_keys.size());
     for (size_t i = 0; i < this->left_keys.size(); ++i) {
       this->key_cmp[i] = JoinKeyCmp::EQ;
@@ -299,7 +301,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
       std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
       std::vector<FieldRef> right_output, Expression filter = literal(true),
       std::string output_suffix_for_left = default_output_suffix_for_left,
-      std::string output_suffix_for_right = default_output_suffix_for_right)
+      std::string output_suffix_for_right = default_output_suffix_for_right,
+      bool disable_bloom_filter = false)
       : join_type(join_type),
         left_keys(std::move(left_keys)),
         right_keys(std::move(right_keys)),
@@ -308,7 +311,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
         right_output(std::move(right_output)),
         output_suffix_for_left(std::move(output_suffix_for_left)),
         output_suffix_for_right(std::move(output_suffix_for_right)),
-        filter(std::move(filter)) {
+        filter(std::move(filter)),
+        disable_bloom_filter(disable_bloom_filter) {
     this->key_cmp.resize(this->left_keys.size());
     for (size_t i = 0; i < this->left_keys.size(); ++i) {
       this->key_cmp[i] = JoinKeyCmp::EQ;
@@ -320,7 +324,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
       std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp,
       Expression filter = literal(true),
       std::string output_suffix_for_left = default_output_suffix_for_left,
-      std::string output_suffix_for_right = default_output_suffix_for_right)
+      std::string output_suffix_for_right = default_output_suffix_for_right,
+      bool disable_bloom_filter = false)
       : join_type(join_type),
         left_keys(std::move(left_keys)),
         right_keys(std::move(right_keys)),
@@ -330,17 +335,20 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
         key_cmp(std::move(key_cmp)),
         output_suffix_for_left(std::move(output_suffix_for_left)),
         output_suffix_for_right(std::move(output_suffix_for_right)),
-        filter(std::move(filter)) {}
+        filter(std::move(filter)),
+        disable_bloom_filter(disable_bloom_filter) {}
+
+  HashJoinNodeOptions() = default;
 
   // type of join (inner, left, semi...)
-  JoinType join_type;
+  JoinType join_type = JoinType::INNER;
   // key fields from left input
   std::vector<FieldRef> left_keys;
   // key fields from right input
   std::vector<FieldRef> right_keys;
   // if set all valid fields from both left and right input will be output
   // (and field ref vectors for output fields will be ignored)
-  bool output_all;
+  bool output_all = false;
   // output fields passed from left input
   std::vector<FieldRef> left_output;
   // output fields passed from right input
@@ -358,7 +366,9 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
   // the filter are not included.  The filter is applied against the
   // concatenated input schema (left fields then right fields) and can reference
   // fields that are not included in the output.
-  Expression filter;
+  Expression filter = literal(true);
+  // whether or not to disable Bloom filters in this join
+  bool disable_bloom_filter = false;
 };
 
 /// \brief Make a node which select top_k/bottom_k rows passed through it
diff --git a/cpp/src/arrow/compute/exec/partition_util.cc b/cpp/src/arrow/compute/exec/partition_util.cc
index ed5e37edca..e99007c45a 100644
--- a/cpp/src/arrow/compute/exec/partition_util.cc
+++ b/cpp/src/arrow/compute/exec/partition_util.cc
@@ -21,24 +21,25 @@
 namespace arrow {
 namespace compute {
 
-PartitionLocks::PartitionLocks()
-    : num_prtns_(0),
-      locks_(nullptr),
-      rand_seed_{0, 0, 0, 0, 0, 0, 0, 0},
-      rand_engine_(rand_seed_) {}
+PartitionLocks::PartitionLocks() : num_prtns_(0), locks_(nullptr), rngs_(nullptr) {}
 
 PartitionLocks::~PartitionLocks() { CleanUp(); }
 
-void PartitionLocks::Init(int num_prtns) {
+void PartitionLocks::Init(size_t num_threads, int num_prtns) {
   num_prtns_ = num_prtns;
   locks_.reset(new PartitionLock[num_prtns]);
+  rngs_.reset(new arrow::random::pcg32_fast[num_threads]);
   for (int i = 0; i < num_prtns; ++i) {
     locks_[i].lock.store(false);
   }
+  arrow::random::pcg32_fast seed_gen(0);
+  std::uniform_int_distribution<uint32_t> seed_dist;
+  for (size_t i = 0; i < num_threads; i++) rngs_[i].seed(seed_dist(seed_gen));
 }
 
 void PartitionLocks::CleanUp() {
   locks_.reset();
+  rngs_.reset();
   num_prtns_ = 0;
 }
 
@@ -48,23 +49,23 @@ std::atomic<bool>* PartitionLocks::lock_ptr(int prtn_id) {
   return &(locks_[prtn_id].lock);
 }
 
-int PartitionLocks::random_int(int num_values) {
-  return rand_distribution_(rand_engine_) % num_values;
+int PartitionLocks::random_int(size_t thread_id, int num_values) {
+  return std::uniform_int_distribution<int>{0, num_values - 1}(rngs_[thread_id]);
 }
 
-bool PartitionLocks::AcquirePartitionLock(int num_prtns_to_try,
+bool PartitionLocks::AcquirePartitionLock(size_t thread_id, int num_prtns_to_try,
                                           const int* prtn_ids_to_try, bool limit_retries,
                                           int max_retries, int* locked_prtn_id,
                                           int* locked_prtn_id_pos) {
   int trial = 0;
   while (!limit_retries || trial <= max_retries) {
-    int prtn_id_pos = random_int(num_prtns_to_try);
+    int prtn_id_pos = random_int(thread_id, num_prtns_to_try);
     int prtn_id = prtn_ids_to_try[prtn_id_pos];
 
     std::atomic<bool>* lock = lock_ptr(prtn_id);
 
     bool expected = false;
-    if (lock->compare_exchange_weak(expected, true)) {
+    if (lock->compare_exchange_weak(expected, true, std::memory_order_acquire)) {
       *locked_prtn_id = prtn_id;
       *locked_prtn_id_pos = prtn_id_pos;
       return true;
@@ -81,7 +82,7 @@ bool PartitionLocks::AcquirePartitionLock(int num_prtns_to_try,
 void PartitionLocks::ReleasePartitionLock(int prtn_id) {
   ARROW_DCHECK(prtn_id >= 0 && prtn_id < num_prtns_);
   std::atomic<bool>* lock = lock_ptr(prtn_id);
-  lock->store(false);
+  lock->store(false, std::memory_order_release);
 }
 
 }  // namespace compute
diff --git a/cpp/src/arrow/compute/exec/partition_util.h b/cpp/src/arrow/compute/exec/partition_util.h
index 6efda4aeeb..07fb91f2f1 100644
--- a/cpp/src/arrow/compute/exec/partition_util.h
+++ b/cpp/src/arrow/compute/exec/partition_util.h
@@ -24,6 +24,7 @@
 #include <random>
 #include "arrow/buffer.h"
 #include "arrow/compute/exec/util.h"
+#include "arrow/util/pcg_random.h"
 
 namespace arrow {
 namespace compute {
@@ -59,14 +60,14 @@ class PartitionSort {
   /// out_arr: [2, 5, 3, 5, 4, 7]
   /// prtn_ranges: [0, 1, 5, 6]
   template <class INPUT_PRTN_ID_FN, class OUTPUT_POS_FN>
-  static void Eval(int num_rows, int num_prtns, uint16_t* prtn_ranges,
+  static void Eval(int64_t num_rows, int num_prtns, uint16_t* prtn_ranges,
                    INPUT_PRTN_ID_FN prtn_id_impl, OUTPUT_POS_FN output_pos_impl) {
     ARROW_DCHECK(num_rows > 0 && num_rows <= (1 << 15));
     ARROW_DCHECK(num_prtns >= 1 && num_prtns <= (1 << 15));
 
     memset(prtn_ranges, 0, (num_prtns + 1) * sizeof(uint16_t));
 
-    for (int i = 0; i < num_rows; ++i) {
+    for (int64_t i = 0; i < num_rows; ++i) {
       int prtn_id = static_cast<int>(prtn_id_impl(i));
       ++prtn_ranges[prtn_id + 1];
     }
@@ -78,7 +79,7 @@ class PartitionSort {
       sum = sum_next;
     }
 
-    for (int i = 0; i < num_rows; ++i) {
+    for (int64_t i = 0; i < num_rows; ++i) {
       int prtn_id = static_cast<int>(prtn_id_impl(i));
       int pos = prtn_ranges[prtn_id + 1]++;
       output_pos_impl(i, pos);
@@ -93,12 +94,14 @@ class PartitionLocks {
   ~PartitionLocks();
   /// \brief Initializes the control, must be called before use
   ///
+  /// \param num_threads Maximum number of threads that will access the partitions
   /// \param num_prtns Number of partitions to synchronize
-  void Init(int num_prtns);
+  void Init(size_t num_threads, int num_prtns);
   /// \brief Cleans up the control, it should not be used after this call
   void CleanUp();
   /// \brief Acquire a partition to work on one
   ///
+  /// \param thread_id The index of the thread trying to acquire the partition lock
   /// \param num_prtns Length of prtns_to_try, must be <= num_prtns used in Init
   /// \param prtns_to_try An array of partitions that still have remaining work
   /// \param limit_retries If false, this method will spinwait forever until success
@@ -109,15 +112,15 @@ class PartitionLocks {
   ///         without successfully acquiring a lock
   ///
   /// This method is thread safe
-  bool AcquirePartitionLock(int num_prtns, const int* prtns_to_try, bool limit_retries,
-                            int max_retries, int* locked_prtn_id,
+  bool AcquirePartitionLock(size_t thread_id, int num_prtns, const int* prtns_to_try,
+                            bool limit_retries, int max_retries, int* locked_prtn_id,
                             int* locked_prtn_id_pos);
   /// \brief Release a partition so that other threads can work on it
   void ReleasePartitionLock(int prtn_id);
 
  private:
   std::atomic<bool>* lock_ptr(int prtn_id);
-  int random_int(int num_values);
+  int random_int(size_t thread_id, int num_values);
 
   struct PartitionLock {
     static constexpr int kCacheLineBytes = 64;
@@ -126,10 +129,7 @@ class PartitionLocks {
   };
   int num_prtns_;
   std::unique_ptr<PartitionLock[]> locks_;
-
-  std::seed_seq rand_seed_;
-  std::mt19937 rand_engine_;
-  std::uniform_int_distribution<uint64_t> rand_distribution_;
+  std::unique_ptr<arrow::random::pcg32_fast[]> rngs_;
 };
 
 }  // namespace compute
diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc
index f6ac70ad45..ae70cfcd46 100644
--- a/cpp/src/arrow/compute/exec/util.cc
+++ b/cpp/src/arrow/compute/exec/util.cc
@@ -29,6 +29,38 @@ using bit_util::CountTrailingZeros;
 
 namespace util {
 
+inline uint64_t bit_util::SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes) {
+  // This will not be correct on big-endian architectures.
+#if !ARROW_LITTLE_ENDIAN
+  ARROW_DCHECK(false);
+#endif
+  ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8);
+  if (num_bytes == 8) {
+    return util::SafeLoad(reinterpret_cast<const uint64_t*>(bytes));
+  } else {
+    uint64_t word = 0;
+    for (int i = 0; i < num_bytes; ++i) {
+      word |= static_cast<uint64_t>(bytes[i]) << (8 * i);
+    }
+    return word;
+  }
+}
+
+inline void bit_util::SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value) {
+  // This will not be correct on big-endian architectures.
+#if !ARROW_LITTLE_ENDIAN
+  ARROW_DCHECK(false);
+#endif
+  ARROW_DCHECK(num_bytes >= 0 && num_bytes <= 8);
+  if (num_bytes == 8) {
+    util::SafeStore(reinterpret_cast<uint64_t*>(bytes), value);
+  } else {
+    for (int i = 0; i < num_bytes; ++i) {
+      bytes[i] = static_cast<uint8_t>(value >> (8 * i));
+    }
+  }
+}
+
 inline void bit_util::bits_to_indexes_helper(uint64_t word, uint16_t base_index,
                                              int* num_indexes, uint16_t* indexes) {
   int n = *num_indexes;
@@ -86,8 +118,8 @@ void bit_util::bits_to_indexes_internal(int64_t hardware_flags, const int num_bi
 #endif
   // Optionally process the last partial word with masking out bits outside range
   if (tail) {
-    uint64_t word =
-        util::SafeLoad(&reinterpret_cast<const uint64_t*>(bits)[num_bits / unroll]);
+    const uint8_t* bits_tail = bits + (num_bits - tail) / 8;
+    uint64_t word = SafeLoadUpTo8Bytes(bits_tail, (tail + 7) / 8);
     if (bit_to_search == 0) {
       word = ~word;
     }
@@ -109,8 +141,7 @@ void bit_util::bits_to_indexes(int bit_to_search, int64_t hardware_flags, int nu
   *num_indexes = 0;
   uint16_t base_index = 0;
   if (bit_offset != 0) {
-    uint64_t bits_head =
-        util::SafeLoad(reinterpret_cast<const uint64_t*>(bits)) >> bit_offset;
+    uint64_t bits_head = bits[0] >> bit_offset;
     int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
     bits_to_indexes(bit_to_search, hardware_flags, bits_in_first_byte,
                     reinterpret_cast<const uint8_t*>(&bits_head), num_indexes, indexes);
@@ -143,8 +174,7 @@ void bit_util::bits_filter_indexes(int bit_to_search, int64_t hardware_flags,
   bit_offset %= 8;
   if (bit_offset != 0) {
     int num_indexes_head = 0;
-    uint64_t bits_head =
-        util::SafeLoad(reinterpret_cast<const uint64_t*>(bits)) >> bit_offset;
+    uint64_t bits_head = bits[0] >> bit_offset;
     int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
     bits_filter_indexes(bit_to_search, hardware_flags, bits_in_first_byte,
                         reinterpret_cast<const uint8_t*>(&bits_head), input_indexes,
@@ -185,8 +215,7 @@ void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits,
   bits += bit_offset / 8;
   bit_offset %= 8;
   if (bit_offset != 0) {
-    uint64_t bits_head =
-        util::SafeLoad(reinterpret_cast<const uint64_t*>(bits)) >> bit_offset;
+    uint64_t bits_head = bits[0] >> bit_offset;
     int bits_in_first_byte = std::min(num_bits, 8 - bit_offset);
     bits_to_bytes(hardware_flags, bits_in_first_byte,
                   reinterpret_cast<const uint8_t*>(&bits_head), bytes);
@@ -207,7 +236,7 @@ void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits,
 #endif
   // Processing 8 bits at a time
   constexpr int unroll = 8;
-  for (int i = num_processed / unroll; i < (num_bits + unroll - 1) / unroll; ++i) {
+  for (int i = num_processed / unroll; i < num_bits / unroll; ++i) {
     uint8_t bits_next = bits[i];
     // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart
     // from the previous.
@@ -219,6 +248,19 @@ void bit_util::bits_to_bytes(int64_t hardware_flags, const int num_bits,
     unpacked *= 255;
     util::SafeStore(&reinterpret_cast<uint64_t*>(bytes)[i], unpacked);
   }
+  int tail = num_bits % unroll;
+  if (tail) {
+    uint8_t bits_next = bits[(num_bits - tail) / unroll];
+    // Clear the lowest bit and then make 8 copies of remaining 7 bits, each 7 bits apart
+    // from the previous.
+    uint64_t unpacked = static_cast<uint64_t>(bits_next & 0xfe) *
+                        ((1ULL << 7) | (1ULL << 14) | (1ULL << 21) | (1ULL << 28) |
+                         (1ULL << 35) | (1ULL << 42) | (1ULL << 49));
+    unpacked |= (bits_next & 1);
+    unpacked &= 0x0101010101010101ULL;
+    unpacked *= 255;
+    SafeStoreUpTo8Bytes(bytes + num_bits - tail, tail, unpacked);
+  }
 }
 
 void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits,
@@ -250,7 +292,7 @@ void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits,
 #endif
   // Process 8 bits at a time
   constexpr int unroll = 8;
-  for (int i = num_processed / unroll; i < (num_bits + unroll - 1) / unroll; ++i) {
+  for (int i = num_processed / unroll; i < num_bits / unroll; ++i) {
     uint64_t bytes_next = util::SafeLoad(&reinterpret_cast<const uint64_t*>(bytes)[i]);
     bytes_next &= 0x0101010101010101ULL;
     bytes_next |= (bytes_next >> 7);  // Pairs of adjacent output bits in individual bytes
@@ -258,6 +300,15 @@ void bit_util::bytes_to_bits(int64_t hardware_flags, const int num_bits,
     bytes_next |= (bytes_next >> 28);  // All 8 output bits in the lowest byte
     bits[i] = static_cast<uint8_t>(bytes_next & 0xff);
   }
+  int tail = num_bits % unroll;
+  if (tail) {
+    uint64_t bytes_next = SafeLoadUpTo8Bytes(bytes + num_bits - tail, tail);
+    bytes_next &= 0x0101010101010101ULL;
+    bytes_next |= (bytes_next >> 7);  // Pairs of adjacent output bits in individual bytes
+    bytes_next |= (bytes_next >> 14);  // 4 adjacent output bits in individual bytes
+    bytes_next |= (bytes_next >> 28);  // All 8 output bits in the lowest byte
+    bits[num_bits / 8] = static_cast<uint8_t>(bytes_next & 0xff);
+  }
 }
 
 bool bit_util::are_all_bytes_zero(int64_t hardware_flags, const uint8_t* bytes,
diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h
index 4e7550582a..839a8a7d29 100644
--- a/cpp/src/arrow/compute/exec/util.h
+++ b/cpp/src/arrow/compute/exec/util.h
@@ -92,7 +92,7 @@ class TempVectorStack {
   Status Init(MemoryPool* pool, int64_t size) {
     num_vectors_ = 0;
     top_ = 0;
-    buffer_size_ = size;
+    buffer_size_ = PaddedAllocationSize(size) + kPadding + 2 * sizeof(uint64_t);
     ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool));
     // Ensure later operations don't accidentally read uninitialized memory.
     std::memset(buffer->mutable_data(), 0xFF, size);
@@ -193,6 +193,8 @@ class bit_util {
                                  uint32_t num_bytes);
 
  private:
+  inline static uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes);
+  inline static void SafeStoreUpTo8Bytes(uint8_t* bytes, int num_bytes, uint64_t value);
   inline static void bits_to_indexes_helper(uint64_t word, uint16_t base_index,
                                             int* num_indexes, uint16_t* indexes);
   inline static void bits_filter_indexes_helper(uint64_t word,
diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc
index 390dcc6cbf..4fae01eb6b 100644
--- a/cpp/src/arrow/compute/light_array.cc
+++ b/cpp/src/arrow/compute/light_array.cc
@@ -138,7 +138,7 @@ Result<KeyColumnMetadata> ColumnMetadataFromDataType(
 }
 
 Result<KeyColumnArray> ColumnArrayFromArrayData(
-    const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows) {
+    const std::shared_ptr<ArrayData>& array_data, int64_t start_row, int64_t num_rows) {
   ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata,
                         ColumnMetadataFromDataType(array_data->type));
   KeyColumnArray column_array = KeyColumnArray(
@@ -165,7 +165,8 @@ Status ColumnMetadatasFromExecBatch(const ExecBatch& batch,
   return Status::OK();
 }
 
-Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row,
+                                 int64_t num_rows,
                                  std::vector<KeyColumnArray>* column_arrays) {
   int num_columns = static_cast<int>(batch.values.size());
   column_arrays->resize(num_columns);
diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h
index 0856e3e8aa..dd13aa0647 100644
--- a/cpp/src/arrow/compute/light_array.h
+++ b/cpp/src/arrow/compute/light_array.h
@@ -171,7 +171,7 @@ ARROW_EXPORT Result<KeyColumnMetadata> ColumnMetadataFromDataType(
 /// The caller should ensure this is only called on "key" columns.
 /// \see ColumnMetadataFromDataType for details
 ARROW_EXPORT Result<KeyColumnArray> ColumnArrayFromArrayData(
-    const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows);
+    const std::shared_ptr<ArrayData>& array_data, int64_t start_row, int64_t num_rows);
 
 /// \brief Create KeyColumnMetadata instances from an ExecBatch
 ///
@@ -188,8 +188,8 @@ ARROW_EXPORT Status ColumnMetadatasFromExecBatch(
 ///
 /// All columns in `batch` must be eligible "key" columns and have an array shape
 /// \see ColumnArrayFromArrayData for more details
-ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row,
-                                              int num_rows,
+ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int64_t start_row,
+                                              int64_t num_rows,
                                               std::vector<KeyColumnArray>* column_arrays);
 
 /// \brief Create KeyColumnArray instances from an ExecBatch