You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/02/25 20:53:35 UTC

[arrow] branch master updated: ARROW-3133: [C++] Remove allocation from Binary Boolean Kernels.

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4910fbf  ARROW-3133: [C++] Remove allocation from Binary Boolean Kernels.
4910fbf is described below

commit 4910fbf4fda05b864daaba820db08291e4afdcb6
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Mon Feb 25 14:53:15 2019 -0600

    ARROW-3133: [C++] Remove allocation from Binary Boolean Kernels.
    
    Also:
    - ARROW-3135: add a helper for validity bitmap propagation
    - ARROW-4572: remove memory zeroing in Allocator kernel
    - Fix regression in InvertKernel where nulls were not being handled (add a unit test to cover this case)
    - Add some unit tests for util-internal.h
    - lift out_type to OpKernel
    
    Author: Micah Kornfield <em...@gmail.com>
    
    Closes #3731 from emkornfield/bin_kern2 and squashes the following commits:
    
    68af8ab5 <Micah Kornfield> Fix valgrind
    63b1e8c2 <Micah Kornfield> ARROW-3133: Remove allocation from Binary Boolean Kernels.
---
 cpp/src/arrow/compute/kernel.h                     |   7 +-
 cpp/src/arrow/compute/kernels/CMakeLists.txt       |   1 +
 cpp/src/arrow/compute/kernels/boolean-test.cc      |  33 ++-
 cpp/src/arrow/compute/kernels/boolean.cc           |  57 +++--
 cpp/src/arrow/compute/kernels/boolean.h            |   6 +-
 cpp/src/arrow/compute/kernels/cast.cc              |   5 +-
 .../arrow/compute/kernels/util-internal-test.cc    | 232 +++++++++++++++++++++
 cpp/src/arrow/compute/kernels/util-internal.cc     | 144 ++++++++-----
 cpp/src/arrow/compute/kernels/util-internal.h      |  52 ++++-
 cpp/src/arrow/compute/test-util.h                  |   3 +
 10 files changed, 430 insertions(+), 110 deletions(-)

diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h
index b8033eb..58d7dc0 100644
--- a/cpp/src/arrow/compute/kernel.h
+++ b/cpp/src/arrow/compute/kernel.h
@@ -54,6 +54,9 @@ class FunctionContext;
 class ARROW_EXPORT OpKernel {
  public:
   virtual ~OpKernel() = default;
+  /// \brief EXPERIMENTAL The output data type of the kernel
+  /// \return the output type
+  virtual std::shared_ptr<DataType> out_type() const = 0;
 };
 
 /// \class Datum
@@ -188,10 +191,6 @@ class ARROW_EXPORT UnaryKernel : public OpKernel {
   /// there will be a more generic mechansim for understanding the necessary
   /// contracts.
   virtual Status Call(FunctionContext* ctx, const Datum& input, Datum* out) = 0;
-
-  /// \brief EXPERIMENTAL The output data type of the kernel
-  /// \return the output type
-  virtual std::shared_ptr<DataType> out_type() const = 0;
 };
 
 /// \class BinaryKernel
diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt
index 0fee74d..5d78747 100644
--- a/cpp/src/arrow/compute/kernels/CMakeLists.txt
+++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt
@@ -20,6 +20,7 @@ arrow_install_all_headers("arrow/compute/kernels")
 add_arrow_test(boolean-test PREFIX "arrow-compute")
 add_arrow_test(cast-test PREFIX "arrow-compute")
 add_arrow_test(hash-test PREFIX "arrow-compute")
+add_arrow_test(util-internal-test PREFIX "arrow-compute")
 
 # Aggregates
 add_arrow_test(aggregate-test PREFIX "arrow-compute")
