You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/06/22 21:32:45 UTC

arrow git commit: ARROW-1073: C++: Adapative integer builder

Repository: arrow
Updated Branches:
  refs/heads/master 222628c9d -> 608b89e16


ARROW-1073: C++: Adapative integer builder

Author: Uwe L. Korn <uw...@xhochy.com>

Closes #723 from xhochy/ARROW-1073 and squashes the following commits:

5bab9c2f [Uwe L. Korn] ARROW-1073: C++: Adapative integer builder


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/608b89e1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/608b89e1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/608b89e1

Branch: refs/heads/master
Commit: 608b89e1648ef8a116c3139606b0440122d2b75a
Parents: 222628c
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Thu Jun 22 17:32:41 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Jun 22 17:32:41 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/array-test.cc        | 220 +++++++++++++++++++++
 cpp/src/arrow/builder-benchmark.cc |  66 +++++++
 cpp/src/arrow/builder.cc           | 331 ++++++++++++++++++++++++++++++++
 cpp/src/arrow/builder.h            | 190 +++++++++++++++++-
 4 files changed, 806 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/array-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index 636d97f..beffa1b 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -1258,6 +1258,226 @@ TEST_F(TestFWBinaryArray, Slice) {
 }
 
 // ----------------------------------------------------------------------
+// AdaptiveInt tests
+
+class TestAdaptiveIntBuilder : public TestBuilder {
+ public:
+  void SetUp() {
+    TestBuilder::SetUp();
+    builder_ = std::make_shared<AdaptiveIntBuilder>(pool_);
+  }
+
+  void Done() { EXPECT_OK(builder_->Finish(&result_)); }
+
+ protected:
+  std::shared_ptr<AdaptiveIntBuilder> builder_;
+
+  std::shared_ptr<Array> expected_;
+  std::shared_ptr<Array> result_;
+};
+
+TEST_F(TestAdaptiveIntBuilder, TestInt8) {
+  builder_->Append(0);
+  builder_->Append(127);
+  builder_->Append(-128);
+
+  Done();
+
+  std::vector<int8_t> expected_values({0, 127, -128});
+  ArrayFromVector<Int8Type, int8_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveIntBuilder, TestInt16) {
+  builder_->Append(0);
+  builder_->Append(128);
+  Done();
+
+  std::vector<int16_t> expected_values({0, 128});
+  ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+
+  SetUp();
+  builder_->Append(-129);
+  expected_values = {-129};
+  Done();
+
+  ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+
+  SetUp();
+  builder_->Append(std::numeric_limits<int16_t>::max());
+  builder_->Append(std::numeric_limits<int16_t>::min());
+  expected_values = {
+      std::numeric_limits<int16_t>::max(), std::numeric_limits<int16_t>::min()};
+  Done();
+
+  ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveIntBuilder, TestInt32) {
+  builder_->Append(0);
+  builder_->Append(static_cast<int64_t>(std::numeric_limits<int16_t>::max()) + 1);
+  Done();
+
+  std::vector<int32_t> expected_values(
+      {0, static_cast<int32_t>(std::numeric_limits<int16_t>::max()) + 1});
+  ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+
+  SetUp();
+  builder_->Append(static_cast<int64_t>(std::numeric_limits<int16_t>::min()) - 1);
+  expected_values = {static_cast<int32_t>(std::numeric_limits<int16_t>::min()) - 1};
+  Done();
+
+  ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+
+  SetUp();
+  builder_->Append(std::numeric_limits<int32_t>::max());
+  builder_->Append(std::numeric_limits<int32_t>::min());
+  expected_values = {
+      std::numeric_limits<int32_t>::max(), std::numeric_limits<int32_t>::min()};
+  Done();
+
+  ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveIntBuilder, TestInt64) {
+  builder_->Append(0);
+  builder_->Append(static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1);
+  Done();
+
+  std::vector<int64_t> expected_values(
+      {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1});
+  ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+
+  SetUp();
+  builder_->Append(static_cast<int64_t>(std::numeric_limits<int32_t>::min()) - 1);
+  expected_values = {static_cast<int64_t>(std::numeric_limits<int32_t>::min()) - 1};
+  Done();
+
+  ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+
+  SetUp();
+  builder_->Append(std::numeric_limits<int64_t>::max());
+  builder_->Append(std::numeric_limits<int64_t>::min());
+  expected_values = {
+      std::numeric_limits<int64_t>::max(), std::numeric_limits<int64_t>::min()};
+  Done();
+
+  ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveIntBuilder, TestAppendVector) {
+  std::vector<int64_t> expected_values(
+      {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1});
+  builder_->Append(expected_values.data(), expected_values.size());
+  Done();
+
+  ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+class TestAdaptiveUIntBuilder : public TestBuilder {
+ public:
+  void SetUp() {
+    TestBuilder::SetUp();
+    builder_ = std::make_shared<AdaptiveUIntBuilder>(pool_);
+  }
+
+  void Done() { EXPECT_OK(builder_->Finish(&result_)); }
+
+ protected:
+  std::shared_ptr<AdaptiveUIntBuilder> builder_;
+
+  std::shared_ptr<Array> expected_;
+  std::shared_ptr<Array> result_;
+};
+
+TEST_F(TestAdaptiveUIntBuilder, TestUInt8) {
+  builder_->Append(0);
+  builder_->Append(255);
+
+  Done();
+
+  std::vector<uint8_t> expected_values({0, 255});
+  ArrayFromVector<UInt8Type, uint8_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveUIntBuilder, TestUInt16) {
+  builder_->Append(0);
+  builder_->Append(256);
+  Done();
+
+  std::vector<uint16_t> expected_values({0, 256});
+  ArrayFromVector<UInt16Type, uint16_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+
+  SetUp();
+  builder_->Append(std::numeric_limits<uint16_t>::max());
+  expected_values = {std::numeric_limits<uint16_t>::max()};
+  Done();
+
+  ArrayFromVector<UInt16Type, uint16_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveUIntBuilder, TestUInt32) {
+  builder_->Append(0);
+  builder_->Append(static_cast<uint64_t>(std::numeric_limits<uint16_t>::max()) + 1);
+  Done();
+
+  std::vector<uint32_t> expected_values(
+      {0, static_cast<uint32_t>(std::numeric_limits<uint16_t>::max()) + 1});
+  ArrayFromVector<UInt32Type, uint32_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+
+  SetUp();
+  builder_->Append(std::numeric_limits<uint32_t>::max());
+  expected_values = {std::numeric_limits<uint32_t>::max()};
+  Done();
+
+  ArrayFromVector<UInt32Type, uint32_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveUIntBuilder, TestUInt64) {
+  builder_->Append(0);
+  builder_->Append(static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+  Done();
+
+  std::vector<uint64_t> expected_values(
+      {0, static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1});
+  ArrayFromVector<UInt64Type, uint64_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+
+  SetUp();
+  builder_->Append(std::numeric_limits<uint64_t>::max());
+  expected_values = {std::numeric_limits<uint64_t>::max()};
+  Done();
+
+  ArrayFromVector<UInt64Type, uint64_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+TEST_F(TestAdaptiveUIntBuilder, TestAppendVector) {
+  std::vector<uint64_t> expected_values(
+      {0, static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1});
+  builder_->Append(expected_values.data(), expected_values.size());
+  Done();
+
+  ArrayFromVector<UInt64Type, uint64_t>(expected_values, &expected_);
+  ASSERT_TRUE(expected_->Equals(result_));
+}
+
+// ----------------------------------------------------------------------
 // List tests
 
 class TestListBuilder : public TestBuilder {

http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/builder-benchmark.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder-benchmark.cc b/cpp/src/arrow/builder-benchmark.cc
index b0c3cd1..62f2fd6 100644
--- a/cpp/src/arrow/builder-benchmark.cc
+++ b/cpp/src/arrow/builder-benchmark.cc
@@ -61,4 +61,70 @@ static void BM_BuildVectorNoNulls(
 
 BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
 
+static void BM_BuildAdaptiveIntNoNulls(
+    benchmark::State& state) {  // NOLINT non-const reference
+  int64_t size = static_cast<int64_t>(std::numeric_limits<int16_t>::max()) * 256;
+  int64_t chunk_size = size / 8;
+  std::vector<int64_t> data;
+  for (int64_t i = 0; i < size; i++) {
+    data.push_back(i);
+  }
+  while (state.KeepRunning()) {
+    AdaptiveIntBuilder builder(default_memory_pool());
+    for (int64_t i = 0; i < size; i += chunk_size) {
+      // Build up an array of 512 MiB in size
+      builder.Append(data.data() + i, chunk_size, nullptr);
+    }
+    std::shared_ptr<Array> out;
+    builder.Finish(&out);
+  }
+  state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
+}
+
+BENCHMARK(BM_BuildAdaptiveIntNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
+
+static void BM_BuildAdaptiveIntNoNullsScalarAppend(
+    benchmark::State& state) {  // NOLINT non-const reference
+  int64_t size = static_cast<int64_t>(std::numeric_limits<int16_t>::max()) * 256;
+  std::vector<int64_t> data;
+  for (int64_t i = 0; i < size; i++) {
+    data.push_back(i);
+  }
+  while (state.KeepRunning()) {
+    AdaptiveIntBuilder builder(default_memory_pool());
+    for (int64_t i = 0; i < size; i++) {
+      builder.Append(data[i]);
+    }
+    std::shared_ptr<Array> out;
+    builder.Finish(&out);
+  }
+  state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
+}
+
+BENCHMARK(BM_BuildAdaptiveIntNoNullsScalarAppend)
+    ->Repetitions(3)
+    ->Unit(benchmark::kMillisecond);
+
+static void BM_BuildAdaptiveUIntNoNulls(
+    benchmark::State& state) {  // NOLINT non-const reference
+  int64_t size = static_cast<int64_t>(std::numeric_limits<uint16_t>::max()) * 256;
+  int64_t chunk_size = size / 8;
+  std::vector<uint64_t> data;
+  for (uint64_t i = 0; i < static_cast<uint64_t>(size); i++) {
+    data.push_back(i);
+  }
+  while (state.KeepRunning()) {
+    AdaptiveUIntBuilder builder(default_memory_pool());
+    for (int64_t i = 0; i < size; i += chunk_size) {
+      // Build up an array of 512 MiB in size
+      builder.Append(data.data() + i, chunk_size, nullptr);
+    }
+    std::shared_ptr<Array> out;
+    builder.Finish(&out);
+  }
+  state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
+}
+
+BENCHMARK(BM_BuildAdaptiveUIntNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index ab43c2a..6762e17 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -17,6 +17,7 @@
 
 #include "arrow/builder.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <cstring>
 #include <limits>
@@ -248,6 +249,336 @@ template class PrimitiveBuilder<HalfFloatType>;
 template class PrimitiveBuilder<FloatType>;
 template class PrimitiveBuilder<DoubleType>;
 
+AdaptiveIntBuilderBase::AdaptiveIntBuilderBase(MemoryPool* pool)
+    : ArrayBuilder(pool, int64()), data_(nullptr), raw_data_(nullptr), int_size_(1) {}
+
+Status AdaptiveIntBuilderBase::Init(int64_t capacity) {
+  RETURN_NOT_OK(ArrayBuilder::Init(capacity));
+  data_ = std::make_shared<PoolBuffer>(pool_);
+
+  int64_t nbytes = capacity * int_size_;
+  RETURN_NOT_OK(data_->Resize(nbytes));
+  // TODO(emkornfield) valgrind complains without this
+  memset(data_->mutable_data(), 0, static_cast<size_t>(nbytes));
+
+  raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
+  return Status::OK();
+}
+
+Status AdaptiveIntBuilderBase::Resize(int64_t capacity) {
+  // XXX: Set floor size for now
+  if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; }
+
+  if (capacity_ == 0) {
+    RETURN_NOT_OK(Init(capacity));
+  } else {
+    RETURN_NOT_OK(ArrayBuilder::Resize(capacity));
+    const int64_t old_bytes = data_->size();
+    const int64_t new_bytes = capacity * int_size_;
+    RETURN_NOT_OK(data_->Resize(new_bytes));
+    raw_data_ = data_->mutable_data();
+    // TODO(emkornfield) valgrind complains without this
+    memset(
+        data_->mutable_data() + old_bytes, 0, static_cast<size_t>(new_bytes - old_bytes));
+  }
+  return Status::OK();
+}
+
+AdaptiveIntBuilder::AdaptiveIntBuilder(MemoryPool* pool) : AdaptiveIntBuilderBase(pool) {}
+
+Status AdaptiveIntBuilder::Finish(std::shared_ptr<Array>* out) {
+  const int64_t bytes_required = length_ * int_size_;
+  if (bytes_required > 0 && bytes_required < data_->size()) {
+    // Trim buffers
+    RETURN_NOT_OK(data_->Resize(bytes_required));
+  }
+  switch (int_size_) {
+    case 1:
+      *out =
+          std::make_shared<Int8Array>(int8(), length_, data_, null_bitmap_, null_count_);
+      break;
+    case 2:
+      *out = std::make_shared<Int16Array>(
+          int16(), length_, data_, null_bitmap_, null_count_);
+      break;
+    case 4:
+      *out = std::make_shared<Int32Array>(
+          int32(), length_, data_, null_bitmap_, null_count_);
+      break;
+    case 8:
+      *out = std::make_shared<Int64Array>(
+          int64(), length_, data_, null_bitmap_, null_count_);
+      break;
+    default:
+      DCHECK(false);
+      return Status::NotImplemented("Only ints of size 1,2,4,8 are supported");
+  }
+
+  data_ = null_bitmap_ = nullptr;
+  capacity_ = length_ = null_count_ = 0;
+  return Status::OK();
+}
+
+Status AdaptiveIntBuilder::Append(
+    const int64_t* values, int64_t length, const uint8_t* valid_bytes) {
+  RETURN_NOT_OK(Reserve(length));
+
+  if (length > 0) {
+    if (int_size_ < 8) {
+      uint8_t new_int_size = int_size_;
+      for (int64_t i = 0; i < length; i++) {
+        if (valid_bytes == nullptr || valid_bytes[i]) {
+          new_int_size = expanded_int_size(values[i], new_int_size);
+        }
+      }
+      if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); }
+    }
+  }
+
+  if (int_size_ == 8) {
+    std::memcpy(reinterpret_cast<int64_t*>(raw_data_) + length_, values,
+        sizeof(int64_t) * length);
+  } else {
+    // int_size_ may have changed, so we need to recheck
+    switch (int_size_) {
+      case 1: {
+        int8_t* data_ptr = reinterpret_cast<int8_t*>(raw_data_) + length_;
+        std::transform(values, values + length, data_ptr,
+            [](int64_t x) { return static_cast<int8_t>(x); });
+      } break;
+      case 2: {
+        int16_t* data_ptr = reinterpret_cast<int16_t*>(raw_data_) + length_;
+        std::transform(values, values + length, data_ptr,
+            [](int64_t x) { return static_cast<int16_t>(x); });
+      } break;
+      case 4: {
+        int32_t* data_ptr = reinterpret_cast<int32_t*>(raw_data_) + length_;
+        std::transform(values, values + length, data_ptr,
+            [](int64_t x) { return static_cast<int32_t>(x); });
+      } break;
+      default:
+        DCHECK(false);
+    }
+  }
+
+  // length_ is update by these
+  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
+
+  return Status::OK();
+}
+
+template <typename new_type, typename old_type>
+typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+AdaptiveIntBuilder::ExpandIntSizeInternal() {
+  return Status::OK();
+}
+
+#define __LESS(a, b) (a) < (b)
+template <typename new_type, typename old_type>
+typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+AdaptiveIntBuilder::ExpandIntSizeInternal() {
+  int_size_ = sizeof(new_type);
+  RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type)));
+
+  old_type* src = reinterpret_cast<old_type*>(raw_data_);
+  new_type* dst = reinterpret_cast<new_type*>(raw_data_);
+  // By doing the backward copy, we ensure that no element is overriden during
+  // the copy process and the copy stays in-place.
+  std::copy_backward(src, src + length_, dst + length_);
+
+  return Status::OK();
+}
+#undef __LESS
+
+template <typename new_type>
+Status AdaptiveIntBuilder::ExpandIntSizeN() {
+  switch (int_size_) {
+    case 1:
+      RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int8_t>()));
+      break;
+    case 2:
+      RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int16_t>()));
+      break;
+    case 4:
+      RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int32_t>()));
+      break;
+    case 8:
+      RETURN_NOT_OK((ExpandIntSizeInternal<new_type, int64_t>()));
+      break;
+    default:
+      DCHECK(false);
+  }
+  return Status::OK();
+}
+
+Status AdaptiveIntBuilder::ExpandIntSize(uint8_t new_int_size) {
+  switch (new_int_size) {
+    case 1:
+      RETURN_NOT_OK((ExpandIntSizeN<int8_t>()));
+      break;
+    case 2:
+      RETURN_NOT_OK((ExpandIntSizeN<int16_t>()));
+      break;
+    case 4:
+      RETURN_NOT_OK((ExpandIntSizeN<int32_t>()));
+      break;
+    case 8:
+      RETURN_NOT_OK((ExpandIntSizeN<int64_t>()));
+      break;
+    default:
+      DCHECK(false);
+  }
+  return Status::OK();
+}
+
+AdaptiveUIntBuilder::AdaptiveUIntBuilder(MemoryPool* pool)
+    : AdaptiveIntBuilderBase(pool) {}
+
+Status AdaptiveUIntBuilder::Finish(std::shared_ptr<Array>* out) {
+  const int64_t bytes_required = length_ * int_size_;
+  if (bytes_required > 0 && bytes_required < data_->size()) {
+    // Trim buffers
+    RETURN_NOT_OK(data_->Resize(bytes_required));
+  }
+  switch (int_size_) {
+    case 1:
+      *out = std::make_shared<UInt8Array>(
+          uint8(), length_, data_, null_bitmap_, null_count_);
+      break;
+    case 2:
+      *out = std::make_shared<UInt16Array>(
+          uint16(), length_, data_, null_bitmap_, null_count_);
+      break;
+    case 4:
+      *out = std::make_shared<UInt32Array>(
+          uint32(), length_, data_, null_bitmap_, null_count_);
+      break;
+    case 8:
+      *out = std::make_shared<UInt64Array>(
+          uint64(), length_, data_, null_bitmap_, null_count_);
+      break;
+    default:
+      DCHECK(false);
+      return Status::NotImplemented("Only ints of size 1,2,4,8 are supported");
+  }
+
+  data_ = null_bitmap_ = nullptr;
+  capacity_ = length_ = null_count_ = 0;
+  return Status::OK();
+}
+
+Status AdaptiveUIntBuilder::Append(
+    const uint64_t* values, int64_t length, const uint8_t* valid_bytes) {
+  RETURN_NOT_OK(Reserve(length));
+
+  if (length > 0) {
+    if (int_size_ < 8) {
+      uint8_t new_int_size = int_size_;
+      for (int64_t i = 0; i < length; i++) {
+        if (valid_bytes == nullptr || valid_bytes[i]) {
+          new_int_size = expanded_uint_size(values[i], new_int_size);
+        }
+      }
+      if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); }
+    }
+  }
+
+  if (int_size_ == 8) {
+    std::memcpy(reinterpret_cast<uint64_t*>(raw_data_) + length_, values,
+        sizeof(uint64_t) * length);
+  } else {
+    // int_size_ may have changed, so we need to recheck
+    switch (int_size_) {
+      case 1: {
+        uint8_t* data_ptr = reinterpret_cast<uint8_t*>(raw_data_) + length_;
+        std::transform(values, values + length, data_ptr,
+            [](uint64_t x) { return static_cast<uint8_t>(x); });
+      } break;
+      case 2: {
+        uint16_t* data_ptr = reinterpret_cast<uint16_t*>(raw_data_) + length_;
+        std::transform(values, values + length, data_ptr,
+            [](uint64_t x) { return static_cast<uint16_t>(x); });
+      } break;
+      case 4: {
+        uint32_t* data_ptr = reinterpret_cast<uint32_t*>(raw_data_) + length_;
+        std::transform(values, values + length, data_ptr,
+            [](uint64_t x) { return static_cast<uint32_t>(x); });
+      } break;
+      default:
+        DCHECK(false);
+    }
+  }
+
+  // length_ is update by these
+  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
+
+  return Status::OK();
+}
+
+template <typename new_type, typename old_type>
+typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+AdaptiveUIntBuilder::ExpandIntSizeInternal() {
+  return Status::OK();
+}
+
+#define __LESS(a, b) (a) < (b)
+template <typename new_type, typename old_type>
+typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+AdaptiveUIntBuilder::ExpandIntSizeInternal() {
+  int_size_ = sizeof(new_type);
+  RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type)));
+
+  old_type* src = reinterpret_cast<old_type*>(raw_data_);
+  new_type* dst = reinterpret_cast<new_type*>(raw_data_);
+  // By doing the backward copy, we ensure that no element is overriden during
+  // the copy process and the copy stays in-place.
+  std::copy_backward(src, src + length_, dst + length_);
+
+  return Status::OK();
+}
+#undef __LESS
+
+template <typename new_type>
+Status AdaptiveUIntBuilder::ExpandIntSizeN() {
+  switch (int_size_) {
+    case 1:
+      RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint8_t>()));
+      break;
+    case 2:
+      RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint16_t>()));
+      break;
+    case 4:
+      RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint32_t>()));
+      break;
+    case 8:
+      RETURN_NOT_OK((ExpandIntSizeInternal<new_type, uint64_t>()));
+      break;
+    default:
+      DCHECK(false);
+  }
+  return Status::OK();
+}
+
+Status AdaptiveUIntBuilder::ExpandIntSize(uint8_t new_int_size) {
+  switch (new_int_size) {
+    case 1:
+      RETURN_NOT_OK((ExpandIntSizeN<uint8_t>()));
+      break;
+    case 2:
+      RETURN_NOT_OK((ExpandIntSizeN<uint16_t>()));
+      break;
+    case 4:
+      RETURN_NOT_OK((ExpandIntSizeN<uint32_t>()));
+      break;
+    case 8:
+      RETURN_NOT_OK((ExpandIntSizeN<uint64_t>()));
+      break;
+    default:
+      DCHECK(false);
+  }
+  return Status::OK();
+}
+
 BooleanBuilder::BooleanBuilder(MemoryPool* pool)
     : ArrayBuilder(pool, boolean()), data_(nullptr), raw_data_(nullptr) {}
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/608b89e1/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 6876916..d77223e 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -20,6 +20,7 @@
 
 #include <cstdint>
 #include <functional>
