You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/07/07 01:22:07 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #13493: ARROW-14182: [C++][Compute] Hash Join performance improvement v2

westonpace commented on code in PR #13493:
URL: https://github.com/apache/arrow/pull/13493#discussion_r915355867


##########
cpp/src/arrow/compute/exec/swiss_join.h:
##########
@@ -0,0 +1,758 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/key_map.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/partition_util.h"
+#include "arrow/compute/exec/schema_util.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/encode_internal.h"
+
+namespace arrow {
+namespace compute {
+
+class RowArrayAccessor {
+ public:
+  // Find the index of this varbinary column within the sequence of all
+  // varbinary columns encoded in rows.
+  //
+  static int VarbinaryColumnId(const RowTableMetadata& row_metadata, int column_id);
+
+  // Calculate how many rows to skip from the tail of the
+  // sequence of selected rows, such that the total size of skipped rows is at
+  // least equal to the size specified by the caller. Skipping of the tail rows
+  // is used to allow for faster processing by the caller of remaining rows
+  // without checking buffer bounds (useful with SIMD or fixed size memory loads
+  // and stores).
+  //
+  static int NumRowsToSkip(const RowTableImpl& rows, int column_id, int num_rows,
+                           const uint32_t* row_ids, int num_tail_bytes_to_skip);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - pointer to the value,
+  // - byte length of the value.
+  //
+  // The information about nulls (validity bitmap) is not used in this call and
+  // has to be processed separately.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void Visit(const RowTableImpl& rows, int column_id, int num_rows,
+                    const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - byte 0xFF if the null is set for the row or 0x00 otherwise.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void VisitNulls(const RowTableImpl& rows, int column_id, int num_rows,
+                         const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+ private:
+#if defined(ARROW_HAVE_AVX2)
+  // This is equivalent to Visit method, but processing 8 rows at a time in a
+  // loop.
+  // Returns the number of processed rows, which may be less than requested (up
+  // to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int Visit_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                        const uint32_t* row_ids, PROCESS_8_VALUES_FN process_8_values_fn);
+
+  // This is equivalent to VisitNulls method, but processing 8 rows at a time in
+  // a loop. Returns the number of processed rows, which may be less than
+  // requested (up to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int VisitNulls_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                             const uint32_t* row_ids,
+                             PROCESS_8_VALUES_FN process_8_values_fn);
+#endif
+};
+
+// Write operations (appending batch rows) must not be called by more than one
+// thread at the same time.
+//
+// Read operations (row comparison, column decoding)
+// can be called by multiple threads concurrently.
+//
+struct RowArray {
+  RowArray() : is_initialized_(false) {}
+
+  Status InitIfNeeded(MemoryPool* pool, const ExecBatch& batch);
+  Status InitIfNeeded(MemoryPool* pool, const RowTableMetadata& row_metadata);
+
+  Status AppendBatchSelection(MemoryPool* pool, const ExecBatch& batch, int begin_row_id,
+                              int end_row_id, int num_row_ids, const uint16_t* row_ids,
+                              std::vector<KeyColumnArray>& temp_column_arrays);
+
+  // This can only be called for a minibatch.
+  //
+  void Compare(const ExecBatch& batch, int begin_row_id, int end_row_id, int num_selected,
+               const uint16_t* batch_selection_maybe_null, const uint32_t* array_row_ids,
+               uint32_t* out_num_not_equal, uint16_t* out_not_equal_selection,
+               int64_t hardware_flags, util::TempVectorStack* temp_stack,
+               std::vector<KeyColumnArray>& temp_column_arrays,
+               uint8_t* out_match_bitvector_maybe_null = NULLPTR);
+
+  // TODO: add AVX2 version
+  //
+  Status DecodeSelected(ResizableArrayData* target, int column_id, int num_rows_to_append,
+                        const uint32_t* row_ids, MemoryPool* pool) const;
+
+  void DebugPrintToFile(const char* filename, bool print_sorted) const;
+
+  int64_t num_rows() const { return is_initialized_ ? rows_.length() : 0; }
+
+  bool is_initialized_;
+  RowTableEncoder encoder_;
+  RowTableImpl rows_;
+  RowTableImpl rows_temp_;
+};
+
+// Implements concatenating multiple row arrays into a single one, using
+// potentially multiple threads, each processing a single input row array.
+//
+class RowArrayMerge {
+ public:
+  // Calculate total number of rows and size in bytes for merged sequence of
+  // rows and allocate memory for it.
+  //
+  // If the rows are of varying length, initialize in the offset array the first
+  // entry for the write area for each input row array. Leave all other
+  // offsets and buffers uninitialized.
+  //
+  // All input sources must be initialized, but they can contain zero rows.
+  //
+  // Output in vector the first target row id for each source (exclusive
+  // cummulative sum of number of rows in sources).
+  //
+  static Status PrepareForMerge(RowArray* target, const std::vector<RowArray*>& sources,
+                                std::vector<int64_t>* first_target_row_id,
+                                MemoryPool* pool);
+
+  // Copy rows from source array to target array.
+  // Both arrays must have the same row metadata.
+  // Target array must already have the memory reserved in all internal buffers
+  // for the copy of the rows.
+  //
+  // Copy of the rows will occupy the same amount of space in the target array
+  // buffers as in the source array, but in the target array we pick at what row
+  // position and offset we start writing.
+  //
+  // Optionally, the rows may be reordered during copy according to the
+  // provided permutation, which represents some sorting order of source rows.
+  // Nth element of the permutation array is the source row index for the Nth
+  // row written into target array. If permutation is missing (null), then the
+  // order of source rows will remain unchanged.
+  //
+  // In case of varying length rows, we purposefully skip outputting of N+1 (one
+  // after last) offset, to allow concurrent copies of rows done to adjacent
+  // ranges in the target array. This offset should already contain the right
+  // value after calling the method preparing target array for merge (which
+  // initializes boundary offsets for target row ranges for each source).
+  //
+  static void MergeSingle(RowArray* target, const RowArray& source,
+                          int64_t first_target_row_id,
+                          const int64_t* source_rows_permutation);
+
+ private:
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for fixed length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyFixedLength(RowTableImpl* target, const RowTableImpl& source,
+                              int64_t first_target_row_id,
+                              const int64_t* source_rows_permutation);
+
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for varying length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyVaryingLength(RowTableImpl* target, const RowTableImpl& source,
+                                int64_t first_target_row_id,
+                                int64_t first_target_row_offset,
+                                const int64_t* source_rows_permutation);
+
+  // Copy null information from rows from source array to a region of the target
+  // array.
+  //
+  static void CopyNulls(RowTableImpl* target, const RowTableImpl& source,
+                        int64_t first_target_row_id,
+                        const int64_t* source_rows_permutation);
+};
+
+// Implements merging of multiple SwissTables into a single one, using
+// potentially multiple threads, each processing a single input source.
+//
+// Each source should correspond to a range of original hashes.
+// A row belongs to a source with index determined by K highest bits of
+// original hash. That means that the number of sources must be a power of 2.
+//
+// We assume that the hash values used and stored inside source tables
+// have K highest bits removed from the original hash in order to avoid huge
+// number of hash collisions that would occur otherwise.
+// These bits will be reinserted back (original hashes will be used) when
+// merging into target.
+//
+class SwissTableMerge {
+ public:
+  // Calculate total number of blocks for merged table.
+  // Allocate buffers sized accordingly and initialize empty target table.
+  //
+  // All input sources must be initialized, but they can be empty.
+  //
+  // Output in a vector the first target group id for each source (exclusive
+  // cummulative sum of number of groups in sources).
+  //
+  static Status PrepareForMerge(SwissTable* target,
+                                const std::vector<SwissTable*>& sources,
+                                std::vector<uint32_t>* first_target_group_id,
+                                MemoryPool* pool);
+
+  // Copy all entries from source to a range of blocks (partition) of target.
+  //
+  // During copy, adjust group ids from source by adding provided base id.
+  //
+  // Skip entries from source that would cross partition boundaries (range of
+  // blocks) when inserted into target. Save their data in output vector for
+  // processing later. We postpone inserting these overflow entries in order to
+  // allow concurrent processing of all partitions. Overflow entries will be
+  // handled by a single-thread afterwards.
+  //
+  static void MergePartition(SwissTable* target, const SwissTable* source,
+                             uint32_t partition_id, int num_partition_bits,
+                             uint32_t base_group_id,
+                             std::vector<uint32_t>* overflow_group_ids,
+                             std::vector<uint32_t>* overflow_hashes);
+
+  // Single-threaded processing of remaining groups, that could not be
+  // inserted in partition merge phase
+  // (due to entries from one partition spilling over due to full blocks into
+  // the next partition).
+  //
+  static void InsertNewGroups(SwissTable* target, const std::vector<uint32_t>& group_ids,
+                              const std::vector<uint32_t>& hashes);
+
+ private:
+  // Insert a new group id.
+  //
+  // Assumes that there are enough slots in the target
+  // and there is no need to resize it.
+  //
+  // Max block id can be provided, in which case the search for an empty slot to
+  // insert new entry to will stop after visiting that block.
+  //
+  // Max block id value greater or equal to the number of blocks guarantees that
+  // the search will not be stopped.
+  //
+  static inline bool InsertNewGroup(SwissTable* target, uint64_t group_id, uint32_t hash,
+                                    int64_t max_block_id);
+};
+
+struct SwissTableWithKeys {
+  struct Input {
+    Input(const ExecBatch* in_batch, int in_batch_start_row, int in_batch_end_row,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, int in_num_selected, const uint16_t* in_selection,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays,
+          std::vector<uint32_t>* in_temp_group_ids);
+
+    Input(const Input& base, int num_rows_to_skip, int num_rows_to_include);
+
+    const ExecBatch* batch;
+    // Window of the batch to operate on.
+    // The window information is only used if row selection is null.
+    //
+    int batch_start_row;
+    int batch_end_row;
+    // Optional selection.
+    // Used instead of window of the batch if not null.
+    //
+    int num_selected;
+    const uint16_t* selection_maybe_null;
+    // Thread specific scratch buffers for storing temporary data.
+    //
+    util::TempVectorStack* temp_stack;
+    std::vector<KeyColumnArray>* temp_column_arrays;
+    std::vector<uint32_t>* temp_group_ids;
+  };
+
+  Status Init(int64_t hardware_flags, MemoryPool* pool);
+
+  void InitCallbacks();
+
+  static void Hash(Input* input, uint32_t* hashes, int64_t hardware_flags);

Review Comment:
   It seems odd that I have to supply `hardware_flags` for this method but not for the `MapXyz` variants.  I think the Map methods are using the hardware flags from `swiss_table_`.  Can we do that here too?



##########
cpp/src/arrow/compute/exec/swiss_join.h:
##########
@@ -0,0 +1,758 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/key_map.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/partition_util.h"
+#include "arrow/compute/exec/schema_util.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/encode_internal.h"
+
+namespace arrow {
+namespace compute {
+
+class RowArrayAccessor {
+ public:
+  // Find the index of this varbinary column within the sequence of all
+  // varbinary columns encoded in rows.
+  //
+  static int VarbinaryColumnId(const RowTableMetadata& row_metadata, int column_id);
+
+  // Calculate how many rows to skip from the tail of the
+  // sequence of selected rows, such that the total size of skipped rows is at
+  // least equal to the size specified by the caller. Skipping of the tail rows
+  // is used to allow for faster processing by the caller of remaining rows
+  // without checking buffer bounds (useful with SIMD or fixed size memory loads
+  // and stores).
+  //
+  static int NumRowsToSkip(const RowTableImpl& rows, int column_id, int num_rows,
+                           const uint32_t* row_ids, int num_tail_bytes_to_skip);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - pointer to the value,
+  // - byte length of the value.
+  //
+  // The information about nulls (validity bitmap) is not used in this call and
+  // has to be processed separately.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void Visit(const RowTableImpl& rows, int column_id, int num_rows,
+                    const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - byte 0xFF if the null is set for the row or 0x00 otherwise.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void VisitNulls(const RowTableImpl& rows, int column_id, int num_rows,
+                         const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+ private:
+#if defined(ARROW_HAVE_AVX2)
+  // This is equivalent to Visit method, but processing 8 rows at a time in a
+  // loop.
+  // Returns the number of processed rows, which may be less than requested (up
+  // to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int Visit_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                        const uint32_t* row_ids, PROCESS_8_VALUES_FN process_8_values_fn);
+
+  // This is equivalent to VisitNulls method, but processing 8 rows at a time in
+  // a loop. Returns the number of processed rows, which may be less than
+  // requested (up to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int VisitNulls_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                             const uint32_t* row_ids,
+                             PROCESS_8_VALUES_FN process_8_values_fn);
+#endif
+};
+
+// Write operations (appending batch rows) must not be called by more than one
+// thread at the same time.
+//
+// Read operations (row comparison, column decoding)
+// can be called by multiple threads concurrently.
+//
+struct RowArray {
+  RowArray() : is_initialized_(false) {}
+
+  Status InitIfNeeded(MemoryPool* pool, const ExecBatch& batch);
+  Status InitIfNeeded(MemoryPool* pool, const RowTableMetadata& row_metadata);
+
+  Status AppendBatchSelection(MemoryPool* pool, const ExecBatch& batch, int begin_row_id,
+                              int end_row_id, int num_row_ids, const uint16_t* row_ids,
+                              std::vector<KeyColumnArray>& temp_column_arrays);
+
+  // This can only be called for a minibatch.
+  //
+  void Compare(const ExecBatch& batch, int begin_row_id, int end_row_id, int num_selected,
+               const uint16_t* batch_selection_maybe_null, const uint32_t* array_row_ids,
+               uint32_t* out_num_not_equal, uint16_t* out_not_equal_selection,
+               int64_t hardware_flags, util::TempVectorStack* temp_stack,
+               std::vector<KeyColumnArray>& temp_column_arrays,
+               uint8_t* out_match_bitvector_maybe_null = NULLPTR);
+
+  // TODO: add AVX2 version
+  //
+  Status DecodeSelected(ResizableArrayData* target, int column_id, int num_rows_to_append,
+                        const uint32_t* row_ids, MemoryPool* pool) const;
+
+  void DebugPrintToFile(const char* filename, bool print_sorted) const;
+
+  int64_t num_rows() const { return is_initialized_ ? rows_.length() : 0; }
+
+  bool is_initialized_;
+  RowTableEncoder encoder_;
+  RowTableImpl rows_;
+  RowTableImpl rows_temp_;
+};
+
+// Implements concatenating multiple row arrays into a single one, using
+// potentially multiple threads, each processing a single input row array.
+//
+class RowArrayMerge {
+ public:
+  // Calculate total number of rows and size in bytes for merged sequence of
+  // rows and allocate memory for it.
+  //
+  // If the rows are of varying length, initialize in the offset array the first
+  // entry for the write area for each input row array. Leave all other
+  // offsets and buffers uninitialized.
+  //
+  // All input sources must be initialized, but they can contain zero rows.
+  //
+  // Output in vector the first target row id for each source (exclusive
+  // cummulative sum of number of rows in sources).
+  //
+  static Status PrepareForMerge(RowArray* target, const std::vector<RowArray*>& sources,
+                                std::vector<int64_t>* first_target_row_id,
+                                MemoryPool* pool);
+
+  // Copy rows from source array to target array.
+  // Both arrays must have the same row metadata.
+  // Target array must already have the memory reserved in all internal buffers
+  // for the copy of the rows.
+  //
+  // Copy of the rows will occupy the same amount of space in the target array
+  // buffers as in the source array, but in the target array we pick at what row
+  // position and offset we start writing.
+  //
+  // Optionally, the rows may be reordered during copy according to the
+  // provided permutation, which represents some sorting order of source rows.
+  // Nth element of the permutation array is the source row index for the Nth
+  // row written into target array. If permutation is missing (null), then the
+  // order of source rows will remain unchanged.
+  //
+  // In case of varying length rows, we purposefully skip outputting of N+1 (one
+  // after last) offset, to allow concurrent copies of rows done to adjacent
+  // ranges in the target array. This offset should already contain the right
+  // value after calling the method preparing target array for merge (which
+  // initializes boundary offsets for target row ranges for each source).
+  //
+  static void MergeSingle(RowArray* target, const RowArray& source,
+                          int64_t first_target_row_id,
+                          const int64_t* source_rows_permutation);
+
+ private:
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for fixed length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyFixedLength(RowTableImpl* target, const RowTableImpl& source,
+                              int64_t first_target_row_id,
+                              const int64_t* source_rows_permutation);
+
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for varying length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyVaryingLength(RowTableImpl* target, const RowTableImpl& source,
+                                int64_t first_target_row_id,
+                                int64_t first_target_row_offset,
+                                const int64_t* source_rows_permutation);
+
+  // Copy null information from rows from source array to a region of the target
+  // array.
+  //
+  static void CopyNulls(RowTableImpl* target, const RowTableImpl& source,
+                        int64_t first_target_row_id,
+                        const int64_t* source_rows_permutation);
+};
+
+// Implements merging of multiple SwissTables into a single one, using
+// potentially multiple threads, each processing a single input source.
+//
+// Each source should correspond to a range of original hashes.
+// A row belongs to a source with index determined by K highest bits of
+// original hash. That means that the number of sources must be a power of 2.
+//
+// We assume that the hash values used and stored inside source tables
+// have K highest bits removed from the original hash in order to avoid huge
+// number of hash collisions that would occur otherwise.
+// These bits will be reinserted back (original hashes will be used) when
+// merging into target.
+//
+class SwissTableMerge {
+ public:
+  // Calculate total number of blocks for merged table.
+  // Allocate buffers sized accordingly and initialize empty target table.
+  //
+  // All input sources must be initialized, but they can be empty.
+  //
+  // Output in a vector the first target group id for each source (exclusive
+  // cummulative sum of number of groups in sources).
+  //
+  static Status PrepareForMerge(SwissTable* target,
+                                const std::vector<SwissTable*>& sources,
+                                std::vector<uint32_t>* first_target_group_id,
+                                MemoryPool* pool);
+
+  // Copy all entries from source to a range of blocks (partition) of target.
+  //
+  // During copy, adjust group ids from source by adding provided base id.
+  //
+  // Skip entries from source that would cross partition boundaries (range of
+  // blocks) when inserted into target. Save their data in output vector for
+  // processing later. We postpone inserting these overflow entries in order to
+  // allow concurrent processing of all partitions. Overflow entries will be
+  // handled by a single-thread afterwards.
+  //
+  static void MergePartition(SwissTable* target, const SwissTable* source,
+                             uint32_t partition_id, int num_partition_bits,
+                             uint32_t base_group_id,
+                             std::vector<uint32_t>* overflow_group_ids,
+                             std::vector<uint32_t>* overflow_hashes);
+
+  // Single-threaded processing of remaining groups, that could not be
+  // inserted in partition merge phase
+  // (due to entries from one partition spilling over due to full blocks into
+  // the next partition).
+  //
+  static void InsertNewGroups(SwissTable* target, const std::vector<uint32_t>& group_ids,
+                              const std::vector<uint32_t>& hashes);
+
+ private:
+  // Insert a new group id.
+  //
+  // Assumes that there are enough slots in the target
+  // and there is no need to resize it.
+  //
+  // Max block id can be provided, in which case the search for an empty slot to
+  // insert new entry to will stop after visiting that block.
+  //
+  // Max block id value greater or equal to the number of blocks guarantees that
+  // the search will not be stopped.
+  //
+  static inline bool InsertNewGroup(SwissTable* target, uint64_t group_id, uint32_t hash,
+                                    int64_t max_block_id);
+};
+
+struct SwissTableWithKeys {
+  struct Input {
+    Input(const ExecBatch* in_batch, int in_batch_start_row, int in_batch_end_row,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, int in_num_selected, const uint16_t* in_selection,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays,
+          std::vector<uint32_t>* in_temp_group_ids);
+
+    Input(const Input& base, int num_rows_to_skip, int num_rows_to_include);
+
+    const ExecBatch* batch;
+    // Window of the batch to operate on.
+    // The window information is only used if row selection is null.
+    //
+    int batch_start_row;
+    int batch_end_row;
+    // Optional selection.
+    // Used instead of window of the batch if not null.
+    //
+    int num_selected;
+    const uint16_t* selection_maybe_null;
+    // Thread specific scratch buffers for storing temporary data.
+    //
+    util::TempVectorStack* temp_stack;
+    std::vector<KeyColumnArray>* temp_column_arrays;
+    std::vector<uint32_t>* temp_group_ids;
+  };
+
+  Status Init(int64_t hardware_flags, MemoryPool* pool);
+
+  void InitCallbacks();
+
+  static void Hash(Input* input, uint32_t* hashes, int64_t hardware_flags);

Review Comment:
   So it took me a while to realize that this method is actually static (and thus doesn't affect the table state).  Does this really belong here?  How is this different than something like `Hashing32::HashBatch`?



##########
cpp/src/arrow/compute/exec/swiss_join.h:
##########
@@ -0,0 +1,758 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/key_map.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/partition_util.h"
+#include "arrow/compute/exec/schema_util.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/encode_internal.h"
+
+namespace arrow {
+namespace compute {
+
+class RowArrayAccessor {
+ public:
+  // Find the index of this varbinary column within the sequence of all
+  // varbinary columns encoded in rows.
+  //
+  static int VarbinaryColumnId(const RowTableMetadata& row_metadata, int column_id);
+
+  // Calculate how many rows to skip from the tail of the
+  // sequence of selected rows, such that the total size of skipped rows is at
+  // least equal to the size specified by the caller. Skipping of the tail rows
+  // is used to allow for faster processing by the caller of remaining rows
+  // without checking buffer bounds (useful with SIMD or fixed size memory loads
+  // and stores).
+  //
+  static int NumRowsToSkip(const RowTableImpl& rows, int column_id, int num_rows,
+                           const uint32_t* row_ids, int num_tail_bytes_to_skip);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - pointer to the value,
+  // - byte length of the value.
+  //
+  // The information about nulls (validity bitmap) is not used in this call and
+  // has to be processed separately.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void Visit(const RowTableImpl& rows, int column_id, int num_rows,
+                    const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - byte 0xFF if the null is set for the row or 0x00 otherwise.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void VisitNulls(const RowTableImpl& rows, int column_id, int num_rows,
+                         const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+ private:
+#if defined(ARROW_HAVE_AVX2)
+  // This is equivalent to Visit method, but processing 8 rows at a time in a
+  // loop.
+  // Returns the number of processed rows, which may be less than requested (up
+  // to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int Visit_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                        const uint32_t* row_ids, PROCESS_8_VALUES_FN process_8_values_fn);
+
+  // This is equivalent to VisitNulls method, but processing 8 rows at a time in
+  // a loop. Returns the number of processed rows, which may be less than
+  // requested (up to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int VisitNulls_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                             const uint32_t* row_ids,
+                             PROCESS_8_VALUES_FN process_8_values_fn);
+#endif
+};
+
+// Write operations (appending batch rows) must not be called by more than one
+// thread at the same time.
+//
+// Read operations (row comparison, column decoding)
+// can be called by multiple threads concurrently.
+//
+struct RowArray {
+  RowArray() : is_initialized_(false) {}
+
+  Status InitIfNeeded(MemoryPool* pool, const ExecBatch& batch);
+  Status InitIfNeeded(MemoryPool* pool, const RowTableMetadata& row_metadata);
+
+  Status AppendBatchSelection(MemoryPool* pool, const ExecBatch& batch, int begin_row_id,
+                              int end_row_id, int num_row_ids, const uint16_t* row_ids,
+                              std::vector<KeyColumnArray>& temp_column_arrays);
+
+  // This can only be called for a minibatch.
+  //
+  void Compare(const ExecBatch& batch, int begin_row_id, int end_row_id, int num_selected,
+               const uint16_t* batch_selection_maybe_null, const uint32_t* array_row_ids,
+               uint32_t* out_num_not_equal, uint16_t* out_not_equal_selection,
+               int64_t hardware_flags, util::TempVectorStack* temp_stack,
+               std::vector<KeyColumnArray>& temp_column_arrays,
+               uint8_t* out_match_bitvector_maybe_null = NULLPTR);
+
+  // TODO: add AVX2 version
+  //
+  Status DecodeSelected(ResizableArrayData* target, int column_id, int num_rows_to_append,
+                        const uint32_t* row_ids, MemoryPool* pool) const;
+
+  void DebugPrintToFile(const char* filename, bool print_sorted) const;
+
+  int64_t num_rows() const { return is_initialized_ ? rows_.length() : 0; }
+
+  bool is_initialized_;
+  RowTableEncoder encoder_;
+  RowTableImpl rows_;
+  RowTableImpl rows_temp_;
+};
+
+// Implements concatenating multiple row arrays into a single one, using
+// potentially multiple threads, each processing a single input row array.
+//
+class RowArrayMerge {
+ public:
+  // Calculate total number of rows and size in bytes for merged sequence of
+  // rows and allocate memory for it.
+  //
+  // If the rows are of varying length, initialize in the offset array the first
+  // entry for the write area for each input row array. Leave all other
+  // offsets and buffers uninitialized.
+  //
+  // All input sources must be initialized, but they can contain zero rows.
+  //
+  // Output in vector the first target row id for each source (exclusive
+  // cummulative sum of number of rows in sources).
+  //
+  static Status PrepareForMerge(RowArray* target, const std::vector<RowArray*>& sources,
+                                std::vector<int64_t>* first_target_row_id,
+                                MemoryPool* pool);
+
+  // Copy rows from source array to target array.
+  // Both arrays must have the same row metadata.
+  // Target array must already have the memory reserved in all internal buffers
+  // for the copy of the rows.
+  //
+  // Copy of the rows will occupy the same amount of space in the target array
+  // buffers as in the source array, but in the target array we pick at what row
+  // position and offset we start writing.
+  //
+  // Optionally, the rows may be reordered during copy according to the
+  // provided permutation, which represents some sorting order of source rows.
+  // Nth element of the permutation array is the source row index for the Nth
+  // row written into target array. If permutation is missing (null), then the
+  // order of source rows will remain unchanged.
+  //
+  // In case of varying length rows, we purposefully skip outputting of N+1 (one
+  // after last) offset, to allow concurrent copies of rows done to adjacent
+  // ranges in the target array. This offset should already contain the right
+  // value after calling the method preparing target array for merge (which
+  // initializes boundary offsets for target row ranges for each source).
+  //
+  static void MergeSingle(RowArray* target, const RowArray& source,
+                          int64_t first_target_row_id,
+                          const int64_t* source_rows_permutation);
+
+ private:
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for fixed length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyFixedLength(RowTableImpl* target, const RowTableImpl& source,
+                              int64_t first_target_row_id,
+                              const int64_t* source_rows_permutation);
+
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for varying length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyVaryingLength(RowTableImpl* target, const RowTableImpl& source,
+                                int64_t first_target_row_id,
+                                int64_t first_target_row_offset,
+                                const int64_t* source_rows_permutation);
+
+  // Copy null information from rows from source array to a region of the target
+  // array.
+  //
+  static void CopyNulls(RowTableImpl* target, const RowTableImpl& source,
+                        int64_t first_target_row_id,
+                        const int64_t* source_rows_permutation);
+};
+
+// Implements merging of multiple SwissTables into a single one, using
+// potentially multiple threads, each processing a single input source.
+//
+// Each source should correspond to a range of original hashes.
+// A row belongs to a source with index determined by K highest bits of
+// original hash. That means that the number of sources must be a power of 2.
+//
+// We assume that the hash values used and stored inside source tables
+// have K highest bits removed from the original hash in order to avoid huge
+// number of hash collisions that would occur otherwise.
+// These bits will be reinserted back (original hashes will be used) when
+// merging into target.
+//
+class SwissTableMerge {
+ public:
+  // Calculate total number of blocks for merged table.
+  // Allocate buffers sized accordingly and initialize empty target table.
+  //
+  // All input sources must be initialized, but they can be empty.
+  //
+  // Output in a vector the first target group id for each source (exclusive
+  // cummulative sum of number of groups in sources).
+  //
+  static Status PrepareForMerge(SwissTable* target,
+                                const std::vector<SwissTable*>& sources,
+                                std::vector<uint32_t>* first_target_group_id,
+                                MemoryPool* pool);
+
+  // Copy all entries from source to a range of blocks (partition) of target.
+  //
+  // During copy, adjust group ids from source by adding provided base id.
+  //
+  // Skip entries from source that would cross partition boundaries (range of
+  // blocks) when inserted into target. Save their data in output vector for
+  // processing later. We postpone inserting these overflow entries in order to
+  // allow concurrent processing of all partitions. Overflow entries will be
+  // handled by a single-thread afterwards.
+  //
+  static void MergePartition(SwissTable* target, const SwissTable* source,
+                             uint32_t partition_id, int num_partition_bits,
+                             uint32_t base_group_id,
+                             std::vector<uint32_t>* overflow_group_ids,
+                             std::vector<uint32_t>* overflow_hashes);
+
+  // Single-threaded processing of remaining groups, that could not be
+  // inserted in partition merge phase
+  // (due to entries from one partition spilling over due to full blocks into
+  // the next partition).
+  //
+  static void InsertNewGroups(SwissTable* target, const std::vector<uint32_t>& group_ids,
+                              const std::vector<uint32_t>& hashes);
+
+ private:
+  // Insert a new group id.
+  //
+  // Assumes that there are enough slots in the target
+  // and there is no need to resize it.
+  //
+  // Max block id can be provided, in which case the search for an empty slot to
+  // insert new entry to will stop after visiting that block.
+  //
+  // Max block id value greater or equal to the number of blocks guarantees that
+  // the search will not be stopped.
+  //
+  static inline bool InsertNewGroup(SwissTable* target, uint64_t group_id, uint32_t hash,
+                                    int64_t max_block_id);
+};
+
+struct SwissTableWithKeys {
+  struct Input {
+    Input(const ExecBatch* in_batch, int in_batch_start_row, int in_batch_end_row,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, int in_num_selected, const uint16_t* in_selection,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays,
+          std::vector<uint32_t>* in_temp_group_ids);
+
+    Input(const Input& base, int num_rows_to_skip, int num_rows_to_include);
+
+    const ExecBatch* batch;
+    // Window of the batch to operate on.
+    // The window information is only used if row selection is null.
+    //
+    int batch_start_row;
+    int batch_end_row;
+    // Optional selection.
+    // Used instead of window of the batch if not null.
+    //
+    int num_selected;
+    const uint16_t* selection_maybe_null;
+    // Thread specific scratch buffers for storing temporary data.
+    //
+    util::TempVectorStack* temp_stack;
+    std::vector<KeyColumnArray>* temp_column_arrays;
+    std::vector<uint32_t>* temp_group_ids;
+  };
+
+  Status Init(int64_t hardware_flags, MemoryPool* pool);
+
+  void InitCallbacks();
+
+  static void Hash(Input* input, uint32_t* hashes, int64_t hardware_flags);
+
+  // If input uses selection, then hashes array must have one element for every
+  // row in the whole (unfiltered and not spliced) input exec batch. Otherwise,
+  // there must be one element in hashes array for every value in the window of
+  // the exec batch specified by input.
+  //
+  // Output arrays will contain one element for every selected batch row in
+  // input (selected either by selection vector if provided or input window
+  // otherwise).
+  //
+  void MapReadOnly(Input* input, const uint32_t* hashes, uint8_t* match_bitvector,
+                   uint32_t* key_ids);
+  Status MapWithInserts(Input* input, const uint32_t* hashes, uint32_t* key_ids);

Review Comment:
   It's not obvious at a first glance what the difference is between the `Hash` call and the `MapXyz` call.  It's also not obvious what a key id is.



##########
cpp/src/arrow/compute/exec/swiss_join.h:
##########
@@ -0,0 +1,758 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/key_map.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/partition_util.h"
+#include "arrow/compute/exec/schema_util.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/encode_internal.h"
+
+namespace arrow {
+namespace compute {
+
+class RowArrayAccessor {
+ public:
+  // Find the index of this varbinary column within the sequence of all
+  // varbinary columns encoded in rows.
+  //
+  static int VarbinaryColumnId(const RowTableMetadata& row_metadata, int column_id);
+
+  // Calculate how many rows to skip from the tail of the
+  // sequence of selected rows, such that the total size of skipped rows is at
+  // least equal to the size specified by the caller. Skipping of the tail rows
+  // is used to allow for faster processing by the caller of remaining rows
+  // without checking buffer bounds (useful with SIMD or fixed size memory loads
+  // and stores).
+  //
+  static int NumRowsToSkip(const RowTableImpl& rows, int column_id, int num_rows,
+                           const uint32_t* row_ids, int num_tail_bytes_to_skip);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - pointer to the value,
+  // - byte length of the value.
+  //
+  // The information about nulls (validity bitmap) is not used in this call and
+  // has to be processed separately.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void Visit(const RowTableImpl& rows, int column_id, int num_rows,
+                    const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - byte 0xFF if the null is set for the row or 0x00 otherwise.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void VisitNulls(const RowTableImpl& rows, int column_id, int num_rows,
+                         const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+ private:
+#if defined(ARROW_HAVE_AVX2)
+  // This is equivalent to Visit method, but processing 8 rows at a time in a
+  // loop.
+  // Returns the number of processed rows, which may be less than requested (up
+  // to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int Visit_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                        const uint32_t* row_ids, PROCESS_8_VALUES_FN process_8_values_fn);
+
+  // This is equivalent to VisitNulls method, but processing 8 rows at a time in
+  // a loop. Returns the number of processed rows, which may be less than
+  // requested (up to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int VisitNulls_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                             const uint32_t* row_ids,
+                             PROCESS_8_VALUES_FN process_8_values_fn);
+#endif
+};
+
+// Write operations (appending batch rows) must not be called by more than one
+// thread at the same time.
+//
+// Read operations (row comparison, column decoding)
+// can be called by multiple threads concurrently.
+//
+struct RowArray {
+  RowArray() : is_initialized_(false) {}
+
+  Status InitIfNeeded(MemoryPool* pool, const ExecBatch& batch);
+  Status InitIfNeeded(MemoryPool* pool, const RowTableMetadata& row_metadata);
+
+  Status AppendBatchSelection(MemoryPool* pool, const ExecBatch& batch, int begin_row_id,
+                              int end_row_id, int num_row_ids, const uint16_t* row_ids,
+                              std::vector<KeyColumnArray>& temp_column_arrays);
+
+  // This can only be called for a minibatch.
+  //
+  void Compare(const ExecBatch& batch, int begin_row_id, int end_row_id, int num_selected,
+               const uint16_t* batch_selection_maybe_null, const uint32_t* array_row_ids,
+               uint32_t* out_num_not_equal, uint16_t* out_not_equal_selection,
+               int64_t hardware_flags, util::TempVectorStack* temp_stack,
+               std::vector<KeyColumnArray>& temp_column_arrays,
+               uint8_t* out_match_bitvector_maybe_null = NULLPTR);
+
+  // TODO: add AVX2 version
+  //
+  Status DecodeSelected(ResizableArrayData* target, int column_id, int num_rows_to_append,
+                        const uint32_t* row_ids, MemoryPool* pool) const;
+
+  void DebugPrintToFile(const char* filename, bool print_sorted) const;
+
+  int64_t num_rows() const { return is_initialized_ ? rows_.length() : 0; }
+
+  bool is_initialized_;
+  RowTableEncoder encoder_;
+  RowTableImpl rows_;
+  RowTableImpl rows_temp_;
+};
+
+// Implements concatenating multiple row arrays into a single one, using
+// potentially multiple threads, each processing a single input row array.
+//
+class RowArrayMerge {
+ public:
+  // Calculate total number of rows and size in bytes for merged sequence of
+  // rows and allocate memory for it.
+  //
+  // If the rows are of varying length, initialize in the offset array the first
+  // entry for the write area for each input row array. Leave all other
+  // offsets and buffers uninitialized.
+  //
+  // All input sources must be initialized, but they can contain zero rows.
+  //
+  // Output in vector the first target row id for each source (exclusive
+  // cummulative sum of number of rows in sources).
+  //
+  static Status PrepareForMerge(RowArray* target, const std::vector<RowArray*>& sources,
+                                std::vector<int64_t>* first_target_row_id,
+                                MemoryPool* pool);
+
+  // Copy rows from source array to target array.
+  // Both arrays must have the same row metadata.
+  // Target array must already have the memory reserved in all internal buffers
+  // for the copy of the rows.
+  //
+  // Copy of the rows will occupy the same amount of space in the target array
+  // buffers as in the source array, but in the target array we pick at what row
+  // position and offset we start writing.
+  //
+  // Optionally, the rows may be reordered during copy according to the
+  // provided permutation, which represents some sorting order of source rows.
+  // Nth element of the permutation array is the source row index for the Nth
+  // row written into target array. If permutation is missing (null), then the
+  // order of source rows will remain unchanged.
+  //
+  // In case of varying length rows, we purposefully skip outputting of N+1 (one
+  // after last) offset, to allow concurrent copies of rows done to adjacent
+  // ranges in the target array. This offset should already contain the right
+  // value after calling the method preparing target array for merge (which
+  // initializes boundary offsets for target row ranges for each source).
+  //
+  static void MergeSingle(RowArray* target, const RowArray& source,
+                          int64_t first_target_row_id,
+                          const int64_t* source_rows_permutation);
+
+ private:
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for fixed length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyFixedLength(RowTableImpl* target, const RowTableImpl& source,
+                              int64_t first_target_row_id,
+                              const int64_t* source_rows_permutation);
+
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for varying length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyVaryingLength(RowTableImpl* target, const RowTableImpl& source,
+                                int64_t first_target_row_id,
+                                int64_t first_target_row_offset,
+                                const int64_t* source_rows_permutation);
+
+  // Copy null information from rows from source array to a region of the target
+  // array.
+  //
+  static void CopyNulls(RowTableImpl* target, const RowTableImpl& source,
+                        int64_t first_target_row_id,
+                        const int64_t* source_rows_permutation);
+};
+
+// Implements merging of multiple SwissTables into a single one, using
+// potentially multiple threads, each processing a single input source.
+//
+// Each source should correspond to a range of original hashes.
+// A row belongs to a source with index determined by K highest bits of
+// original hash. That means that the number of sources must be a power of 2.
+//
+// We assume that the hash values used and stored inside source tables
+// have K highest bits removed from the original hash in order to avoid huge
+// number of hash collisions that would occur otherwise.
+// These bits will be reinserted back (original hashes will be used) when
+// merging into target.
+//
+class SwissTableMerge {
+ public:
+  // Calculate total number of blocks for merged table.
+  // Allocate buffers sized accordingly and initialize empty target table.
+  //
+  // All input sources must be initialized, but they can be empty.
+  //
+  // Output in a vector the first target group id for each source (exclusive
+  // cummulative sum of number of groups in sources).
+  //
+  static Status PrepareForMerge(SwissTable* target,
+                                const std::vector<SwissTable*>& sources,
+                                std::vector<uint32_t>* first_target_group_id,
+                                MemoryPool* pool);
+
+  // Copy all entries from source to a range of blocks (partition) of target.
+  //
+  // During copy, adjust group ids from source by adding provided base id.
+  //
+  // Skip entries from source that would cross partition boundaries (range of
+  // blocks) when inserted into target. Save their data in output vector for
+  // processing later. We postpone inserting these overflow entries in order to
+  // allow concurrent processing of all partitions. Overflow entries will be
+  // handled by a single-thread afterwards.
+  //
+  static void MergePartition(SwissTable* target, const SwissTable* source,
+                             uint32_t partition_id, int num_partition_bits,
+                             uint32_t base_group_id,
+                             std::vector<uint32_t>* overflow_group_ids,
+                             std::vector<uint32_t>* overflow_hashes);
+
+  // Single-threaded processing of remaining groups, that could not be
+  // inserted in partition merge phase
+  // (due to entries from one partition spilling over due to full blocks into
+  // the next partition).
+  //
+  static void InsertNewGroups(SwissTable* target, const std::vector<uint32_t>& group_ids,
+                              const std::vector<uint32_t>& hashes);
+
+ private:
+  // Insert a new group id.
+  //
+  // Assumes that there are enough slots in the target
+  // and there is no need to resize it.
+  //
+  // Max block id can be provided, in which case the search for an empty slot to
+  // insert new entry to will stop after visiting that block.
+  //
+  // Max block id value greater or equal to the number of blocks guarantees that
+  // the search will not be stopped.
+  //
+  static inline bool InsertNewGroup(SwissTable* target, uint64_t group_id, uint32_t hash,
+                                    int64_t max_block_id);
+};
+
+struct SwissTableWithKeys {
+  struct Input {
+    Input(const ExecBatch* in_batch, int in_batch_start_row, int in_batch_end_row,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, int in_num_selected, const uint16_t* in_selection,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays,
+          std::vector<uint32_t>* in_temp_group_ids);
+
+    Input(const Input& base, int num_rows_to_skip, int num_rows_to_include);
+
+    const ExecBatch* batch;
+    // Window of the batch to operate on.
+    // The window information is only used if row selection is null.
+    //
+    int batch_start_row;
+    int batch_end_row;
+    // Optional selection.
+    // Used instead of window of the batch if not null.
+    //
+    int num_selected;
+    const uint16_t* selection_maybe_null;
+    // Thread specific scratch buffers for storing temporary data.
+    //
+    util::TempVectorStack* temp_stack;
+    std::vector<KeyColumnArray>* temp_column_arrays;
+    std::vector<uint32_t>* temp_group_ids;
+  };
+
+  Status Init(int64_t hardware_flags, MemoryPool* pool);
+
+  void InitCallbacks();

Review Comment:
   Why does this need to be called separately from `Init`?  Can `Init` call `InitCallbacks`?



##########
cpp/src/arrow/compute/exec/swiss_join.h:
##########
@@ -0,0 +1,758 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/key_map.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/partition_util.h"
+#include "arrow/compute/exec/schema_util.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/encode_internal.h"
+
+namespace arrow {
+namespace compute {
+
+class RowArrayAccessor {
+ public:
+  // Find the index of this varbinary column within the sequence of all
+  // varbinary columns encoded in rows.
+  //
+  static int VarbinaryColumnId(const RowTableMetadata& row_metadata, int column_id);
+
+  // Calculate how many rows to skip from the tail of the
+  // sequence of selected rows, such that the total size of skipped rows is at
+  // least equal to the size specified by the caller. Skipping of the tail rows
+  // is used to allow for faster processing by the caller of remaining rows
+  // without checking buffer bounds (useful with SIMD or fixed size memory loads
+  // and stores).
+  //
+  static int NumRowsToSkip(const RowTableImpl& rows, int column_id, int num_rows,
+                           const uint32_t* row_ids, int num_tail_bytes_to_skip);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - pointer to the value,
+  // - byte length of the value.
+  //
+  // The information about nulls (validity bitmap) is not used in this call and
+  // has to be processed separately.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void Visit(const RowTableImpl& rows, int column_id, int num_rows,
+                    const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - byte 0xFF if the null is set for the row or 0x00 otherwise.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void VisitNulls(const RowTableImpl& rows, int column_id, int num_rows,
+                         const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+ private:
+#if defined(ARROW_HAVE_AVX2)
+  // This is equivalent to Visit method, but processing 8 rows at a time in a
+  // loop.
+  // Returns the number of processed rows, which may be less than requested (up
+  // to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int Visit_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                        const uint32_t* row_ids, PROCESS_8_VALUES_FN process_8_values_fn);
+
+  // This is equivalent to VisitNulls method, but processing 8 rows at a time in
+  // a loop. Returns the number of processed rows, which may be less than
+  // requested (up to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int VisitNulls_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                             const uint32_t* row_ids,
+                             PROCESS_8_VALUES_FN process_8_values_fn);
+#endif
+};
+
+// Write operations (appending batch rows) must not be called by more than one
+// thread at the same time.
+//
+// Read operations (row comparison, column decoding)
+// can be called by multiple threads concurrently.
+//
+struct RowArray {
+  RowArray() : is_initialized_(false) {}
+
+  Status InitIfNeeded(MemoryPool* pool, const ExecBatch& batch);
+  Status InitIfNeeded(MemoryPool* pool, const RowTableMetadata& row_metadata);
+
+  Status AppendBatchSelection(MemoryPool* pool, const ExecBatch& batch, int begin_row_id,
+                              int end_row_id, int num_row_ids, const uint16_t* row_ids,
+                              std::vector<KeyColumnArray>& temp_column_arrays);
+
+  // This can only be called for a minibatch.
+  //
+  void Compare(const ExecBatch& batch, int begin_row_id, int end_row_id, int num_selected,
+               const uint16_t* batch_selection_maybe_null, const uint32_t* array_row_ids,
+               uint32_t* out_num_not_equal, uint16_t* out_not_equal_selection,
+               int64_t hardware_flags, util::TempVectorStack* temp_stack,
+               std::vector<KeyColumnArray>& temp_column_arrays,
+               uint8_t* out_match_bitvector_maybe_null = NULLPTR);
+
+  // TODO: add AVX2 version
+  //
+  Status DecodeSelected(ResizableArrayData* target, int column_id, int num_rows_to_append,
+                        const uint32_t* row_ids, MemoryPool* pool) const;
+
+  void DebugPrintToFile(const char* filename, bool print_sorted) const;
+
+  int64_t num_rows() const { return is_initialized_ ? rows_.length() : 0; }
+
+  bool is_initialized_;
+  RowTableEncoder encoder_;
+  RowTableImpl rows_;
+  RowTableImpl rows_temp_;
+};
+
+// Implements concatenating multiple row arrays into a single one, using
+// potentially multiple threads, each processing a single input row array.
+//
+class RowArrayMerge {
+ public:
+  // Calculate total number of rows and size in bytes for merged sequence of
+  // rows and allocate memory for it.
+  //
+  // If the rows are of varying length, initialize in the offset array the first
+  // entry for the write area for each input row array. Leave all other
+  // offsets and buffers uninitialized.
+  //
+  // All input sources must be initialized, but they can contain zero rows.
+  //
+  // Output in vector the first target row id for each source (exclusive
+  // cummulative sum of number of rows in sources).
+  //
+  static Status PrepareForMerge(RowArray* target, const std::vector<RowArray*>& sources,
+                                std::vector<int64_t>* first_target_row_id,
+                                MemoryPool* pool);
+
+  // Copy rows from source array to target array.
+  // Both arrays must have the same row metadata.
+  // Target array must already have the memory reserved in all internal buffers
+  // for the copy of the rows.
+  //
+  // Copy of the rows will occupy the same amount of space in the target array
+  // buffers as in the source array, but in the target array we pick at what row
+  // position and offset we start writing.
+  //
+  // Optionally, the rows may be reordered during copy according to the
+  // provided permutation, which represents some sorting order of source rows.
+  // Nth element of the permutation array is the source row index for the Nth
+  // row written into target array. If permutation is missing (null), then the
+  // order of source rows will remain unchanged.
+  //
+  // In case of varying length rows, we purposefully skip outputting of N+1 (one
+  // after last) offset, to allow concurrent copies of rows done to adjacent
+  // ranges in the target array. This offset should already contain the right
+  // value after calling the method preparing target array for merge (which
+  // initializes boundary offsets for target row ranges for each source).
+  //
+  static void MergeSingle(RowArray* target, const RowArray& source,
+                          int64_t first_target_row_id,
+                          const int64_t* source_rows_permutation);
+
+ private:
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for fixed length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyFixedLength(RowTableImpl* target, const RowTableImpl& source,
+                              int64_t first_target_row_id,
+                              const int64_t* source_rows_permutation);
+
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for varying length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyVaryingLength(RowTableImpl* target, const RowTableImpl& source,
+                                int64_t first_target_row_id,
+                                int64_t first_target_row_offset,
+                                const int64_t* source_rows_permutation);
+
+  // Copy null information from rows from source array to a region of the target
+  // array.
+  //
+  static void CopyNulls(RowTableImpl* target, const RowTableImpl& source,
+                        int64_t first_target_row_id,
+                        const int64_t* source_rows_permutation);
+};
+
+// Implements merging of multiple SwissTables into a single one, using
+// potentially multiple threads, each processing a single input source.
+//
+// Each source should correspond to a range of original hashes.
+// A row belongs to a source with index determined by K highest bits of
+// original hash. That means that the number of sources must be a power of 2.
+//
+// We assume that the hash values used and stored inside source tables
+// have K highest bits removed from the original hash in order to avoid huge
+// number of hash collisions that would occur otherwise.
+// These bits will be reinserted back (original hashes will be used) when
+// merging into target.
+//
+class SwissTableMerge {
+ public:
+  // Calculate total number of blocks for merged table.
+  // Allocate buffers sized accordingly and initialize empty target table.
+  //
+  // All input sources must be initialized, but they can be empty.
+  //
+  // Output in a vector the first target group id for each source (exclusive
+  // cummulative sum of number of groups in sources).
+  //
+  static Status PrepareForMerge(SwissTable* target,
+                                const std::vector<SwissTable*>& sources,
+                                std::vector<uint32_t>* first_target_group_id,
+                                MemoryPool* pool);
+
+  // Copy all entries from source to a range of blocks (partition) of target.
+  //
+  // During copy, adjust group ids from source by adding provided base id.
+  //
+  // Skip entries from source that would cross partition boundaries (range of
+  // blocks) when inserted into target. Save their data in output vector for
+  // processing later. We postpone inserting these overflow entries in order to
+  // allow concurrent processing of all partitions. Overflow entries will be
+  // handled by a single-thread afterwards.
+  //
+  static void MergePartition(SwissTable* target, const SwissTable* source,
+                             uint32_t partition_id, int num_partition_bits,
+                             uint32_t base_group_id,
+                             std::vector<uint32_t>* overflow_group_ids,
+                             std::vector<uint32_t>* overflow_hashes);
+
+  // Single-threaded processing of remaining groups, that could not be
+  // inserted in partition merge phase
+  // (due to entries from one partition spilling over due to full blocks into
+  // the next partition).
+  //
+  static void InsertNewGroups(SwissTable* target, const std::vector<uint32_t>& group_ids,
+                              const std::vector<uint32_t>& hashes);
+
+ private:
+  // Insert a new group id.
+  //
+  // Assumes that there are enough slots in the target
+  // and there is no need to resize it.
+  //
+  // Max block id can be provided, in which case the search for an empty slot to
+  // insert new entry to will stop after visiting that block.
+  //
+  // Max block id value greater or equal to the number of blocks guarantees that
+  // the search will not be stopped.
+  //
+  static inline bool InsertNewGroup(SwissTable* target, uint64_t group_id, uint32_t hash,
+                                    int64_t max_block_id);
+};
+
+struct SwissTableWithKeys {

Review Comment:
   Can you add a note about the thread safety of the hash/map methods?



##########
cpp/src/arrow/compute/exec/swiss_join.h:
##########
@@ -0,0 +1,758 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include "arrow/compute/exec/key_map.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/partition_util.h"
+#include "arrow/compute/exec/schema_util.h"
+#include "arrow/compute/exec/task_util.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/encode_internal.h"
+
+namespace arrow {
+namespace compute {
+
+class RowArrayAccessor {
+ public:
+  // Find the index of this varbinary column within the sequence of all
+  // varbinary columns encoded in rows.
+  //
+  static int VarbinaryColumnId(const RowTableMetadata& row_metadata, int column_id);
+
+  // Calculate how many rows to skip from the tail of the
+  // sequence of selected rows, such that the total size of skipped rows is at
+  // least equal to the size specified by the caller. Skipping of the tail rows
+  // is used to allow for faster processing by the caller of remaining rows
+  // without checking buffer bounds (useful with SIMD or fixed size memory loads
+  // and stores).
+  //
+  static int NumRowsToSkip(const RowTableImpl& rows, int column_id, int num_rows,
+                           const uint32_t* row_ids, int num_tail_bytes_to_skip);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - pointer to the value,
+  // - byte length of the value.
+  //
+  // The information about nulls (validity bitmap) is not used in this call and
+  // has to be processed separately.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void Visit(const RowTableImpl& rows, int column_id, int num_rows,
+                    const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+  // The supplied lambda will be called for each row in the given list of rows.
+  // The arguments given to it will be:
+  // - index of a row (within the set of selected rows),
+  // - byte 0xFF if the null is set for the row or 0x00 otherwise.
+  //
+  template <class PROCESS_VALUE_FN>
+  static void VisitNulls(const RowTableImpl& rows, int column_id, int num_rows,
+                         const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+ private:
+#if defined(ARROW_HAVE_AVX2)
+  // This is equivalent to Visit method, but processing 8 rows at a time in a
+  // loop.
+  // Returns the number of processed rows, which may be less than requested (up
+  // to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int Visit_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                        const uint32_t* row_ids, PROCESS_8_VALUES_FN process_8_values_fn);
+
+  // This is equivalent to VisitNulls method, but processing 8 rows at a time in
+  // a loop. Returns the number of processed rows, which may be less than
+  // requested (up to 7 rows at the end may be skipped).
+  //
+  template <class PROCESS_8_VALUES_FN>
+  static int VisitNulls_avx2(const RowTableImpl& rows, int column_id, int num_rows,
+                             const uint32_t* row_ids,
+                             PROCESS_8_VALUES_FN process_8_values_fn);
+#endif
+};
+
+// Write operations (appending batch rows) must not be called by more than one
+// thread at the same time.
+//
+// Read operations (row comparison, column decoding)
+// can be called by multiple threads concurrently.
+//
+struct RowArray {
+  RowArray() : is_initialized_(false) {}
+
+  Status InitIfNeeded(MemoryPool* pool, const ExecBatch& batch);
+  Status InitIfNeeded(MemoryPool* pool, const RowTableMetadata& row_metadata);
+
+  Status AppendBatchSelection(MemoryPool* pool, const ExecBatch& batch, int begin_row_id,
+                              int end_row_id, int num_row_ids, const uint16_t* row_ids,
+                              std::vector<KeyColumnArray>& temp_column_arrays);
+
+  // This can only be called for a minibatch.
+  //
+  void Compare(const ExecBatch& batch, int begin_row_id, int end_row_id, int num_selected,
+               const uint16_t* batch_selection_maybe_null, const uint32_t* array_row_ids,
+               uint32_t* out_num_not_equal, uint16_t* out_not_equal_selection,
+               int64_t hardware_flags, util::TempVectorStack* temp_stack,
+               std::vector<KeyColumnArray>& temp_column_arrays,
+               uint8_t* out_match_bitvector_maybe_null = NULLPTR);
+
+  // TODO: add AVX2 version
+  //
+  Status DecodeSelected(ResizableArrayData* target, int column_id, int num_rows_to_append,
+                        const uint32_t* row_ids, MemoryPool* pool) const;
+
+  void DebugPrintToFile(const char* filename, bool print_sorted) const;
+
+  int64_t num_rows() const { return is_initialized_ ? rows_.length() : 0; }
+
+  bool is_initialized_;
+  RowTableEncoder encoder_;
+  RowTableImpl rows_;
+  RowTableImpl rows_temp_;
+};
+
+// Implements concatenating multiple row arrays into a single one, using
+// potentially multiple threads, each processing a single input row array.
+//
+class RowArrayMerge {
+ public:
+  // Calculate total number of rows and size in bytes for merged sequence of
+  // rows and allocate memory for it.
+  //
+  // If the rows are of varying length, initialize in the offset array the first
+  // entry for the write area for each input row array. Leave all other
+  // offsets and buffers uninitialized.
+  //
+  // All input sources must be initialized, but they can contain zero rows.
+  //
+  // Output in vector the first target row id for each source (exclusive
+  // cummulative sum of number of rows in sources).
+  //
+  static Status PrepareForMerge(RowArray* target, const std::vector<RowArray*>& sources,
+                                std::vector<int64_t>* first_target_row_id,
+                                MemoryPool* pool);
+
+  // Copy rows from source array to target array.
+  // Both arrays must have the same row metadata.
+  // Target array must already have the memory reserved in all internal buffers
+  // for the copy of the rows.
+  //
+  // Copy of the rows will occupy the same amount of space in the target array
+  // buffers as in the source array, but in the target array we pick at what row
+  // position and offset we start writing.
+  //
+  // Optionally, the rows may be reordered during copy according to the
+  // provided permutation, which represents some sorting order of source rows.
+  // Nth element of the permutation array is the source row index for the Nth
+  // row written into target array. If permutation is missing (null), then the
+  // order of source rows will remain unchanged.
+  //
+  // In case of varying length rows, we purposefully skip outputting of N+1 (one
+  // after last) offset, to allow concurrent copies of rows done to adjacent
+  // ranges in the target array. This offset should already contain the right
+  // value after calling the method preparing target array for merge (which
+  // initializes boundary offsets for target row ranges for each source).
+  //
+  static void MergeSingle(RowArray* target, const RowArray& source,
+                          int64_t first_target_row_id,
+                          const int64_t* source_rows_permutation);
+
+ private:
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for fixed length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyFixedLength(RowTableImpl* target, const RowTableImpl& source,
+                              int64_t first_target_row_id,
+                              const int64_t* source_rows_permutation);
+
+  // Copy rows from source array to a region of the target array.
+  // This implementation is for varying length rows.
+  // Null information needs to be handled separately.
+  //
+  static void CopyVaryingLength(RowTableImpl* target, const RowTableImpl& source,
+                                int64_t first_target_row_id,
+                                int64_t first_target_row_offset,
+                                const int64_t* source_rows_permutation);
+
+  // Copy null information from rows from source array to a region of the target
+  // array.
+  //
+  static void CopyNulls(RowTableImpl* target, const RowTableImpl& source,
+                        int64_t first_target_row_id,
+                        const int64_t* source_rows_permutation);
+};
+
+// Implements merging of multiple SwissTables into a single one, using
+// potentially multiple threads, each processing a single input source.
+//
+// Each source should correspond to a range of original hashes.
+// A row belongs to a source with index determined by K highest bits of
+// original hash. That means that the number of sources must be a power of 2.
+//
+// We assume that the hash values used and stored inside source tables
+// have K highest bits removed from the original hash in order to avoid huge
+// number of hash collisions that would occur otherwise.
+// These bits will be reinserted back (original hashes will be used) when
+// merging into target.
+//
+class SwissTableMerge {
+ public:
+  // Calculate total number of blocks for merged table.
+  // Allocate buffers sized accordingly and initialize empty target table.
+  //
+  // All input sources must be initialized, but they can be empty.
+  //
+  // Output in a vector the first target group id for each source (exclusive
+  // cummulative sum of number of groups in sources).
+  //
+  static Status PrepareForMerge(SwissTable* target,
+                                const std::vector<SwissTable*>& sources,
+                                std::vector<uint32_t>* first_target_group_id,
+                                MemoryPool* pool);
+
+  // Copy all entries from source to a range of blocks (partition) of target.
+  //
+  // During copy, adjust group ids from source by adding provided base id.
+  //
+  // Skip entries from source that would cross partition boundaries (range of
+  // blocks) when inserted into target. Save their data in output vector for
+  // processing later. We postpone inserting these overflow entries in order to
+  // allow concurrent processing of all partitions. Overflow entries will be
+  // handled by a single-thread afterwards.
+  //
+  static void MergePartition(SwissTable* target, const SwissTable* source,
+                             uint32_t partition_id, int num_partition_bits,
+                             uint32_t base_group_id,
+                             std::vector<uint32_t>* overflow_group_ids,
+                             std::vector<uint32_t>* overflow_hashes);
+
+  // Single-threaded processing of remaining groups, that could not be
+  // inserted in partition merge phase
+  // (due to entries from one partition spilling over due to full blocks into
+  // the next partition).
+  //
+  static void InsertNewGroups(SwissTable* target, const std::vector<uint32_t>& group_ids,
+                              const std::vector<uint32_t>& hashes);
+
+ private:
+  // Insert a new group id.
+  //
+  // Assumes that there are enough slots in the target
+  // and there is no need to resize it.
+  //
+  // Max block id can be provided, in which case the search for an empty slot to
+  // insert new entry to will stop after visiting that block.
+  //
+  // Max block id value greater or equal to the number of blocks guarantees that
+  // the search will not be stopped.
+  //
+  static inline bool InsertNewGroup(SwissTable* target, uint64_t group_id, uint32_t hash,
+                                    int64_t max_block_id);
+};
+
+struct SwissTableWithKeys {
+  struct Input {
+    Input(const ExecBatch* in_batch, int in_batch_start_row, int in_batch_end_row,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays);
+
+    Input(const ExecBatch* in_batch, int in_num_selected, const uint16_t* in_selection,
+          util::TempVectorStack* in_temp_stack,
+          std::vector<KeyColumnArray>* in_temp_column_arrays,
+          std::vector<uint32_t>* in_temp_group_ids);
+
+    Input(const Input& base, int num_rows_to_skip, int num_rows_to_include);
+
+    const ExecBatch* batch;
+    // Window of the batch to operate on.
+    // The window information is only used if row selection is null.
+    //
+    int batch_start_row;
+    int batch_end_row;
+    // Optional selection.
+    // Used instead of window of the batch if not null.
+    //
+    int num_selected;
+    const uint16_t* selection_maybe_null;
+    // Thread specific scratch buffers for storing temporary data.
+    //
+    util::TempVectorStack* temp_stack;
+    std::vector<KeyColumnArray>* temp_column_arrays;
+    std::vector<uint32_t>* temp_group_ids;

Review Comment:
   What's the rationale for these belonging to the input instead of belonging to the table?  Is it because these need to belong to the calling thread and the table is shared across all threads?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org