diff --git a/cpp/src/arrow/compute/kernels/boolean-test.cc b/cpp/src/arrow/compute/kernels/boolean-test.cc
index e6277c0..439e0db 100644
--- a/cpp/src/arrow/compute/kernels/boolean-test.cc
+++ b/cpp/src/arrow/compute/kernels/boolean-test.cc
@@ -49,7 +49,7 @@ class TestBooleanKernel : public ComputeFixture, public TestBase {
     ASSERT_OK(kernel(&this->ctx_, left, right, &result));
     ASSERT_EQ(Datum::ARRAY, result.kind());
     std::shared_ptr<Array> result_array = result.make_array();
-    ASSERT_TRUE(result_array->Equals(expected));
+    ASSERT_ARRAYS_EQUAL(*expected, *result_array);
   }
 
   void TestChunkedArrayBinary(const BinaryKernelFunc& kernel,
@@ -99,25 +99,23 @@ class TestBooleanKernel : public ComputeFixture, public TestBase {
 };
 
 TEST_F(TestBooleanKernel, Invert) {
-  vector<bool> values1 = {true, false, true};
-  vector<bool> values2 = {false, true, false};
+  vector<bool> values1 = {true, false, true, false};
+  vector<bool> values2 = {false, true, false, true};
 
   auto type = boolean();
-  auto a1 = _MakeArray<BooleanType, bool>(type, values1, {});
-  auto a2 = _MakeArray<BooleanType, bool>(type, values2, {});
+  auto a1 = _MakeArray<BooleanType, bool>(type, values1, {true, true, true, false});
+  auto a2 = _MakeArray<BooleanType, bool>(type, values2, {true, true, true, false});
 
   // Plain array
   Datum result;
   ASSERT_OK(Invert(&this->ctx_, a1, &result));
   ASSERT_EQ(Datum::ARRAY, result.kind());
-  std::shared_ptr<Array> result_array = result.make_array();
-  ASSERT_TRUE(result_array->Equals(a2));
+  ASSERT_ARRAYS_EQUAL(*a2, *result.make_array());
 
   // Array with offset
   ASSERT_OK(Invert(&this->ctx_, a1->Slice(1), &result));
   ASSERT_EQ(Datum::ARRAY, result.kind());
-  result_array = result.make_array();
-  ASSERT_TRUE(result_array->Equals(a2->Slice(1)));
+  ASSERT_ARRAYS_EQUAL(*a2->Slice(1), *result.make_array());
 
   // ChunkedArray
   std::vector<std::shared_ptr<Array>> ca1_arrs = {a1, a1->Slice(1)};
@@ -127,7 +125,7 @@ TEST_F(TestBooleanKernel, Invert) {
   ASSERT_OK(Invert(&this->ctx_, ca1, &result));
   ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
   std::shared_ptr<ChunkedArray> result_ca = result.chunked_array();
-  ASSERT_TRUE(result_ca->Equals(ca2));
+  ASSERT_ARRAYS_EQUAL(*ca2, *result_ca);
 }
 
 TEST_F(TestBooleanKernel, InvertEmptyArray) {
@@ -139,7 +137,20 @@ TEST_F(TestBooleanKernel, InvertEmptyArray) {
 
   Datum result;
   ASSERT_OK(Invert(&this->ctx_, input, &result));
-  ASSERT_TRUE(result.make_array()->Equals(input.make_array()));
+  ASSERT_ARRAYS_EQUAL(*input.make_array(), *result.make_array());
+}
+
+TEST_F(TestBooleanKernel, BinaryOpOnEmptyArray) {
+  auto type = boolean();
+  std::vector<std::shared_ptr<Buffer>> data_buffers(2);
+  Datum input;
+  input.value = ArrayData::Make(boolean(), 0 /* length */, std::move(data_buffers),
+                                0 /* null_count */);
+
+  Datum result;
+  ASSERT_OK(And(&this->ctx_, input, input, &result));
+  // Result should be empty as well.
+  ASSERT_ARRAYS_EQUAL(*input.make_array(), *result.make_array());
 }
 
 TEST_F(TestBooleanKernel, And) {
diff --git a/cpp/src/arrow/compute/kernels/boolean.cc b/cpp/src/arrow/compute/kernels/boolean.cc
index 7d8b15a..2209dc9 100644
--- a/cpp/src/arrow/compute/kernels/boolean.cc
+++ b/cpp/src/arrow/compute/kernels/boolean.cc
@@ -55,6 +55,7 @@ class InvertKernel : public BooleanUnaryKernel {
 
     // Handle output data buffer
     if (in_data.length > 0) {
+      RETURN_NOT_OK(detail::PropagateNulls(ctx, in_data, result.get()));
       const Buffer& data_buffer = *in_data.buffers[1];
       DCHECK_LE(BitUtil::BytesForBits(in_data.length), data_buffer.size());
       InvertBitmap(data_buffer.data(), in_data.offset, in_data.length,
@@ -65,8 +66,8 @@ class InvertKernel : public BooleanUnaryKernel {
 };
 
 Status Invert(FunctionContext* ctx, const Datum& value, Datum* out) {
-  detail::PrimitiveAllocatingUnaryKernel kernel(
-      std::unique_ptr<UnaryKernel>(new InvertKernel()), boolean());
+  InvertKernel invert;
+  detail::PrimitiveAllocatingUnaryKernel kernel(&invert);
 
   std::vector<Datum> result;
   RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, &kernel, value, &result));
@@ -86,73 +87,65 @@ class BinaryBooleanKernel : public BinaryKernel {
 
     const ArrayData& left_data = *left.array();
     const ArrayData& right_data = *right.array();
+    DCHECK_EQ(left_data.length, right_data.length);
     ArrayData* result;
-    out->value = ArrayData::Make(boolean(), right_data.length);
 
     result = out->array().get();
-
-    // If one of the arrays has a null value, the result will have a null.
-    std::shared_ptr<Buffer> validity_bitmap;
-    RETURN_NOT_OK(BitmapAnd(ctx->memory_pool(), left_data.buffers[0]->data(),
-                            left_data.offset, right_data.buffers[0]->data(),
-                            right_data.offset, right_data.length, 0, &validity_bitmap));
-    result->buffers.push_back(validity_bitmap);
-
-    result->null_count =
-        result->length - CountSetBits(validity_bitmap->data(), 0, result->length);
-
+    RETURN_NOT_OK(detail::AssignNullIntersection(ctx, left_data, right_data, result));
     return Compute(ctx, left_data, right_data, result);
   }
+
+  std::shared_ptr<DataType> out_type() const override { return boolean(); }
 };
 
 class AndKernel : public BinaryBooleanKernel {
   Status Compute(FunctionContext* ctx, const ArrayData& left, const ArrayData& right,
                  ArrayData* out) override {
-    std::shared_ptr<Buffer> data_bitmap;
-    RETURN_NOT_OK(BitmapAnd(ctx->memory_pool(), left.buffers[1]->data(), left.offset,
-                            right.buffers[1]->data(), right.offset, right.length, 0,
-                            &data_bitmap));
-    out->buffers.push_back(data_bitmap);
+    if (right.length > 0) {
+      BitmapAnd(left.buffers[1]->data(), left.offset, right.buffers[1]->data(),
+                right.offset, right.length, 0, out->buffers[1]->mutable_data());
+    }
     return Status::OK();
   }
 };
 
 Status And(FunctionContext* ctx, const Datum& left, const Datum& right, Datum* out) {
-  AndKernel kernel;
+  AndKernel and_kernel;
+  detail::PrimitiveAllocatingBinaryKernel kernel(&and_kernel);
   return detail::InvokeBinaryArrayKernel(ctx, &kernel, left, right, out);
 }
 
 class OrKernel : public BinaryBooleanKernel {
   Status Compute(FunctionContext* ctx, const ArrayData& left, const ArrayData& right,
                  ArrayData* out) override {
-    std::shared_ptr<Buffer> data_bitmap;
-    RETURN_NOT_OK(BitmapOr(ctx->memory_pool(), left.buffers[1]->data(), left.offset,
-                           right.buffers[1]->data(), right.offset, right.length, 0,
-                           &data_bitmap));
-    out->buffers.push_back(data_bitmap);
+    if (right.length > 0) {
+      BitmapOr(left.buffers[1]->data(), left.offset, right.buffers[1]->data(),
+               right.offset, right.length, 0, out->buffers[1]->mutable_data());
+    }
     return Status::OK();
   }
 };
 
 Status Or(FunctionContext* ctx, const Datum& left, const Datum& right, Datum* out) {
-  OrKernel kernel;
+  OrKernel or_kernel;
+  detail::PrimitiveAllocatingBinaryKernel kernel(&or_kernel);
   return detail::InvokeBinaryArrayKernel(ctx, &kernel, left, right, out);
 }
 
 class XorKernel : public BinaryBooleanKernel {
   Status Compute(FunctionContext* ctx, const ArrayData& left, const ArrayData& right,
                  ArrayData* out) override {
-    std::shared_ptr<Buffer> data_bitmap;
-    RETURN_NOT_OK(BitmapXor(ctx->memory_pool(), left.buffers[1]->data(), left.offset,
-                            right.buffers[1]->data(), right.offset, right.length, 0,
-                            &data_bitmap));
-    out->buffers.push_back(data_bitmap);
+    if (right.length > 0) {
+      BitmapXor(left.buffers[1]->data(), left.offset, right.buffers[1]->data(),
+                right.offset, right.length, 0, out->buffers[1]->mutable_data());
+    }
     return Status::OK();
   }
 };
 
 Status Xor(FunctionContext* ctx, const Datum& left, const Datum& right, Datum* out) {
-  XorKernel kernel;
+  XorKernel xor_kernel;
+  detail::PrimitiveAllocatingBinaryKernel kernel(&xor_kernel);
   return detail::InvokeBinaryArrayKernel(ctx, &kernel, left, right, out);
 }
 
diff --git a/cpp/src/arrow/compute/kernels/boolean.h b/cpp/src/arrow/compute/kernels/boolean.h
index 88f5ad1..fb88659 100644
--- a/cpp/src/arrow/compute/kernels/boolean.h
+++ b/cpp/src/arrow/compute/kernels/boolean.h
@@ -37,7 +37,7 @@ class FunctionContext;
 ARROW_EXPORT
 Status Invert(FunctionContext* context, const Datum& value, Datum* out);
 
-/// \brief Element-wise AND of two boolean dates
+/// \brief Element-wise AND of two boolean datums
 /// \param[in] context the FunctionContext
 /// \param[in] left left operand (array)
 /// \param[in] right right operand (array)
@@ -48,7 +48,7 @@ Status Invert(FunctionContext* context, const Datum& value, Datum* out);
 ARROW_EXPORT
 Status And(FunctionContext* context, const Datum& left, const Datum& right, Datum* out);
 
-/// \brief Element-wise OR of two boolean dates
+/// \brief Element-wise OR of two boolean datums
 /// \param[in] context the FunctionContext
 /// \param[in] left left operand (array)
 /// \param[in] right right operand (array)
@@ -59,7 +59,7 @@ Status And(FunctionContext* context, const Datum& left, const Datum& right, Datu
 ARROW_EXPORT
 Status Or(FunctionContext* context, const Datum& left, const Datum& right, Datum* out);
 
-/// \brief Element-wise XOR of two boolean dates
+/// \brief Element-wise XOR of two boolean datums
 /// \param[in] context the FunctionContext
 /// \param[in] left left operand (array)
 /// \param[in] right right operand (array)
diff --git a/cpp/src/arrow/compute/kernels/cast.cc b/cpp/src/arrow/compute/kernels/cast.cc
index 97601b5..2ebc097 100644
--- a/cpp/src/arrow/compute/kernels/cast.cc
+++ b/cpp/src/arrow/compute/kernels/cast.cc
@@ -531,6 +531,9 @@ struct CastFunctor<Date64Type, TimestampType> {
 
     ShiftTime<int64_t, int64_t>(ctx, options, conversion.first, conversion.second, input,
                                 output);
+    if (!ctx->status().ok()) {
+      return;
+    }
 
     // Ensure that intraday milliseconds have been zeroed out
     auto out_data = output->GetMutableValues<int64_t>(1);
@@ -634,7 +637,7 @@ Status InvokeWithAllocation(FunctionContext* ctx, UnaryKernel* func, const Datum
   std::vector<Datum> result;
   if (NeedToPreallocate(*func->out_type())) {
     // Create wrapper that allocates output memory for primitive types
-    detail::PrimitiveAllocatingUnaryKernel wrapper(func, func->out_type());
+    detail::PrimitiveAllocatingUnaryKernel wrapper(func);
     RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, &wrapper, input, &result));
   } else {
     RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, func, input, &result));
diff --git a/cpp/src/arrow/compute/kernels/util-internal-test.cc b/cpp/src/arrow/compute/kernels/util-internal-test.cc
new file mode 100644
index 0000000..9f16344
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/util-internal-test.cc
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <memory>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/compute/kernels/util-internal.h"
+#include "arrow/compute/test-util.h"
+
+namespace arrow {
+namespace compute {
+namespace detail {
+
+using ::testing::_;
+using ::testing::AllOf;
+using ::testing::Each;
+using ::testing::ElementsAre;
+using ::testing::ElementsAreArray;
+using ::testing::Eq;
+using ::testing::Ge;
+using ::testing::IsNull;
+using ::testing::Ne;
+using ::testing::NotNull;
+using ::testing::Return;
+
+TEST(PropagateNulls, UnknownNullCountWithNullsZeroCopies) {
+  ArrayData input(boolean(), /*length=*/16, kUnknownNullCount);
+  constexpr uint8_t validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
+  std::shared_ptr<Buffer> nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+  input.buffers.push_back(nulls);
+  FunctionContext ctx(default_memory_pool());
+  ArrayData output;
+
+  ASSERT_OK(PropagateNulls(&ctx, input, &output));
+
+  ASSERT_THAT(output.buffers, ElementsAre(Eq(nulls)));
+  ASSERT_THAT(output.null_count, 9);
+}
+
+TEST(PropagateNulls, UnknownNullCountWithoutNullsLeavesNullptr) {
+  ArrayData input(boolean(), /*length=*/16, kUnknownNullCount);
+  constexpr uint8_t validity_bitmap[8] = {255, 255, 0, 0, 0, 0, 0, 0};
+  std::shared_ptr<Buffer> nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+  input.buffers.push_back(nulls);
+  FunctionContext ctx(default_memory_pool());
+  ArrayData output;
+
+  ASSERT_OK(PropagateNulls(&ctx, input, &output));
+
+  EXPECT_THAT(output.null_count, Eq(0));
+  EXPECT_THAT(output.buffers, ElementsAre(IsNull())) << output.buffers[0]->data()[0];
+}
+
+TEST(PropagateNulls, OffsetAndHasNulls) {
+  ArrayData input(boolean(), /*length=*/16, kUnknownNullCount,  // slice off the first 8
+                  /*offset=*/7);
+  constexpr uint8_t validity_bitmap[8] = {0, 1, 0, 0, 0, 0, 0, 0};
+  std::shared_ptr<Buffer> nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+  input.buffers.push_back(nulls);
+  FunctionContext ctx(default_memory_pool());
+  ArrayData output;
+
+  ASSERT_OK(PropagateNulls(&ctx, input, &output));
+
+  // Copy is made.
+  EXPECT_THAT(output.null_count, Eq(15));
+  ASSERT_THAT(output.buffers, ElementsAre(AllOf(Ne(nulls), NotNull())));
+  const auto& output_buffer = *output.buffers[0];
+  // the slice shifts the bit one over
+  ASSERT_THAT(std::vector<uint8_t>(output_buffer.data(),
+                                   output_buffer.data() + output_buffer.size()),
+              ElementsAreArray({2, 0}));
+  ASSERT_THAT(std::vector<uint8_t>(output_buffer.data() + output_buffer.size(),
+                                   output_buffer.data() + output_buffer.capacity()),
+              Each(0));
+}
+
+TEST(AssignNullIntersection, ZeroCopyWhenZeroNullsOnOneInput) {
+  ArrayData some_nulls(boolean(), /* length= */ 16, kUnknownNullCount);
+  constexpr uint8_t validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
+  std::shared_ptr<Buffer> nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+  some_nulls.buffers.push_back(nulls);
+
+  ArrayData no_nulls(boolean(), /* length= */ 16, /*null_count=*/0);
+
+  FunctionContext ctx(default_memory_pool());
+  ArrayData output;
+  output.length = 16;
+
+  ASSERT_OK(AssignNullIntersection(&ctx, some_nulls, no_nulls, &output));
+  ASSERT_THAT(output.buffers, ElementsAre(Eq(nulls)));
+  ASSERT_THAT(output.null_count, 9);
+
+  output.buffers[0] = nullptr;
+  ASSERT_OK(AssignNullIntersection(&ctx, no_nulls, some_nulls, &output));
+  ASSERT_THAT(output.buffers, ElementsAre(Eq(nulls)));
+  ASSERT_THAT(output.null_count, 9);
+}
+
+TEST(AssignNullIntersection, IntersectsNullsWhenSomeOnBoth) {
+  ArrayData left(boolean(), /* length= */ 16, kUnknownNullCount);
+  constexpr uint8_t left_validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
+  std::shared_ptr<Buffer> left_nulls = std::make_shared<Buffer>(left_validity_bitmap, 8);
+  left.buffers.push_back(left_nulls);
+
+  ArrayData right(boolean(), /* length= */ 16, kUnknownNullCount);
+  constexpr uint8_t right_validity_bitmap[8] = {127, 0, 0, 0, 0, 0, 0, 0};
+  std::shared_ptr<Buffer> right_nulls =
+      std::make_shared<Buffer>(right_validity_bitmap, 8);
+  right.buffers.push_back(right_nulls);
+
+  FunctionContext ctx(default_memory_pool());
+  ArrayData output;
+  output.length = 16;
+
+  ASSERT_OK(AssignNullIntersection(&ctx, left, right, &output));
+
+  EXPECT_THAT(output.null_count, 10);
+  ASSERT_THAT(output.buffers, ElementsAre(NotNull()));
+  const auto& output_buffer = *output.buffers[0];
+  EXPECT_THAT(std::vector<uint8_t>(output_buffer.data(),
+                                   output_buffer.data() + output_buffer.size()),
+              ElementsAreArray({126, 0}));
+  EXPECT_THAT(std::vector<uint8_t>(output_buffer.data() + output_buffer.size(),
+                                   output_buffer.data() + output_buffer.capacity()),
+              Each(0));
+}
+
+TEST(PrimitiveAllocatingUnaryKernel, BooleanFunction) {
+  MockUnaryKernel mock;
+  EXPECT_CALL(mock, out_type).WillRepeatedly(Return(boolean()));
+  EXPECT_CALL(mock, Call(_, _, _)).WillOnce(Return(Status::OK()));
+  PrimitiveAllocatingUnaryKernel kernel(&mock);
+
+  auto input =
+      std::make_shared<ArrayData>(boolean(), /* length= */ 16, kUnknownNullCount);
+  FunctionContext ctx(default_memory_pool());
+  Datum output;
+  output.value = ArrayData::Make(kernel.out_type(), input->length);
+  ASSERT_OK(kernel.Call(&ctx, input, &output));
+
+  ASSERT_THAT(output.array()->buffers, ElementsAre(IsNull(), NotNull()));
+  auto value_buffer = output.array()->buffers[1];
+  EXPECT_THAT(value_buffer->size(), Eq(2));
+  EXPECT_THAT(value_buffer->capacity(), Ge(2));
+  // Booleans should have this always zeroed out.
+  EXPECT_THAT(*(value_buffer->data() + value_buffer->size() - 1), Eq(0));
+}
+
+TEST(PrimitiveAllocatingUnaryKernel, NonBoolean) {
+  MockUnaryKernel mock;
+  EXPECT_CALL(mock, out_type).WillRepeatedly(Return(int32()));
+  EXPECT_CALL(mock, Call(_, _, _)).WillOnce(Return(Status::OK()));
+  PrimitiveAllocatingUnaryKernel kernel(&mock);
+
+  auto input =
+      std::make_shared<ArrayData>(boolean(), /* length= */ 16, kUnknownNullCount);
+  FunctionContext ctx(default_memory_pool());
+  Datum output;
+  output.value = ArrayData::Make(kernel.out_type(), input->length);
+  ASSERT_OK(kernel.Call(&ctx, input, &output));
+
+  ASSERT_THAT(output.array()->buffers, ElementsAre(IsNull(), NotNull()));
+  auto value_buffer = output.array()->buffers[1];
+  EXPECT_THAT(value_buffer->size(), Eq(64));
+  EXPECT_THAT(value_buffer->capacity(), Ge(64));
+}
+
+TEST(PrimitiveAllocatingBinaryKernel, BooleanFunction) {
+  MockBinaryKernel mock;
+  EXPECT_CALL(mock, out_type).WillRepeatedly(Return(boolean()));
+  EXPECT_CALL(mock, Call(_, _, _, _)).WillOnce(Return(Status::OK()));
+  PrimitiveAllocatingBinaryKernel kernel(&mock);
+
+  auto input =
+      std::make_shared<ArrayData>(boolean(), /* length= */ 16, kUnknownNullCount);
+  FunctionContext ctx(default_memory_pool());
+  Datum output;
+  output.value = ArrayData::Make(kernel.out_type(), input->length);
+  ASSERT_OK(kernel.Call(&ctx, input, input, &output));
+
+  ASSERT_THAT(output.array()->buffers, ElementsAre(IsNull(), NotNull()));
+  auto value_buffer = output.array()->buffers[1];
+  EXPECT_THAT(value_buffer->size(), Eq(2));
+  EXPECT_THAT(value_buffer->capacity(), Ge(2));
+  // Booleans should have this always zeroed out.
+  EXPECT_THAT(*(value_buffer->data() + value_buffer->size() - 1), Eq(0));
+}
+
+TEST(PrimitiveAllocatingBinaryKernel, NonBoolean) {
+  MockBinaryKernel mock;
+  EXPECT_CALL(mock, out_type).WillRepeatedly(Return(int32()));
+  EXPECT_CALL(mock, Call(_, _, _, _)).WillOnce(Return(Status::OK()));
+  PrimitiveAllocatingBinaryKernel kernel(&mock);
+
+  auto input =
+      std::make_shared<ArrayData>(boolean(), /* length= */ 16, kUnknownNullCount);
+  FunctionContext ctx(default_memory_pool());
+  Datum output;
+  output.value = ArrayData::Make(kernel.out_type(), input->length);
+  ASSERT_OK(kernel.Call(&ctx, input, input, &output));
+
+  ASSERT_THAT(output.array()->buffers, ElementsAre(IsNull(), NotNull()));
+  auto value_buffer = output.array()->buffers[1];
+  EXPECT_THAT(value_buffer->size(), Eq(64));
+  EXPECT_THAT(value_buffer->capacity(), Ge(64));
+}
+
+}  // namespace detail
+}  // namespace compute
+}  // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/util-internal.cc b/cpp/src/arrow/compute/kernels/util-internal.cc
index 60b668d..174b688 100644
--- a/cpp/src/arrow/compute/kernels/util-internal.cc
+++ b/cpp/src/arrow/compute/kernels/util-internal.cc
@@ -26,6 +26,7 @@
 #include "arrow/array.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
+#include "arrow/util/bit-util.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/logging.h"
 
@@ -34,11 +35,47 @@
 
 namespace arrow {
 
+using internal::BitmapAnd;
 using internal::checked_cast;
 
 namespace compute {
 namespace detail {
 
+namespace {
+
+inline void ZeroLastByte(Buffer* buffer) {
+  *(buffer->mutable_data() + (buffer->size() - 1)) = 0;
+}
+
+Status AllocateValueBuffer(FunctionContext* ctx, const DataType& type, int64_t length,
+                           std::shared_ptr<Buffer>* buffer) {
+  if (type.id() != Type::NA) {
+    const auto& fw_type = checked_cast<const FixedWidthType&>(type);
+
+    int bit_width = fw_type.bit_width();
+    int64_t buffer_size = 0;
+
+    if (bit_width == 1) {
+      buffer_size = BitUtil::BytesForBits(length);
+    } else {
+      DCHECK_EQ(bit_width % 8, 0)
+          << "Only bit widths with multiple of 8 are currently supported";
+      buffer_size = length * fw_type.bit_width() / 8;
+    }
+    RETURN_NOT_OK(ctx->Allocate(buffer_size, buffer));
+
+    if (bit_width == 1 && buffer_size > 0) {
+      // Some utility methods access the last byte before it might be
+      // initialized this makes valgrind/asan unhappy, so we proactively
+      // zero it.
+      ZeroLastByte(buffer->get());
+    }
+  }
+  return Status::OK();
+}
+
+}  // namespace
+
 Status InvokeUnaryArrayKernel(FunctionContext* ctx, UnaryKernel* kernel,
                               const Datum& value, std::vector<Datum>* outputs) {
   if (value.kind() == Datum::ARRAY) {
@@ -90,7 +127,6 @@ Status InvokeBinaryArrayKernel(FunctionContext* ctx, BinaryKernel* kernel,
   if (right_length != left_length) {
     return Status::Invalid("Right and left have different lengths");
   }
-
   // TODO: Remove duplication with ChunkedArray::Equals
   int left_chunk_idx = 0;
   int64_t left_start_idx = 0;
@@ -98,20 +134,20 @@ Status InvokeBinaryArrayKernel(FunctionContext* ctx, BinaryKernel* kernel,
   int64_t right_start_idx = 0;
 
   int64_t elements_compared = 0;
-  while (elements_compared < left_length) {
+  do {
     const std::shared_ptr<Array> left_array = left_arrays[left_chunk_idx];
     const std::shared_ptr<Array> right_array = right_arrays[right_chunk_idx];
     int64_t common_length = std::min(left_array->length() - left_start_idx,
                                      right_array->length() - right_start_idx);
-
     std::shared_ptr<Array> left_op = left_array->Slice(left_start_idx, common_length);
     std::shared_ptr<Array> right_op = right_array->Slice(right_start_idx, common_length);
+
     Datum output;
-    RETURN_NOT_OK(kernel->Call(ctx, Datum(left_op), Datum(right_op), &output));
+    output.value = ArrayData::Make(kernel->out_type(), common_length);
+    RETURN_NOT_OK(kernel->Call(ctx, left_op, right_op, &output));
     outputs->push_back(output);
 
     elements_compared += common_length;
-
     // If we have exhausted the current chunk, proceed to the next one individually.
     if (left_start_idx + common_length == left_array->length()) {
       left_chunk_idx++;
@@ -126,8 +162,7 @@ Status InvokeBinaryArrayKernel(FunctionContext* ctx, BinaryKernel* kernel,
     } else {
       right_start_idx += common_length;
     }
-  }
-
+  } while (elements_compared < left_length);
   return Status::OK();
 }
 
@@ -170,23 +205,11 @@ Datum WrapDatumsLike(const Datum& value, const std::vector<Datum>& datums) {
   }
 }
 
-PrimitiveAllocatingUnaryKernel::PrimitiveAllocatingUnaryKernel(
-    UnaryKernel* delegate, const std::shared_ptr<DataType>& out_type)
-    : delegate_(delegate), out_type_(out_type) {}
-
-PrimitiveAllocatingUnaryKernel::PrimitiveAllocatingUnaryKernel(
-    std::unique_ptr<UnaryKernel> delegate, const std::shared_ptr<DataType>& out_type)
-    : PrimitiveAllocatingUnaryKernel(delegate.get(), out_type) {
-  owned_delegate_ = std::move(delegate);
-}
-
-inline void ZeroLastByte(Buffer* buffer) {
-  *(buffer->mutable_data() + (buffer->size() - 1)) = 0;
-}
+PrimitiveAllocatingUnaryKernel::PrimitiveAllocatingUnaryKernel(UnaryKernel* delegate)
+    : delegate_(delegate) {}
 
 Status PropagateNulls(FunctionContext* ctx, const ArrayData& input, ArrayData* output) {
   const int64_t length = input.length;
-
   if (output->buffers.size() == 0) {
     // Ensure we can assign a buffer
     output->buffers.resize(1);
@@ -207,12 +230,36 @@ Status PropagateNulls(FunctionContext* ctx, const ArrayData& input, ArrayData* o
     internal::CopyBitmap(validity_bitmap.data(), input.offset, length,
                          buffer->mutable_data(), 0 /* destination offset */);
     output->buffers[0] = std::move(buffer);
-  } else {
+  } else if (output->null_count > 0) {
     output->buffers[0] = input.buffers[0];
   }
   return Status::OK();
 }
 
+Status AssignNullIntersection(FunctionContext* ctx, const ArrayData& left,
+                              const ArrayData& right, ArrayData* output) {
+  if (output->buffers.size() == 0) {
+    // Ensure we can assign a buffer
+    output->buffers.resize(1);
+  }
+
+  if (left.GetNullCount() > 0 && right.GetNullCount() > 0) {
+    RETURN_NOT_OK(BitmapAnd(ctx->memory_pool(), left.buffers[0]->data(), left.offset,
+                            right.buffers[0]->data(), right.offset, right.length, 0,
+                            &(output->buffers[0])));
+    // Force computation of null count.
+    output->null_count = kUnknownNullCount;
+    output->GetNullCount();
+    return Status::OK();
+  } else if (left.null_count != 0) {
+    return PropagateNulls(ctx, left, output);
+  } else {
+    // right has a positive null_count or both are zero.
+    return PropagateNulls(ctx, right, output);
+  }
+  return Status::OK();
+}
+
 Status PrimitiveAllocatingUnaryKernel::Call(FunctionContext* ctx, const Datum& input,
                                             Datum* out) {
   std::vector<std::shared_ptr<Buffer>> data_buffers;
@@ -225,39 +272,40 @@ Status PrimitiveAllocatingUnaryKernel::Call(FunctionContext* ctx, const Datum& i
   result->buffers.resize(2);
 
   const int64_t length = in_data.length;
-
   // Allocate the value buffer
-  std::shared_ptr<Buffer> buffer;
-  if (out_type_->id() != Type::NA) {
-    const auto& fw_type = checked_cast<const FixedWidthType&>(*out_type_);
+  RETURN_NOT_OK(AllocateValueBuffer(ctx, *out_type(), length, &(result->buffers[1])));
+  return delegate_->Call(ctx, input, out);
+}
 
-    int bit_width = fw_type.bit_width();
-    int64_t buffer_size = 0;
+std::shared_ptr<DataType> PrimitiveAllocatingUnaryKernel::out_type() const {
+  return delegate_->out_type();
+}
 
-    if (bit_width == 1) {
-      buffer_size = BitUtil::BytesForBits(length);
-    } else {
-      DCHECK_EQ(bit_width % 8, 0)
-          << "Only bit widths with multiple of 8 are currently supported";
-      buffer_size = length * fw_type.bit_width() / 8;
-    }
-    RETURN_NOT_OK(ctx->Allocate(buffer_size, &buffer));
-    buffer->ZeroPadding();
+PrimitiveAllocatingBinaryKernel::PrimitiveAllocatingBinaryKernel(BinaryKernel* delegate)
+    : delegate_(delegate) {}
 
-    if (bit_width == 1 && buffer_size > 0) {
-      // Some utility methods access the last byte before it might be
-      // initialized this makes valgrind/asan unhappy, so we proactively
-      // zero it.
-      ZeroLastByte(buffer.get());
-    }
+Status PrimitiveAllocatingBinaryKernel::Call(FunctionContext* ctx, const Datum& left,
+                                             const Datum& right, Datum* out) {
+  std::vector<std::shared_ptr<Buffer>> data_buffers;
+  DCHECK_EQ(left.kind(), Datum::ARRAY);
+  DCHECK_EQ(right.kind(), Datum::ARRAY);
+  const ArrayData& left_data = *left.array();
+  DCHECK_EQ(left_data.length, right.array()->length);
 
-    memset(buffer->mutable_data(), 0, buffer_size);
-    result->buffers[1] = std::move(buffer);
-  }
-  return delegate_->Call(ctx, input, out);
+  DCHECK_EQ(out->kind(), Datum::ARRAY);
+
+  ArrayData* result = out->array().get();
+
+  result->buffers.resize(2);
+
+  const int64_t length = left_data.length;
+  RETURN_NOT_OK(AllocateValueBuffer(ctx, *out_type(), length, &(result->buffers[1])));
+
+  // Allocate the value buffer
+  return delegate_->Call(ctx, left, right, out);
 }
 
-std::shared_ptr<DataType> PrimitiveAllocatingUnaryKernel::out_type() const {
+std::shared_ptr<DataType> PrimitiveAllocatingBinaryKernel::out_type() const {
   return delegate_->out_type();
 }
 
diff --git a/cpp/src/arrow/compute/kernels/util-internal.h b/cpp/src/arrow/compute/kernels/util-internal.h
index bd27280..4cb7a24 100644
--- a/cpp/src/arrow/compute/kernels/util-internal.h
+++ b/cpp/src/arrow/compute/kernels/util-internal.h
@@ -64,14 +64,25 @@ Status InvokeBinaryArrayKernel(FunctionContext* ctx, BinaryKernel* kernel,
 
 /// \brief Assign validity bitmap to output, copying bitmap if necessary, but
 /// zero-copy otherwise, so that the same value slots are valid/not-null in the
-/// output
-/// (sliced arrays)
+/// output (sliced arrays).
+///
 /// \param[in] ctx the kernel FunctionContext
 /// \param[in] input the input array
-/// \param[out] output the output array
+/// \param[out] output the output array.  Must have length set correctly.
 ARROW_EXPORT
 Status PropagateNulls(FunctionContext* ctx, const ArrayData& input, ArrayData* output);
 
+/// \brief Assign validity bitmap to output, taking the intersection of left and right
+/// null bitmaps if necessary, but zero-copy otherwise.
+///
+/// \param[in] ctx the kernel FunctionContext
+/// \param[in] left the left operand
+/// \param[in] right the right operand
+/// \param[out] output the output array. Must have length set correctly.
+ARROW_EXPORT
+Status AssignNullIntersection(FunctionContext* ctx, const ArrayData& left,
+                              const ArrayData& right, ArrayData* output);
+
 ARROW_EXPORT
 Datum WrapArraysLike(const Datum& value,
                      const std::vector<std::shared_ptr<Array>>& arrays);
@@ -79,13 +90,14 @@ Datum WrapArraysLike(const Datum& value,
 ARROW_EXPORT
 Datum WrapDatumsLike(const Datum& value, const std::vector<Datum>& datums);
 
-/// \brief Kernel used to preallocate outputs for primitive types.
-class PrimitiveAllocatingUnaryKernel : public UnaryKernel {
+/// \brief Kernel used to preallocate outputs for primitive types. This
+/// does not include allocations for the validity bitmap (PropagateNulls
+/// should be used for that).
+class ARROW_EXPORT PrimitiveAllocatingUnaryKernel : public UnaryKernel {
  public:
-  PrimitiveAllocatingUnaryKernel(std::unique_ptr<UnaryKernel> delegate,
-                                 const std::shared_ptr<DataType>& out_type);
-  PrimitiveAllocatingUnaryKernel(UnaryKernel* delegate,
-                                 const std::shared_ptr<DataType>& out_type);
+  // \brief Construct with a delegate that must live longer
+  // then this object.
+  explicit PrimitiveAllocatingUnaryKernel(UnaryKernel* delegate);
   /// \brief Allocates ArrayData with the necessary data buffers allocated and
   /// then written into by the delegate kernel
   Status Call(FunctionContext* ctx, const Datum& input, Datum* out) override;
@@ -94,8 +106,26 @@ class PrimitiveAllocatingUnaryKernel : public UnaryKernel {
 
  private:
   UnaryKernel* delegate_;
-  std::shared_ptr<DataType> out_type_;
-  std::unique_ptr<UnaryKernel> owned_delegate_;
+};
+
+/// \brief Kernel used to preallocate outputs for primitive types.
+class ARROW_EXPORT PrimitiveAllocatingBinaryKernel : public BinaryKernel {
+ public:
+  // \brief Construct with a kernel to delegate operatoions to.
+  //
+  // Ownership is not taken of the delegate kernel, it must outlive
+  // the life time of this object.
+  explicit PrimitiveAllocatingBinaryKernel(BinaryKernel* delegate);
+
+  /// \brief Sets out to be of type ArrayData with the necessary
+  /// data buffers prepopulated.
+  Status Call(FunctionContext* ctx, const Datum& left, const Datum& right,
+              Datum* out) override;
+
+  std::shared_ptr<DataType> out_type() const override;
+
+ private:
+  BinaryKernel* delegate_;
 };
 
 }  // namespace detail
diff --git a/cpp/src/arrow/compute/test-util.h b/cpp/src/arrow/compute/test-util.h
index 1e7f872..e90a034 100644
--- a/cpp/src/arrow/compute/test-util.h
+++ b/cpp/src/arrow/compute/test-util.h
@@ -46,11 +46,14 @@ class ComputeFixture {
 class MockUnaryKernel : public UnaryKernel {
  public:
   MOCK_METHOD3(Call, Status(FunctionContext* ctx, const Datum& input, Datum* out));
+  MOCK_CONST_METHOD0(out_type, std::shared_ptr<DataType>());
 };
 
 class MockBinaryKernel : public BinaryKernel {
+ public:
   MOCK_METHOD4(Call, Status(FunctionContext* ctx, const Datum& left, const Datum& right,
                             Datum* out));
+  MOCK_CONST_METHOD0(out_type, std::shared_ptr<DataType>());
 };
 
 template <typename Type, typename T>