You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/12 19:57:48 UTC

[GitHub] [arrow] bkietz commented on a change in pull request #10290: ARROW-12725: [C++][Compute] Column at a time hash and comparison in group by

bkietz commented on a change in pull request #10290:
URL: https://github.com/apache/arrow/pull/10290#discussion_r688021792



##########
File path: cpp/src/arrow/compute/exec/key_compare.cc
##########
@@ -17,250 +17,398 @@
 
 #include "arrow/compute/exec/key_compare.h"
 
+#include <immintrin.h>
+#include <memory.h>
+
 #include <algorithm>
 #include <cstdint>
 
 #include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
 #include "arrow/util/ubsan.h"
 
 namespace arrow {
 namespace compute {
 
-void KeyCompare::CompareRows(uint32_t num_rows_to_compare,
-                             const uint16_t* sel_left_maybe_null,
-                             const uint32_t* left_to_right_map,
-                             KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_num_rows,
-                             uint16_t* out_sel_left_maybe_same,
-                             const KeyEncoder::KeyRowArray& rows_left,
-                             const KeyEncoder::KeyRowArray& rows_right) {
-  ARROW_DCHECK(rows_left.metadata().is_compatible(rows_right.metadata()));
-
-  if (num_rows_to_compare == 0) {
-    *out_num_rows = 0;
+template <bool use_selection>
+void KeyCompare::NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_compare,
+                                       const uint16_t* sel_left_maybe_null,
+                                       const uint32_t* left_to_right_map,
+                                       KeyEncoder::KeyEncoderContext* ctx,
+                                       const KeyEncoder::KeyColumnArray& col,
+                                       const KeyEncoder::KeyRowArray& rows,
+                                       uint8_t* match_bytevector) {
+  if (!rows.has_any_nulls(ctx) && !col.data(0)) {
     return;
   }
-
-  // Allocate temporary byte and bit vectors
-  auto bytevector_holder =
-      util::TempVectorHolder<uint8_t>(ctx->stack, num_rows_to_compare);
-  auto bitvector_holder =
-      util::TempVectorHolder<uint8_t>(ctx->stack, num_rows_to_compare);
-
-  uint8_t* match_bytevector = bytevector_holder.mutable_data();
-  uint8_t* match_bitvector = bitvector_holder.mutable_data();
-
-  // All comparison functions called here will update match byte vector
-  // (AND it with comparison result) instead of overwriting it.
-  memset(match_bytevector, 0xff, num_rows_to_compare);
-
-  if (rows_left.metadata().is_fixed_length) {
-    CompareFixedLength(num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
-                       match_bytevector, ctx, rows_left.metadata().fixed_length,
-                       rows_left.data(1), rows_right.data(1));
-  } else {
-    CompareVaryingLength(num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
-                         match_bytevector, ctx, rows_left.data(2), rows_right.data(2),
-                         rows_left.offsets(), rows_right.offsets());
+  uint32_t num_processed = 0;
+#if defined(ARROW_HAVE_AVX2)
+  if (ctx->has_avx2()) {
+    num_processed = NullUpdateColumnToRow_avx2(use_selection, id_col, num_rows_to_compare,
+                                               sel_left_maybe_null, left_to_right_map,
+                                               ctx, col, rows, match_bytevector);
   }
+#endif
 
-  // CompareFixedLength can be used to compare nulls as well
-  bool nulls_present = rows_left.has_any_nulls(ctx) || rows_right.has_any_nulls(ctx);
-  if (nulls_present) {
-    CompareFixedLength(num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
-                       match_bytevector, ctx,
-                       rows_left.metadata().null_masks_bytes_per_row,
-                       rows_left.null_masks(), rows_right.null_masks());
+  if (!col.data(0)) {
+    // Remove rows from the result for which the column value is a null
+    const uint8_t* null_masks = rows.null_masks();
+    uint32_t null_mask_num_bytes = rows.metadata().null_masks_bytes_per_row;
+    for (uint32_t i = num_processed; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      uint32_t irow_right = left_to_right_map[irow_left];
+      int64_t bitid = irow_right * null_mask_num_bytes * 8 + id_col;
+      match_bytevector[i] &= (BitUtil::GetBit(null_masks, bitid) ? 0 : 0xff);
+    }
+  } else if (!rows.has_any_nulls(ctx)) {
+    // Remove rows from the result for which the column value on left side is null
+    const uint8_t* non_nulls = col.data(0);
+    ARROW_DCHECK(non_nulls);
+    for (uint32_t i = num_processed; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      match_bytevector[i] &=
+          BitUtil::GetBit(non_nulls, irow_left + col.bit_offset(0)) ? 0xff : 0;
+    }
+  } else {
+    const uint8_t* null_masks = rows.null_masks();
+    uint32_t null_mask_num_bytes = rows.metadata().null_masks_bytes_per_row;
+    const uint8_t* non_nulls = col.data(0);
+    ARROW_DCHECK(non_nulls);
+    for (uint32_t i = num_processed; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      uint32_t irow_right = left_to_right_map[irow_left];
+      int64_t bitid_right = irow_right * null_mask_num_bytes * 8 + id_col;
+      int right_null = BitUtil::GetBit(null_masks, bitid_right) ? 0xff : 0;
+      int left_null =
+          BitUtil::GetBit(non_nulls, irow_left + col.bit_offset(0)) ? 0 : 0xff;
+      match_bytevector[i] |= left_null & right_null;
+      match_bytevector[i] &= ~(left_null ^ right_null);
+    }
   }
+}
 
-  util::BitUtil::bytes_to_bits(ctx->hardware_flags, num_rows_to_compare, match_bytevector,
-                               match_bitvector);
-  if (sel_left_maybe_null) {
-    int out_num_rows_int;
-    util::BitUtil::bits_filter_indexes(0, ctx->hardware_flags, num_rows_to_compare,
-                                       match_bitvector, sel_left_maybe_null,
-                                       &out_num_rows_int, out_sel_left_maybe_same);
-    *out_num_rows = out_num_rows_int;
+template <bool use_selection, class COMPARE_FN>
+void KeyCompare::CompareBinaryColumnToRowHelper(
+    uint32_t offset_within_row, uint32_t first_row_to_compare,
+    uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
+    const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
+    const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+    uint8_t* match_bytevector, COMPARE_FN compare_fn) {
+  bool is_fixed_length = rows.metadata().is_fixed_length;
+  if (is_fixed_length) {
+    uint32_t fixed_length = rows.metadata().fixed_length;
+    const uint8_t* rows_left = col.data(1);
+    const uint8_t* rows_right = rows.data(1);
+    for (uint32_t i = first_row_to_compare; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      uint32_t irow_right = left_to_right_map[irow_left];
+      uint32_t offset_right = irow_right * fixed_length + offset_within_row;
+      match_bytevector[i] = compare_fn(rows_left, rows_right, irow_left, offset_right);
+    }
   } else {
-    int out_num_rows_int;
-    util::BitUtil::bits_to_indexes(0, ctx->hardware_flags, num_rows_to_compare,
-                                   match_bitvector, &out_num_rows_int,
-                                   out_sel_left_maybe_same);
-    *out_num_rows = out_num_rows_int;
+    const uint8_t* rows_left = col.data(1);
+    const uint32_t* offsets_right = rows.offsets();
+    const uint8_t* rows_right = rows.data(2);
+    for (uint32_t i = first_row_to_compare; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      uint32_t irow_right = left_to_right_map[irow_left];
+      uint32_t offset_right = offsets_right[irow_right] + offset_within_row;
+      match_bytevector[i] = compare_fn(rows_left, rows_right, irow_left, offset_right);
+    }
   }
 }
 
-void KeyCompare::CompareFixedLength(uint32_t num_rows_to_compare,
-                                    const uint16_t* sel_left_maybe_null,
-                                    const uint32_t* left_to_right_map,
-                                    uint8_t* match_bytevector,
-                                    KeyEncoder::KeyEncoderContext* ctx,
-                                    uint32_t fixed_length, const uint8_t* rows_left,
-                                    const uint8_t* rows_right) {
-  bool use_selection = (sel_left_maybe_null != nullptr);
-
-  uint32_t num_rows_already_processed = 0;
-
+template <bool use_selection>
+void KeyCompare::CompareBinaryColumnToRow(
+    uint32_t offset_within_row, uint32_t num_rows_to_compare,
+    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+  uint32_t num_processed = 0;
 #if defined(ARROW_HAVE_AVX2)
-  if (ctx->has_avx2() && !use_selection) {
-    // Choose between up-to-8B length, up-to-16B length and any size versions
-    if (fixed_length <= 8) {
-      num_rows_already_processed = CompareFixedLength_UpTo8B_avx2(
-          num_rows_to_compare, left_to_right_map, match_bytevector, fixed_length,
-          rows_left, rows_right);
-    } else if (fixed_length <= 16) {
-      num_rows_already_processed = CompareFixedLength_UpTo16B_avx2(
-          num_rows_to_compare, left_to_right_map, match_bytevector, fixed_length,
-          rows_left, rows_right);
-    } else {
-      num_rows_already_processed =
-          CompareFixedLength_avx2(num_rows_to_compare, left_to_right_map,
-                                  match_bytevector, fixed_length, rows_left, rows_right);
-    }
+  if (ctx->has_avx2()) {
+    num_processed = CompareBinaryColumnToRow_avx2(
+        use_selection, offset_within_row, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector);
   }
 #endif
 
-  typedef void (*CompareFixedLengthImp_t)(uint32_t, uint32_t, const uint16_t*,
-                                          const uint32_t*, uint8_t*, uint32_t,
-                                          const uint8_t*, const uint8_t*);
-  static const CompareFixedLengthImp_t CompareFixedLengthImp_fn[] = {
-      CompareFixedLengthImp<false, 1>, CompareFixedLengthImp<false, 2>,
-      CompareFixedLengthImp<false, 0>, CompareFixedLengthImp<true, 1>,
-      CompareFixedLengthImp<true, 2>,  CompareFixedLengthImp<true, 0>};
-  int dispatch_const = (use_selection ? 3 : 0) +
-                       ((fixed_length <= 8) ? 0 : ((fixed_length <= 16) ? 1 : 2));
-  CompareFixedLengthImp_fn[dispatch_const](
-      num_rows_already_processed, num_rows_to_compare, sel_left_maybe_null,
-      left_to_right_map, match_bytevector, fixed_length, rows_left, rows_right);
-}
+  uint32_t col_width = col.metadata().fixed_length;
+  if (col_width == 0) {
+    int bit_offset = col.bit_offset(1);
+    CompareBinaryColumnToRowHelper<use_selection>(
+        offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector,
+        [bit_offset](const uint8_t* left_base, const uint8_t* right_base,
+                     uint32_t irow_left, uint32_t offset_right) {
+          uint8_t left = BitUtil::GetBit(left_base, irow_left + bit_offset) ? 0xff : 0x00;
+          uint8_t right = right_base[offset_right];
+          return left == right ? 0xff : 0;
+        });
+  } else if (col_width == 1) {
+    CompareBinaryColumnToRowHelper<use_selection>(
+        offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
+           uint32_t offset_right) {
+          uint8_t left = left_base[irow_left];
+          uint8_t right = right_base[offset_right];
+          return left == right ? 0xff : 0;
+        });
+  } else if (col_width == 2) {
+    CompareBinaryColumnToRowHelper<use_selection>(
+        offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
+           uint32_t offset_right) {
+          uint16_t left = reinterpret_cast<const uint16_t*>(left_base)[irow_left];

Review comment:
       Are these guaranteed to be aligned? If so, please add an assertion and a comment.

##########
File path: cpp/src/arrow/compute/exec/key_compare.cc
##########
@@ -17,250 +17,398 @@
 
 #include "arrow/compute/exec/key_compare.h"
 
+#include <immintrin.h>

Review comment:
       This include needs to be guarded by an `#ifdef`. This is causing the non-Intel CI jobs to fail
   
   https://app.travis-ci.com/github/apache/arrow/jobs/530849513#L1115
   
   https://app.travis-ci.com/github/apache/arrow/jobs/530849512#L1023

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -991,6 +991,9 @@ TEST(GroupBy, SumOnlyStringAndDictKeys) {
                                            {
                                                {"hash_sum", nullptr},
                                            }));
+    if (key_type->Equals(utf8())) {

Review comment:
       Why does it only require sorting for string keys?

##########
File path: cpp/src/arrow/compute/exec/key_compare.cc
##########
@@ -17,250 +17,398 @@
 
 #include "arrow/compute/exec/key_compare.h"
 
+#include <immintrin.h>
+#include <memory.h>
+
 #include <algorithm>
 #include <cstdint>
 
 #include "arrow/compute/exec/util.h"
+#include "arrow/util/bit_util.h"
 #include "arrow/util/ubsan.h"
 
 namespace arrow {
 namespace compute {
 
-void KeyCompare::CompareRows(uint32_t num_rows_to_compare,
-                             const uint16_t* sel_left_maybe_null,
-                             const uint32_t* left_to_right_map,
-                             KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_num_rows,
-                             uint16_t* out_sel_left_maybe_same,
-                             const KeyEncoder::KeyRowArray& rows_left,
-                             const KeyEncoder::KeyRowArray& rows_right) {
-  ARROW_DCHECK(rows_left.metadata().is_compatible(rows_right.metadata()));
-
-  if (num_rows_to_compare == 0) {
-    *out_num_rows = 0;
+template <bool use_selection>
+void KeyCompare::NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_compare,
+                                       const uint16_t* sel_left_maybe_null,
+                                       const uint32_t* left_to_right_map,
+                                       KeyEncoder::KeyEncoderContext* ctx,
+                                       const KeyEncoder::KeyColumnArray& col,
+                                       const KeyEncoder::KeyRowArray& rows,
+                                       uint8_t* match_bytevector) {
+  if (!rows.has_any_nulls(ctx) && !col.data(0)) {
     return;
   }
-
-  // Allocate temporary byte and bit vectors
-  auto bytevector_holder =
-      util::TempVectorHolder<uint8_t>(ctx->stack, num_rows_to_compare);
-  auto bitvector_holder =
-      util::TempVectorHolder<uint8_t>(ctx->stack, num_rows_to_compare);
-
-  uint8_t* match_bytevector = bytevector_holder.mutable_data();
-  uint8_t* match_bitvector = bitvector_holder.mutable_data();
-
-  // All comparison functions called here will update match byte vector
-  // (AND it with comparison result) instead of overwriting it.
-  memset(match_bytevector, 0xff, num_rows_to_compare);
-
-  if (rows_left.metadata().is_fixed_length) {
-    CompareFixedLength(num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
-                       match_bytevector, ctx, rows_left.metadata().fixed_length,
-                       rows_left.data(1), rows_right.data(1));
-  } else {
-    CompareVaryingLength(num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
-                         match_bytevector, ctx, rows_left.data(2), rows_right.data(2),
-                         rows_left.offsets(), rows_right.offsets());
+  uint32_t num_processed = 0;
+#if defined(ARROW_HAVE_AVX2)
+  if (ctx->has_avx2()) {
+    num_processed = NullUpdateColumnToRow_avx2(use_selection, id_col, num_rows_to_compare,
+                                               sel_left_maybe_null, left_to_right_map,
+                                               ctx, col, rows, match_bytevector);
   }
+#endif
 
-  // CompareFixedLength can be used to compare nulls as well
-  bool nulls_present = rows_left.has_any_nulls(ctx) || rows_right.has_any_nulls(ctx);
-  if (nulls_present) {
-    CompareFixedLength(num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
-                       match_bytevector, ctx,
-                       rows_left.metadata().null_masks_bytes_per_row,
-                       rows_left.null_masks(), rows_right.null_masks());
+  if (!col.data(0)) {
+    // Remove rows from the result for which the column value is a null
+    const uint8_t* null_masks = rows.null_masks();
+    uint32_t null_mask_num_bytes = rows.metadata().null_masks_bytes_per_row;
+    for (uint32_t i = num_processed; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      uint32_t irow_right = left_to_right_map[irow_left];
+      int64_t bitid = irow_right * null_mask_num_bytes * 8 + id_col;
+      match_bytevector[i] &= (BitUtil::GetBit(null_masks, bitid) ? 0 : 0xff);
+    }
+  } else if (!rows.has_any_nulls(ctx)) {
+    // Remove rows from the result for which the column value on left side is null
+    const uint8_t* non_nulls = col.data(0);
+    ARROW_DCHECK(non_nulls);
+    for (uint32_t i = num_processed; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      match_bytevector[i] &=
+          BitUtil::GetBit(non_nulls, irow_left + col.bit_offset(0)) ? 0xff : 0;
+    }
+  } else {
+    const uint8_t* null_masks = rows.null_masks();
+    uint32_t null_mask_num_bytes = rows.metadata().null_masks_bytes_per_row;
+    const uint8_t* non_nulls = col.data(0);
+    ARROW_DCHECK(non_nulls);
+    for (uint32_t i = num_processed; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      uint32_t irow_right = left_to_right_map[irow_left];
+      int64_t bitid_right = irow_right * null_mask_num_bytes * 8 + id_col;
+      int right_null = BitUtil::GetBit(null_masks, bitid_right) ? 0xff : 0;
+      int left_null =
+          BitUtil::GetBit(non_nulls, irow_left + col.bit_offset(0)) ? 0 : 0xff;
+      match_bytevector[i] |= left_null & right_null;
+      match_bytevector[i] &= ~(left_null ^ right_null);
+    }
   }
+}
 
-  util::BitUtil::bytes_to_bits(ctx->hardware_flags, num_rows_to_compare, match_bytevector,
-                               match_bitvector);
-  if (sel_left_maybe_null) {
-    int out_num_rows_int;
-    util::BitUtil::bits_filter_indexes(0, ctx->hardware_flags, num_rows_to_compare,
-                                       match_bitvector, sel_left_maybe_null,
-                                       &out_num_rows_int, out_sel_left_maybe_same);
-    *out_num_rows = out_num_rows_int;
+template <bool use_selection, class COMPARE_FN>
+void KeyCompare::CompareBinaryColumnToRowHelper(
+    uint32_t offset_within_row, uint32_t first_row_to_compare,
+    uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
+    const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
+    const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+    uint8_t* match_bytevector, COMPARE_FN compare_fn) {
+  bool is_fixed_length = rows.metadata().is_fixed_length;
+  if (is_fixed_length) {
+    uint32_t fixed_length = rows.metadata().fixed_length;
+    const uint8_t* rows_left = col.data(1);
+    const uint8_t* rows_right = rows.data(1);
+    for (uint32_t i = first_row_to_compare; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      uint32_t irow_right = left_to_right_map[irow_left];
+      uint32_t offset_right = irow_right * fixed_length + offset_within_row;
+      match_bytevector[i] = compare_fn(rows_left, rows_right, irow_left, offset_right);
+    }
   } else {
-    int out_num_rows_int;
-    util::BitUtil::bits_to_indexes(0, ctx->hardware_flags, num_rows_to_compare,
-                                   match_bitvector, &out_num_rows_int,
-                                   out_sel_left_maybe_same);
-    *out_num_rows = out_num_rows_int;
+    const uint8_t* rows_left = col.data(1);
+    const uint32_t* offsets_right = rows.offsets();
+    const uint8_t* rows_right = rows.data(2);
+    for (uint32_t i = first_row_to_compare; i < num_rows_to_compare; ++i) {
+      uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
+      uint32_t irow_right = left_to_right_map[irow_left];
+      uint32_t offset_right = offsets_right[irow_right] + offset_within_row;
+      match_bytevector[i] = compare_fn(rows_left, rows_right, irow_left, offset_right);
+    }
   }
 }
 
-void KeyCompare::CompareFixedLength(uint32_t num_rows_to_compare,
-                                    const uint16_t* sel_left_maybe_null,
-                                    const uint32_t* left_to_right_map,
-                                    uint8_t* match_bytevector,
-                                    KeyEncoder::KeyEncoderContext* ctx,
-                                    uint32_t fixed_length, const uint8_t* rows_left,
-                                    const uint8_t* rows_right) {
-  bool use_selection = (sel_left_maybe_null != nullptr);
-
-  uint32_t num_rows_already_processed = 0;
-
+template <bool use_selection>
+void KeyCompare::CompareBinaryColumnToRow(
+    uint32_t offset_within_row, uint32_t num_rows_to_compare,
+    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+  uint32_t num_processed = 0;
 #if defined(ARROW_HAVE_AVX2)
-  if (ctx->has_avx2() && !use_selection) {
-    // Choose between up-to-8B length, up-to-16B length and any size versions
-    if (fixed_length <= 8) {
-      num_rows_already_processed = CompareFixedLength_UpTo8B_avx2(
-          num_rows_to_compare, left_to_right_map, match_bytevector, fixed_length,
-          rows_left, rows_right);
-    } else if (fixed_length <= 16) {
-      num_rows_already_processed = CompareFixedLength_UpTo16B_avx2(
-          num_rows_to_compare, left_to_right_map, match_bytevector, fixed_length,
-          rows_left, rows_right);
-    } else {
-      num_rows_already_processed =
-          CompareFixedLength_avx2(num_rows_to_compare, left_to_right_map,
-                                  match_bytevector, fixed_length, rows_left, rows_right);
-    }
+  if (ctx->has_avx2()) {
+    num_processed = CompareBinaryColumnToRow_avx2(
+        use_selection, offset_within_row, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector);
   }
 #endif
 
-  typedef void (*CompareFixedLengthImp_t)(uint32_t, uint32_t, const uint16_t*,
-                                          const uint32_t*, uint8_t*, uint32_t,
-                                          const uint8_t*, const uint8_t*);
-  static const CompareFixedLengthImp_t CompareFixedLengthImp_fn[] = {
-      CompareFixedLengthImp<false, 1>, CompareFixedLengthImp<false, 2>,
-      CompareFixedLengthImp<false, 0>, CompareFixedLengthImp<true, 1>,
-      CompareFixedLengthImp<true, 2>,  CompareFixedLengthImp<true, 0>};
-  int dispatch_const = (use_selection ? 3 : 0) +
-                       ((fixed_length <= 8) ? 0 : ((fixed_length <= 16) ? 1 : 2));
-  CompareFixedLengthImp_fn[dispatch_const](
-      num_rows_already_processed, num_rows_to_compare, sel_left_maybe_null,
-      left_to_right_map, match_bytevector, fixed_length, rows_left, rows_right);
-}
+  uint32_t col_width = col.metadata().fixed_length;
+  if (col_width == 0) {
+    int bit_offset = col.bit_offset(1);
+    CompareBinaryColumnToRowHelper<use_selection>(
+        offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector,
+        [bit_offset](const uint8_t* left_base, const uint8_t* right_base,
+                     uint32_t irow_left, uint32_t offset_right) {
+          uint8_t left = BitUtil::GetBit(left_base, irow_left + bit_offset) ? 0xff : 0x00;
+          uint8_t right = right_base[offset_right];
+          return left == right ? 0xff : 0;
+        });
+  } else if (col_width == 1) {
+    CompareBinaryColumnToRowHelper<use_selection>(
+        offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
+           uint32_t offset_right) {
+          uint8_t left = left_base[irow_left];
+          uint8_t right = right_base[offset_right];
+          return left == right ? 0xff : 0;
+        });
+  } else if (col_width == 2) {
+    CompareBinaryColumnToRowHelper<use_selection>(
+        offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
+           uint32_t offset_right) {
+          uint16_t left = reinterpret_cast<const uint16_t*>(left_base)[irow_left];
+          uint16_t right = *reinterpret_cast<const uint16_t*>(right_base + offset_right);
+          return left == right ? 0xff : 0;
+        });
+  } else if (col_width == 4) {
+    CompareBinaryColumnToRowHelper<use_selection>(
+        offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
+           uint32_t offset_right) {
+          uint32_t left = reinterpret_cast<const uint32_t*>(left_base)[irow_left];
+          uint32_t right = *reinterpret_cast<const uint32_t*>(right_base + offset_right);
+          return left == right ? 0xff : 0;
+        });
+  } else if (col_width == 8) {
+    CompareBinaryColumnToRowHelper<use_selection>(
+        offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
+           uint32_t offset_right) {
+          uint64_t left = reinterpret_cast<const uint64_t*>(left_base)[irow_left];
+          uint64_t right = *reinterpret_cast<const uint64_t*>(right_base + offset_right);
+          return left == right ? 0xff : 0;
+        });
+  } else {
+    CompareBinaryColumnToRowHelper<use_selection>(
+        offset_within_row, num_processed, num_rows_to_compare, sel_left_maybe_null,
+        left_to_right_map, ctx, col, rows, match_bytevector,
+        [&col](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left,
+               uint32_t offset_right) {
+          uint32_t length = col.metadata().fixed_length;
 
-template <bool use_selection, int num_64bit_words>
-void KeyCompare::CompareFixedLengthImp(uint32_t num_rows_already_processed,
-                                       uint32_t num_rows,
-                                       const uint16_t* sel_left_maybe_null,
-                                       const uint32_t* left_to_right_map,
-                                       uint8_t* match_bytevector, uint32_t length,
-                                       const uint8_t* rows_left,
-                                       const uint8_t* rows_right) {
-  // Key length (for encoded key) has to be non-zero
-  ARROW_DCHECK(length > 0);
+          // Non-zero length guarantees no underflow
+          int32_t num_loops_less_one = (static_cast<int32_t>(length) + 7) / 8 - 1;
+
+          uint64_t tail_mask = ~0ULL >> (64 - 8 * (length - num_loops_less_one * 8));
 
-  // Non-zero length guarantees no underflow
-  int32_t num_loops_less_one = (static_cast<int32_t>(length) + 7) / 8 - 1;
+          const uint64_t* key_left_ptr =
+              reinterpret_cast<const uint64_t*>(left_base + irow_left * length);
+          const uint64_t* key_right_ptr =
+              reinterpret_cast<const uint64_t*>(right_base + offset_right);
+          uint64_t result_or = 0;
+          int32_t i;
+          // length cannot be zero
+          for (i = 0; i < num_loops_less_one; ++i) {
+            uint64_t key_left = key_left_ptr[i];
+            uint64_t key_right = key_right_ptr[i];
+            result_or |= key_left ^ key_right;
+          }
+          uint64_t key_left = key_left_ptr[i];
+          uint64_t key_right = key_right_ptr[i];
+          result_or |= tail_mask & (key_left ^ key_right);
+          return result_or == 0 ? 0xff : 0;
+        });
+  }
+}
 
-  // Length remaining in last loop can only be zero for input length equal to zero
-  uint32_t length_remaining_last_loop = length - num_loops_less_one * 8;
-  uint64_t tail_mask = (~0ULL) >> (8 * (8 - length_remaining_last_loop));
+// Overwrites the match_bytevector instead of updating it
+template <bool use_selection, bool is_first_varbinary_col>
+void KeyCompare::CompareVarBinaryColumnToRow(
+    uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
+    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+#if defined(ARROW_HAVE_AVX2)
+  if (ctx->has_avx2()) {
+    CompareVarBinaryColumnToRow_avx2(
+        use_selection, is_first_varbinary_col, id_varbinary_col, num_rows_to_compare,
+        sel_left_maybe_null, left_to_right_map, ctx, col, rows, match_bytevector);
+    return;
+  }
+#endif
 
-  for (uint32_t id_input = num_rows_already_processed; id_input < num_rows; ++id_input) {
-    uint32_t irow_left = use_selection ? sel_left_maybe_null[id_input] : id_input;
+  const uint32_t* offsets_left = col.offsets();
+  const uint32_t* offsets_right = rows.offsets();
+  const uint8_t* rows_left = col.data(2);
+  const uint8_t* rows_right = rows.data(2);
+  for (uint32_t i = 0; i < num_rows_to_compare; ++i) {
+    uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
     uint32_t irow_right = left_to_right_map[irow_left];
-    uint32_t begin_left = length * irow_left;
-    uint32_t begin_right = length * irow_right;
+    uint32_t begin_left = offsets_left[irow_left];
+    uint32_t length_left = offsets_left[irow_left + 1] - begin_left;
+    uint32_t begin_right = offsets_right[irow_right];
+    uint32_t length_right;
+    uint32_t offset_within_row;
+    if (!is_first_varbinary_col) {
+      rows.metadata().nth_varbinary_offset_and_length(
+          rows_right + begin_right, id_varbinary_col, &offset_within_row, &length_right);
+    } else {
+      rows.metadata().first_varbinary_offset_and_length(
+          rows_right + begin_right, &offset_within_row, &length_right);
+    }
+    begin_right += offset_within_row;
+    uint32_t length = std::min(length_left, length_right);
     const uint64_t* key_left_ptr =
         reinterpret_cast<const uint64_t*>(rows_left + begin_left);
     const uint64_t* key_right_ptr =
         reinterpret_cast<const uint64_t*>(rows_right + begin_right);
-    uint64_t result_or = 0ULL;
-    int32_t istripe = 0;
-
-    // Specializations for keys up to 8 bytes and between 9 and 16 bytes to
-    // avoid internal loop over words in the value for short ones.
-    //
-    // Template argument 0 means arbitrarily many 64-bit words,
-    // 1 means up to 1 and 2 means up to 2.
-    //
-    if (num_64bit_words == 0) {
-      for (; istripe < num_loops_less_one; ++istripe) {
-        uint64_t key_left = util::SafeLoad(&key_left_ptr[istripe]);
-        uint64_t key_right = util::SafeLoad(&key_right_ptr[istripe]);
-        result_or |= (key_left ^ key_right);
+    uint64_t result_or = 0;
+    if (length > 0) {
+      int32_t j;
+      // length can be zero
+      for (j = 0; j < (static_cast<int32_t>(length) + 7) / 8 - 1; ++j) {

Review comment:
       Nit: please replace all of these with calls to descriptively named helpers, like:
   ```suggestion
         for (j = 0; j < BitUtil::CeilDiv(length, 8); ++j) {
   ```

##########
File path: cpp/src/arrow/compute/exec/key_compare_avx2.cc
##########
@@ -25,160 +25,545 @@ namespace compute {
 
 #if defined(ARROW_HAVE_AVX2)
 
-uint32_t KeyCompare::CompareFixedLength_UpTo8B_avx2(
-    uint32_t num_rows, const uint32_t* left_to_right_map, uint8_t* match_bytevector,
-    uint32_t length, const uint8_t* rows_left, const uint8_t* rows_right) {
-  ARROW_DCHECK(length <= 8);
-  __m256i offset_left = _mm256_setr_epi64x(0, length, length * 2, length * 3);
-  __m256i offset_left_incr = _mm256_set1_epi64x(length * 4);
-  __m256i mask = _mm256_set1_epi64x(~0ULL >> (8 * (8 - length)));
-
-  constexpr uint32_t unroll = 4;
-  for (uint32_t i = 0; i < num_rows / unroll; ++i) {
-    auto key_left = _mm256_i64gather_epi64(
-        reinterpret_cast<arrow::util::int64_for_gather_t*>(rows_left), offset_left, 1);
-    offset_left = _mm256_add_epi64(offset_left, offset_left_incr);
-    __m128i offset_right =
-        _mm_loadu_si128(reinterpret_cast<const __m128i*>(left_to_right_map) + i);
-    offset_right = _mm_mullo_epi32(offset_right, _mm_set1_epi32(length));
-
-    auto key_right = _mm256_i32gather_epi64(
-        reinterpret_cast<arrow::util::int64_for_gather_t*>(rows_right), offset_right, 1);
-    uint32_t cmp = _mm256_movemask_epi8(_mm256_cmpeq_epi64(
-        _mm256_and_si256(key_left, mask), _mm256_and_si256(key_right, mask)));
-    reinterpret_cast<uint32_t*>(match_bytevector)[i] &= cmp;
+template <bool use_selection>
+uint32_t KeyCompare::NullUpdateColumnToRowImp_avx2(
+    uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
+    const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
+    const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+    uint8_t* match_bytevector) {
+  if (!rows.has_any_nulls(ctx) && !col.data(0)) {
+    return num_rows_to_compare;
   }
+  if (!col.data(0)) {
+    // Remove rows from the result for which the column value is a null
+    const uint8_t* null_masks = rows.null_masks();
+    uint32_t null_mask_num_bytes = rows.metadata().null_masks_bytes_per_row;
 
-  uint32_t num_rows_processed = num_rows - (num_rows % unroll);
-  return num_rows_processed;
-}
+    uint32_t num_processed = 0;
+    constexpr uint32_t unroll = 8;
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      __m256i irow_right;
+      if (use_selection) {
+        __m256i irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+        irow_right = _mm256_i32gather_epi32((const int*)left_to_right_map, irow_left, 4);
+      } else {
+        irow_right =
+            _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
+      }
+      __m256i bitid =
+          _mm256_mullo_epi32(irow_right, _mm256_set1_epi32(null_mask_num_bytes * 8));
+      bitid = _mm256_add_epi32(bitid, _mm256_set1_epi32(id_col));
+      __m256i right =
+          _mm256_i32gather_epi32((const int*)null_masks, _mm256_srli_epi32(bitid, 3), 1);
+      right = _mm256_and_si256(
+          _mm256_set1_epi32(1),
+          _mm256_srlv_epi32(right, _mm256_and_si256(bitid, _mm256_set1_epi32(7))));
+      __m256i cmp = _mm256_cmpeq_epi32(right, _mm256_setzero_si256());
+      uint32_t result_lo =
+          _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(cmp)));
+      uint32_t result_hi =
+          _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_extracti128_si256(cmp, 1)));
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] &=
+          result_lo | (static_cast<uint64_t>(result_hi) << 32);
+    }
+    num_processed = num_rows_to_compare / unroll * unroll;
+    return num_processed;
+  } else if (!rows.has_any_nulls(ctx)) {
+    // Remove rows from the result for which the column value on left side is null
+    const uint8_t* non_nulls = col.data(0);
+    ARROW_DCHECK(non_nulls);
+    uint32_t num_processed = 0;
+    constexpr uint32_t unroll = 8;
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      __m256i cmp;
+      if (use_selection) {
+        __m256i irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+        irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(col.bit_offset(0)));
+        __m256i left = _mm256_i32gather_epi32((const int*)non_nulls,
+                                              _mm256_srli_epi32(irow_left, 3), 1);
+        left = _mm256_and_si256(
+            _mm256_set1_epi32(1),
+            _mm256_srlv_epi32(left, _mm256_and_si256(irow_left, _mm256_set1_epi32(7))));
+        cmp = _mm256_cmpeq_epi32(left, _mm256_set1_epi32(1));
+      } else {
+        __m256i left = _mm256_cvtepu8_epi32(_mm_set1_epi8(static_cast<uint8_t>(
+            reinterpret_cast<const uint16_t*>(non_nulls + i)[0] >> col.bit_offset(0))));
+        __m256i bits = _mm256_setr_epi32(1, 2, 4, 8, 16, 32, 64, 128);
+        cmp = _mm256_cmpeq_epi32(_mm256_and_si256(left, bits), bits);
+      }
+      uint32_t result_lo =
+          _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(cmp)));
+      uint32_t result_hi =
+          _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_extracti128_si256(cmp, 1)));
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] &=
+          result_lo | (static_cast<uint64_t>(result_hi) << 32);
+      num_processed = num_rows_to_compare / unroll * unroll;
+    }
+    return num_processed;
+  } else {
+    const uint8_t* null_masks = rows.null_masks();
+    uint32_t null_mask_num_bytes = rows.metadata().null_masks_bytes_per_row;
+    const uint8_t* non_nulls = col.data(0);
+    ARROW_DCHECK(non_nulls);
 
-uint32_t KeyCompare::CompareFixedLength_UpTo16B_avx2(
-    uint32_t num_rows, const uint32_t* left_to_right_map, uint8_t* match_bytevector,
-    uint32_t length, const uint8_t* rows_left, const uint8_t* rows_right) {
-  ARROW_DCHECK(length <= 16);
-
-  constexpr uint64_t kByteSequence0To7 = 0x0706050403020100ULL;
-  constexpr uint64_t kByteSequence8To15 = 0x0f0e0d0c0b0a0908ULL;
-
-  __m256i mask =
-      _mm256_cmpgt_epi8(_mm256_set1_epi8(length),
-                        _mm256_setr_epi64x(kByteSequence0To7, kByteSequence8To15,
-                                           kByteSequence0To7, kByteSequence8To15));
-  const uint8_t* key_left_ptr = rows_left;
-
-  constexpr uint32_t unroll = 2;
-  for (uint32_t i = 0; i < num_rows / unroll; ++i) {
-    auto key_left = _mm256_inserti128_si256(
-        _mm256_castsi128_si256(
-            _mm_loadu_si128(reinterpret_cast<const __m128i*>(key_left_ptr))),
-        _mm_loadu_si128(reinterpret_cast<const __m128i*>(key_left_ptr + length)), 1);
-    key_left_ptr += length * 2;
-    auto key_right = _mm256_inserti128_si256(
-        _mm256_castsi128_si256(_mm_loadu_si128(reinterpret_cast<const __m128i*>(
-            rows_right + length * left_to_right_map[2 * i]))),
-        _mm_loadu_si128(reinterpret_cast<const __m128i*>(
-            rows_right + length * left_to_right_map[2 * i + 1])),
-        1);
-    __m256i cmp = _mm256_cmpeq_epi64(_mm256_and_si256(key_left, mask),
-                                     _mm256_and_si256(key_right, mask));
-    cmp = _mm256_and_si256(cmp, _mm256_shuffle_epi32(cmp, 0xee));  // 0b11101110
-    cmp = _mm256_permute4x64_epi64(cmp, 0x08);                     // 0b00001000
-    reinterpret_cast<uint16_t*>(match_bytevector)[i] &=
-        (_mm256_movemask_epi8(cmp) & 0xffff);
-  }
+    uint32_t num_processed = 0;
+    constexpr uint32_t unroll = 8;
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      __m256i left_null;
+      __m256i irow_right;
+      if (use_selection) {
+        __m256i irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+        irow_right = _mm256_i32gather_epi32((const int*)left_to_right_map, irow_left, 4);
+        irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(col.bit_offset(0)));
+        __m256i left = _mm256_i32gather_epi32((const int*)non_nulls,
+                                              _mm256_srli_epi32(irow_left, 3), 1);
+        left = _mm256_and_si256(
+            _mm256_set1_epi32(1),
+            _mm256_srlv_epi32(left, _mm256_and_si256(irow_left, _mm256_set1_epi32(7))));
+        left_null = _mm256_cmpeq_epi32(left, _mm256_setzero_si256());
+      } else {
+        irow_right =
+            _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
+        __m256i left = _mm256_cvtepu8_epi32(_mm_set1_epi8(static_cast<uint8_t>(
+            reinterpret_cast<const uint16_t*>(non_nulls + i)[0] >> col.bit_offset(0))));
+        __m256i bits = _mm256_setr_epi32(1, 2, 4, 8, 16, 32, 64, 128);
+        left_null =
+            _mm256_cmpeq_epi32(_mm256_and_si256(left, bits), _mm256_setzero_si256());
+      }
+      __m256i bitid =
+          _mm256_mullo_epi32(irow_right, _mm256_set1_epi32(null_mask_num_bytes * 8));
+      bitid = _mm256_add_epi32(bitid, _mm256_set1_epi32(id_col));
+      __m256i right =
+          _mm256_i32gather_epi32((const int*)null_masks, _mm256_srli_epi32(bitid, 3), 1);
+      right = _mm256_and_si256(
+          _mm256_set1_epi32(1),
+          _mm256_srlv_epi32(right, _mm256_and_si256(bitid, _mm256_set1_epi32(7))));
+      __m256i right_null = _mm256_cmpeq_epi32(right, _mm256_set1_epi32(1));
 
-  uint32_t num_rows_processed = num_rows - (num_rows % unroll);
-  return num_rows_processed;
-}
+      uint64_t left_null_64 =
+          static_cast<uint32_t>(_mm256_movemask_epi8(
+              _mm256_cvtepi32_epi64(_mm256_castsi256_si128(left_null)))) |
+          (static_cast<uint64_t>(static_cast<uint32_t>(_mm256_movemask_epi8(
+               _mm256_cvtepi32_epi64(_mm256_extracti128_si256(left_null, 1)))))
+           << 32);
 
-uint32_t KeyCompare::CompareFixedLength_avx2(uint32_t num_rows,
-                                             const uint32_t* left_to_right_map,
-                                             uint8_t* match_bytevector, uint32_t length,
-                                             const uint8_t* rows_left,
-                                             const uint8_t* rows_right) {
-  ARROW_DCHECK(length > 0);
+      uint64_t right_null_64 =
+          static_cast<uint32_t>(_mm256_movemask_epi8(
+              _mm256_cvtepi32_epi64(_mm256_castsi256_si128(right_null)))) |
+          (static_cast<uint64_t>(static_cast<uint32_t>(_mm256_movemask_epi8(
+               _mm256_cvtepi32_epi64(_mm256_extracti128_si256(right_null, 1)))))
+           << 32);
 
-  constexpr uint64_t kByteSequence0To7 = 0x0706050403020100ULL;
-  constexpr uint64_t kByteSequence8To15 = 0x0f0e0d0c0b0a0908ULL;
-  constexpr uint64_t kByteSequence16To23 = 0x1716151413121110ULL;
-  constexpr uint64_t kByteSequence24To31 = 0x1f1e1d1c1b1a1918ULL;
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] |= left_null_64 & right_null_64;
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] &= ~(left_null_64 ^ right_null_64);
+    }
+    num_processed = num_rows_to_compare / unroll * unroll;
+    return num_processed;
+  }
+}
 
-  // Non-zero length guarantees no underflow
-  int32_t num_loops_less_one = (static_cast<int32_t>(length) + 31) / 32 - 1;
+template <bool use_selection, class COMPARE8_FN>
+uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
+    uint32_t offset_within_row, uint32_t num_rows_to_compare,
+    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
+    COMPARE8_FN compare8_fn) {
+  bool is_fixed_length = rows.metadata().is_fixed_length;
+  if (is_fixed_length) {
+    uint32_t fixed_length = rows.metadata().fixed_length;
+    const uint8_t* rows_left = col.data(1);
+    const uint8_t* rows_right = rows.data(1);
+    constexpr uint32_t unroll = 8;
+    __m256i irow_left = _mm256_setr_epi32(0, 1, 2, 3, 4, 5, 6, 7);
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      if (use_selection) {
+        irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+      }
+      __m256i irow_right;
+      if (use_selection) {
+        irow_right = _mm256_i32gather_epi32((const int*)left_to_right_map, irow_left, 4);
+      } else {
+        irow_right =
+            _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
+      }
 
-  __m256i tail_mask =
-      _mm256_cmpgt_epi8(_mm256_set1_epi8(length - num_loops_less_one * 32),
-                        _mm256_setr_epi64x(kByteSequence0To7, kByteSequence8To15,
-                                           kByteSequence16To23, kByteSequence24To31));
+      __m256i offset_right =
+          _mm256_mullo_epi32(irow_right, _mm256_set1_epi32(fixed_length));
+      offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row));
 
-  for (uint32_t irow_left = 0; irow_left < num_rows; ++irow_left) {
-    uint32_t irow_right = left_to_right_map[irow_left];
-    uint32_t begin_left = length * irow_left;
-    uint32_t begin_right = length * irow_right;
-    const __m256i* key_left_ptr =
-        reinterpret_cast<const __m256i*>(rows_left + begin_left);
-    const __m256i* key_right_ptr =
-        reinterpret_cast<const __m256i*>(rows_right + begin_right);
-    __m256i result_or = _mm256_setzero_si256();
-    int32_t i;
-    // length cannot be zero
-    for (i = 0; i < num_loops_less_one; ++i) {
-      __m256i key_left = _mm256_loadu_si256(key_left_ptr + i);
-      __m256i key_right = _mm256_loadu_si256(key_right_ptr + i);
-      result_or = _mm256_or_si256(result_or, _mm256_xor_si256(key_left, key_right));
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] =
+          compare8_fn(rows_left, rows_right, i * unroll, irow_left, offset_right);
+
+      if (!use_selection) {
+        irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(8));
+      }
     }
+    return num_rows_to_compare - (num_rows_to_compare % unroll);
+  } else {
+    const uint8_t* rows_left = col.data(1);
+    const uint32_t* offsets_right = rows.offsets();
+    const uint8_t* rows_right = rows.data(2);
+    constexpr uint32_t unroll = 8;
+    __m256i irow_left = _mm256_setr_epi32(0, 1, 2, 3, 4, 5, 6, 7);
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      if (use_selection) {
+        irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+      }
+      __m256i irow_right;
+      if (use_selection) {
+        irow_right = _mm256_i32gather_epi32((const int*)left_to_right_map, irow_left, 4);
+      } else {
+        irow_right =
+            _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
+      }
+      __m256i offset_right =
+          _mm256_i32gather_epi32((const int*)offsets_right, irow_right, 4);
+      offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row));
 
-    __m256i key_left = _mm256_loadu_si256(key_left_ptr + i);
-    __m256i key_right = _mm256_loadu_si256(key_right_ptr + i);
-    result_or = _mm256_or_si256(
-        result_or, _mm256_and_si256(tail_mask, _mm256_xor_si256(key_left, key_right)));
-    int result = _mm256_testz_si256(result_or, result_or) * 0xff;
-    match_bytevector[irow_left] &= result;
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] =
+          compare8_fn(rows_left, rows_right, i * unroll, irow_left, offset_right);
+
+      if (!use_selection) {
+        irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(8));
+      }
+    }
+    return num_rows_to_compare - (num_rows_to_compare % unroll);
   }
