You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/02/25 20:53:35 UTC
[arrow] branch master updated: ARROW-3133: [C++] Remove allocation
from Binary Boolean Kernels.
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 4910fbf ARROW-3133: [C++] Remove allocation from Binary Boolean Kernels.
4910fbf is described below
commit 4910fbf4fda05b864daaba820db08291e4afdcb6
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Mon Feb 25 14:53:15 2019 -0600
ARROW-3133: [C++] Remove allocation from Binary Boolean Kernels.
Also:
- ARROW-3135: add a helper for validity bitmap propagation
- ARROW-4572: remove memory zeroing in Allocator kernel
- Fix regression in InvertKernel where nulls were not being handled (add a unit test to cover this case)
- Add some unit tests for util-internal.h
- lift out_type to OpKernel
Author: Micah Kornfield <em...@gmail.com>
Closes #3731 from emkornfield/bin_kern2 and squashes the following commits:
68af8ab5 <Micah Kornfield> Fix valgrind
63b1e8c2 <Micah Kornfield> ARROW-3133: Remove allocation from Binary Boolean Kernels.
---
cpp/src/arrow/compute/kernel.h | 7 +-
cpp/src/arrow/compute/kernels/CMakeLists.txt | 1 +
cpp/src/arrow/compute/kernels/boolean-test.cc | 33 ++-
cpp/src/arrow/compute/kernels/boolean.cc | 57 +++--
cpp/src/arrow/compute/kernels/boolean.h | 6 +-
cpp/src/arrow/compute/kernels/cast.cc | 5 +-
.../arrow/compute/kernels/util-internal-test.cc | 232 +++++++++++++++++++++
cpp/src/arrow/compute/kernels/util-internal.cc | 144 ++++++++-----
cpp/src/arrow/compute/kernels/util-internal.h | 52 ++++-
cpp/src/arrow/compute/test-util.h | 3 +
10 files changed, 430 insertions(+), 110 deletions(-)
diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h
index b8033eb..58d7dc0 100644
--- a/cpp/src/arrow/compute/kernel.h
+++ b/cpp/src/arrow/compute/kernel.h
@@ -54,6 +54,9 @@ class FunctionContext;
class ARROW_EXPORT OpKernel {
public:
virtual ~OpKernel() = default;
+ /// \brief EXPERIMENTAL The output data type of the kernel
+ /// \return the output type
+ virtual std::shared_ptr<DataType> out_type() const = 0;
};
/// \class Datum
@@ -188,10 +191,6 @@ class ARROW_EXPORT UnaryKernel : public OpKernel {
/// there will be a more generic mechansim for understanding the necessary
/// contracts.
virtual Status Call(FunctionContext* ctx, const Datum& input, Datum* out) = 0;
-
- /// \brief EXPERIMENTAL The output data type of the kernel
- /// \return the output type
- virtual std::shared_ptr<DataType> out_type() const = 0;
};
/// \class BinaryKernel
diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt
index 0fee74d..5d78747 100644
--- a/cpp/src/arrow/compute/kernels/CMakeLists.txt
+++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt
@@ -20,6 +20,7 @@ arrow_install_all_headers("arrow/compute/kernels")
add_arrow_test(boolean-test PREFIX "arrow-compute")
add_arrow_test(cast-test PREFIX "arrow-compute")
add_arrow_test(hash-test PREFIX "arrow-compute")
+add_arrow_test(util-internal-test PREFIX "arrow-compute")
# Aggregates
add_arrow_test(aggregate-test PREFIX "arrow-compute")
diff --git a/cpp/src/arrow/compute/kernels/boolean-test.cc b/cpp/src/arrow/compute/kernels/boolean-test.cc
index e6277c0..439e0db 100644
--- a/cpp/src/arrow/compute/kernels/boolean-test.cc
+++ b/cpp/src/arrow/compute/kernels/boolean-test.cc
@@ -49,7 +49,7 @@ class TestBooleanKernel : public ComputeFixture, public TestBase {
ASSERT_OK(kernel(&this->ctx_, left, right, &result));
ASSERT_EQ(Datum::ARRAY, result.kind());
std::shared_ptr<Array> result_array = result.make_array();
- ASSERT_TRUE(result_array->Equals(expected));
+ ASSERT_ARRAYS_EQUAL(*expected, *result_array);
}
void TestChunkedArrayBinary(const BinaryKernelFunc& kernel,
@@ -99,25 +99,23 @@ class TestBooleanKernel : public ComputeFixture, public TestBase {
};
TEST_F(TestBooleanKernel, Invert) {
- vector<bool> values1 = {true, false, true};
- vector<bool> values2 = {false, true, false};
+ vector<bool> values1 = {true, false, true, false};
+ vector<bool> values2 = {false, true, false, true};
auto type = boolean();
- auto a1 = _MakeArray<BooleanType, bool>(type, values1, {});
- auto a2 = _MakeArray<BooleanType, bool>(type, values2, {});
+ auto a1 = _MakeArray<BooleanType, bool>(type, values1, {true, true, true, false});
+ auto a2 = _MakeArray<BooleanType, bool>(type, values2, {true, true, true, false});
// Plain array
Datum result;
ASSERT_OK(Invert(&this->ctx_, a1, &result));
ASSERT_EQ(Datum::ARRAY, result.kind());
- std::shared_ptr<Array> result_array = result.make_array();
- ASSERT_TRUE(result_array->Equals(a2));
+ ASSERT_ARRAYS_EQUAL(*a2, *result.make_array());
// Array with offset
ASSERT_OK(Invert(&this->ctx_, a1->Slice(1), &result));
ASSERT_EQ(Datum::ARRAY, result.kind());
- result_array = result.make_array();
- ASSERT_TRUE(result_array->Equals(a2->Slice(1)));
+ ASSERT_ARRAYS_EQUAL(*a2->Slice(1), *result.make_array());
// ChunkedArray
std::vector<std::shared_ptr<Array>> ca1_arrs = {a1, a1->Slice(1)};
@@ -127,7 +125,7 @@ TEST_F(TestBooleanKernel, Invert) {
ASSERT_OK(Invert(&this->ctx_, ca1, &result));
ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
std::shared_ptr<ChunkedArray> result_ca = result.chunked_array();
- ASSERT_TRUE(result_ca->Equals(ca2));
+ ASSERT_ARRAYS_EQUAL(*ca2, *result_ca);
}
TEST_F(TestBooleanKernel, InvertEmptyArray) {
@@ -139,7 +137,20 @@ TEST_F(TestBooleanKernel, InvertEmptyArray) {
Datum result;
ASSERT_OK(Invert(&this->ctx_, input, &result));
- ASSERT_TRUE(result.make_array()->Equals(input.make_array()));
+ ASSERT_ARRAYS_EQUAL(*input.make_array(), *result.make_array());
+}
+
+TEST_F(TestBooleanKernel, BinaryOpOnEmptyArray) {
+ auto type = boolean();
+ std::vector<std::shared_ptr<Buffer>> data_buffers(2);
+ Datum input;
+ input.value = ArrayData::Make(boolean(), 0 /* length */, std::move(data_buffers),
+ 0 /* null_count */);
+
+ Datum result;
+ ASSERT_OK(And(&this->ctx_, input, input, &result));
+ // Result should be empty as well.
+ ASSERT_ARRAYS_EQUAL(*input.make_array(), *result.make_array());
}
TEST_F(TestBooleanKernel, And) {
diff --git a/cpp/src/arrow/compute/kernels/boolean.cc b/cpp/src/arrow/compute/kernels/boolean.cc
index 7d8b15a..2209dc9 100644
--- a/cpp/src/arrow/compute/kernels/boolean.cc
+++ b/cpp/src/arrow/compute/kernels/boolean.cc
@@ -55,6 +55,7 @@ class InvertKernel : public BooleanUnaryKernel {
// Handle output data buffer
if (in_data.length > 0) {
+ RETURN_NOT_OK(detail::PropagateNulls(ctx, in_data, result.get()));
const Buffer& data_buffer = *in_data.buffers[1];
DCHECK_LE(BitUtil::BytesForBits(in_data.length), data_buffer.size());
InvertBitmap(data_buffer.data(), in_data.offset, in_data.length,
@@ -65,8 +66,8 @@ class InvertKernel : public BooleanUnaryKernel {
};
Status Invert(FunctionContext* ctx, const Datum& value, Datum* out) {
- detail::PrimitiveAllocatingUnaryKernel kernel(
- std::unique_ptr<UnaryKernel>(new InvertKernel()), boolean());
+ InvertKernel invert;
+ detail::PrimitiveAllocatingUnaryKernel kernel(&invert);
std::vector<Datum> result;
RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, &kernel, value, &result));
@@ -86,73 +87,65 @@ class BinaryBooleanKernel : public BinaryKernel {
const ArrayData& left_data = *left.array();
const ArrayData& right_data = *right.array();
+ DCHECK_EQ(left_data.length, right_data.length);
ArrayData* result;
- out->value = ArrayData::Make(boolean(), right_data.length);
result = out->array().get();
-
- // If one of the arrays has a null value, the result will have a null.
- std::shared_ptr<Buffer> validity_bitmap;
- RETURN_NOT_OK(BitmapAnd(ctx->memory_pool(), left_data.buffers[0]->data(),
- left_data.offset, right_data.buffers[0]->data(),
- right_data.offset, right_data.length, 0, &validity_bitmap));
- result->buffers.push_back(validity_bitmap);
-
- result->null_count =
- result->length - CountSetBits(validity_bitmap->data(), 0, result->length);
-
+ RETURN_NOT_OK(detail::AssignNullIntersection(ctx, left_data, right_data, result));
return Compute(ctx, left_data, right_data, result);
}
+
+ std::shared_ptr<DataType> out_type() const override { return boolean(); }
};
class AndKernel : public BinaryBooleanKernel {
Status Compute(FunctionContext* ctx, const ArrayData& left, const ArrayData& right,
ArrayData* out) override {
- std::shared_ptr<Buffer> data_bitmap;
- RETURN_NOT_OK(BitmapAnd(ctx->memory_pool(), left.buffers[1]->data(), left.offset,
- right.buffers[1]->data(), right.offset, right.length, 0,
- &data_bitmap));
- out->buffers.push_back(data_bitmap);
+ if (right.length > 0) {
+ BitmapAnd(left.buffers[1]->data(), left.offset, right.buffers[1]->data(),
+ right.offset, right.length, 0, out->buffers[1]->mutable_data());
+ }
return Status::OK();
}
};
Status And(FunctionContext* ctx, const Datum& left, const Datum& right, Datum* out) {
- AndKernel kernel;
+ AndKernel and_kernel;
+ detail::PrimitiveAllocatingBinaryKernel kernel(&and_kernel);
return detail::InvokeBinaryArrayKernel(ctx, &kernel, left, right, out);
}
class OrKernel : public BinaryBooleanKernel {
Status Compute(FunctionContext* ctx, const ArrayData& left, const ArrayData& right,
ArrayData* out) override {
- std::shared_ptr<Buffer> data_bitmap;
- RETURN_NOT_OK(BitmapOr(ctx->memory_pool(), left.buffers[1]->data(), left.offset,
- right.buffers[1]->data(), right.offset, right.length, 0,
- &data_bitmap));
- out->buffers.push_back(data_bitmap);
+ if (right.length > 0) {
+ BitmapOr(left.buffers[1]->data(), left.offset, right.buffers[1]->data(),
+ right.offset, right.length, 0, out->buffers[1]->mutable_data());
+ }
return Status::OK();
}
};
Status Or(FunctionContext* ctx, const Datum& left, const Datum& right, Datum* out) {
- OrKernel kernel;
+ OrKernel or_kernel;
+ detail::PrimitiveAllocatingBinaryKernel kernel(&or_kernel);
return detail::InvokeBinaryArrayKernel(ctx, &kernel, left, right, out);
}
class XorKernel : public BinaryBooleanKernel {
Status Compute(FunctionContext* ctx, const ArrayData& left, const ArrayData& right,
ArrayData* out) override {
- std::shared_ptr<Buffer> data_bitmap;
- RETURN_NOT_OK(BitmapXor(ctx->memory_pool(), left.buffers[1]->data(), left.offset,
- right.buffers[1]->data(), right.offset, right.length, 0,
- &data_bitmap));
- out->buffers.push_back(data_bitmap);
+ if (right.length > 0) {
+ BitmapXor(left.buffers[1]->data(), left.offset, right.buffers[1]->data(),
+ right.offset, right.length, 0, out->buffers[1]->mutable_data());
+ }
return Status::OK();
}
};
Status Xor(FunctionContext* ctx, const Datum& left, const Datum& right, Datum* out) {
- XorKernel kernel;
+ XorKernel xor_kernel;
+ detail::PrimitiveAllocatingBinaryKernel kernel(&xor_kernel);
return detail::InvokeBinaryArrayKernel(ctx, &kernel, left, right, out);
}
diff --git a/cpp/src/arrow/compute/kernels/boolean.h b/cpp/src/arrow/compute/kernels/boolean.h
index 88f5ad1..fb88659 100644
--- a/cpp/src/arrow/compute/kernels/boolean.h
+++ b/cpp/src/arrow/compute/kernels/boolean.h
@@ -37,7 +37,7 @@ class FunctionContext;
ARROW_EXPORT
Status Invert(FunctionContext* context, const Datum& value, Datum* out);
-/// \brief Element-wise AND of two boolean dates
+/// \brief Element-wise AND of two boolean datums
/// \param[in] context the FunctionContext
/// \param[in] left left operand (array)
/// \param[in] right right operand (array)
@@ -48,7 +48,7 @@ Status Invert(FunctionContext* context, const Datum& value, Datum* out);
ARROW_EXPORT
Status And(FunctionContext* context, const Datum& left, const Datum& right, Datum* out);
-/// \brief Element-wise OR of two boolean dates
+/// \brief Element-wise OR of two boolean datums
/// \param[in] context the FunctionContext
/// \param[in] left left operand (array)
/// \param[in] right right operand (array)
@@ -59,7 +59,7 @@ Status And(FunctionContext* context, const Datum& left, const Datum& right, Datu
ARROW_EXPORT
Status Or(FunctionContext* context, const Datum& left, const Datum& right, Datum* out);
-/// \brief Element-wise XOR of two boolean dates
+/// \brief Element-wise XOR of two boolean datums
/// \param[in] context the FunctionContext
/// \param[in] left left operand (array)
/// \param[in] right right operand (array)
diff --git a/cpp/src/arrow/compute/kernels/cast.cc b/cpp/src/arrow/compute/kernels/cast.cc
index 97601b5..2ebc097 100644
--- a/cpp/src/arrow/compute/kernels/cast.cc
+++ b/cpp/src/arrow/compute/kernels/cast.cc
@@ -531,6 +531,9 @@ struct CastFunctor<Date64Type, TimestampType> {
ShiftTime<int64_t, int64_t>(ctx, options, conversion.first, conversion.second, input,
output);
+ if (!ctx->status().ok()) {
+ return;
+ }
// Ensure that intraday milliseconds have been zeroed out
auto out_data = output->GetMutableValues<int64_t>(1);
@@ -634,7 +637,7 @@ Status InvokeWithAllocation(FunctionContext* ctx, UnaryKernel* func, const Datum
std::vector<Datum> result;
if (NeedToPreallocate(*func->out_type())) {
// Create wrapper that allocates output memory for primitive types
- detail::PrimitiveAllocatingUnaryKernel wrapper(func, func->out_type());
+ detail::PrimitiveAllocatingUnaryKernel wrapper(func);
RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, &wrapper, input, &result));
} else {
RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, func, input, &result));
diff --git a/cpp/src/arrow/compute/kernels/util-internal-test.cc b/cpp/src/arrow/compute/kernels/util-internal-test.cc
new file mode 100644
index 0000000..9f16344
--- /dev/null
+++ b/cpp/src/arrow/compute/kernels/util-internal-test.cc
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <memory>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/gtest_util.h"
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/compute/kernels/util-internal.h"
+#include "arrow/compute/test-util.h"
+
+namespace arrow {
+namespace compute {
+namespace detail {
+
+using ::testing::_;
+using ::testing::AllOf;
+using ::testing::Each;
+using ::testing::ElementsAre;
+using ::testing::ElementsAreArray;
+using ::testing::Eq;
+using ::testing::Ge;
+using ::testing::IsNull;
+using ::testing::Ne;
+using ::testing::NotNull;
+using ::testing::Return;
+
+TEST(PropagateNulls, UnknownNullCountWithNullsZeroCopies) {
+ ArrayData input(boolean(), /*length=*/16, kUnknownNullCount);
+ constexpr uint8_t validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
+ std::shared_ptr<Buffer> nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+ input.buffers.push_back(nulls);
+ FunctionContext ctx(default_memory_pool());
+ ArrayData output;
+
+ ASSERT_OK(PropagateNulls(&ctx, input, &output));
+
+ ASSERT_THAT(output.buffers, ElementsAre(Eq(nulls)));
+ ASSERT_THAT(output.null_count, 9);
+}
+
+TEST(PropagateNulls, UnknownNullCountWithoutNullsLeavesNullptr) {
+ ArrayData input(boolean(), /*length=*/16, kUnknownNullCount);
+ constexpr uint8_t validity_bitmap[8] = {255, 255, 0, 0, 0, 0, 0, 0};
+ std::shared_ptr<Buffer> nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+ input.buffers.push_back(nulls);
+ FunctionContext ctx(default_memory_pool());
+ ArrayData output;
+
+ ASSERT_OK(PropagateNulls(&ctx, input, &output));
+
+ EXPECT_THAT(output.null_count, Eq(0));
+ EXPECT_THAT(output.buffers, ElementsAre(IsNull())) << output.buffers[0]->data()[0];
+}
+
+TEST(PropagateNulls, OffsetAndHasNulls) {
+ ArrayData input(boolean(), /*length=*/16, kUnknownNullCount, // slice off the first 8
+ /*offset=*/7);
+ constexpr uint8_t validity_bitmap[8] = {0, 1, 0, 0, 0, 0, 0, 0};
+ std::shared_ptr<Buffer> nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+ input.buffers.push_back(nulls);
+ FunctionContext ctx(default_memory_pool());
+ ArrayData output;
+
+ ASSERT_OK(PropagateNulls(&ctx, input, &output));
+
+ // Copy is made.
+ EXPECT_THAT(output.null_count, Eq(15));
+ ASSERT_THAT(output.buffers, ElementsAre(AllOf(Ne(nulls), NotNull())));
+ const auto& output_buffer = *output.buffers[0];
+ // the slice shifts the bit one over
+ ASSERT_THAT(std::vector<uint8_t>(output_buffer.data(),
+ output_buffer.data() + output_buffer.size()),
+ ElementsAreArray({2, 0}));
+ ASSERT_THAT(std::vector<uint8_t>(output_buffer.data() + output_buffer.size(),
+ output_buffer.data() + output_buffer.capacity()),
+ Each(0));
+}
+
+TEST(AssignNullIntersection, ZeroCopyWhenZeroNullsOnOneInput) {
+ ArrayData some_nulls(boolean(), /* length= */ 16, kUnknownNullCount);
+ constexpr uint8_t validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
+ std::shared_ptr<Buffer> nulls = std::make_shared<Buffer>(validity_bitmap, 8);
+ some_nulls.buffers.push_back(nulls);
+
+ ArrayData no_nulls(boolean(), /* length= */ 16, /*null_count=*/0);
+
+ FunctionContext ctx(default_memory_pool());
+ ArrayData output;
+ output.length = 16;
+
+ ASSERT_OK(AssignNullIntersection(&ctx, some_nulls, no_nulls, &output));
+ ASSERT_THAT(output.buffers, ElementsAre(Eq(nulls)));
+ ASSERT_THAT(output.null_count, 9);
+
+ output.buffers[0] = nullptr;
+ ASSERT_OK(AssignNullIntersection(&ctx, no_nulls, some_nulls, &output));
+ ASSERT_THAT(output.buffers, ElementsAre(Eq(nulls)));
+ ASSERT_THAT(output.null_count, 9);
+}
+
+TEST(AssignNullIntersection, IntersectsNullsWhenSomeOnBoth) {
+ ArrayData left(boolean(), /* length= */ 16, kUnknownNullCount);
+ constexpr uint8_t left_validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
+ std::shared_ptr<Buffer> left_nulls = std::make_shared<Buffer>(left_validity_bitmap, 8);
+ left.buffers.push_back(left_nulls);
+
+ ArrayData right(boolean(), /* length= */ 16, kUnknownNullCount);
+ constexpr uint8_t right_validity_bitmap[8] = {127, 0, 0, 0, 0, 0, 0, 0};
+ std::shared_ptr<Buffer> right_nulls =
+ std::make_shared<Buffer>(right_validity_bitmap, 8);
+ right.buffers.push_back(right_nulls);
+
+ FunctionContext ctx(default_memory_pool());
+ ArrayData output;
+ output.length = 16;
+
+ ASSERT_OK(AssignNullIntersection(&ctx, left, right, &output));
+
+ EXPECT_THAT(output.null_count, 10);
+ ASSERT_THAT(output.buffers, ElementsAre(NotNull()));
+ const auto& output_buffer = *output.buffers[0];
+ EXPECT_THAT(std::vector<uint8_t>(output_buffer.data(),
+ output_buffer.data() + output_buffer.size()),
+ ElementsAreArray({126, 0}));
+ EXPECT_THAT(std::vector<uint8_t>(output_buffer.data() + output_buffer.size(),
+ output_buffer.data() + output_buffer.capacity()),
+ Each(0));
+}
+
+TEST(PrimitiveAllocatingUnaryKernel, BooleanFunction) {
+ MockUnaryKernel mock;
+ EXPECT_CALL(mock, out_type).WillRepeatedly(Return(boolean()));
+ EXPECT_CALL(mock, Call(_, _, _)).WillOnce(Return(Status::OK()));
+ PrimitiveAllocatingUnaryKernel kernel(&mock);
+
+ auto input =
+ std::make_shared<ArrayData>(boolean(), /* length= */ 16, kUnknownNullCount);
+ FunctionContext ctx(default_memory_pool());
+ Datum output;
+ output.value = ArrayData::Make(kernel.out_type(), input->length);
+ ASSERT_OK(kernel.Call(&ctx, input, &output));
+
+ ASSERT_THAT(output.array()->buffers, ElementsAre(IsNull(), NotNull()));
+ auto value_buffer = output.array()->buffers[1];
+ EXPECT_THAT(value_buffer->size(), Eq(2));
+ EXPECT_THAT(value_buffer->capacity(), Ge(2));
+ // Booleans should have this always zeroed out.
+ EXPECT_THAT(*(value_buffer->data() + value_buffer->size() - 1), Eq(0));
+}
+
+TEST(PrimitiveAllocatingUnaryKernel, NonBoolean) {
+ MockUnaryKernel mock;
+ EXPECT_CALL(mock, out_type).WillRepeatedly(Return(int32()));
+ EXPECT_CALL(mock, Call(_, _, _)).WillOnce(Return(Status::OK()));
+ PrimitiveAllocatingUnaryKernel kernel(&mock);
+
+ auto input =
+ std::make_shared<ArrayData>(boolean(), /* length= */ 16, kUnknownNullCount);
+ FunctionContext ctx(default_memory_pool());
+ Datum output;
+ output.value = ArrayData::Make(kernel.out_type(), input->length);
+ ASSERT_OK(kernel.Call(&ctx, input, &output));
+
+ ASSERT_THAT(output.array()->buffers, ElementsAre(IsNull(), NotNull()));
+ auto value_buffer = output.array()->buffers[1];
+ EXPECT_THAT(value_buffer->size(), Eq(64));
+ EXPECT_THAT(value_buffer->capacity(), Ge(64));
+}
+
+TEST(PrimitiveAllocatingBinaryKernel, BooleanFunction) {
+ MockBinaryKernel mock;
+ EXPECT_CALL(mock, out_type).WillRepeatedly(Return(boolean()));
+ EXPECT_CALL(mock, Call(_, _, _, _)).WillOnce(Return(Status::OK()));
+ PrimitiveAllocatingBinaryKernel kernel(&mock);
+
+ auto input =
+ std::make_shared<ArrayData>(boolean(), /* length= */ 16, kUnknownNullCount);
+ FunctionContext ctx(default_memory_pool());
+ Datum output;
+ output.value = ArrayData::Make(kernel.out_type(), input->length);
+ ASSERT_OK(kernel.Call(&ctx, input, input, &output));
+
+ ASSERT_THAT(output.array()->buffers, ElementsAre(IsNull(), NotNull()));
+ auto value_buffer = output.array()->buffers[1];
+ EXPECT_THAT(value_buffer->size(), Eq(2));
+ EXPECT_THAT(value_buffer->capacity(), Ge(2));
+ // Booleans should have this always zeroed out.
+ EXPECT_THAT(*(value_buffer->data() + value_buffer->size() - 1), Eq(0));
+}
+
+TEST(PrimitiveAllocatingBinaryKernel, NonBoolean) {
+ MockBinaryKernel mock;
+ EXPECT_CALL(mock, out_type).WillRepeatedly(Return(int32()));
+ EXPECT_CALL(mock, Call(_, _, _, _)).WillOnce(Return(Status::OK()));
+ PrimitiveAllocatingBinaryKernel kernel(&mock);
+
+ auto input =
+ std::make_shared<ArrayData>(boolean(), /* length= */ 16, kUnknownNullCount);
+ FunctionContext ctx(default_memory_pool());
+ Datum output;
+ output.value = ArrayData::Make(kernel.out_type(), input->length);
+ ASSERT_OK(kernel.Call(&ctx, input, input, &output));
+
+ ASSERT_THAT(output.array()->buffers, ElementsAre(IsNull(), NotNull()));
+ auto value_buffer = output.array()->buffers[1];
+ EXPECT_THAT(value_buffer->size(), Eq(64));
+ EXPECT_THAT(value_buffer->capacity(), Ge(64));
+}
+
+} // namespace detail
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/util-internal.cc b/cpp/src/arrow/compute/kernels/util-internal.cc
index 60b668d..174b688 100644
--- a/cpp/src/arrow/compute/kernels/util-internal.cc
+++ b/cpp/src/arrow/compute/kernels/util-internal.cc
@@ -26,6 +26,7 @@
#include "arrow/array.h"
#include "arrow/status.h"
#include "arrow/table.h"
+#include "arrow/util/bit-util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
@@ -34,11 +35,47 @@
namespace arrow {
+using internal::BitmapAnd;
using internal::checked_cast;
namespace compute {
namespace detail {
+namespace {
+
+inline void ZeroLastByte(Buffer* buffer) {
+ *(buffer->mutable_data() + (buffer->size() - 1)) = 0;
+}
+
+Status AllocateValueBuffer(FunctionContext* ctx, const DataType& type, int64_t length,
+ std::shared_ptr<Buffer>* buffer) {
+ if (type.id() != Type::NA) {
+ const auto& fw_type = checked_cast<const FixedWidthType&>(type);
+
+ int bit_width = fw_type.bit_width();
+ int64_t buffer_size = 0;
+
+ if (bit_width == 1) {
+ buffer_size = BitUtil::BytesForBits(length);
+ } else {
+ DCHECK_EQ(bit_width % 8, 0)
+ << "Only bit widths with multiple of 8 are currently supported";
+ buffer_size = length * fw_type.bit_width() / 8;
+ }
+ RETURN_NOT_OK(ctx->Allocate(buffer_size, buffer));
+
+ if (bit_width == 1 && buffer_size > 0) {
+ // Some utility methods access the last byte before it might be
+ // initialized this makes valgrind/asan unhappy, so we proactively
+ // zero it.
+ ZeroLastByte(buffer->get());
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace
+
Status InvokeUnaryArrayKernel(FunctionContext* ctx, UnaryKernel* kernel,
const Datum& value, std::vector<Datum>* outputs) {
if (value.kind() == Datum::ARRAY) {
@@ -90,7 +127,6 @@ Status InvokeBinaryArrayKernel(FunctionContext* ctx, BinaryKernel* kernel,
if (right_length != left_length) {
return Status::Invalid("Right and left have different lengths");
}
-
// TODO: Remove duplication with ChunkedArray::Equals
int left_chunk_idx = 0;
int64_t left_start_idx = 0;
@@ -98,20 +134,20 @@ Status InvokeBinaryArrayKernel(FunctionContext* ctx, BinaryKernel* kernel,
int64_t right_start_idx = 0;
int64_t elements_compared = 0;
- while (elements_compared < left_length) {
+ do {
const std::shared_ptr<Array> left_array = left_arrays[left_chunk_idx];
const std::shared_ptr<Array> right_array = right_arrays[right_chunk_idx];
int64_t common_length = std::min(left_array->length() - left_start_idx,
right_array->length() - right_start_idx);
-
std::shared_ptr<Array> left_op = left_array->Slice(left_start_idx, common_length);
std::shared_ptr<Array> right_op = right_array->Slice(right_start_idx, common_length);
+
Datum output;
- RETURN_NOT_OK(kernel->Call(ctx, Datum(left_op), Datum(right_op), &output));
+ output.value = ArrayData::Make(kernel->out_type(), common_length);
+ RETURN_NOT_OK(kernel->Call(ctx, left_op, right_op, &output));
outputs->push_back(output);
elements_compared += common_length;
-
// If we have exhausted the current chunk, proceed to the next one individually.
if (left_start_idx + common_length == left_array->length()) {
left_chunk_idx++;
@@ -126,8 +162,7 @@ Status InvokeBinaryArrayKernel(FunctionContext* ctx, BinaryKernel* kernel,
} else {
right_start_idx += common_length;
}
- }
-
+ } while (elements_compared < left_length);
return Status::OK();
}
@@ -170,23 +205,11 @@ Datum WrapDatumsLike(const Datum& value, const std::vector<Datum>& datums) {
}
}
-PrimitiveAllocatingUnaryKernel::PrimitiveAllocatingUnaryKernel(
- UnaryKernel* delegate, const std::shared_ptr<DataType>& out_type)
- : delegate_(delegate), out_type_(out_type) {}
-
-PrimitiveAllocatingUnaryKernel::PrimitiveAllocatingUnaryKernel(
- std::unique_ptr<UnaryKernel> delegate, const std::shared_ptr<DataType>& out_type)
- : PrimitiveAllocatingUnaryKernel(delegate.get(), out_type) {
- owned_delegate_ = std::move(delegate);
-}
-
-inline void ZeroLastByte(Buffer* buffer) {
- *(buffer->mutable_data() + (buffer->size() - 1)) = 0;
-}
+PrimitiveAllocatingUnaryKernel::PrimitiveAllocatingUnaryKernel(UnaryKernel* delegate)
+ : delegate_(delegate) {}
Status PropagateNulls(FunctionContext* ctx, const ArrayData& input, ArrayData* output) {
const int64_t length = input.length;
-
if (output->buffers.size() == 0) {
// Ensure we can assign a buffer
output->buffers.resize(1);
@@ -207,12 +230,36 @@ Status PropagateNulls(FunctionContext* ctx, const ArrayData& input, ArrayData* o
internal::CopyBitmap(validity_bitmap.data(), input.offset, length,
buffer->mutable_data(), 0 /* destination offset */);
output->buffers[0] = std::move(buffer);
- } else {
+ } else if (output->null_count > 0) {
output->buffers[0] = input.buffers[0];
}
return Status::OK();
}
+Status AssignNullIntersection(FunctionContext* ctx, const ArrayData& left,
+ const ArrayData& right, ArrayData* output) {
+ if (output->buffers.size() == 0) {
+ // Ensure we can assign a buffer
+ output->buffers.resize(1);
+ }
+
+ if (left.GetNullCount() > 0 && right.GetNullCount() > 0) {
+ RETURN_NOT_OK(BitmapAnd(ctx->memory_pool(), left.buffers[0]->data(), left.offset,
+ right.buffers[0]->data(), right.offset, right.length, 0,
+ &(output->buffers[0])));
+ // Force computation of null count.
+ output->null_count = kUnknownNullCount;
+ output->GetNullCount();
+ return Status::OK();
+ } else if (left.null_count != 0) {
+ return PropagateNulls(ctx, left, output);
+ } else {
+ // right has a positive null_count or both are zero.
+ return PropagateNulls(ctx, right, output);
+ }
+ return Status::OK();
+}
+
Status PrimitiveAllocatingUnaryKernel::Call(FunctionContext* ctx, const Datum& input,
Datum* out) {
std::vector<std::shared_ptr<Buffer>> data_buffers;
@@ -225,39 +272,40 @@ Status PrimitiveAllocatingUnaryKernel::Call(FunctionContext* ctx, const Datum& i
result->buffers.resize(2);
const int64_t length = in_data.length;
-
// Allocate the value buffer
- std::shared_ptr<Buffer> buffer;
- if (out_type_->id() != Type::NA) {
- const auto& fw_type = checked_cast<const FixedWidthType&>(*out_type_);
+ RETURN_NOT_OK(AllocateValueBuffer(ctx, *out_type(), length, &(result->buffers[1])));
+ return delegate_->Call(ctx, input, out);
+}
- int bit_width = fw_type.bit_width();
- int64_t buffer_size = 0;
+std::shared_ptr<DataType> PrimitiveAllocatingUnaryKernel::out_type() const {
+ return delegate_->out_type();
+}
- if (bit_width == 1) {
- buffer_size = BitUtil::BytesForBits(length);
- } else {
- DCHECK_EQ(bit_width % 8, 0)
- << "Only bit widths with multiple of 8 are currently supported";
- buffer_size = length * fw_type.bit_width() / 8;
- }
- RETURN_NOT_OK(ctx->Allocate(buffer_size, &buffer));
- buffer->ZeroPadding();
+PrimitiveAllocatingBinaryKernel::PrimitiveAllocatingBinaryKernel(BinaryKernel* delegate)
+ : delegate_(delegate) {}
- if (bit_width == 1 && buffer_size > 0) {
- // Some utility methods access the last byte before it might be
- // initialized this makes valgrind/asan unhappy, so we proactively
- // zero it.
- ZeroLastByte(buffer.get());
- }
+Status PrimitiveAllocatingBinaryKernel::Call(FunctionContext* ctx, const Datum& left,
+ const Datum& right, Datum* out) {
+ std::vector<std::shared_ptr<Buffer>> data_buffers;
+ DCHECK_EQ(left.kind(), Datum::ARRAY);
+ DCHECK_EQ(right.kind(), Datum::ARRAY);
+ const ArrayData& left_data = *left.array();
+ DCHECK_EQ(left_data.length, right.array()->length);
- memset(buffer->mutable_data(), 0, buffer_size);
- result->buffers[1] = std::move(buffer);
- }
- return delegate_->Call(ctx, input, out);
+ DCHECK_EQ(out->kind(), Datum::ARRAY);
+
+ ArrayData* result = out->array().get();
+
+ result->buffers.resize(2);
+
+ const int64_t length = left_data.length;
+ RETURN_NOT_OK(AllocateValueBuffer(ctx, *out_type(), length, &(result->buffers[1])));
+
+ // Allocate the value buffer
+ return delegate_->Call(ctx, left, right, out);
}
-std::shared_ptr<DataType> PrimitiveAllocatingUnaryKernel::out_type() const {
+std::shared_ptr<DataType> PrimitiveAllocatingBinaryKernel::out_type() const {
return delegate_->out_type();
}
diff --git a/cpp/src/arrow/compute/kernels/util-internal.h b/cpp/src/arrow/compute/kernels/util-internal.h
index bd27280..4cb7a24 100644
--- a/cpp/src/arrow/compute/kernels/util-internal.h
+++ b/cpp/src/arrow/compute/kernels/util-internal.h
@@ -64,14 +64,25 @@ Status InvokeBinaryArrayKernel(FunctionContext* ctx, BinaryKernel* kernel,
/// \brief Assign validity bitmap to output, copying bitmap if necessary, but
/// zero-copy otherwise, so that the same value slots are valid/not-null in the
-/// output
-/// (sliced arrays)
+/// output (sliced arrays).
+///
/// \param[in] ctx the kernel FunctionContext
/// \param[in] input the input array
-/// \param[out] output the output array
+/// \param[out] output the output array. Must have length set correctly.
ARROW_EXPORT
Status PropagateNulls(FunctionContext* ctx, const ArrayData& input, ArrayData* output);
+/// \brief Assign validity bitmap to output, taking the intersection of left and right
+/// null bitmaps if necessary, but zero-copy otherwise.
+///
+/// \param[in] ctx the kernel FunctionContext
+/// \param[in] left the left operand
+/// \param[in] right the right operand
+/// \param[out] output the output array. Must have length set correctly.
+ARROW_EXPORT
+Status AssignNullIntersection(FunctionContext* ctx, const ArrayData& left,
+ const ArrayData& right, ArrayData* output);
+
ARROW_EXPORT
Datum WrapArraysLike(const Datum& value,
const std::vector<std::shared_ptr<Array>>& arrays);
@@ -79,13 +90,14 @@ Datum WrapArraysLike(const Datum& value,
ARROW_EXPORT
Datum WrapDatumsLike(const Datum& value, const std::vector<Datum>& datums);
-/// \brief Kernel used to preallocate outputs for primitive types.
-class PrimitiveAllocatingUnaryKernel : public UnaryKernel {
+/// \brief Kernel used to preallocate outputs for primitive types. This
+/// does not include allocations for the validity bitmap (PropagateNulls
+/// should be used for that).
+class ARROW_EXPORT PrimitiveAllocatingUnaryKernel : public UnaryKernel {
public:
- PrimitiveAllocatingUnaryKernel(std::unique_ptr<UnaryKernel> delegate,
- const std::shared_ptr<DataType>& out_type);
- PrimitiveAllocatingUnaryKernel(UnaryKernel* delegate,
- const std::shared_ptr<DataType>& out_type);
+ // \brief Construct with a delegate that must live longer
+ // then this object.
+ explicit PrimitiveAllocatingUnaryKernel(UnaryKernel* delegate);
/// \brief Allocates ArrayData with the necessary data buffers allocated and
/// then written into by the delegate kernel
Status Call(FunctionContext* ctx, const Datum& input, Datum* out) override;
@@ -94,8 +106,26 @@ class PrimitiveAllocatingUnaryKernel : public UnaryKernel {
private:
UnaryKernel* delegate_;
- std::shared_ptr<DataType> out_type_;
- std::unique_ptr<UnaryKernel> owned_delegate_;
+};
+
+/// \brief Kernel used to preallocate outputs for primitive types.
+class ARROW_EXPORT PrimitiveAllocatingBinaryKernel : public BinaryKernel {
+ public:
+ // \brief Construct with a kernel to delegate operatoions to.
+ //
+ // Ownership is not taken of the delegate kernel, it must outlive
+ // the life time of this object.
+ explicit PrimitiveAllocatingBinaryKernel(BinaryKernel* delegate);
+
+ /// \brief Sets out to be of type ArrayData with the necessary
+ /// data buffers prepopulated.
+ Status Call(FunctionContext* ctx, const Datum& left, const Datum& right,
+ Datum* out) override;
+
+ std::shared_ptr<DataType> out_type() const override;
+
+ private:
+ BinaryKernel* delegate_;
};
} // namespace detail
diff --git a/cpp/src/arrow/compute/test-util.h b/cpp/src/arrow/compute/test-util.h
index 1e7f872..e90a034 100644
--- a/cpp/src/arrow/compute/test-util.h
+++ b/cpp/src/arrow/compute/test-util.h
@@ -46,11 +46,14 @@ class ComputeFixture {
class MockUnaryKernel : public UnaryKernel {
public:
MOCK_METHOD3(Call, Status(FunctionContext* ctx, const Datum& input, Datum* out));
+ MOCK_CONST_METHOD0(out_type, std::shared_ptr<DataType>());
};
class MockBinaryKernel : public BinaryKernel {
+ public:
MOCK_METHOD4(Call, Status(FunctionContext* ctx, const Datum& left, const Datum& right,
Datum* out));
+ MOCK_CONST_METHOD0(out_type, std::shared_ptr<DataType>());
};
template <typename Type, typename T>