+#include <limits>
 #include <memory>
 #include <string>
 #include <vector>
@@ -247,6 +248,193 @@ using HalfFloatBuilder = NumericBuilder<HalfFloatType>;
 using FloatBuilder = NumericBuilder<FloatType>;
 using DoubleBuilder = NumericBuilder<DoubleType>;
 
+class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder {
+ public:
+  explicit AdaptiveIntBuilderBase(MemoryPool* pool);
+
+  /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
+  Status AppendNulls(const uint8_t* valid_bytes, int64_t length) {
+    RETURN_NOT_OK(Reserve(length));
+    UnsafeAppendToBitmap(valid_bytes, length);
+    return Status::OK();
+  }
+
+  Status AppendNull() {
+    RETURN_NOT_OK(Reserve(1));
+    UnsafeAppendToBitmap(false);
+    return Status::OK();
+  }
+
+  std::shared_ptr<Buffer> data() const { return data_; }
+
+  Status Init(int64_t capacity) override;
+
+  /// Increase the capacity of the builder to accommodate at least the indicated
+  /// number of elements
+  Status Resize(int64_t capacity) override;
+
+ protected:
+  std::shared_ptr<PoolBuffer> data_;
+  uint8_t* raw_data_;
+
+  uint8_t int_size_;
+};
+
+// Check if we would need to expand the underlying storage type
+inline uint8_t expanded_uint_size(uint64_t val, uint8_t current_int_size) {
+  if (current_int_size == 8 ||
+      (current_int_size < 8 &&
+          (val > static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())))) {
+    return 8;
+  } else if (current_int_size == 4 ||
+             (current_int_size < 4 &&
+                 (val > static_cast<uint64_t>(std::numeric_limits<uint16_t>::max())))) {
+    return 4;
+  } else if (current_int_size == 2 ||
+             (current_int_size == 1 &&
+                 (val > static_cast<uint64_t>(std::numeric_limits<uint8_t>::max())))) {
+    return 2;
+  } else {
+    return 1;
+  }
+}
+
+class ARROW_EXPORT AdaptiveUIntBuilder : public AdaptiveIntBuilderBase {
+ public:
+  explicit AdaptiveUIntBuilder(MemoryPool* pool);
+
+  using ArrayBuilder::Advance;
+
+  /// Scalar append
+  Status Append(uint64_t val) {
+    RETURN_NOT_OK(Reserve(1));
+    BitUtil::SetBit(null_bitmap_data_, length_);
+
+    uint8_t new_int_size = expanded_uint_size(val, int_size_);
+    if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); }
+
+    switch (int_size_) {
+      case 1:
+        reinterpret_cast<uint8_t*>(raw_data_)[length_++] = static_cast<uint8_t>(val);
+        break;
+      case 2:
+        reinterpret_cast<uint16_t*>(raw_data_)[length_++] = static_cast<uint16_t>(val);
+        break;
+      case 4:
+        reinterpret_cast<uint32_t*>(raw_data_)[length_++] = static_cast<uint32_t>(val);
+        break;
+      case 8:
+        reinterpret_cast<uint64_t*>(raw_data_)[length_++] = val;
+        break;
+      default:
+        return Status::NotImplemented("This code shall never be reached");
+    }
+    return Status::OK();
+  }
+
+  /// Vector append
+  ///
+  /// If passed, valid_bytes is of equal length to values, and any zero byte
+  /// will be considered as a null for that slot
+  Status Append(
+      const uint64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr);
+
+  Status ExpandIntSize(uint8_t new_int_size);
+  Status Finish(std::shared_ptr<Array>* out) override;
+
+ protected:
+  template <typename new_type, typename old_type>
+  typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+  ExpandIntSizeInternal();
+#define __LESS(a, b) (a) < (b)
+  template <typename new_type, typename old_type>
+  typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+  ExpandIntSizeInternal();
+#undef __LESS
+
+  template <typename new_type>
+  Status ExpandIntSizeN();
+};
+
+// Check if we would need to expand the underlying storage type
+inline uint8_t expanded_int_size(int64_t val, uint8_t current_int_size) {
+  if (current_int_size == 8 ||
+      (current_int_size < 8 &&
+          (val > static_cast<int64_t>(std::numeric_limits<int32_t>::max()) ||
+              val < static_cast<int64_t>(std::numeric_limits<int32_t>::min())))) {
+    return 8;
+  } else if (current_int_size == 4 ||
+             (current_int_size < 4 &&
+                 (val > static_cast<int64_t>(std::numeric_limits<int16_t>::max()) ||
+                     val < static_cast<int64_t>(std::numeric_limits<int16_t>::min())))) {
+    return 4;
+  } else if (current_int_size == 2 ||
+             (current_int_size == 1 &&
+                 (val > static_cast<int64_t>(std::numeric_limits<int8_t>::max()) ||
+                     val < static_cast<int64_t>(std::numeric_limits<int8_t>::min())))) {
+    return 2;
+  } else {
+    return 1;
+  }
+}
+
+class ARROW_EXPORT AdaptiveIntBuilder : public AdaptiveIntBuilderBase {
+ public:
+  explicit AdaptiveIntBuilder(MemoryPool* pool);
+
+  using ArrayBuilder::Advance;
+
+  /// Scalar append
+  Status Append(int64_t val) {
+    RETURN_NOT_OK(Reserve(1));
+    BitUtil::SetBit(null_bitmap_data_, length_);
+
+    uint8_t new_int_size = expanded_int_size(val, int_size_);
+    if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); }
+
+    switch (int_size_) {
+      case 1:
+        reinterpret_cast<int8_t*>(raw_data_)[length_++] = static_cast<int8_t>(val);
+        break;
+      case 2:
+        reinterpret_cast<int16_t*>(raw_data_)[length_++] = static_cast<int16_t>(val);
+        break;
+      case 4:
+        reinterpret_cast<int32_t*>(raw_data_)[length_++] = static_cast<int32_t>(val);
+        break;
+      case 8:
+        reinterpret_cast<int64_t*>(raw_data_)[length_++] = val;
+        break;
+      default:
+        return Status::NotImplemented("This code shall never be reached");
+    }
+    return Status::OK();
+  }
+
+  /// Vector append
+  ///
+  /// If passed, valid_bytes is of equal length to values, and any zero byte
+  /// will be considered as a null for that slot
+  Status Append(
+      const int64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr);
+
+  Status ExpandIntSize(uint8_t new_int_size);
+  Status Finish(std::shared_ptr<Array>* out) override;
+
+ protected:
+  template <typename new_type, typename old_type>
+  typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
+  ExpandIntSizeInternal();
+#define __LESS(a, b) (a) < (b)
+  template <typename new_type, typename old_type>
+  typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type
+  ExpandIntSizeInternal();
+#undef __LESS
+
+  template <typename new_type>
+  Status ExpandIntSizeN();
+};
+
 class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
  public:
   explicit BooleanBuilder(MemoryPool* pool);
@@ -271,7 +459,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
 
   /// Scalar append
   Status Append(bool val) {
-    Reserve(1);
+    RETURN_NOT_OK(Reserve(1));
     BitUtil::SetBit(null_bitmap_data_, length_);
     if (val) {
       BitUtil::SetBit(raw_data_, length_);