+}
 
-  uint32_t num_rows_processed = num_rows;
-  return num_rows_processed;
+template <bool use_selection>
+uint32_t KeyCompare::CompareBinaryColumnToRowImp_avx2(
+    uint32_t offset_within_row, uint32_t num_rows_to_compare,
+    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+  uint32_t col_width = col.metadata().fixed_length;
+  if (col_width == 0) {
+    int bit_offset = col.bit_offset(1);
+    return CompareBinaryColumnToRowHelper_avx2<use_selection>(
+        offset_within_row, num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
+        ctx, col, rows, match_bytevector,
+        [bit_offset](const uint8_t* left_base, const uint8_t* right_base,
+                     uint32_t irow_left_base, __m256i irow_left, __m256i offset_right) {
+          __m256i left;
+          if (use_selection) {
+            irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(bit_offset));
+            left = _mm256_i32gather_epi32((const int*)left_base,
+                                          _mm256_srli_epi32(irow_left, 3), 1);
+            left = _mm256_and_si256(
+                _mm256_set1_epi32(1),
+                _mm256_srlv_epi32(left,
+                                  _mm256_and_si256(irow_left, _mm256_set1_epi32(7))));
+            left = _mm256_mullo_epi32(left, _mm256_set1_epi32(0xff));
+          } else {
+            __m256i bits = _mm256_setr_epi32(1, 2, 4, 8, 16, 32, 64, 128);
+            uint32_t start_bit_index = irow_left_base + bit_offset;
+            uint8_t left_bits_8 =
+                (reinterpret_cast<const uint16_t*>(left_base + start_bit_index / 8)[0] >>
+                 (start_bit_index % 8)) &
+                0xff;
+            left = _mm256_cmpeq_epi32(
+                _mm256_and_si256(bits, _mm256_set1_epi8(left_bits_8)), bits);
+            left = _mm256_and_si256(left, _mm256_set1_epi32(0xff));
+          }
+          __m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
+          right = _mm256_and_si256(right, _mm256_set1_epi32(0xff));
+          __m256i cmp = _mm256_cmpeq_epi32(left, right);
+          uint32_t result_lo =
+              _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(cmp)));
+          uint32_t result_hi = _mm256_movemask_epi8(
+              _mm256_cvtepi32_epi64(_mm256_extracti128_si256(cmp, 1)));
+          return result_lo | (static_cast<uint64_t>(result_hi) << 32);
+        });
+  } else if (col_width == 1) {

Review comment:
       These blocks are very repetitive and it's difficult to see how they differ. As written the blocks look sufficiently different to make generic extraction difficult, which is unfortunate.

##########
File path: cpp/src/arrow/compute/exec/key_compare_avx2.cc
##########
@@ -25,160 +25,545 @@ namespace compute {
 
 #if defined(ARROW_HAVE_AVX2)
 
-uint32_t KeyCompare::CompareFixedLength_UpTo8B_avx2(
-    uint32_t num_rows, const uint32_t* left_to_right_map, uint8_t* match_bytevector,
-    uint32_t length, const uint8_t* rows_left, const uint8_t* rows_right) {
-  ARROW_DCHECK(length <= 8);
-  __m256i offset_left = _mm256_setr_epi64x(0, length, length * 2, length * 3);
-  __m256i offset_left_incr = _mm256_set1_epi64x(length * 4);
-  __m256i mask = _mm256_set1_epi64x(~0ULL >> (8 * (8 - length)));
-
-  constexpr uint32_t unroll = 4;
-  for (uint32_t i = 0; i < num_rows / unroll; ++i) {
-    auto key_left = _mm256_i64gather_epi64(
-        reinterpret_cast<arrow::util::int64_for_gather_t*>(rows_left), offset_left, 1);
-    offset_left = _mm256_add_epi64(offset_left, offset_left_incr);
-    __m128i offset_right =
-        _mm_loadu_si128(reinterpret_cast<const __m128i*>(left_to_right_map) + i);
-    offset_right = _mm_mullo_epi32(offset_right, _mm_set1_epi32(length));
-
-    auto key_right = _mm256_i32gather_epi64(
-        reinterpret_cast<arrow::util::int64_for_gather_t*>(rows_right), offset_right, 1);
-    uint32_t cmp = _mm256_movemask_epi8(_mm256_cmpeq_epi64(
-        _mm256_and_si256(key_left, mask), _mm256_and_si256(key_right, mask)));
-    reinterpret_cast<uint32_t*>(match_bytevector)[i] &= cmp;
+template <bool use_selection>
+uint32_t KeyCompare::NullUpdateColumnToRowImp_avx2(
+    uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
+    const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
+    const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+    uint8_t* match_bytevector) {
+  if (!rows.has_any_nulls(ctx) && !col.data(0)) {
+    return num_rows_to_compare;
   }
+  if (!col.data(0)) {
+    // Remove rows from the result for which the column value is a null
+    const uint8_t* null_masks = rows.null_masks();
+    uint32_t null_mask_num_bytes = rows.metadata().null_masks_bytes_per_row;
 
-  uint32_t num_rows_processed = num_rows - (num_rows % unroll);
-  return num_rows_processed;
-}
+    uint32_t num_processed = 0;
+    constexpr uint32_t unroll = 8;
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      __m256i irow_right;
+      if (use_selection) {
+        __m256i irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+        irow_right = _mm256_i32gather_epi32((const int*)left_to_right_map, irow_left, 4);
+      } else {
+        irow_right =
+            _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
+      }
+      __m256i bitid =
+          _mm256_mullo_epi32(irow_right, _mm256_set1_epi32(null_mask_num_bytes * 8));
+      bitid = _mm256_add_epi32(bitid, _mm256_set1_epi32(id_col));
+      __m256i right =
+          _mm256_i32gather_epi32((const int*)null_masks, _mm256_srli_epi32(bitid, 3), 1);
+      right = _mm256_and_si256(
+          _mm256_set1_epi32(1),
+          _mm256_srlv_epi32(right, _mm256_and_si256(bitid, _mm256_set1_epi32(7))));
+      __m256i cmp = _mm256_cmpeq_epi32(right, _mm256_setzero_si256());
+      uint32_t result_lo =
+          _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(cmp)));
+      uint32_t result_hi =
+          _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_extracti128_si256(cmp, 1)));
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] &=
+          result_lo | (static_cast<uint64_t>(result_hi) << 32);
+    }
+    num_processed = num_rows_to_compare / unroll * unroll;
+    return num_processed;
+  } else if (!rows.has_any_nulls(ctx)) {
+    // Remove rows from the result for which the column value on left side is null
+    const uint8_t* non_nulls = col.data(0);
+    ARROW_DCHECK(non_nulls);
+    uint32_t num_processed = 0;
+    constexpr uint32_t unroll = 8;
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      __m256i cmp;
+      if (use_selection) {
+        __m256i irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+        irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(col.bit_offset(0)));
+        __m256i left = _mm256_i32gather_epi32((const int*)non_nulls,
+                                              _mm256_srli_epi32(irow_left, 3), 1);
+        left = _mm256_and_si256(
+            _mm256_set1_epi32(1),
+            _mm256_srlv_epi32(left, _mm256_and_si256(irow_left, _mm256_set1_epi32(7))));
+        cmp = _mm256_cmpeq_epi32(left, _mm256_set1_epi32(1));
+      } else {
+        __m256i left = _mm256_cvtepu8_epi32(_mm_set1_epi8(static_cast<uint8_t>(
+            reinterpret_cast<const uint16_t*>(non_nulls + i)[0] >> col.bit_offset(0))));
+        __m256i bits = _mm256_setr_epi32(1, 2, 4, 8, 16, 32, 64, 128);
+        cmp = _mm256_cmpeq_epi32(_mm256_and_si256(left, bits), bits);
+      }
+      uint32_t result_lo =
+          _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(cmp)));
+      uint32_t result_hi =
+          _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_extracti128_si256(cmp, 1)));
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] &=
+          result_lo | (static_cast<uint64_t>(result_hi) << 32);
+      num_processed = num_rows_to_compare / unroll * unroll;
+    }
+    return num_processed;
+  } else {
+    const uint8_t* null_masks = rows.null_masks();
+    uint32_t null_mask_num_bytes = rows.metadata().null_masks_bytes_per_row;
+    const uint8_t* non_nulls = col.data(0);
+    ARROW_DCHECK(non_nulls);
 
-uint32_t KeyCompare::CompareFixedLength_UpTo16B_avx2(
-    uint32_t num_rows, const uint32_t* left_to_right_map, uint8_t* match_bytevector,
-    uint32_t length, const uint8_t* rows_left, const uint8_t* rows_right) {
-  ARROW_DCHECK(length <= 16);
-
-  constexpr uint64_t kByteSequence0To7 = 0x0706050403020100ULL;
-  constexpr uint64_t kByteSequence8To15 = 0x0f0e0d0c0b0a0908ULL;
-
-  __m256i mask =
-      _mm256_cmpgt_epi8(_mm256_set1_epi8(length),
-                        _mm256_setr_epi64x(kByteSequence0To7, kByteSequence8To15,
-                                           kByteSequence0To7, kByteSequence8To15));
-  const uint8_t* key_left_ptr = rows_left;
-
-  constexpr uint32_t unroll = 2;
-  for (uint32_t i = 0; i < num_rows / unroll; ++i) {
-    auto key_left = _mm256_inserti128_si256(
-        _mm256_castsi128_si256(
-            _mm_loadu_si128(reinterpret_cast<const __m128i*>(key_left_ptr))),
-        _mm_loadu_si128(reinterpret_cast<const __m128i*>(key_left_ptr + length)), 1);
-    key_left_ptr += length * 2;
-    auto key_right = _mm256_inserti128_si256(
-        _mm256_castsi128_si256(_mm_loadu_si128(reinterpret_cast<const __m128i*>(
-            rows_right + length * left_to_right_map[2 * i]))),
-        _mm_loadu_si128(reinterpret_cast<const __m128i*>(
-            rows_right + length * left_to_right_map[2 * i + 1])),
-        1);
-    __m256i cmp = _mm256_cmpeq_epi64(_mm256_and_si256(key_left, mask),
-                                     _mm256_and_si256(key_right, mask));
-    cmp = _mm256_and_si256(cmp, _mm256_shuffle_epi32(cmp, 0xee));  // 0b11101110
-    cmp = _mm256_permute4x64_epi64(cmp, 0x08);                     // 0b00001000
-    reinterpret_cast<uint16_t*>(match_bytevector)[i] &=
-        (_mm256_movemask_epi8(cmp) & 0xffff);
-  }
+    uint32_t num_processed = 0;
+    constexpr uint32_t unroll = 8;
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      __m256i left_null;
+      __m256i irow_right;
+      if (use_selection) {
+        __m256i irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+        irow_right = _mm256_i32gather_epi32((const int*)left_to_right_map, irow_left, 4);
+        irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(col.bit_offset(0)));
+        __m256i left = _mm256_i32gather_epi32((const int*)non_nulls,
+                                              _mm256_srli_epi32(irow_left, 3), 1);
+        left = _mm256_and_si256(
+            _mm256_set1_epi32(1),
+            _mm256_srlv_epi32(left, _mm256_and_si256(irow_left, _mm256_set1_epi32(7))));
+        left_null = _mm256_cmpeq_epi32(left, _mm256_setzero_si256());
+      } else {
+        irow_right =
+            _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
+        __m256i left = _mm256_cvtepu8_epi32(_mm_set1_epi8(static_cast<uint8_t>(
+            reinterpret_cast<const uint16_t*>(non_nulls + i)[0] >> col.bit_offset(0))));
+        __m256i bits = _mm256_setr_epi32(1, 2, 4, 8, 16, 32, 64, 128);
+        left_null =
+            _mm256_cmpeq_epi32(_mm256_and_si256(left, bits), _mm256_setzero_si256());
+      }
+      __m256i bitid =
+          _mm256_mullo_epi32(irow_right, _mm256_set1_epi32(null_mask_num_bytes * 8));
+      bitid = _mm256_add_epi32(bitid, _mm256_set1_epi32(id_col));
+      __m256i right =
+          _mm256_i32gather_epi32((const int*)null_masks, _mm256_srli_epi32(bitid, 3), 1);
+      right = _mm256_and_si256(
+          _mm256_set1_epi32(1),
+          _mm256_srlv_epi32(right, _mm256_and_si256(bitid, _mm256_set1_epi32(7))));
+      __m256i right_null = _mm256_cmpeq_epi32(right, _mm256_set1_epi32(1));
 
-  uint32_t num_rows_processed = num_rows - (num_rows % unroll);
-  return num_rows_processed;
-}
+      uint64_t left_null_64 =
+          static_cast<uint32_t>(_mm256_movemask_epi8(
+              _mm256_cvtepi32_epi64(_mm256_castsi256_si128(left_null)))) |
+          (static_cast<uint64_t>(static_cast<uint32_t>(_mm256_movemask_epi8(
+               _mm256_cvtepi32_epi64(_mm256_extracti128_si256(left_null, 1)))))
+           << 32);
 
