You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by bk...@apache.org on 2020/11/23 16:45:21 UTC

[arrow] branch master updated: ARROW-10669: [C++][Compute] Support scalar arguments to Boolean compute functions

This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a8c89e3  ARROW-10669: [C++][Compute] Support scalar arguments to Boolean compute functions
a8c89e3 is described below

commit a8c89e3566c32f6bb21b7914d0f538c2218d8004
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Mon Nov 23 11:44:43 2020 -0500

    ARROW-10669: [C++][Compute] Support scalar arguments to Boolean compute functions
    
    - "and", "or", "xor", "and_kleene", and "or_kleene" gain support for scalar arguments.
    - Added new functions "and_not" and "and_not_kleene"
    - repaired and simplified some null propagation logic
    
    Closes #8728 from bkietz/10669-Support-Scalar-inputs-to-
    
    Authored-by: Benjamin Kietzman <be...@gmail.com>
    Signed-off-by: Benjamin Kietzman <be...@gmail.com>
---
 cpp/src/arrow/compute/api_scalar.cc                |   2 +
 cpp/src/arrow/compute/api_scalar.h                 |  47 +++-
 cpp/src/arrow/compute/exec.cc                      | 181 ++++++------
 cpp/src/arrow/compute/kernels/codegen_internal.h   |  26 +-
 cpp/src/arrow/compute/kernels/scalar_boolean.cc    | 309 +++++++++++++++++++--
 .../arrow/compute/kernels/scalar_boolean_test.cc   | 215 +++++---------
 cpp/src/arrow/compute/kernels/test_util.cc         |  21 +-
 cpp/src/arrow/util/bit_util_test.cc                |  27 ++
 cpp/src/arrow/util/bitmap.cc                       |  10 +
 cpp/src/arrow/util/bitmap.h                        |   7 +
 cpp/src/arrow/util/bitmap_ops.cc                   |  19 ++
 cpp/src/arrow/util/bitmap_ops.h                    |  19 ++
 cpp/src/arrow/util/vector.h                        |  19 +-
 docs/source/cpp/compute.rst                        |   4 +
 14 files changed, 625 insertions(+), 281 deletions(-)

diff --git a/cpp/src/arrow/compute/api_scalar.cc b/cpp/src/arrow/compute/api_scalar.cc
index 353151e..c62281b 100644
--- a/cpp/src/arrow/compute/api_scalar.cc
+++ b/cpp/src/arrow/compute/api_scalar.cc
@@ -92,6 +92,8 @@ SCALAR_EAGER_BINARY(KleeneAnd, "and_kleene")
 SCALAR_EAGER_BINARY(Or, "or")
 SCALAR_EAGER_BINARY(KleeneOr, "or_kleene")
 SCALAR_EAGER_BINARY(Xor, "xor")
+SCALAR_EAGER_BINARY(AndNot, "and_not")
+SCALAR_EAGER_BINARY(KleeneAndNot, "and_not_kleene")
 
 // ----------------------------------------------------------------------
 
diff --git a/cpp/src/arrow/compute/api_scalar.h b/cpp/src/arrow/compute/api_scalar.h
index 62d52d2..789ac90 100644
--- a/cpp/src/arrow/compute/api_scalar.h
+++ b/cpp/src/arrow/compute/api_scalar.h
@@ -192,8 +192,8 @@ Result<Datum> Invert(const Datum& value, ExecContext* ctx = NULLPTR);
 /// \brief Element-wise AND of two boolean datums which always propagates nulls
 /// (null and false is null).
 ///
-/// \param[in] left left operand (array)
-/// \param[in] right right operand (array)
+/// \param[in] left left operand
+/// \param[in] right right operand
 /// \param[in] ctx the function execution context, optional
 /// \return the resulting datum
 ///
