You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/06/22 21:32:45 UTC
arrow git commit: ARROW-1073: C++: Adapative integer builder
Repository: arrow
Updated Branches:
refs/heads/master 222628c9d -> 608b89e16
ARROW-1073: C++: Adapative integer builder
Author: Uwe L. Korn <uw...@xhochy.com>
Closes #723 from xhochy/ARROW-1073 and squashes the following commits:
5bab9c2f [Uwe L. Korn] ARROW-1073: C++: Adapative integer builder
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/608b89e1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/608b89e1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/608b89e1
Branch: refs/heads/master
Commit: 608b89e1648ef8a116c3139606b0440122d2b75a
Parents: 222628c
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Thu Jun 22 17:32:41 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Jun 22 17:32:41 2017 -0400
----------------------------------------------------------------------
cpp/src/arrow/array-test.cc | 220 +++++++++++++++++++++
cpp/src/arrow/builder-benchmark.cc | 66 +++++++
cpp/src/arrow/builder.cc | 331 ++++++++++++++++++++++++++++++++
cpp/src/arrow/builder.h | 190 +++++++++++++++++-
4 files changed, 806 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/array-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index 636d97f..beffa1b 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -1258,6 +1258,226 @@ TEST_F(TestFWBinaryArray, Slice) {
}
// ----------------------------------------------------------------------
+// AdaptiveInt tests
+
+class TestAdaptiveIntBuilder : public TestBuilder {
+ public:
+ void SetUp() {
+ TestBuilder::SetUp();
+ builder_ = std::make_shared<AdaptiveIntBuilder>(pool_);
+ }
+
+ void Done() { EXPECT_OK(builder_->Finish(&result_)); }
+
+ protected:
+ std::shared_ptr<AdaptiveIntBuilder> builder_;
+
+ std::shared_ptr<Array> expected_;
+ std::shared_ptr<Array> result_;
+};
+
+TEST_F(TestAdaptiveIntBuilder, TestInt8) {
+ builder_->Append(0);
+ builder_->Append(127);
+ builder_->Append(-128);
+
+ Done();
+
+ std::vector<int8_t> expected_values({0, 127, -128});
+ ArrayFromVector<Int8Type, int8_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveIntBuilder, TestInt16) {
+ builder_->Append(0);
+ builder_->Append(128);
+ Done();
+
+ std::vector<int16_t> expected_values({0, 128});
+ ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+
+ SetUp();
+ builder_->Append(-129);
+ expected_values = {-129};
+ Done();
+
+ ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+
+ SetUp();
+ builder_->Append(std::numeric_limits<int16_t>::max());
+ builder_->Append(std::numeric_limits<int16_t>::min());
+ expected_values = {
+ std::numeric_limits<int16_t>::max(), std::numeric_limits<int16_t>::min()};
+ Done();
+
+ ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveIntBuilder, TestInt32) {
+ builder_->Append(0);
+ builder_->Append(static_cast<int64_t>(std::numeric_limits<int16_t>::max()) + 1);
+ Done();
+
+ std::vector<int32_t> expected_values(
+ {0, static_cast<int32_t>(std::numeric_limits<int16_t>::max()) + 1});
+ ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+
+ SetUp();
+ builder_->Append(static_cast<int64_t>(std::numeric_limits<int16_t>::min()) - 1);
+ expected_values = {static_cast<int32_t>(std::numeric_limits<int16_t>::min()) - 1};
+ Done();
+
+ ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+
+ SetUp();
+ builder_->Append(std::numeric_limits<int32_t>::max());
+ builder_->Append(std::numeric_limits<int32_t>::min());
+ expected_values = {
+ std::numeric_limits<int32_t>::max(), std::numeric_limits<int32_t>::min()};
+ Done();
+
+ ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveIntBuilder, TestInt64) {
+ builder_->Append(0);
+ builder_->Append(static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1);
+ Done();
+
+ std::vector<int64_t> expected_values(
+ {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1});
+ ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+
+ SetUp();
+ builder_->Append(static_cast<int64_t>(std::numeric_limits<int32_t>::min()) - 1);
+ expected_values = {static_cast<int64_t>(std::numeric_limits<int32_t>::min()) - 1};
+ Done();
+
+ ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+
+ SetUp();
+ builder_->Append(std::numeric_limits<int64_t>::max());
+ builder_->Append(std::numeric_limits<int64_t>::min());
+ expected_values = {
+ std::numeric_limits<int64_t>::max(), std::numeric_limits<int64_t>::min()};
+ Done();
+
+ ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveIntBuilder, TestAppendVector) {
+ std::vector<int64_t> expected_values(
+ {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1});
+ builder_->Append(expected_values.data(), expected_values.size());
+ Done();
+
+ ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+class TestAdaptiveUIntBuilder : public TestBuilder {
+ public:
+ void SetUp() {
+ TestBuilder::SetUp();
+ builder_ = std::make_shared<AdaptiveUIntBuilder>(pool_);
+ }
+
+ void Done() { EXPECT_OK(builder_->Finish(&result_)); }
+
+ protected:
+ std::shared_ptr<AdaptiveUIntBuilder> builder_;
+
+ std::shared_ptr<Array> expected_;
+ std::shared_ptr<Array> result_;
+};
+
+TEST_F(TestAdaptiveUIntBuilder, TestUInt8) {
+ builder_->Append(0);
+ builder_->Append(255);
+
+ Done();
+
+ std::vector<uint8_t> expected_values({0, 255});
+ ArrayFromVector<UInt8Type, uint8_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveUIntBuilder, TestUInt16) {
+ builder_->Append(0);
+ builder_->Append(256);
+ Done();
+
+ std::vector<uint16_t> expected_values({0, 256});
+ ArrayFromVector<UInt16Type, uint16_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+
+ SetUp();
+ builder_->Append(std::numeric_limits<uint16_t>::max());
+ expected_values = {std::numeric_limits<uint16_t>::max()};
+ Done();
+
+ ArrayFromVector<UInt16Type, uint16_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveUIntBuilder, TestUInt32) {
+ builder_->Append(0);
+ builder_->Append(static_cast<uint64_t>(std::numeric_limits<uint16_t>::max()) + 1);
+ Done();
+
+ std::vector<uint32_t> expected_values(
+ {0, static_cast<uint32_t>(std::numeric_limits<uint16_t>::max()) + 1});
+ ArrayFromVector<UInt32Type, uint32_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+
+ SetUp();
+ builder_->Append(std::numeric_limits<uint32_t>::max());
+ expected_values = {std::numeric_limits<uint32_t>::max()};
+ Done();
+
+ ArrayFromVector<UInt32Type, uint32_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveUIntBuilder, TestUInt64) {
+ builder_->Append(0);
+ builder_->Append(static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ Done();
+
+ std::vector<uint64_t> expected_values(
+ {0, static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1});
+ ArrayFromVector<UInt64Type, uint64_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+
+ SetUp();
+ builder_->Append(std::numeric_limits<uint64_t>::max());
+ expected_values = {std::numeric_limits<uint64_t>::max()};
+ Done();
+
+ ArrayFromVector<UInt64Type, uint64_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveUIntBuilder, TestAppendVector) {
+ std::vector<uint64_t> expected_values(
+ {0, static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1});
+ builder_->Append(expected_values.data(), expected_values.size());
+ Done();
+
+ ArrayFromVector<UInt64Type, uint64_t>(expected_values, &expected_);
+ ASSERT_TRUE(expected_->Equals(result_));
+}
+
+// ----------------------------------------------------------------------
// List tests
class TestListBuilder : public TestBuilder {
http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/builder-benchmark.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder-benchmark.cc b/cpp/src/arrow/builder-benchmark.cc
index b0c3cd1..62f2fd6 100644
--- a/cpp/src/arrow/builder-benchmark.cc
+++ b/cpp/src/arrow/builder-benchmark.cc
@@ -61,4 +61,70 @@ static void BM_BuildVectorNoNulls(
BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
+static void BM_BuildAdaptiveIntNoNulls(
+ benchmark::State& state) { // NOLINT non-const reference
+ int64_t size = static_cast<int64_t>(std::numeric_limits<int16_t>::max()) * 256;
+ int64_t chunk_size = size / 8;
+ std::vector<int64_t> data;
+ for (int64_t i = 0; i < size; i++) {
+ data.push_back(i);
+ }
+ while (state.KeepRunning()) {
+ AdaptiveIntBuilder builder(default_memory_pool());
+ for (int64_t i = 0; i < size; i += chunk_size) {
+ // Build up an array of 512 MiB in size
+ builder.Append(data.data() + i, chunk_size, nullptr);
+ }
+ std::shared_ptr<Array> out;
+ builder.Finish(&out);
+ }
+ state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
+}
+
+BENCHMARK(BM_BuildAdaptiveIntNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
+
+static void BM_BuildAdaptiveIntNoNullsScalarAppend(
+ benchmark::State& state) { // NOLINT non-const reference
+ int64_t size = static_cast<int64_t>(std::numeric_limits<int16_t>::max()) * 256;
+ std::vector<int64_t> data;
+ for (int64_t i = 0; i < size; i++) {
+ data.push_back(i);
+ }
+ while (state.KeepRunning()) {
+ AdaptiveIntBuilder builder(default_memory_pool());
+ for (int64_t i = 0; i < size; i++) {
+ builder.Append(data[i]);
+ }
+ std::shared_ptr<Array> out;
+ builder.Finish(&out);
+ }
+ state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
+}
+
+BENCHMARK(BM_BuildAdaptiveIntNoNullsScalarAppend)
+ ->Repetitions(3)
+ ->Unit(benchmark::kMillisecond);
+
+static void BM_BuildAdaptiveUIntNoNulls(
+ benchmark::State& state) { // NOLINT non-const reference
+ int64_t size = static_cast<int64_t>(std::numeric_limits<uint16_t>::max()) * 256;
+ int64_t chunk_size = size / 8;
+ std::vector<uint64_t> data;
+ for (uint64_t i = 0; i < static_cast<uint64_t>(size); i++) {
+ data.push_back(i);
+ }
+ while (state.KeepRunning()) {
+ AdaptiveUIntBuilder builder(default_memory_pool());
+ for (int64_t i = 0; i < size; i += chunk_size) {
+ // Build up an array of 512 MiB in size
+ builder.Append(data.data() + i, chunk_size, nullptr);
+ }
+ std::shared_ptr<Array> out;
+ builder.Finish(&out);
+ }
+ state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
+}
+
+BENCHMARK(BM_BuildAdaptiveUIntNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
+
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index ab43c2a..6762e17 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -17,6 +17,7 @@
#include "arrow/builder.h"
+#include <algorithm>
#include <cstdint>
#include <cstring>
#include <limits>
@@ -248,6 +249,336 @@ template class PrimitiveBuilder<HalfFloatType>;
template class PrimitiveBuilder<FloatType>;
template class PrimitiveBuilder<DoubleType>;
+AdaptiveIntBuilderBase::AdaptiveIntBuilderBase(MemoryPool* pool)
+ : ArrayBuilder(pool, int64()), data_(nullptr), raw_data_(nullptr), int_size_(1) {}
+
+Status AdaptiveIntBuilderBase::Init(int64_t capacity) {
+ RETURN_NOT_OK(ArrayBuilder::Init(capacity));
+ data_ = std::make_shared<PoolBuffer>(pool_);
+
+ int64_t nbytes = capacity * int_size_;
+ RETURN_NOT_OK(data_->Resize(nbytes));
+ // TODO(emkornfield) valgrind complains without this
+ memset(data_->mutable_data(), 0, static_cast<size_t>(nbytes));
+
+ raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
+ return Status::OK();
+}
+
+Status AdaptiveIntBuilderBase::Resize(int64_t capacity) {
+ // XXX: Set floor size for now
+ if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; }
+
+ if (capacity_ == 0) {
+ RETURN_NOT_OK(Init(capacity));
+ } else {
+ RETURN_NOT_OK(ArrayBuilder::Resize(capacity));
+ const int64_t old_bytes = data_->size();
+ const int64_t new_bytes = capacity * int_size_;
+ RETURN_NOT_OK(data_->Resize(new_bytes));
+ raw_data_ = data_->mutable_data();
+ // TODO(emkornfield) valgrind complains without this
+ memset(
+ data_->mutable_data() + old_bytes, 0, static_cast<size_t>(new_bytes - old_bytes));
+ }
+ return Status::OK();
+}
+
+AdaptiveIntBuilder::AdaptiveIntBuilder(MemoryPool* pool) : AdaptiveIntBuilderBase(pool) {}
+
+Status AdaptiveIntBuilder::Finish(std::shared_ptr<Array>* out) {
+ const int64_t bytes_required = length_ * int_size_;
+ if (bytes_required > 0 && bytes_required < data_->size()) {
+ // Trim buffers
+ RETURN_NOT_OK(data_->Resize(bytes_required));
+ }
+ switch (int_size_) {
+ case 1:
+ *out =
+ std::make_shared<Int8Array>(int8(), length_, data_, null_bitmap_, null_count_);
+ break;
+ case 2:
+ *out = std::make_shared<Int16Array>(
+ int16(), length_, data_, null_bitmap_, null_count_);
+ break;
+ case 4:
+ *out = std::make_shared<Int32Array>(
+ int32(), length_, data_, null_bitmap_, null_count_);
+ break;
+ case 8:
+ *out = std::make_shared<Int64Array>(
+ int64(), length_, data_, null_bitmap_, null_count_);
+ break;
+ default:
+ DCHECK(false);
+ return Status::NotImplemented("Only ints of size 1,2,4,8 are supported");
+ }
+
+ data_ = null_bitmap_ = nullptr;
+ capacity_ = length_ = null_count_ = 0;
+ return Status::OK();
+}
+
+Status AdaptiveIntBuilder::Append(
+ const int64_t* values, int64_t length, const uint8_t* valid_bytes) {
+ RETURN_NOT_OK(Reserve(length));
+
+ if (length > 0) {
+ if (int_size_ < 8) {
+ uint8_t new_int_size = int_size_;
+ for (int64_t i = 0; i < length; i++) {
+ if (valid_bytes == nullptr || valid_bytes[i]) {
+ new_int_size = expanded_int_size(values[i], new_int_size);
+ }
+ }
+ if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); }
+ }
+ }
+
+ if (int_size_ == 8) {
+ std::memcpy(reinterpret_cast<int64_t*>(raw_data_) + length_, values,
+ sizeof(int64_t) * length);
+ } else {
+ // int_size_ may have changed, so we need to recheck
+ switch (int_size_) {
+ case 1: {
+ int8_t* data_ptr = reinterpret_cast<int8_t*>(raw_data_) + length_;
+ std::transform(values, values + length, data_ptr,
+ [](int64_t x) { return static_cast<int8_t>(x); });
+ } break;
+ case 2: {
+ int16_t* data_ptr = reinterpret_cast<int16_t*>(raw_data_) + length_;
+ std::transform(values, values + length, data_ptr,
+ [](int64_t x) { return static_cast<int16_t>(x); });
+ } break;
+ case 4: {
+ int32_t* data_ptr = reinterpret_cast<int32_t*>(raw_data_) + length_;
+ std::transform(values, values + length, data_ptr,
+ [](int64_t x) { return static_cast<int32_t>(x); });
+ } break;
+ default:
+ DCHECK(false);
+ }
+ }
+
+ // length_ is update by these
+ ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
+
+ return Status::OK();
+}
+
+template <typename new_type, typename old_type>
+typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+AdaptiveIntBuilder::ExpandIntSizeInternal() {
+ return Status::OK();
+}
+
+#define __LESS(a, b) (a) < (b)
+template <typename new_type, typename old_type>
+typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+AdaptiveIntBuilder::ExpandIntSizeInternal() {
+ int_size_ = sizeof(new_type);
+ RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type)));
+
+ old_type* src = reinterpret_cast<old_type*>(raw_data_);
+ new_type* dst = reinterpret_cast<new_type*>(raw_data_);
+ // By doing the backward copy, we ensure that no element is overriden during
+ // the copy process and the copy stays in-place.
+ std::copy_backward(src, src + length_, dst + length_);
+
+ return Status::OK();
+}
+#undef __LESS
+
+template <typename new_type>
+Status AdaptiveIntBuilder::ExpandIntSizeN() {
+ switch (int_size_) {
+ case 1:
+ RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int8_t>()));
+ break;
+ case 2:
+ RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int16_t>()));
+ break;
+ case 4:
+ RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int32_t>()));
+ break;
+ case 8:
+ RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int64_t>()));
+ break;
+ default:
+ DCHECK(false);
+ }
+ return Status::OK();
+}
+
+Status AdaptiveIntBuilder::ExpandIntSize(uint8_t new_int_size) {
+ switch (new_int_size) {
+ case 1:
+ RETURN_NOT_OK((ExpandIntSizeN<int8_t>()));
+ break;
+ case 2:
+ RETURN_NOT_OK((ExpandIntSizeN<int16_t>()));
+ break;
+ case 4:
+ RETURN_NOT_OK((ExpandIntSizeN<int32_t>()));
+ break;
+ case 8:
+ RETURN_NOT_OK((ExpandIntSizeN<int64_t>()));
+ break;
+ default:
+ DCHECK(false);
+ }
+ return Status::OK();
+}
+
+AdaptiveUIntBuilder::AdaptiveUIntBuilder(MemoryPool* pool)
+ : AdaptiveIntBuilderBase(pool) {}
+
+Status AdaptiveUIntBuilder::Finish(std::shared_ptr<Array>* out) {
+ const int64_t bytes_required = length_ * int_size_;
+ if (bytes_required > 0 && bytes_required < data_->size()) {
+ // Trim buffers
+ RETURN_NOT_OK(data_->Resize(bytes_required));
+ }
+ switch (int_size_) {
+ case 1:
+ *out = std::make_shared<UInt8Array>(
+ uint8(), length_, data_, null_bitmap_, null_count_);
+ break;
+ case 2:
+ *out = std::make_shared<UInt16Array>(
+ uint16(), length_, data_, null_bitmap_, null_count_);
+ break;
+ case 4:
+ *out = std::make_shared<UInt32Array>(
+ uint32(), length_, data_, null_bitmap_, null_count_);
+ break;
+ case 8:
+ *out = std::make_shared<UInt64Array>(
+ uint64(), length_, data_, null_bitmap_, null_count_);
+ break;
+ default:
+ DCHECK(false);
+ return Status::NotImplemented("Only ints of size 1,2,4,8 are supported");
+ }
+
+ data_ = null_bitmap_ = nullptr;
+ capacity_ = length_ = null_count_ = 0;
+ return Status::OK();
+}
+
+Status AdaptiveUIntBuilder::Append(
+ const uint64_t* values, int64_t length, const uint8_t* valid_bytes) {
+ RETURN_NOT_OK(Reserve(length));
+
+ if (length > 0) {
+ if (int_size_ < 8) {
+ uint8_t new_int_size = int_size_;
+ for (int64_t i = 0; i < length; i++) {
+ if (valid_bytes == nullptr || valid_bytes[i]) {
+ new_int_size = expanded_uint_size(values[i], new_int_size);
+ }
+ }
+ if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); }
+ }
+ }
+
+ if (int_size_ == 8) {
+ std::memcpy(reinterpret_cast<uint64_t*>(raw_data_) + length_, values,
+ sizeof(uint64_t) * length);
+ } else {
+ // int_size_ may have changed, so we need to recheck
+ switch (int_size_) {
+ case 1: {
+ uint8_t* data_ptr = reinterpret_cast<uint8_t*>(raw_data_) + length_;
+ std::transform(values, values + length, data_ptr,
+ [](uint64_t x) { return static_cast<uint8_t>(x); });
+ } break;
+ case 2: {
+ uint16_t* data_ptr = reinterpret_cast<uint16_t*>(raw_data_) + length_;
+ std::transform(values, values + length, data_ptr,
+ [](uint64_t x) { return static_cast<uint16_t>(x); });
+ } break;
+ case 4: {
+ uint32_t* data_ptr = reinterpret_cast<uint32_t*>(raw_data_) + length_;
+ std::transform(values, values + length, data_ptr,
+ [](uint64_t x) { return static_cast<uint32_t>(x); });
+ } break;
+ default:
+ DCHECK(false);
+ }
+ }
+
+ // length_ is update by these
+ ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
+
+ return Status::OK();
+}
+
+template <typename new_type, typename old_type>
+typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+AdaptiveUIntBuilder::ExpandIntSizeInternal() {
+ return Status::OK();
+}
+
+#define __LESS(a, b) (a) < (b)
+template <typename new_type, typename old_type>
+typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+AdaptiveUIntBuilder::ExpandIntSizeInternal() {
+ int_size_ = sizeof(new_type);
+ RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type)));
+
+ old_type* src = reinterpret_cast<old_type*>(raw_data_);
+ new_type* dst = reinterpret_cast<new_type*>(raw_data_);
+ // By doing the backward copy, we ensure that no element is overriden during
+ // the copy process and the copy stays in-place.
+ std::copy_backward(src, src + length_, dst + length_);
+
+ return Status::OK();
+}
+#undef __LESS
+
+template <typename new_type>
+Status AdaptiveUIntBuilder::ExpandIntSizeN() {
+ switch (int_size_) {
+ case 1:
+ RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint8_t>()));
+ break;
+ case 2:
+ RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint16_t>()));
+ break;
+ case 4:
+ RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint32_t>()));
+ break;
+ case 8:
+ RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint64_t>()));
+ break;
+ default:
+ DCHECK(false);
+ }
+ return Status::OK();
+}
+
+Status AdaptiveUIntBuilder::ExpandIntSize(uint8_t new_int_size) {
+ switch (new_int_size) {
+ case 1:
+ RETURN_NOT_OK((ExpandIntSizeN<uint8_t>()));
+ break;
+ case 2:
+ RETURN_NOT_OK((ExpandIntSizeN<uint16_t>()));
+ break;
+ case 4:
+ RETURN_NOT_OK((ExpandIntSizeN<uint32_t>()));
+ break;
+ case 8:
+ RETURN_NOT_OK((ExpandIntSizeN<uint64_t>()));
+ break;
+ default:
+ DCHECK(false);
+ }
+ return Status::OK();
+}
+
BooleanBuilder::BooleanBuilder(MemoryPool* pool)
: ArrayBuilder(pool, boolean()), data_(nullptr), raw_data_(nullptr) {}
http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 6876916..d77223e 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -20,6 +20,7 @@
#include <cstdint>
#include <functional>
+#include <limits>
#include <memory>
#include <string>
#include <vector>
@@ -247,6 +248,193 @@ using HalfFloatBuilder = NumericBuilder<HalfFloatType>;
using FloatBuilder = NumericBuilder<FloatType>;
using DoubleBuilder = NumericBuilder<DoubleType>;
+class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder {
+ public:
+ explicit AdaptiveIntBuilderBase(MemoryPool* pool);
+
+ /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
+ Status AppendNulls(const uint8_t* valid_bytes, int64_t length) {
+ RETURN_NOT_OK(Reserve(length));
+ UnsafeAppendToBitmap(valid_bytes, length);
+ return Status::OK();
+ }
+
+ Status AppendNull() {
+ RETURN_NOT_OK(Reserve(1));
+ UnsafeAppendToBitmap(false);
+ return Status::OK();
+ }
+
+ std::shared_ptr<Buffer> data() const { return data_; }
+
+ Status Init(int64_t capacity) override;
+
+ /// Increase the capacity of the builder to accommodate at least the indicated
+ /// number of elements
+ Status Resize(int64_t capacity) override;
+
+ protected:
+ std::shared_ptr<PoolBuffer> data_;
+ uint8_t* raw_data_;
+
+ uint8_t int_size_;
+};
+
+// Check if we would need to expand the underlying storage type
+inline uint8_t expanded_uint_size(uint64_t val, uint8_t current_int_size) {
+ if (current_int_size == 8 ||
+ (current_int_size < 8 &&
+ (val > static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())))) {
+ return 8;
+ } else if (current_int_size == 4 ||
+ (current_int_size < 4 &&
+ (val > static_cast<uint64_t>(std::numeric_limits<uint16_t>::max())))) {
+ return 4;
+ } else if (current_int_size == 2 ||
+ (current_int_size == 1 &&
+ (val > static_cast<uint64_t>(std::numeric_limits<uint8_t>::max())))) {
+ return 2;
+ } else {
+ return 1;
+ }
+}
+
+class ARROW_EXPORT AdaptiveUIntBuilder : public AdaptiveIntBuilderBase {
+ public:
+ explicit AdaptiveUIntBuilder(MemoryPool* pool);
+
+ using ArrayBuilder::Advance;
+
+ /// Scalar append
+ Status Append(uint64_t val) {
+ RETURN_NOT_OK(Reserve(1));
+ BitUtil::SetBit(null_bitmap_data_, length_);
+
+ uint8_t new_int_size = expanded_uint_size(val, int_size_);
+ if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); }
+
+ switch (int_size_) {
+ case 1:
+ reinterpret_cast<uint8_t*>(raw_data_)[length_++] = static_cast<uint8_t>(val);
+ break;
+ case 2:
+ reinterpret_cast<uint16_t*>(raw_data_)[length_++] = static_cast<uint16_t>(val);
+ break;
+ case 4:
+ reinterpret_cast<uint32_t*>(raw_data_)[length_++] = static_cast<uint32_t>(val);
+ break;
+ case 8:
+ reinterpret_cast<uint64_t*>(raw_data_)[length_++] = val;
+ break;
+ default:
+ return Status::NotImplemented("This code shall never be reached");
+ }
+ return Status::OK();
+ }
+
+ /// Vector append
+ ///
+ /// If passed, valid_bytes is of equal length to values, and any zero byte
+ /// will be considered as a null for that slot
+ Status Append(
+ const uint64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr);
+
+ Status ExpandIntSize(uint8_t new_int_size);
+ Status Finish(std::shared_ptr<Array>* out) override;
+
+ protected:
+ template <typename new_type, typename old_type>
+ typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+ ExpandIntSizeInternal();
+#define __LESS(a, b) (a) < (b)
+ template <typename new_type, typename old_type>
+ typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+ ExpandIntSizeInternal();
+#undef __LESS
+
+ template <typename new_type>
+ Status ExpandIntSizeN();
+};
+
+// Check if we would need to expand the underlying storage type
+inline uint8_t expanded_int_size(int64_t val, uint8_t current_int_size) {
+ if (current_int_size == 8 ||
+ (current_int_size < 8 &&
+ (val > static_cast<int64_t>(std::numeric_limits<int32_t>::max()) ||
+ val < static_cast<int64_t>(std::numeric_limits<int32_t>::min())))) {
+ return 8;
+ } else if (current_int_size == 4 ||
+ (current_int_size < 4 &&
+ (val > static_cast<int64_t>(std::numeric_limits<int16_t>::max()) ||
+ val < static_cast<int64_t>(std::numeric_limits<int16_t>::min())))) {
+ return 4;
+ } else if (current_int_size == 2 ||
+ (current_int_size == 1 &&
+ (val > static_cast<int64_t>(std::numeric_limits<int8_t>::max()) ||
+ val < static_cast<int64_t>(std::numeric_limits<int8_t>::min())))) {
+ return 2;
+ } else {
+ return 1;
+ }
+}
+
+class ARROW_EXPORT AdaptiveIntBuilder : public AdaptiveIntBuilderBase {
+ public:
+ explicit AdaptiveIntBuilder(MemoryPool* pool);
+
+ using ArrayBuilder::Advance;
+
+ /// Scalar append
+ Status Append(int64_t val) {
+ RETURN_NOT_OK(Reserve(1));
+ BitUtil::SetBit(null_bitmap_data_, length_);
+
+ uint8_t new_int_size = expanded_int_size(val, int_size_);
+ if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); }
+
+ switch (int_size_) {
+ case 1:
+ reinterpret_cast<int8_t*>(raw_data_)[length_++] = static_cast<int8_t>(val);
+ break;
+ case 2:
+ reinterpret_cast<int16_t*>(raw_data_)[length_++] = static_cast<int16_t>(val);
+ break;
+ case 4:
+ reinterpret_cast<int32_t*>(raw_data_)[length_++] = static_cast<int32_t>(val);
+ break;
+ case 8:
+ reinterpret_cast<int64_t*>(raw_data_)[length_++] = val;
+ break;
+ default:
+ return Status::NotImplemented("This code shall never be reached");
+ }
+ return Status::OK();
+ }
+
+ /// Vector append
+ ///
+ /// If passed, valid_bytes is of equal length to values, and any zero byte
+ /// will be considered as a null for that slot
+ Status Append(
+ const int64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr);
+
+ Status ExpandIntSize(uint8_t new_int_size);
+ Status Finish(std::shared_ptr<Array>* out) override;
+
+ protected:
+ template <typename new_type, typename old_type>
+ typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+ ExpandIntSizeInternal();
+#define __LESS(a, b) (a) < (b)
+ template <typename new_type, typename old_type>
+ typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+ ExpandIntSizeInternal();
+#undef __LESS
+
+ template <typename new_type>
+ Status ExpandIntSizeN();
+};
+
class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
public:
explicit BooleanBuilder(MemoryPool* pool);
@@ -271,7 +459,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
/// Scalar append
Status Append(bool val) {
- Reserve(1);
+ RETURN_NOT_OK(Reserve(1));
BitUtil::SetBit(null_bitmap_data_, length_);
if (val) {
BitUtil::SetBit(raw_data_, length_);