-uint32_t KeyCompare::CompareFixedLength_avx2(uint32_t num_rows,
-                                             const uint32_t* left_to_right_map,
-                                             uint8_t* match_bytevector, uint32_t length,
-                                             const uint8_t* rows_left,
-                                             const uint8_t* rows_right) {
-  ARROW_DCHECK(length > 0);
+      uint64_t right_null_64 =
+          static_cast<uint32_t>(_mm256_movemask_epi8(
+              _mm256_cvtepi32_epi64(_mm256_castsi256_si128(right_null)))) |
+          (static_cast<uint64_t>(static_cast<uint32_t>(_mm256_movemask_epi8(
+               _mm256_cvtepi32_epi64(_mm256_extracti128_si256(right_null, 1)))))
+           << 32);
 
-  constexpr uint64_t kByteSequence0To7 = 0x0706050403020100ULL;
-  constexpr uint64_t kByteSequence8To15 = 0x0f0e0d0c0b0a0908ULL;
-  constexpr uint64_t kByteSequence16To23 = 0x1716151413121110ULL;
-  constexpr uint64_t kByteSequence24To31 = 0x1f1e1d1c1b1a1918ULL;
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] |= left_null_64 & right_null_64;
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] &= ~(left_null_64 ^ right_null_64);
+    }
+    num_processed = num_rows_to_compare / unroll * unroll;
+    return num_processed;
+  }
+}
 