@@ -205,8 +205,8 @@ Result<Datum> And(const Datum& left, const Datum& right, ExecContext* ctx = NULL
 /// \brief Element-wise AND of two boolean datums with a Kleene truth table
 /// (null and false is false).
 ///
-/// \param[in] left left operand (array)
-/// \param[in] right right operand (array)
+/// \param[in] left left operand
+/// \param[in] right right operand
 /// \param[in] ctx the function execution context, optional
 /// \return the resulting datum
 ///
@@ -219,8 +219,8 @@ Result<Datum> KleeneAnd(const Datum& left, const Datum& right,
 /// \brief Element-wise OR of two boolean datums which always propagates nulls
 /// (null and true is null).
 ///
-/// \param[in] left left operand (array)
-/// \param[in] right right operand (array)
+/// \param[in] left left operand
+/// \param[in] right right operand
 /// \param[in] ctx the function execution context, optional
 /// \return the resulting datum
 ///
@@ -232,8 +232,8 @@ Result<Datum> Or(const Datum& left, const Datum& right, ExecContext* ctx = NULLP
 /// \brief Element-wise OR of two boolean datums with a Kleene truth table
 /// (null or true is true).
 ///
-/// \param[in] left left operand (array)
-/// \param[in] right right operand (array)
+/// \param[in] left left operand
+/// \param[in] right right operand
 /// \param[in] ctx the function execution context, optional
 /// \return the resulting datum
 ///
@@ -243,8 +243,8 @@ ARROW_EXPORT
 Result<Datum> KleeneOr(const Datum& left, const Datum& right, ExecContext* ctx = NULLPTR);
 
 /// \brief Element-wise XOR of two boolean datums
-/// \param[in] left left operand (array)
-/// \param[in] right right operand (array)
+/// \param[in] left left operand
+/// \param[in] right right operand
 /// \param[in] ctx the function execution context, optional
 /// \return the resulting datum
 ///
@@ -253,6 +253,33 @@ Result<Datum> KleeneOr(const Datum& left, const Datum& right, ExecContext* ctx =
 ARROW_EXPORT
 Result<Datum> Xor(const Datum& left, const Datum& right, ExecContext* ctx = NULLPTR);
 
+/// \brief Element-wise AND NOT of two boolean datums which always propagates nulls
+/// (null and not true is null).
+///
+/// \param[in] left left operand
+/// \param[in] right right operand
+/// \param[in] ctx the function execution context, optional
+/// \return the resulting datum
+///
+/// \since 3.0.0
+/// \note API not yet finalized
+ARROW_EXPORT
+Result<Datum> AndNot(const Datum& left, const Datum& right, ExecContext* ctx = NULLPTR);
+
+/// \brief Element-wise AND NOT of two boolean datums with a Kleene truth table
+/// (false and not null is false, null and not true is false).
+///
+/// \param[in] left left operand
+/// \param[in] right right operand
+/// \param[in] ctx the function execution context, optional
+/// \return the resulting datum
+///
+/// \since 3.0.0
+/// \note API not yet finalized
+ARROW_EXPORT
+Result<Datum> KleeneAndNot(const Datum& left, const Datum& right,
+                           ExecContext* ctx = NULLPTR);
+
 /// \brief IsIn returns true for each element of `values` that is contained in
 /// `value_set`
 ///
diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc
index dd97119..790ae9b 100644
--- a/cpp/src/arrow/compute/exec.cc
+++ b/cpp/src/arrow/compute/exec.cc
@@ -46,6 +46,7 @@
 #include "arrow/util/cpu_info.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/make_unique.h"
+#include "arrow/util/vector.h"
 
 namespace arrow {
 
@@ -213,20 +214,33 @@ bool ExecBatchIterator::Next(ExecBatch* batch) {
 
 namespace {
 
-bool ArrayHasNulls(const ArrayData& data) {
-  // As discovered in ARROW-8863 (and not only for that reason)
-  // ArrayData::null_count can -1 even when buffers[0] is nullptr. So we check
-  // for both cases (nullptr means no nulls, or null_count already computed)
-  if (data.type->id() == Type::NA) {
-    return true;
-  } else if (data.buffers[0] == nullptr) {
-    return false;
-  } else {
+struct NullGeneralization {
+  enum type { PERHAPS_NULL, ALL_VALID, ALL_NULL };
+
+  static type Get(const Datum& datum) {
+    if (datum.type()->id() == Type::NA) {
+      return ALL_NULL;
+    }
+
+    if (datum.is_scalar()) {
+      return datum.scalar()->is_valid ? ALL_VALID : ALL_NULL;
+    }
+
+    const auto& arr = *datum.array();
+
     // Do not count the bits if they haven't been counted already
-    const int64_t known_null_count = data.null_count.load();
-    return known_null_count == kUnknownNullCount || known_null_count > 0;
+    const int64_t known_null_count = arr.null_count.load();
+    if (known_null_count == 0) {
+      return ALL_VALID;
+    }
+
+    if (known_null_count == arr.length) {
+      return ALL_NULL;
+    }
+
+    return PERHAPS_NULL;
   }
-}
+};
 
 // Null propagation implementation that deals both with preallocated bitmaps
 // and maybe-to-be allocated bitmaps
@@ -243,15 +257,16 @@ class NullPropagator {
  public:
   NullPropagator(KernelContext* ctx, const ExecBatch& batch, ArrayData* output)
       : ctx_(ctx), batch_(batch), output_(output) {
-    // At this point, the values in batch_.values must have been validated to
-    // all be value-like
-    for (const Datum& val : batch_.values) {
-      if (val.kind() == Datum::ARRAY) {
-        if (ArrayHasNulls(*val.array())) {
-          values_with_nulls_.push_back(&val);
-        }
-      } else if (!val.scalar()->is_valid) {
-        values_with_nulls_.push_back(&val);
+    for (const Datum& datum : batch_.values) {
+      auto null_generalization = NullGeneralization::Get(datum);
+
+      if (null_generalization == NullGeneralization::ALL_NULL) {
+        is_all_null_ = true;
+      }
+
+      if (null_generalization != NullGeneralization::ALL_VALID &&
+          datum.kind() == Datum::ARRAY) {
+        arrays_with_nulls_.push_back(datum.array().get());
       }
     }
 
@@ -272,52 +287,33 @@ class NullPropagator {
     return Status::OK();
   }
 
-  Result<bool> ShortCircuitIfAllNull() {
-    // An all-null value (scalar null or all-null array) gives us a short
-    // circuit opportunity
-    bool is_all_null = false;
-    std::shared_ptr<Buffer> all_null_bitmap;
+  Status AllNullShortCircuit() {
+    // OK, the output should be all null
+    output_->null_count = output_->length;
+
+    if (bitmap_preallocated_) {
+      BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
+      return Status::OK();
+    }
 
     // Walk all the values with nulls instead of breaking on the first in case
     // we find a bitmap that can be reused in the non-preallocated case
-    for (const Datum* value : values_with_nulls_) {
-      if (value->type()->id() == Type::NA) {
-        // No bitmap
-        is_all_null = true;
-      } else if (value->kind() == Datum::ARRAY) {
-        const ArrayData& arr = *value->array();
-        if (arr.null_count.load() == arr.length) {
-          // Pluck the all null bitmap so we can set it in the output if it was
-          // not pre-allocated
-          all_null_bitmap = arr.buffers[0];
-          is_all_null = true;
-        }
-      } else {
-        // Scalar
-        is_all_null = !value->scalar()->is_valid;
+    for (const ArrayData* arr : arrays_with_nulls_) {
+      if (arr->null_count.load() == arr->length && arr->buffers[0] != nullptr) {
+        // Reuse this all null bitmap
+        output_->buffers[0] = arr->buffers[0];
+        return Status::OK();
       }
     }
-    if (!is_all_null) {
-      return false;
-    }
 
-    // OK, the output should be all null
-    output_->null_count = output_->length;
-
-    if (!bitmap_preallocated_ && all_null_bitmap) {
-      // If we did not pre-allocate memory, and we observed an all-null bitmap,
-      // then we can zero-copy it into the output
-      output_->buffers[0] = std::move(all_null_bitmap);
-    } else {
-      RETURN_NOT_OK(EnsureAllocated());
-      BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
-    }
-    return true;
+    RETURN_NOT_OK(EnsureAllocated());
+    BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
+    return Status::OK();
   }
 
   Status PropagateSingle() {
     // One array
-    const ArrayData& arr = *values_with_nulls_[0]->array();
+    const ArrayData& arr = *arrays_with_nulls_[0];
     const std::shared_ptr<Buffer>& arr_bitmap = arr.buffers[0];
 
     // Reuse the null count if it's known
@@ -325,26 +321,27 @@ class NullPropagator {
 
     if (bitmap_preallocated_) {
       CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_, output_->offset);
+      return Status::OK();
+    }
+
+    // Two cases when memory was not pre-allocated:
+    //
+    // * Offset is zero: we reuse the bitmap as is
+    // * Offset is nonzero but a multiple of 8: we can slice the bitmap
+    // * Offset is not a multiple of 8: we must allocate and use CopyBitmap
+    //
+    // Keep in mind that output_->offset is not permitted to be nonzero when
+    // the bitmap is not preallocated, and that precondition is asserted
+    // higher in the call stack.
+    if (arr.offset == 0) {
+      output_->buffers[0] = arr_bitmap;
+    } else if (arr.offset % 8 == 0) {
+      output_->buffers[0] =
+          SliceBuffer(arr_bitmap, arr.offset / 8, BitUtil::BytesForBits(arr.length));
     } else {
-      // Two cases when memory was not pre-allocated:
-      //
-      // * Offset is zero: we reuse the bitmap as is
-      // * Offset is nonzero but a multiple of 8: we can slice the bitmap
-      // * Offset is not a multiple of 8: we must allocate and use CopyBitmap
-      //
-      // Keep in mind that output_->offset is not permitted to be nonzero when
-      // the bitmap is not preallocated, and that precondition is asserted
-      // higher in the call stack.
-      if (arr.offset == 0) {
-        output_->buffers[0] = arr_bitmap;
-      } else if (arr.offset % 8 == 0) {
-        output_->buffers[0] =
-            SliceBuffer(arr_bitmap, arr.offset / 8, BitUtil::BytesForBits(arr.length));
-      } else {
-        RETURN_NOT_OK(EnsureAllocated());
-        CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_,
-                   /*dst_offset=*/0);
-      }
+      RETURN_NOT_OK(EnsureAllocated());
+      CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_,
+                 /*dst_offset=*/0);
     }
     return Status::OK();
   }
@@ -356,7 +353,6 @@ class NullPropagator {
     RETURN_NOT_OK(EnsureAllocated());
 
     auto Accumulate = [&](const ArrayData& left, const ArrayData& right) {
-      // This is a precondition of reaching this code path
       DCHECK(left.buffers[0]);
       DCHECK(right.buffers[0]);
       BitmapAnd(left.buffers[0]->data(), left.offset, right.buffers[0]->data(),
@@ -364,27 +360,27 @@ class NullPropagator {
                 output_->buffers[0]->mutable_data());
     };
 
-    DCHECK_GT(values_with_nulls_.size(), 1);
+    DCHECK_GT(arrays_with_nulls_.size(), 1);
 
     // Seed the output bitmap with the & of the first two bitmaps
-    Accumulate(*values_with_nulls_[0]->array(), *values_with_nulls_[1]->array());
+    Accumulate(*arrays_with_nulls_[0], *arrays_with_nulls_[1]);
 
     // Accumulate the rest
-    for (size_t i = 2; i < values_with_nulls_.size(); ++i) {
-      Accumulate(*output_, *values_with_nulls_[i]->array());
+    for (size_t i = 2; i < arrays_with_nulls_.size(); ++i) {
+      Accumulate(*output_, *arrays_with_nulls_[i]);
     }
     return Status::OK();
   }
 
   Status Execute() {
-    bool finished = false;
-    ARROW_ASSIGN_OR_RAISE(finished, ShortCircuitIfAllNull());
-    if (finished) {
-      return Status::OK();
+    if (is_all_null_) {
+      // An all-null value (scalar null or all-null array) gives us a short
+      // circuit opportunity
+      return AllNullShortCircuit();
     }
 
     // At this point, by construction we know that all of the values in
-    // values_with_nulls_ are arrays that are not all null. So there are a
+    // arrays_with_nulls_ are arrays that are not all null. So there are a
     // few cases:
     //
     // * No arrays. This is a no-op w/o preallocation but when the bitmap is
@@ -399,24 +395,27 @@ class NullPropagator {
 
     output_->null_count = kUnknownNullCount;
 
-    if (values_with_nulls_.size() == 0) {
+    if (arrays_with_nulls_.empty()) {
       // No arrays with nulls case
       output_->null_count = 0;
       if (bitmap_preallocated_) {
         BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, true);
       }
       return Status::OK();
-    } else if (values_with_nulls_.size() == 1) {
+    }
+
+    if (arrays_with_nulls_.size() == 1) {
       return PropagateSingle();
-    } else {
-      return PropagateMultiple();
     }
+
+    return PropagateMultiple();
   }
 
  private:
   KernelContext* ctx_;
   const ExecBatch& batch_;
-  std::vector<const Datum*> values_with_nulls_;
+  std::vector<const ArrayData*> arrays_with_nulls_;
+  bool is_all_null_ = false;
   ArrayData* output_;
   uint8_t* bitmap_;
   bool bitmap_preallocated_ = false;
diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h
index 9051576..2f00a23 100644
--- a/cpp/src/arrow/compute/kernels/codegen_internal.h
+++ b/cpp/src/arrow/compute/kernels/codegen_internal.h
@@ -427,12 +427,28 @@ void SimpleUnary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
 //
 // static void Call(KernelContext*, const ArrayData& arg0, const ArrayData& arg1,
 //                  ArrayData* out)
+// static void Call(KernelContext*, const ArrayData& arg0, const Scalar& arg1,
+//                  ArrayData* out)
+// static void Call(KernelContext*, const Scalar& arg0, const ArrayData& arg1,
+//                  ArrayData* out)
+// static void Call(KernelContext*, const Scalar& arg0, const Scalar& arg1,
+//                  Scalar* out)
 template <typename Operator>
 void SimpleBinary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
-  if (batch[0].kind() == Datum::SCALAR || batch[1].kind() == Datum::SCALAR) {
-    ctx->SetStatus(Status::NotImplemented("NYI"));
-  } else if (batch.length > 0) {
-    Operator::Call(ctx, *batch[0].array(), *batch[1].array(), out->mutable_array());
+  if (batch.length == 0) return;
+
+  if (batch[0].kind() == Datum::ARRAY) {
+    if (batch[1].kind() == Datum::ARRAY) {
+      Operator::Call(ctx, *batch[0].array(), *batch[1].array(), out->mutable_array());
+    } else {
+      Operator::Call(ctx, *batch[0].array(), *batch[1].scalar(), out->mutable_array());
+    }
+  } else {
+    if (batch[1].kind() == Datum::ARRAY) {
+      Operator::Call(ctx, *batch[0].scalar(), *batch[1].array(), out->mutable_array());
+    } else {
+      Operator::Call(ctx, *batch[0].scalar(), *batch[1].scalar(), out->scalar().get());
+    }
   }
 }
 
@@ -631,8 +647,6 @@ struct ScalarUnaryNotNullStateful {
       Arg0Value arg0_val = UnboxScalar<Arg0Type>::Unbox(arg0);
       BoxScalar<OutType>::Box(this->op.template Call<OutValue, Arg0Value>(ctx, arg0_val),
                               out->scalar().get());
-    } else {
-      out->value = MakeNullScalar(arg0.type);
     }
   }
 
diff --git a/cpp/src/arrow/compute/kernels/scalar_boolean.cc b/cpp/src/arrow/compute/kernels/scalar_boolean.cc
index 48acf31..009b968 100644
--- a/cpp/src/arrow/compute/kernels/scalar_boolean.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_boolean.cc
@@ -35,7 +35,8 @@ enum BitmapIndex { LEFT_VALID, LEFT_DATA, RIGHT_VALID, RIGHT_DATA };
 template <typename ComputeWord>
 void ComputeKleene(ComputeWord&& compute_word, KernelContext* ctx, const ArrayData& left,
                    const ArrayData& right, ArrayData* out) {
-  DCHECK(left.null_count != 0 || right.null_count != 0);
+  DCHECK(left.null_count != 0 || right.null_count != 0)
+      << "ComputeKleene is unnecessarily expensive for the non-null case";
 
   Bitmap bitmaps[4];
   bitmaps[LEFT_VALID] = {left.buffers[0], left.offset, left.length};
@@ -61,40 +62,78 @@ void ComputeKleene(ComputeWord&& compute_word, KernelContext* ctx, const ArrayDa
     ++i;
   };
 
-  if (right.null_count == 0 || left.null_count == 0) {
-    if (left.null_count == 0) {
-      // ensure only bitmaps[RIGHT_VALID].buffer might be null
-      std::swap(bitmaps[LEFT_VALID], bitmaps[RIGHT_VALID]);
-      std::swap(bitmaps[LEFT_DATA], bitmaps[RIGHT_DATA]);
-    }
-    // override bitmaps[RIGHT_VALID] to make it safe for Visit()
+  if (right.null_count == 0) {
+    // bitmaps[RIGHT_VALID] might be null; override to make it safe for Visit()
     bitmaps[RIGHT_VALID] = bitmaps[RIGHT_DATA];
-
     Bitmap::VisitWords(bitmaps, [&](std::array<uint64_t, 4> words) {
       apply(words[LEFT_VALID], words[LEFT_DATA], ~uint64_t(0), words[RIGHT_DATA]);
     });
-  } else {
+    return;
+  }
+
+  if (left.null_count == 0) {
+    // bitmaps[LEFT_VALID] might be null; override to make it safe for Visit()
+    bitmaps[LEFT_VALID] = bitmaps[LEFT_DATA];
     Bitmap::VisitWords(bitmaps, [&](std::array<uint64_t, 4> words) {
-      apply(words[LEFT_VALID], words[LEFT_DATA], words[RIGHT_VALID], words[RIGHT_DATA]);
+      apply(~uint64_t(0), words[LEFT_DATA], words[RIGHT_VALID], words[RIGHT_DATA]);
     });
+    return;
   }
+
+  DCHECK(left.null_count != 0 && right.null_count != 0);
+  Bitmap::VisitWords(bitmaps, [&](std::array<uint64_t, 4> words) {
+    apply(words[LEFT_VALID], words[LEFT_DATA], words[RIGHT_VALID], words[RIGHT_DATA]);
+  });
+}
+
+inline BooleanScalar InvertScalar(const Scalar& in) {
+  return in.is_valid ? BooleanScalar(!checked_cast<const BooleanScalar&>(in).value)
+                     : BooleanScalar();
+}
+
+inline Bitmap GetBitmap(const ArrayData& arr, int index) {
+  return Bitmap{arr.buffers[index], arr.offset, arr.length};
 }
 
 struct Invert {
   static void Call(KernelContext* ctx, const Scalar& in, Scalar* out) {
-    if (in.is_valid) {
-      checked_cast<BooleanScalar*>(out)->value =
-          !checked_cast<const BooleanScalar&>(in).value;
-    }
+    *checked_cast<BooleanScalar*>(out) = InvertScalar(in);
   }
 
   static void Call(KernelContext* ctx, const ArrayData& in, ArrayData* out) {
-    ::arrow::internal::InvertBitmap(in.buffers[1]->data(), in.offset, in.length,
-                                    out->buffers[1]->mutable_data(), out->offset);
+    GetBitmap(*out, 1).CopyFromInverted(GetBitmap(in, 1));
+  }
+};
+
+template <typename Op>
+struct Commutative {
+  static void Call(KernelContext* ctx, const Scalar& left, const ArrayData& right,
+                   ArrayData* out) {
+    Op::Call(ctx, right, left, out);
   }
 };
 
-struct And {
+struct And : Commutative<And> {
+  using Commutative<And>::Call;
+
+  static void Call(KernelContext* ctx, const Scalar& left, const Scalar& right,
+                   Scalar* out) {
+    if (left.is_valid && right.is_valid) {
+      checked_cast<BooleanScalar*>(out)->value =
+          checked_cast<const BooleanScalar&>(left).value &&
+          checked_cast<const BooleanScalar&>(right).value;
+    }
+  }
+
+  static void Call(KernelContext* ctx, const ArrayData& left, const Scalar& right,
+                   ArrayData* out) {
+    if (!right.is_valid) return;  // all null case
+
+    return checked_cast<const BooleanScalar&>(right).value
+               ? GetBitmap(*out, 1).CopyFrom(GetBitmap(left, 1))
+               : GetBitmap(*out, 1).SetBitsTo(false);
+  }
+
   static void Call(KernelContext* ctx, const ArrayData& left, const ArrayData& right,
                    ArrayData* out) {
     ::arrow::internal::BitmapAnd(left.buffers[1]->data(), left.offset,
@@ -103,11 +142,48 @@ struct And {
   }
 };
 
-struct KleeneAnd {
+struct KleeneAnd : Commutative<KleeneAnd> {
+  using Commutative<KleeneAnd>::Call;
+
+  static void Call(KernelContext* ctx, const Scalar& left, const Scalar& right,
+                   Scalar* out) {
+    bool left_true = left.is_valid && checked_cast<const BooleanScalar&>(left).value;
+    bool left_false = left.is_valid && !checked_cast<const BooleanScalar&>(left).value;
+
+    bool right_true = right.is_valid && checked_cast<const BooleanScalar&>(right).value;
+    bool right_false = right.is_valid && !checked_cast<const BooleanScalar&>(right).value;
+
+    checked_cast<BooleanScalar*>(out)->value = left_true && right_true;
+    out->is_valid = left_false || right_false || (left_true && right_true);
+  }
+
+  static void Call(KernelContext* ctx, const ArrayData& left, const Scalar& right,
+                   ArrayData* out) {
+    bool right_true = right.is_valid && checked_cast<const BooleanScalar&>(right).value;
+    bool right_false = right.is_valid && !checked_cast<const BooleanScalar&>(right).value;
+
+    if (right_false) {
+      return GetBitmap(*out, 0).SetBitsTo(true),
+             GetBitmap(*out, 1).SetBitsTo(false);  // all false case
+    }
+
+    if (right_true) {
+      return GetBitmap(*out, 0).CopyFrom(GetBitmap(left, 0)),
+             GetBitmap(*out, 1).CopyFrom(GetBitmap(left, 1));
+    }
+
+    // scalar was null: out[i] is valid iff left[i] was false
+    ::arrow::internal::BitmapAndNot(left.buffers[0]->data(), left.offset,
+                                    left.buffers[1]->data(), left.offset, left.length,
+                                    out->offset, out->buffers[0]->mutable_data());
+    ::arrow::internal::CopyBitmap(left.buffers[1]->data(), left.offset, left.length,
+                                  out->buffers[1]->mutable_data(), out->offset);
+  }
+
   static void Call(KernelContext* ctx, const ArrayData& left, const ArrayData& right,
                    ArrayData* out) {
     if (left.GetNullCount() == 0 && right.GetNullCount() == 0) {
-      BitUtil::SetBitsTo(out->buffers[0]->mutable_data(), out->offset, out->length, true);
+      GetBitmap(*out, 0).SetBitsTo(true);
       return And::Call(ctx, left, right, out);
     }
     auto compute_word = [](uint64_t left_true, uint64_t left_false, uint64_t right_true,
@@ -120,7 +196,27 @@ struct KleeneAnd {
   }
 };
 
-struct Or {
+struct Or : Commutative<Or> {
+  using Commutative<Or>::Call;
+
+  static void Call(KernelContext* ctx, const Scalar& left, const Scalar& right,
+                   Scalar* out) {
+    if (left.is_valid && right.is_valid) {
+      checked_cast<BooleanScalar*>(out)->value =
+          checked_cast<const BooleanScalar&>(left).value ||
+          checked_cast<const BooleanScalar&>(right).value;
+    }
+  }
+
+  static void Call(KernelContext* ctx, const ArrayData& left, const Scalar& right,
+                   ArrayData* out) {
+    if (!right.is_valid) return;  // all null case
+
+    return checked_cast<const BooleanScalar&>(right).value
+               ? GetBitmap(*out, 1).SetBitsTo(true)
+               : GetBitmap(*out, 1).CopyFrom(GetBitmap(left, 1));
+  }
+
   static void Call(KernelContext* ctx, const ArrayData& left, const ArrayData& right,
                    ArrayData* out) {
     ::arrow::internal::BitmapOr(left.buffers[1]->data(), left.offset,
@@ -129,13 +225,51 @@ struct Or {
   }
 };
 
-struct KleeneOr {
+struct KleeneOr : Commutative<KleeneOr> {
+  using Commutative<KleeneOr>::Call;
+
+  static void Call(KernelContext* ctx, const Scalar& left, const Scalar& right,
+                   Scalar* out) {
+    bool left_true = left.is_valid && checked_cast<const BooleanScalar&>(left).value;
+    bool left_false = left.is_valid && !checked_cast<const BooleanScalar&>(left).value;
+
+    bool right_true = right.is_valid && checked_cast<const BooleanScalar&>(right).value;
+    bool right_false = right.is_valid && !checked_cast<const BooleanScalar&>(right).value;
+
+    checked_cast<BooleanScalar*>(out)->value = left_true || right_true;
+    out->is_valid = left_true || right_true || (left_false && right_false);
+  }
+
+  static void Call(KernelContext* ctx, const ArrayData& left, const Scalar& right,
+                   ArrayData* out) {
+    bool right_true = right.is_valid && checked_cast<const BooleanScalar&>(right).value;
+    bool right_false = right.is_valid && !checked_cast<const BooleanScalar&>(right).value;
+
+    if (right_true) {
+      return GetBitmap(*out, 0).SetBitsTo(true),
+             GetBitmap(*out, 1).SetBitsTo(true);  // all true case
+    }
+
+    if (right_false) {
+      return GetBitmap(*out, 0).CopyFrom(GetBitmap(left, 0)),
+             GetBitmap(*out, 1).CopyFrom(GetBitmap(left, 1));
+    }
+
+    // scalar was null: out[i] is valid iff left[i] was true
+    ::arrow::internal::BitmapAnd(left.buffers[0]->data(), left.offset,
+                                 left.buffers[1]->data(), left.offset, left.length,
+                                 out->offset, out->buffers[0]->mutable_data());
+    ::arrow::internal::CopyBitmap(left.buffers[1]->data(), left.offset, left.length,
+                                  out->buffers[1]->mutable_data(), out->offset);
+  }
+
   static void Call(KernelContext* ctx, const ArrayData& left, const ArrayData& right,
                    ArrayData* out) {
     if (left.GetNullCount() == 0 && right.GetNullCount() == 0) {
-      BitUtil::SetBitsTo(out->buffers[0]->mutable_data(), out->offset, out->length, true);
+      GetBitmap(*out, 0).SetBitsTo(true);
       return Or::Call(ctx, left, right, out);
     }
+
     static auto compute_word = [](uint64_t left_true, uint64_t left_false,
                                   uint64_t right_true, uint64_t right_false,
                                   uint64_t* out_valid, uint64_t* out_data) {
@@ -147,7 +281,27 @@ struct KleeneOr {
   }
 };
 
-struct Xor {
+struct Xor : Commutative<Xor> {
+  using Commutative<Xor>::Call;
+
+  static void Call(KernelContext* ctx, const Scalar& left, const Scalar& right,
+                   Scalar* out) {
+    if (left.is_valid && right.is_valid) {
+      checked_cast<BooleanScalar*>(out)->value =
+          checked_cast<const BooleanScalar&>(left).value ^
+          checked_cast<const BooleanScalar&>(right).value;
+    }
+  }
+
+  static void Call(KernelContext* ctx, const ArrayData& left, const Scalar& right,
+                   ArrayData* out) {
+    if (!right.is_valid) return;  // all null case
+
+    return checked_cast<const BooleanScalar&>(right).value
+               ? GetBitmap(*out, 1).CopyFromInverted(GetBitmap(left, 1))
+               : GetBitmap(*out, 1).CopyFrom(GetBitmap(left, 1));
+  }
+
   static void Call(KernelContext* ctx, const ArrayData& left, const ArrayData& right,
                    ArrayData* out) {
     ::arrow::internal::BitmapXor(left.buffers[1]->data(), left.offset,
@@ -156,6 +310,86 @@ struct Xor {
   }
 };
 
+struct AndNot {
+  static void Call(KernelContext* ctx, const Scalar& left, const Scalar& right,
+                   Scalar* out) {
+    And::Call(ctx, left, InvertScalar(right), out);
+  }
+
+  static void Call(KernelContext* ctx, const Scalar& left, const ArrayData& right,
+                   ArrayData* out) {
+    if (!left.is_valid) return;  // all null case
+
+    return checked_cast<const BooleanScalar&>(left).value
+               ? GetBitmap(*out, 1).CopyFromInverted(GetBitmap(right, 1))
+               : GetBitmap(*out, 1).SetBitsTo(false);
+  }
+
+  static void Call(KernelContext* ctx, const ArrayData& left, const Scalar& right,
+                   ArrayData* out) {
+    And::Call(ctx, left, InvertScalar(right), out);
+  }
+
+  static void Call(KernelContext* ctx, const ArrayData& left, const ArrayData& right,
+                   ArrayData* out) {
+    ::arrow::internal::BitmapAndNot(left.buffers[1]->data(), left.offset,
+                                    right.buffers[1]->data(), right.offset, right.length,
+                                    out->offset, out->buffers[1]->mutable_data());
+  }
+};
+
+struct KleeneAndNot {
+  static void Call(KernelContext* ctx, const Scalar& left, const Scalar& right,
+                   Scalar* out) {
+    KleeneAnd::Call(ctx, left, InvertScalar(right), out);
+  }
+
+  static void Call(KernelContext* ctx, const Scalar& left, const ArrayData& right,
+                   ArrayData* out) {
+    bool left_true = left.is_valid && checked_cast<const BooleanScalar&>(left).value;
+    bool left_false = left.is_valid && !checked_cast<const BooleanScalar&>(left).value;
+
+    if (left_false) {
+      return GetBitmap(*out, 0).SetBitsTo(true),
+             GetBitmap(*out, 1).SetBitsTo(false);  // all false case
+    }
+
+    if (left_true) {
+      return GetBitmap(*out, 0).CopyFrom(GetBitmap(right, 0)),
+             GetBitmap(*out, 1).CopyFromInverted(GetBitmap(right, 1));
+    }
+
+    // scalar was null: out[i] is valid iff right[i] was true
+    ::arrow::internal::BitmapAnd(right.buffers[0]->data(), right.offset,
+                                 right.buffers[1]->data(), right.offset, right.length,
+                                 out->offset, out->buffers[0]->mutable_data());
+    ::arrow::internal::InvertBitmap(right.buffers[1]->data(), right.offset, right.length,
+                                    out->buffers[1]->mutable_data(), out->offset);
+  }
+
+  static void Call(KernelContext* ctx, const ArrayData& left, const Scalar& right,
+                   ArrayData* out) {
+    KleeneAnd::Call(ctx, left, InvertScalar(right), out);
+  }
+
+  static void Call(KernelContext* ctx, const ArrayData& left, const ArrayData& right,
+                   ArrayData* out) {
+    if (left.GetNullCount() == 0 && right.GetNullCount() == 0) {
+      GetBitmap(*out, 0).SetBitsTo(true);
+      return AndNot::Call(ctx, left, right, out);
+    }
+
+    static auto compute_word = [](uint64_t left_true, uint64_t left_false,
+                                  uint64_t right_true, uint64_t right_false,
+                                  uint64_t* out_valid, uint64_t* out_data) {
+      *out_data = left_true & right_false;
+      *out_valid = left_false | right_true | (left_true & right_false);
+    };
+
+    return ComputeKleene(compute_word, ctx, left, right, out);
+  }
+};
+
 void MakeFunction(std::string name, int arity, ArrayKernelExec exec,
                   const FunctionDoc* doc, FunctionRegistry* registry,
                   bool can_write_into_slices = true,
@@ -163,7 +397,7 @@ void MakeFunction(std::string name, int arity, ArrayKernelExec exec,
   auto func = std::make_shared<ScalarFunction>(name, Arity(arity), doc);
 
   // Scalar arguments not yet supported
-  std::vector<InputType> in_types(arity, InputType::Array(boolean()));
+  std::vector<InputType> in_types(arity, InputType(boolean()));
   ScalarKernel kernel(std::move(in_types), boolean(), exec);
   kernel.null_handling = null_handling;
   kernel.can_write_into_slices = can_write_into_slices;
@@ -180,6 +414,12 @@ const FunctionDoc and_doc{
      "For a different null behavior, see function \"and_kleene\"."),
     {"x", "y"}};
 
+const FunctionDoc and_not_doc{
+    "Logical 'and not' boolean values",
+    ("When a null is encountered in either input, a null is output.\n"
+     "For a different null behavior, see function \"and_not_kleene\"."),
+    {"x", "y"}};
+
 const FunctionDoc or_doc{
     "Logical 'or' boolean values",
     ("When a null is encountered in either input, a null is output.\n"
@@ -205,6 +445,21 @@ const FunctionDoc and_kleene_doc{
      "For a different null behavior, see function \"and\"."),
     {"x", "y"}};
 
+const FunctionDoc and_not_kleene_doc{
+    "Logical 'and not' boolean values (Kleene logic)",
+    ("This function behaves as follows with nulls:\n\n"
+     "- true and null = null\n"
+     "- null and false = null\n"
+     "- false and null = false\n"
+     "- null and true = false\n"
+     "- null and null = null\n"
+     "\n"
+     "In other words, in this context a null value really means \"unknown\",\n"
+     "and an unknown value 'and not' true is always false, as is false\n"
+     "'and not' an unknown value.\n"
+     "For a different null behavior, see function \"and_not\"."),
+    {"x", "y"}};
+
 const FunctionDoc or_kleene_doc{
     "Logical 'or' boolean values (Kleene logic)",
     ("This function behaves as follows with nulls:\n\n"
@@ -227,6 +482,7 @@ void RegisterScalarBoolean(FunctionRegistry* registry) {
   // These functions can write into sliced output bitmaps
   MakeFunction("invert", 1, applicator::SimpleUnary<Invert>, &invert_doc, registry);
   MakeFunction("and", 2, applicator::SimpleBinary<And>, &and_doc, registry);
+  MakeFunction("and_not", 2, applicator::SimpleBinary<AndNot>, &and_not_doc, registry);
   MakeFunction("or", 2, applicator::SimpleBinary<Or>, &or_doc, registry);
   MakeFunction("xor", 2, applicator::SimpleBinary<Xor>, &xor_doc, registry);
 
@@ -234,6 +490,9 @@ void RegisterScalarBoolean(FunctionRegistry* registry) {
   MakeFunction("and_kleene", 2, applicator::SimpleBinary<KleeneAnd>, &and_kleene_doc,
                registry,
                /*can_write_into_slices=*/false, NullHandling::COMPUTED_PREALLOCATE);
+  MakeFunction("and_not_kleene", 2, applicator::SimpleBinary<KleeneAndNot>,
+               &and_not_kleene_doc, registry,
+               /*can_write_into_slices=*/false, NullHandling::COMPUTED_PREALLOCATE);
   MakeFunction("or_kleene", 2, applicator::SimpleBinary<KleeneOr>, &or_kleene_doc,
                registry,
                /*can_write_into_slices=*/false, NullHandling::COMPUTED_PREALLOCATE);
diff --git a/cpp/src/arrow/compute/kernels/scalar_boolean_test.cc b/cpp/src/arrow/compute/kernels/scalar_boolean_test.cc
index 1609e5d..7d3f68e 100644
--- a/cpp/src/arrow/compute/kernels/scalar_boolean_test.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_boolean_test.cc
@@ -27,185 +27,114 @@
 #include "arrow/compute/kernels/test_util.h"
 #include "arrow/testing/gtest_common.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/util/checked_cast.h"
 
 namespace arrow {
+
+using internal::checked_pointer_cast;
+
 namespace compute {
 
-using BinaryKernelFunc =
-    std::function<Result<Datum>(const Datum&, const Datum&, ExecContext*)>;
-
-class TestBooleanKernel : public TestBase {
- public:
-  void TestArrayBinary(const BinaryKernelFunc& kernel, const std::shared_ptr<Array>& left,
-                       const std::shared_ptr<Array>& right,
-                       const std::shared_ptr<Array>& expected) {
-    ASSERT_OK_AND_ASSIGN(Datum result, kernel(left, right, &ctx_));
-    ASSERT_EQ(Datum::ARRAY, result.kind());
-    std::shared_ptr<Array> result_array = result.make_array();
-    ASSERT_OK(result_array->ValidateFull());
-    AssertArraysEqual(*expected, *result_array, /*verbose=*/true);
-
-    ASSERT_OK_AND_ASSIGN(result, kernel(right, left, &ctx_));
-    ASSERT_EQ(Datum::ARRAY, result.kind());
-    result_array = result.make_array();
-    ASSERT_OK(result_array->ValidateFull());
-    AssertArraysEqual(*expected, *result_array, /*verbose=*/true);
-  }
+void CheckBooleanScalarArrayBinary(std::string func_name, Datum array) {
+  for (std::shared_ptr<Scalar> scalar :
+       {std::make_shared<BooleanScalar>(), std::make_shared<BooleanScalar>(true),
+        std::make_shared<BooleanScalar>(false)}) {
+    ASSERT_OK_AND_ASSIGN(Datum actual, CallFunction(func_name, {Datum(scalar), array}));
 
-  void TestChunkedArrayBinary(const BinaryKernelFunc& kernel,
-                              const std::shared_ptr<ChunkedArray>& left,
-                              const std::shared_ptr<ChunkedArray>& right,
-                              const std::shared_ptr<ChunkedArray>& expected) {
-    ASSERT_OK_AND_ASSIGN(Datum result, kernel(left, right, &ctx_));
-    ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
-    std::shared_ptr<ChunkedArray> result_ca = result.chunked_array();
-    AssertChunkedEquivalent(*expected, *result_ca);
-
-    ASSERT_OK_AND_ASSIGN(result, kernel(right, left, &ctx_));
-    ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
-    result_ca = result.chunked_array();
-    AssertChunkedEquivalent(*expected, *result_ca);
-  }
+    ASSERT_OK_AND_ASSIGN(auto constant_array,
+                         MakeArrayFromScalar(*scalar, array.length()));
 
-  void TestBinaryKernel(const BinaryKernelFunc& kernel,
-                        const std::shared_ptr<Array>& left,
-                        const std::shared_ptr<Array>& right,
-                        const std::shared_ptr<Array>& expected) {
-    TestArrayBinary(kernel, left, right, expected);
-    TestArrayBinary(kernel, left->Slice(1), right->Slice(1), expected->Slice(1));
-
-    // ChunkedArray
-    auto cleft = std::make_shared<ChunkedArray>(ArrayVector{left, left->Slice(1)});
-    auto cright = std::make_shared<ChunkedArray>(ArrayVector{right, right->Slice(1)});
-    auto cexpected =
-        std::make_shared<ChunkedArray>(ArrayVector{expected, expected->Slice(1)});
-    TestChunkedArrayBinary(kernel, cleft, cright, cexpected);
-
-    // ChunkedArray with different chunks
-    cleft = std::make_shared<ChunkedArray>(ArrayVector{
-        left->Slice(0, 1), left->Slice(1), left->Slice(1, 1), left->Slice(2)});
-    TestChunkedArrayBinary(kernel, cleft, cright, cexpected);
-  }
+    ASSERT_OK_AND_ASSIGN(Datum expected,
+                         CallFunction(func_name, {Datum(constant_array), array}));
+    AssertDatumsEqual(expected, actual);
 
-  void TestBinaryKernelPropagate(const BinaryKernelFunc& kernel,
-                                 const std::vector<bool>& left,
-                                 const std::vector<bool>& right,
-                                 const std::vector<bool>& expected,
-                                 const std::vector<bool>& expected_nulls) {
-    auto type = boolean();
-    TestBinaryKernel(kernel, _MakeArray<BooleanType, bool>(type, left, {}),
-                     _MakeArray<BooleanType, bool>(type, right, {}),
-                     _MakeArray<BooleanType, bool>(type, expected, {}));
-
-    TestBinaryKernel(kernel, _MakeArray<BooleanType, bool>(type, left, left),
-                     _MakeArray<BooleanType, bool>(type, right, right),
-                     _MakeArray<BooleanType, bool>(type, expected, expected_nulls));
+    ASSERT_OK_AND_ASSIGN(actual, CallFunction(func_name, {array, Datum(scalar)}));
+    ASSERT_OK_AND_ASSIGN(expected,
+                         CallFunction(func_name, {array, Datum(constant_array)}));
+    AssertDatumsEqual(expected, actual);
   }
-
- protected:
-  ExecContext ctx_;
-};
-
-TEST_F(TestBooleanKernel, Invert) {
-  std::vector<bool> values1 = {true, false, true, false};
-  std::vector<bool> values2 = {false, true, false, true};
-
-  auto type = boolean();
-  auto a1 = _MakeArray<BooleanType, bool>(type, values1, {true, true, true, false});
-  auto a2 = _MakeArray<BooleanType, bool>(type, values2, {true, true, true, false});
-
-  // Plain array
-  ASSERT_OK_AND_ASSIGN(Datum result, Invert(a1));
-  ASSERT_EQ(Datum::ARRAY, result.kind());
-  ASSERT_ARRAYS_EQUAL(*a2, *result.make_array());
-
-  // Array with offset
-  ASSERT_OK_AND_ASSIGN(result, Invert(a1->Slice(1)));
-  ASSERT_EQ(Datum::ARRAY, result.kind());
-  ASSERT_ARRAYS_EQUAL(*a2->Slice(1), *result.make_array());
-
-  // ChunkedArray
-  std::vector<std::shared_ptr<Array>> ca1_arrs = {a1, a1->Slice(1)};
-  auto ca1 = std::make_shared<ChunkedArray>(ca1_arrs);
-  std::vector<std::shared_ptr<Array>> ca2_arrs = {a2, a2->Slice(1)};
-  auto ca2 = std::make_shared<ChunkedArray>(ca2_arrs);
-  ASSERT_OK_AND_ASSIGN(result, Invert(ca1));
-  ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
-  std::shared_ptr<ChunkedArray> result_ca = result.chunked_array();
-
-  // Contiguous preallocation, so a single output chunk even though there were
-  // two input chunks
-  ASSERT_EQ(1, result_ca->num_chunks());
-  AssertChunkedEquivalent(*ca2, *result_ca);
 }
 
-TEST_F(TestBooleanKernel, InvertEmptyArray) {
-  std::vector<std::shared_ptr<Buffer>> data_buffers(2);
-  Datum input;
-  input.value = ArrayData::Make(boolean(), 0 /* length */, std::move(data_buffers),
-                                0 /* null_count */);
-
-  ASSERT_OK_AND_ASSIGN(Datum result, Invert(input));
-  ASSERT_ARRAYS_EQUAL(*input.make_array(), *result.make_array());
+TEST(TestBooleanKernel, Invert) {
+  auto arr =
+      ArrayFromJSON(boolean(), "[true, false, true, null, false, true, false, null]");
+  auto expected =
+      ArrayFromJSON(boolean(), "[false, true, false, null, true, false, true, null]");
+  CheckScalarUnary("invert", arr, expected);
 }
 
-TEST_F(TestBooleanKernel, BinaryOpOnEmptyArray) {
-  auto type = boolean();
-  std::vector<std::shared_ptr<Buffer>> data_buffers(2);
-  Datum input;
-  input.value = ArrayData::Make(boolean(), 0 /* length */, std::move(data_buffers),
-                                0 /* null_count */);
-
-  ASSERT_OK_AND_ASSIGN(Datum result, And(input, input));
-  // Result should be empty as well.
-  ASSERT_ARRAYS_EQUAL(*input.make_array(), *result.make_array());
+TEST(TestBooleanKernel, And) {
+  auto left = ArrayFromJSON(boolean(), "    [true, true,  true, false, false, null]");
+  auto right = ArrayFromJSON(boolean(), "   [true, false, null, false, null,  null]");
+  auto expected = ArrayFromJSON(boolean(), "[true, false, null, false, null,  null]");
+  CheckScalarBinary("and", left, right, expected);
+  CheckBooleanScalarArrayBinary("and", left);
 }
 
-TEST_F(TestBooleanKernel, And) {
-  std::vector<bool> values1 = {true, false, true, false, true, true};
-  std::vector<bool> values2 = {true, true, false, false, true, false};
-  std::vector<bool> values3 = {true, false, false, false, true, false};
-  TestBinaryKernelPropagate(And, values1, values2, values3, values3);
+TEST(TestBooleanKernel, Or) {
+  auto left = ArrayFromJSON(boolean(), "    [true, true,  true, false, false, null]");
+  auto right = ArrayFromJSON(boolean(), "   [true, false, null, false, null,  null]");
+  auto expected = ArrayFromJSON(boolean(), "[true, true,  null, false, null,  null]");
+  CheckScalarBinary("or", left, right, expected);
+  CheckBooleanScalarArrayBinary("or", left);
 }
 
-TEST_F(TestBooleanKernel, Or) {
-  std::vector<bool> values1 = {true, false, true, false, true, true};
-  std::vector<bool> values2 = {true, true, false, false, true, false};
-  std::vector<bool> values3 = {true, true, true, false, true, true};
-  std::vector<bool> values3_nulls = {true, false, false, false, true, false};
-  TestBinaryKernelPropagate(Or, values1, values2, values3, values3_nulls);
+TEST(TestBooleanKernel, Xor) {
+  auto left = ArrayFromJSON(boolean(), "    [true,  true,  true, false, false, null]");
+  auto right = ArrayFromJSON(boolean(), "   [true,  false, null, false, null,  null]");
+  auto expected = ArrayFromJSON(boolean(), "[false, true,  null, false, null,  null]");
+  CheckScalarBinary("xor", left, right, expected);
+  CheckBooleanScalarArrayBinary("xor", left);
 }
 
-TEST_F(TestBooleanKernel, Xor) {
-  std::vector<bool> values1 = {true, false, true, false, true, true};
-  std::vector<bool> values2 = {true, true, false, false, true, false};
-  std::vector<bool> values3 = {false, true, true, false, false, true};
-  std::vector<bool> values3_nulls = {true, false, false, false, true, false};
-  TestBinaryKernelPropagate(Xor, values1, values2, values3, values3_nulls);
+TEST(TestBooleanKernel, AndNot) {
+  auto left = ArrayFromJSON(
+      boolean(), "[true,  true,  true, false, false, false, null, null,  null]");
+  auto right = ArrayFromJSON(
+      boolean(), "[true,  false, null, true,  false, null,  true, false, null]");
+  auto expected = ArrayFromJSON(
+      boolean(), "[false, true,  null, false, false, null,  null, null,  null]");
+  CheckScalarBinary("and_not", left, right, expected);
+  CheckBooleanScalarArrayBinary("and_not", left);
 }
 
-TEST_F(TestBooleanKernel, KleeneAnd) {
+TEST(TestBooleanKernel, KleeneAnd) {
   auto left = ArrayFromJSON(boolean(), "    [true, true,  true, false, false, null]");
   auto right = ArrayFromJSON(boolean(), "   [true, false, null, false, null,  null]");
   auto expected = ArrayFromJSON(boolean(), "[true, false, null, false, false, null]");
-  TestBinaryKernel(KleeneAnd, left, right, expected);
+  CheckScalarBinary("and_kleene", left, right, expected);
+  CheckBooleanScalarArrayBinary("and_kleene", left);
 
   left = ArrayFromJSON(boolean(), "    [true, true,  false, null, null]");
   right = ArrayFromJSON(boolean(), "   [true, false, false, true, false]");
   expected = ArrayFromJSON(boolean(), "[true, false, false, null, false]");
-  TestBinaryKernel(KleeneAnd, left, right, expected);
+  CheckScalarBinary("and_kleene", left, right, expected);
+  CheckBooleanScalarArrayBinary("and_kleene", left);
+}
+
+TEST(TestBooleanKernel, KleeneAndNot) {
+  auto left = ArrayFromJSON(
+      boolean(), "[true,  true,  true, false, false, false, null, null,  null]");
+  auto right = ArrayFromJSON(
+      boolean(), "[true,  false, null, true,  false, null,  true, false, null]");
+  auto expected = ArrayFromJSON(
+      boolean(), "[false, true,  null, false, false, false, false, null, null]");
+  CheckScalarBinary("and_not_kleene", left, right, expected);
+  CheckBooleanScalarArrayBinary("and_not_kleene", left);
 }
 
-TEST_F(TestBooleanKernel, KleeneOr) {
+TEST(TestBooleanKernel, KleeneOr) {
   auto left = ArrayFromJSON(boolean(), "    [true, true,  true, false, false, null]");
   auto right = ArrayFromJSON(boolean(), "   [true, false, null, false, null,  null]");
   auto expected = ArrayFromJSON(boolean(), "[true, true,  true, false, null,  null]");
-  TestBinaryKernel(KleeneOr, left, right, expected);
+  CheckScalarBinary("or_kleene", left, right, expected);
+  CheckBooleanScalarArrayBinary("or_kleene", left);
 
   left = ArrayFromJSON(boolean(), "    [true, true,  false, null, null]");
   right = ArrayFromJSON(boolean(), "   [true, false, false, true, false]");
   expected = ArrayFromJSON(boolean(), "[true, true,  false, true, null]");
-  TestBinaryKernel(KleeneOr, left, right, expected);
+  CheckScalarBinary("or_kleene", left, right, expected);
+  CheckBooleanScalarArrayBinary("or_kleene", left);
 }
 
 }  // namespace compute
diff --git a/cpp/src/arrow/compute/kernels/test_util.cc b/cpp/src/arrow/compute/kernels/test_util.cc
index f8cc1c0..5d54a8c 100644
--- a/cpp/src/arrow/compute/kernels/test_util.cc
+++ b/cpp/src/arrow/compute/kernels/test_util.cc
@@ -71,7 +71,22 @@ ScalarVector GetScalars(const ArrayVector& inputs, int64_t index) {
 void CheckScalar(std::string func_name, const ScalarVector& inputs,
                  std::shared_ptr<Scalar> expected, const FunctionOptions* options) {
   ASSERT_OK_AND_ASSIGN(Datum out, CallFunction(func_name, GetDatums(inputs), options));
-  AssertScalarsEqual(*expected, *out.scalar(), /*verbose=*/true);
+  if (!out.scalar()->Equals(expected)) {
+    std::string summary = func_name + "(";
+    for (const auto& input : inputs) {
+      summary += input->ToString() + ",";
+    }
+    summary.back() = ')';
+
+    summary += " = " + out.scalar()->ToString() + " != " + expected->ToString();
+
+    if (!out.type()->Equals(expected->type)) {
+      summary += " (types differed: " + out.type()->ToString() + " vs " +
+                 expected->type->ToString() + ")";
+    }
+
+    FAIL() << summary;
+  }
 }
 
 void CheckScalar(std::string func_name, const ArrayVector& inputs,
@@ -97,6 +112,10 @@ void CheckScalar(std::string func_name, const ArrayVector& inputs,
                             expected->Slice(2 * slice_length), options);
   }
 
+  // should also work with an empty slice
+  CheckScalarNonRecursive(func_name, SliceAll(inputs, 0, 0), expected->Slice(0, 0),
+                          options);
+
   // Ditto with ChunkedArray inputs
   if (slice_length > 0) {
     std::vector<std::shared_ptr<ChunkedArray>> chunked_inputs;
diff --git a/cpp/src/arrow/util/bit_util_test.cc b/cpp/src/arrow/util/bit_util_test.cc
index d9d775c..baf2af8 100644
--- a/cpp/src/arrow/util/bit_util_test.cc
+++ b/cpp/src/arrow/util/bit_util_test.cc
@@ -56,6 +56,7 @@
 namespace arrow {
 
 using internal::BitmapAnd;
+using internal::BitmapAndNot;
 using internal::BitmapOr;
 using internal::BitmapXor;
 using internal::BitsetStack;
@@ -1104,6 +1105,22 @@ struct BitmapXorOp : public BitmapOperation {
   }
 };
 
+struct BitmapAndNotOp : public BitmapOperation {
+  Result<std::shared_ptr<Buffer>> Call(MemoryPool* pool, const uint8_t* left,
+                                       int64_t left_offset, const uint8_t* right,
+                                       int64_t right_offset, int64_t length,
+                                       int64_t out_offset) const override {
+    return BitmapAndNot(pool, left, left_offset, right, right_offset, length, out_offset);
+  }
+
+  Status Call(const uint8_t* left, int64_t left_offset, const uint8_t* right,
+              int64_t right_offset, int64_t length, int64_t out_offset,
+              uint8_t* out_buffer) const override {
+    BitmapAndNot(left, left_offset, right, right_offset, length, out_offset, out_buffer);
+    return Status::OK();
+  }
+};
+
 class BitmapOp : public TestBase {
  public:
   void TestAligned(const BitmapOperation& op, const std::vector<int>& left_bits,
@@ -1196,6 +1213,16 @@ TEST_F(BitmapOp, Xor) {
   TestUnaligned(op, left, right, result);
 }
 
+TEST_F(BitmapOp, AndNot) {
+  BitmapAndNotOp op;
+  std::vector<int> left = {0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1};
+  std::vector<int> right = {0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0};
+  std::vector<int> result = {0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1};
+
+  TestAligned(op, left, right, result);
+  TestUnaligned(op, left, right, result);
+}
+
 TEST_F(BitmapOp, RandomXor) {
   const int kBitCount = 1000;
   uint8_t buffer[kBitCount * 2] = {0};
diff --git a/cpp/src/arrow/util/bitmap.cc b/cpp/src/arrow/util/bitmap.cc
index 0cd364d..33d1dee 100644
--- a/cpp/src/arrow/util/bitmap.cc
+++ b/cpp/src/arrow/util/bitmap.cc
@@ -46,6 +46,16 @@ std::string Bitmap::Diff(const Bitmap& other) const {
   return ToArray()->Diff(*other.ToArray());
 }
 
+void Bitmap::CopyFrom(const Bitmap& other) {
+  ::arrow::internal::CopyBitmap(other.buffer_->data(), other.offset_, other.length_,
+                                buffer_->mutable_data(), offset_);
+}
+
+void Bitmap::CopyFromInverted(const Bitmap& other) {
+  ::arrow::internal::InvertBitmap(other.buffer_->data(), other.offset_, other.length_,
+                                  buffer_->mutable_data(), offset_);
+}
+
 bool Bitmap::Equals(const Bitmap& other) const {
   if (length_ != other.length_) {
     return false;
diff --git a/cpp/src/arrow/util/bitmap.h b/cpp/src/arrow/util/bitmap.h
index a6b948c..43d59f0 100644
--- a/cpp/src/arrow/util/bitmap.h
+++ b/cpp/src/arrow/util/bitmap.h
@@ -86,6 +86,13 @@ class ARROW_EXPORT Bitmap : public util::ToStringOstreamable<Bitmap>,
     BitUtil::SetBitTo(buffer_->mutable_data(), i + offset_, v);
   }
 
+  void SetBitsTo(bool v) {
+    BitUtil::SetBitsTo(buffer_->mutable_data(), offset_, length_, v);
+  }
+
+  void CopyFrom(const Bitmap& other);
+  void CopyFromInverted(const Bitmap& other);
+
   /// \brief Visit bits from each bitmap as bitset<N>
   ///
   /// All bitmaps must have identical length.
diff --git a/cpp/src/arrow/util/bitmap_ops.cc b/cpp/src/arrow/util/bitmap_ops.cc
index 87f6a06..acc5bd9 100644
--- a/cpp/src/arrow/util/bitmap_ops.cc
+++ b/cpp/src/arrow/util/bitmap_ops.cc
@@ -560,5 +560,24 @@ void BitmapXor(const uint8_t* left, int64_t left_offset, const uint8_t* right,
   BitmapOp<std::bit_xor>(left, left_offset, right, right_offset, length, out_offset, out);
 }
 
+template <typename T>
+struct AndNotOp {
+  constexpr T operator()(const T& l, const T& r) const { return l & ~r; }
+};
+
+Result<std::shared_ptr<Buffer>> BitmapAndNot(MemoryPool* pool, const uint8_t* left,
+                                             int64_t left_offset, const uint8_t* right,
+                                             int64_t right_offset, int64_t length,
+                                             int64_t out_offset) {
+  return BitmapOp<AndNotOp>(pool, left, left_offset, right, right_offset, length,
+                            out_offset);
+}
+
+void BitmapAndNot(const uint8_t* left, int64_t left_offset, const uint8_t* right,
+                  int64_t right_offset, int64_t length, int64_t out_offset,
+                  uint8_t* out) {
+  BitmapOp<AndNotOp>(left, left_offset, right, right_offset, length, out_offset, out);
+}
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/bitmap_ops.h b/cpp/src/arrow/util/bitmap_ops.h
index 535dd30..554e1d7 100644
--- a/cpp/src/arrow/util/bitmap_ops.h
+++ b/cpp/src/arrow/util/bitmap_ops.h
@@ -164,5 +164,24 @@ ARROW_EXPORT
 void BitmapXor(const uint8_t* left, int64_t left_offset, const uint8_t* right,
                int64_t right_offset, int64_t length, int64_t out_offset, uint8_t* out);
 
+/// \brief Do a "bitmap and not" on right and left buffers starting at
+/// their respective bit-offsets for the given bit-length and put
+/// the results in out_buffer starting at the given bit-offset.
+///
+/// out_buffer will be allocated and initialized to zeros using pool before
+/// the operation.
+ARROW_EXPORT
+Result<std::shared_ptr<Buffer>> BitmapAndNot(MemoryPool* pool, const uint8_t* left,
+                                             int64_t left_offset, const uint8_t* right,
+                                             int64_t right_offset, int64_t length,
+                                             int64_t out_offset);
+
+/// \brief Do a "bitmap and not" on right and left buffers starting at
+/// their respective bit-offsets for the given bit-length and put
+/// the results in out starting at the given bit-offset.
+ARROW_EXPORT
+void BitmapAndNot(const uint8_t* left, int64_t left_offset, const uint8_t* right,
+                  int64_t right_offset, int64_t length, int64_t out_offset, uint8_t* out);
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/vector.h b/cpp/src/arrow/util/vector.h
index 81195e8..cbd874d 100644
--- a/cpp/src/arrow/util/vector.h
+++ b/cpp/src/arrow/util/vector.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <algorithm>
 #include <utility>
 #include <vector>
 
@@ -26,7 +27,7 @@ namespace arrow {
 namespace internal {
 
 template <typename T>
-inline std::vector<T> DeleteVectorElement(const std::vector<T>& values, size_t index) {
+std::vector<T> DeleteVectorElement(const std::vector<T>& values, size_t index) {
   DCHECK(!values.empty());
   DCHECK_LT(index, values.size());
   std::vector<T> out;
@@ -41,8 +42,8 @@ inline std::vector<T> DeleteVectorElement(const std::vector<T>& values, size_t i
 }
 
 template <typename T>
-inline std::vector<T> AddVectorElement(const std::vector<T>& values, size_t index,
-                                       T new_element) {
+std::vector<T> AddVectorElement(const std::vector<T>& values, size_t index,
+                                T new_element) {
   DCHECK_LE(index, values.size());
   std::vector<T> out;
   out.reserve(values.size() + 1);
@@ -57,8 +58,8 @@ inline std::vector<T> AddVectorElement(const std::vector<T>& values, size_t inde
 }
 
 template <typename T>
-inline std::vector<T> ReplaceVectorElement(const std::vector<T>& values, size_t index,
-                                           T new_element) {
+std::vector<T> ReplaceVectorElement(const std::vector<T>& values, size_t index,
+                                    T new_element) {
   DCHECK_LE(index, values.size());
   std::vector<T> out;
   out.reserve(values.size());
@@ -72,5 +73,13 @@ inline std::vector<T> ReplaceVectorElement(const std::vector<T>& values, size_t
   return out;
 }
 
+template <typename T, typename Predicate>
+std::vector<T> FilterVector(std::vector<T> values, Predicate&& predicate) {
+  auto new_end =
+      std::remove_if(values.begin(), values.end(), std::forward<Predicate>(predicate));
+  values.erase(new_end, values.end());
+  return values;
+}
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst
index 081461d..c1d3ac7 100644
--- a/docs/source/cpp/compute.rst
+++ b/docs/source/cpp/compute.rst
@@ -253,8 +253,12 @@ For the Kleene logic variants, therefore:
 +==========================+============+====================+=====================+
 | and                      | Binary     | Boolean            | Boolean             |
 +--------------------------+------------+--------------------+---------------------+
+| and_not                  | Binary     | Boolean            | Boolean             |
++--------------------------+------------+--------------------+---------------------+
 | and_kleene               | Binary     | Boolean            | Boolean             |
 +--------------------------+------------+--------------------+---------------------+
+| and_not_kleene           | Binary     | Boolean            | Boolean             |
++--------------------------+------------+--------------------+---------------------+
 | invert                   | Unary      | Boolean            | Boolean             |
 +--------------------------+------------+--------------------+---------------------+
 | or                       | Binary     | Boolean            | Boolean             |