-  // Non-zero length guarantees no underflow
-  int32_t num_loops_less_one = (static_cast<int32_t>(length) + 31) / 32 - 1;
+template <bool use_selection, class COMPARE8_FN>
+uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
+    uint32_t offset_within_row, uint32_t num_rows_to_compare,
+    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
+    COMPARE8_FN compare8_fn) {
+  bool is_fixed_length = rows.metadata().is_fixed_length;
+  if (is_fixed_length) {
+    uint32_t fixed_length = rows.metadata().fixed_length;
+    const uint8_t* rows_left = col.data(1);
+    const uint8_t* rows_right = rows.data(1);
+    constexpr uint32_t unroll = 8;
+    __m256i irow_left = _mm256_setr_epi32(0, 1, 2, 3, 4, 5, 6, 7);
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      if (use_selection) {
+        irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+      }
+      __m256i irow_right;
+      if (use_selection) {
+        irow_right = _mm256_i32gather_epi32((const int*)left_to_right_map, irow_left, 4);
+      } else {
+        irow_right =
+            _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
+      }
 
-  __m256i tail_mask =
-      _mm256_cmpgt_epi8(_mm256_set1_epi8(length - num_loops_less_one * 32),
-                        _mm256_setr_epi64x(kByteSequence0To7, kByteSequence8To15,
-                                           kByteSequence16To23, kByteSequence24To31));
+      __m256i offset_right =
+          _mm256_mullo_epi32(irow_right, _mm256_set1_epi32(fixed_length));
+      offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row));
 
-  for (uint32_t irow_left = 0; irow_left < num_rows; ++irow_left) {
-    uint32_t irow_right = left_to_right_map[irow_left];
-    uint32_t begin_left = length * irow_left;
-    uint32_t begin_right = length * irow_right;
-    const __m256i* key_left_ptr =
-        reinterpret_cast<const __m256i*>(rows_left + begin_left);
-    const __m256i* key_right_ptr =
-        reinterpret_cast<const __m256i*>(rows_right + begin_right);
-    __m256i result_or = _mm256_setzero_si256();
-    int32_t i;
-    // length cannot be zero
-    for (i = 0; i < num_loops_less_one; ++i) {
-      __m256i key_left = _mm256_loadu_si256(key_left_ptr + i);
-      __m256i key_right = _mm256_loadu_si256(key_right_ptr + i);
-      result_or = _mm256_or_si256(result_or, _mm256_xor_si256(key_left, key_right));
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] =
+          compare8_fn(rows_left, rows_right, i * unroll, irow_left, offset_right);
+
+      if (!use_selection) {
+        irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(8));
+      }
     }
+    return num_rows_to_compare - (num_rows_to_compare % unroll);
+  } else {
+    const uint8_t* rows_left = col.data(1);
+    const uint32_t* offsets_right = rows.offsets();
+    const uint8_t* rows_right = rows.data(2);
+    constexpr uint32_t unroll = 8;
+    __m256i irow_left = _mm256_setr_epi32(0, 1, 2, 3, 4, 5, 6, 7);
+    for (uint32_t i = 0; i < num_rows_to_compare / unroll; ++i) {
+      if (use_selection) {
+        irow_left = _mm256_cvtepu16_epi32(
+            _mm_loadu_si128(reinterpret_cast<const __m128i*>(sel_left_maybe_null) + i));
+      }
+      __m256i irow_right;
+      if (use_selection) {
+        irow_right = _mm256_i32gather_epi32((const int*)left_to_right_map, irow_left, 4);
+      } else {
+        irow_right =
+            _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_to_right_map) + i);
+      }
+      __m256i offset_right =
+          _mm256_i32gather_epi32((const int*)offsets_right, irow_right, 4);
+      offset_right = _mm256_add_epi32(offset_right, _mm256_set1_epi32(offset_within_row));
 
-    __m256i key_left = _mm256_loadu_si256(key_left_ptr + i);
-    __m256i key_right = _mm256_loadu_si256(key_right_ptr + i);
-    result_or = _mm256_or_si256(
-        result_or, _mm256_and_si256(tail_mask, _mm256_xor_si256(key_left, key_right)));
-    int result = _mm256_testz_si256(result_or, result_or) * 0xff;
-    match_bytevector[irow_left] &= result;
+      reinterpret_cast<uint64_t*>(match_bytevector)[i] =
+          compare8_fn(rows_left, rows_right, i * unroll, irow_left, offset_right);
+
+      if (!use_selection) {
+        irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(8));
+      }
+    }
+    return num_rows_to_compare - (num_rows_to_compare % unroll);
   }
+}
 
-  uint32_t num_rows_processed = num_rows;
-  return num_rows_processed;
+template <bool use_selection>
+uint32_t KeyCompare::CompareBinaryColumnToRowImp_avx2(
+    uint32_t offset_within_row, uint32_t num_rows_to_compare,
+    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+  uint32_t col_width = col.metadata().fixed_length;
+  if (col_width == 0) {
+    int bit_offset = col.bit_offset(1);
+    return CompareBinaryColumnToRowHelper_avx2<use_selection>(
+        offset_within_row, num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
+        ctx, col, rows, match_bytevector,
+        [bit_offset](const uint8_t* left_base, const uint8_t* right_base,
+                     uint32_t irow_left_base, __m256i irow_left, __m256i offset_right) {
+          __m256i left;
+          if (use_selection) {
+            irow_left = _mm256_add_epi32(irow_left, _mm256_set1_epi32(bit_offset));
+            left = _mm256_i32gather_epi32((const int*)left_base,
+                                          _mm256_srli_epi32(irow_left, 3), 1);
+            left = _mm256_and_si256(
+                _mm256_set1_epi32(1),
+                _mm256_srlv_epi32(left,
+                                  _mm256_and_si256(irow_left, _mm256_set1_epi32(7))));
+            left = _mm256_mullo_epi32(left, _mm256_set1_epi32(0xff));
+          } else {
+            __m256i bits = _mm256_setr_epi32(1, 2, 4, 8, 16, 32, 64, 128);
+            uint32_t start_bit_index = irow_left_base + bit_offset;
+            uint8_t left_bits_8 =
+                (reinterpret_cast<const uint16_t*>(left_base + start_bit_index / 8)[0] >>
+                 (start_bit_index % 8)) &
+                0xff;
+            left = _mm256_cmpeq_epi32(
+                _mm256_and_si256(bits, _mm256_set1_epi8(left_bits_8)), bits);
+            left = _mm256_and_si256(left, _mm256_set1_epi32(0xff));
+          }
+          __m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
+          right = _mm256_and_si256(right, _mm256_set1_epi32(0xff));
+          __m256i cmp = _mm256_cmpeq_epi32(left, right);
+          uint32_t result_lo =
+              _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(cmp)));
+          uint32_t result_hi = _mm256_movemask_epi8(
+              _mm256_cvtepi32_epi64(_mm256_extracti128_si256(cmp, 1)));
+          return result_lo | (static_cast<uint64_t>(result_hi) << 32);
+        });
+  } else if (col_width == 1) {
+    return CompareBinaryColumnToRowHelper_avx2<use_selection>(
+        offset_within_row, num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
+        ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left_base,
+           __m256i irow_left, __m256i offset_right) {
+          __m256i left;
+          if (use_selection) {
+            left = _mm256_i32gather_epi32((const int*)left_base, irow_left, 1);
+            left = _mm256_and_si256(left, _mm256_set1_epi32(0xff));
+          } else {
+            left = _mm256_cvtepu8_epi32(_mm_set1_epi64x(
+                reinterpret_cast<const uint64_t*>(left_base)[irow_left_base / 8]));
+          }
+          __m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
+          right = _mm256_and_si256(right, _mm256_set1_epi32(0xff));
+          __m256i cmp = _mm256_cmpeq_epi32(left, right);
+          uint32_t result_lo =
+              _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(cmp)));
+          uint32_t result_hi = _mm256_movemask_epi8(
+              _mm256_cvtepi32_epi64(_mm256_extracti128_si256(cmp, 1)));
+          return result_lo | (static_cast<uint64_t>(result_hi) << 32);
+        });
+  } else if (col_width == 2) {
+    return CompareBinaryColumnToRowHelper_avx2<use_selection>(
+        offset_within_row, num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
+        ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left_base,
+           __m256i irow_left, __m256i offset_right) {
+          __m256i left;
+          if (use_selection) {
+            left = _mm256_i32gather_epi32((const int*)left_base, irow_left, 2);
+            left = _mm256_and_si256(left, _mm256_set1_epi32(0xffff));
+          } else {
+            left = _mm256_cvtepu16_epi32(_mm_loadu_si128(
+                reinterpret_cast<const __m128i*>(left_base) + irow_left_base / 8));
+          }
+          __m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
+          right = _mm256_and_si256(right, _mm256_set1_epi32(0xffff));
+          __m256i cmp = _mm256_cmpeq_epi32(left, right);
+          uint32_t result_lo =
+              _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(cmp)));
+          uint32_t result_hi = _mm256_movemask_epi8(
+              _mm256_cvtepi32_epi64(_mm256_extracti128_si256(cmp, 1)));
+          return result_lo | (static_cast<uint64_t>(result_hi) << 32);
+        });
+  } else if (col_width == 4) {
+    return CompareBinaryColumnToRowHelper_avx2<use_selection>(
+        offset_within_row, num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
+        ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left_base,
+           __m256i irow_left, __m256i offset_right) {
+          __m256i left;
+          if (use_selection) {
+            left = _mm256_i32gather_epi32((const int*)left_base, irow_left, 4);
+          } else {
+            left = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_base) +
+                                      irow_left_base / 8);
+          }
+          __m256i right = _mm256_i32gather_epi32((const int*)right_base, offset_right, 1);
+          __m256i cmp = _mm256_cmpeq_epi32(left, right);
+          uint32_t result_lo =
+              _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(cmp)));
+          uint32_t result_hi = _mm256_movemask_epi8(
+              _mm256_cvtepi32_epi64(_mm256_extracti128_si256(cmp, 1)));
+          return result_lo | (static_cast<uint64_t>(result_hi) << 32);
+        });
+  } else if (col_width == 8) {
+    return CompareBinaryColumnToRowHelper_avx2<use_selection>(
+        offset_within_row, num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
+        ctx, col, rows, match_bytevector,
+        [](const uint8_t* left_base, const uint8_t* right_base, uint32_t irow_left_base,
+           __m256i irow_left, __m256i offset_right) {
+          auto left_base_i64 =
+              reinterpret_cast<const arrow::util::int64_for_gather_t*>(left_base);
+          __m256i left_lo =
+              _mm256_i32gather_epi64(left_base_i64, _mm256_castsi256_si128(irow_left), 8);
+          __m256i left_hi = _mm256_i32gather_epi64(
+              left_base_i64, _mm256_extracti128_si256(irow_left, 1), 8);
+          if (use_selection) {
+            left_lo = _mm256_i32gather_epi64(left_base_i64,
+                                             _mm256_castsi256_si128(irow_left), 8);
+            left_hi = _mm256_i32gather_epi64(left_base_i64,
+                                             _mm256_extracti128_si256(irow_left, 1), 8);
+          } else {
+            left_lo = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_base) +
+                                         irow_left_base / 4);
+            left_hi = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(left_base) +
+                                         irow_left_base / 4 + 1);
+          }
+          auto right_base_i64 =
+              reinterpret_cast<const arrow::util::int64_for_gather_t*>(right_base);
+          __m256i right_lo = _mm256_i32gather_epi64(
+              right_base_i64, _mm256_castsi256_si128(offset_right), 1);
+          __m256i right_hi = _mm256_i32gather_epi64(
+              right_base_i64, _mm256_extracti128_si256(offset_right, 1), 1);
+          uint32_t result_lo =
+              _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_lo, right_lo));
+          uint32_t result_hi =
+              _mm256_movemask_epi8(_mm256_cmpeq_epi64(left_hi, right_hi));
+          return result_lo | (static_cast<uint64_t>(result_hi) << 32);
+        });
+  } else {
+    return CompareBinaryColumnToRowHelper_avx2<use_selection>(
+        offset_within_row, num_rows_to_compare, sel_left_maybe_null, left_to_right_map,
+        ctx, col, rows, match_bytevector,
+        [&col](const uint8_t* left_base, const uint8_t* right_base,
+               uint32_t irow_left_base, __m256i irow_left, __m256i offset_right) {
+          uint32_t irow_left_array[8];
+          uint32_t offset_right_array[8];
+          if (use_selection) {
+            _mm256_storeu_si256(reinterpret_cast<__m256i*>(irow_left_array), irow_left);
+          }
+          _mm256_storeu_si256(reinterpret_cast<__m256i*>(offset_right_array),
+                              offset_right);
+          uint32_t length = col.metadata().fixed_length;
+
+          // Non-zero length guarantees no underflow
+          int32_t num_loops_less_one = (static_cast<int32_t>(length) + 31) / 32 - 1;
+
+          __m256i tail_mask = _mm256_cmpgt_epi8(
+              _mm256_set1_epi8(length - num_loops_less_one * 32),
+              _mm256_setr_epi64x(0x0706050403020100ULL, 0x0f0e0d0c0b0a0908ULL,
+                                 0x1716151413121110ULL, 0x1f1e1d1c1b1a1918ULL));
+
+          uint64_t result = 0;
+          for (uint32_t irow = 0; irow < 8; ++irow) {
+            const __m256i* key_left_ptr = reinterpret_cast<const __m256i*>(
+                left_base +
+                (use_selection ? irow_left_array[irow] : irow_left_base + irow) * length);
+            const __m256i* key_right_ptr =
+                reinterpret_cast<const __m256i*>(right_base + offset_right_array[irow]);
+            __m256i result_or = _mm256_setzero_si256();
+            int32_t i;
+            // length cannot be zero
+            for (i = 0; i < num_loops_less_one; ++i) {
+              __m256i key_left = _mm256_loadu_si256(key_left_ptr + i);
+              __m256i key_right = _mm256_loadu_si256(key_right_ptr + i);
+              result_or =
+                  _mm256_or_si256(result_or, _mm256_xor_si256(key_left, key_right));
+            }
+            __m256i key_left = _mm256_loadu_si256(key_left_ptr + i);
+            __m256i key_right = _mm256_loadu_si256(key_right_ptr + i);
+            result_or = _mm256_or_si256(
+                result_or,
+                _mm256_and_si256(tail_mask, _mm256_xor_si256(key_left, key_right)));
+            uint64_t result_single = _mm256_testz_si256(result_or, result_or) * 0xff;
+            result |= result_single << (8 * irow);
+          }
+          return result;
+        });
+  }
 }
 
-void KeyCompare::CompareVaryingLength_avx2(
-    uint32_t num_rows, const uint32_t* left_to_right_map, uint8_t* match_bytevector,
-    const uint8_t* rows_left, const uint8_t* rows_right, const uint32_t* offsets_left,
-    const uint32_t* offsets_right) {
-  for (uint32_t irow_left = 0; irow_left < num_rows; ++irow_left) {
+// Overwrites the match_bytevector instead of updating it
+template <bool use_selection, bool is_first_varbinary_col>
+void KeyCompare::CompareVarBinaryColumnToRowImp_avx2(
+    uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
+    const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
+    KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+    const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+  const uint32_t* offsets_left = col.offsets();
+  const uint32_t* offsets_right = rows.offsets();
+  const uint8_t* rows_left = col.data(2);
+  const uint8_t* rows_right = rows.data(2);
+  for (uint32_t i = 0; i < num_rows_to_compare; ++i) {
+    uint32_t irow_left = use_selection ? sel_left_maybe_null[i] : i;
     uint32_t irow_right = left_to_right_map[irow_left];
     uint32_t begin_left = offsets_left[irow_left];
-    uint32_t begin_right = offsets_right[irow_right];
     uint32_t length_left = offsets_left[irow_left + 1] - begin_left;
-    uint32_t length_right = offsets_right[irow_right + 1] - begin_right;
-    uint32_t length = std::min(length_left, length_right);
-    auto key_left_ptr = reinterpret_cast<const __m256i*>(rows_left + begin_left);
-    auto key_right_ptr = reinterpret_cast<const __m256i*>(rows_right + begin_right);
-    __m256i result_or = _mm256_setzero_si256();
-    int32_t i;
-    // length can be zero
-    for (i = 0; i < (static_cast<int32_t>(length) + 31) / 32 - 1; ++i) {
-      __m256i key_left = _mm256_loadu_si256(key_left_ptr + i);
-      __m256i key_right = _mm256_loadu_si256(key_right_ptr + i);
-      result_or = _mm256_or_si256(result_or, _mm256_xor_si256(key_left, key_right));
+    uint32_t begin_right = offsets_right[irow_right];
+    uint32_t length_right;
+    uint32_t offset_within_row;
+    if (!is_first_varbinary_col) {
+      rows.metadata().nth_varbinary_offset_and_length(
+          rows_right + begin_right, id_varbinary_col, &offset_within_row, &length_right);
+    } else {
+      rows.metadata().first_varbinary_offset_and_length(
+          rows_right + begin_right, &offset_within_row, &length_right);
     }
+    begin_right += offset_within_row;
 
-    constexpr uint64_t kByteSequence0To7 = 0x0706050403020100ULL;
-    constexpr uint64_t kByteSequence8To15 = 0x0f0e0d0c0b0a0908ULL;
-    constexpr uint64_t kByteSequence16To23 = 0x1716151413121110ULL;
-    constexpr uint64_t kByteSequence24To31 = 0x1f1e1d1c1b1a1918ULL;
+    __m256i result_or = _mm256_setzero_si256();
+    uint32_t length = std::min(length_left, length_right);
+    if (length > 0) {
+      const __m256i* key_left_ptr =
+          reinterpret_cast<const __m256i*>(rows_left + begin_left);
+      const __m256i* key_right_ptr =
+          reinterpret_cast<const __m256i*>(rows_right + begin_right);
+      int32_t j;
+      // length can be zero
+      for (j = 0; j < (static_cast<int32_t>(length) + 31) / 32 - 1; ++j) {
+        __m256i key_left = _mm256_loadu_si256(key_left_ptr + j);
+        __m256i key_right = _mm256_loadu_si256(key_right_ptr + j);
+        result_or = _mm256_or_si256(result_or, _mm256_xor_si256(key_left, key_right));
+      }
 
-    __m256i tail_mask =
-        _mm256_cmpgt_epi8(_mm256_set1_epi8(length - i * 32),
-                          _mm256_setr_epi64x(kByteSequence0To7, kByteSequence8To15,
-                                             kByteSequence16To23, kByteSequence24To31));
+      __m256i tail_mask = _mm256_cmpgt_epi8(
+          _mm256_set1_epi8(length - j * 32),
+          _mm256_setr_epi64x(0x0706050403020100ULL, 0x0f0e0d0c0b0a0908ULL,

Review comment:
       why remove these constants?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org