You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/11/30 18:31:20 UTC

[arrow] branch master updated: ARROW-3893: [C++] Improve adaptive int builder performance

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e87110f  ARROW-3893: [C++] Improve adaptive int builder performance
e87110f is described below

commit e87110f174ec0374f3c59f7fdafe3a22433e7ce4
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Fri Nov 30 12:29:17 2018 -0600

    ARROW-3893: [C++] Improve adaptive int builder performance
    
    The strategy is two-pronged:
    - scalar append to int builder appends to a small uint64_t scratch space, deferring width detection and conversion until the scratch space is full
    - bulk int width detection is much improved
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #3040 from pitrou/ARROW-3893-adaptive-int-builder-perf and squashes the following commits:
    
    61a101b12 <Antoine Pitrou> ARROW-3893:  Improve adaptive int builder performance
---
 cpp/src/arrow/CMakeLists.txt             |   1 +
 cpp/src/arrow/array-test.cc              | 107 ++++++--
 cpp/src/arrow/builder-benchmark.cc       |  37 ++-
 cpp/src/arrow/builder.cc                 | 223 ++++++++++-------
 cpp/src/arrow/builder.h                  | 126 +++-------
 cpp/src/arrow/util/CMakeLists.txt        |   2 +
 cpp/src/arrow/util/int-util-benchmark.cc | 109 +++++++++
 cpp/src/arrow/util/int-util-test.cc      | 379 +++++++++++++++++++++++++++++
 cpp/src/arrow/util/int-util.cc           | 406 +++++++++++++++++++++++++++++++
 cpp/src/arrow/util/int-util.h            |  69 ++++++
 10 files changed, 1245 insertions(+), 214 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 8ff17a2..a56079f 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -48,6 +48,7 @@ set(ARROW_SRCS
   util/compression.cc
   util/cpu-info.cc
   util/decimal.cc
+  util/int-util.cc
   util/io-util.cc
   util/logging.cc
   util/key_value_metadata.cc
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index ab03ced..5866058 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -1199,7 +1199,7 @@ TEST_F(TestAdaptiveIntBuilder, TestInt8) {
 
   std::vector<int8_t> expected_values({0, 127, -128});
   ArrayFromVector<Int8Type, int8_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 }
 
 TEST_F(TestAdaptiveIntBuilder, TestInt16) {
@@ -1209,7 +1209,7 @@ TEST_F(TestAdaptiveIntBuilder, TestInt16) {
 
   std::vector<int16_t> expected_values({0, 128});
   ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 
   SetUp();
   ASSERT_OK(builder_->Append(-129));
@@ -1217,7 +1217,7 @@ TEST_F(TestAdaptiveIntBuilder, TestInt16) {
   Done();
 
   ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 
   SetUp();
   ASSERT_OK(builder_->Append(std::numeric_limits<int16_t>::max()));
@@ -1227,7 +1227,7 @@ TEST_F(TestAdaptiveIntBuilder, TestInt16) {
   Done();
 
   ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 }
 
 TEST_F(TestAdaptiveIntBuilder, TestInt32) {
@@ -1239,7 +1239,7 @@ TEST_F(TestAdaptiveIntBuilder, TestInt32) {
   std::vector<int32_t> expected_values(
       {0, static_cast<int32_t>(std::numeric_limits<int16_t>::max()) + 1});
   ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 
   SetUp();
   ASSERT_OK(
@@ -1248,7 +1248,7 @@ TEST_F(TestAdaptiveIntBuilder, TestInt32) {
   Done();
 
   ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 
   SetUp();
   ASSERT_OK(builder_->Append(std::numeric_limits<int32_t>::max()));
@@ -1258,7 +1258,7 @@ TEST_F(TestAdaptiveIntBuilder, TestInt32) {
   Done();
 
   ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 }
 
 TEST_F(TestAdaptiveIntBuilder, TestInt64) {
@@ -1270,7 +1270,7 @@ TEST_F(TestAdaptiveIntBuilder, TestInt64) {
   std::vector<int64_t> expected_values(
       {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1});
   ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 
   SetUp();
   ASSERT_OK(
@@ -1279,7 +1279,7 @@ TEST_F(TestAdaptiveIntBuilder, TestInt64) {
   Done();
 
   ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 
   SetUp();
   ASSERT_OK(builder_->Append(std::numeric_limits<int64_t>::max()));
@@ -1289,17 +1289,58 @@ TEST_F(TestAdaptiveIntBuilder, TestInt64) {
   Done();
 
   ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  AssertArraysEqual(*expected_, *result_);
 }
 
 TEST_F(TestAdaptiveIntBuilder, TestAppendValues) {
-  std::vector<int64_t> expected_values(
-      {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1});
-  ASSERT_OK(builder_->AppendValues(expected_values.data(), expected_values.size()));
-  Done();
-
-  ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
-  ASSERT_TRUE(expected_->Equals(result_));
+  {
+    std::vector<int64_t> expected_values(
+        {0, static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1});
+    ASSERT_OK(builder_->AppendValues(expected_values.data(), expected_values.size()));
+    Done();
+
+    ArrayFromVector<Int64Type, int64_t>(expected_values, &expected_);
+    AssertArraysEqual(*expected_, *result_);
+  }
+  {
+    SetUp();
+    std::vector<int64_t> values(
+        {0, std::numeric_limits<int32_t>::min(), std::numeric_limits<int32_t>::max()});
+    ASSERT_OK(builder_->AppendValues(values.data(), values.size()));
+    Done();
+
+    std::vector<int32_t> expected_values(
+        {0, std::numeric_limits<int32_t>::min(), std::numeric_limits<int32_t>::max()});
+
+    ArrayFromVector<Int32Type, int32_t>(expected_values, &expected_);
+    AssertArraysEqual(*expected_, *result_);
+  }
+  {
+    SetUp();
+    std::vector<int64_t> values(
+        {0, std::numeric_limits<int16_t>::min(), std::numeric_limits<int16_t>::max()});
+    ASSERT_OK(builder_->AppendValues(values.data(), values.size()));
+    Done();
+
+    std::vector<int16_t> expected_values(
+        {0, std::numeric_limits<int16_t>::min(), std::numeric_limits<int16_t>::max()});
+
+    ArrayFromVector<Int16Type, int16_t>(expected_values, &expected_);
+    AssertArraysEqual(*expected_, *result_);
+  }
+  {
+    SetUp();
+    std::vector<int64_t> values(
+        {0, std::numeric_limits<int8_t>::min(), std::numeric_limits<int8_t>::max()});
+    ASSERT_OK(builder_->AppendValues(values.data(), values.size()));
+    Done();
+
+    std::vector<int8_t> expected_values(
+        {0, std::numeric_limits<int8_t>::min(), std::numeric_limits<int8_t>::max()});
+
+    ArrayFromVector<Int8Type, int8_t>(expected_values, &expected_);
+    AssertArraysEqual(*expected_, *result_);
+  }
 }
 
 TEST_F(TestAdaptiveIntBuilder, TestAssertZeroPadded) {
@@ -1311,15 +1352,23 @@ TEST_F(TestAdaptiveIntBuilder, TestAssertZeroPadded) {
 
 TEST_F(TestAdaptiveIntBuilder, TestAppendNull) {
   int64_t size = 1000;
-  for (unsigned index = 0; index < size; ++index) {
+  ASSERT_OK(builder_->Append(127));
+  for (unsigned index = 1; index < size - 1; ++index) {
     ASSERT_OK(builder_->AppendNull());
   }
+  ASSERT_OK(builder_->Append(-128));
 
   Done();
 
-  for (unsigned index = 0; index < size; ++index) {
-    ASSERT_TRUE(result_->IsNull(index));
-  }
+  std::vector<bool> expected_valid(size, false);
+  expected_valid[0] = true;
+  expected_valid[size - 1] = true;
+  std::vector<int8_t> expected_values(size);
+  expected_values[0] = 127;
+  expected_values[size - 1] = -128;
+  std::shared_ptr<Array> expected;
+  ArrayFromVector<Int8Type, int8_t>(expected_valid, expected_values, &expected_);
+  AssertArraysEqual(*expected_, *result_);
 }
 
 TEST_F(TestAdaptiveIntBuilder, TestAppendNulls) {
@@ -1438,15 +1487,23 @@ TEST_F(TestAdaptiveUIntBuilder, TestAssertZeroPadded) {
 
 TEST_F(TestAdaptiveUIntBuilder, TestAppendNull) {
   int64_t size = 1000;
-  for (unsigned index = 0; index < size; ++index) {
+  ASSERT_OK(builder_->Append(254));
+  for (unsigned index = 1; index < size - 1; ++index) {
     ASSERT_OK(builder_->AppendNull());
   }
+  ASSERT_OK(builder_->Append(255));
 
   Done();
 
-  for (unsigned index = 0; index < size; ++index) {
-    ASSERT_TRUE(result_->IsNull(index));
-  }
+  std::vector<bool> expected_valid(size, false);
+  expected_valid[0] = true;
+  expected_valid[size - 1] = true;
+  std::vector<uint8_t> expected_values(size);
+  expected_values[0] = 254;
+  expected_values[size - 1] = 255;
+  std::shared_ptr<Array> expected;
+  ArrayFromVector<UInt8Type, uint8_t>(expected_valid, expected_values, &expected_);
+  AssertArraysEqual(*expected_, *result_);
 }
 
 TEST_F(TestAdaptiveUIntBuilder, TestAppendNulls) {
diff --git a/cpp/src/arrow/builder-benchmark.cc b/cpp/src/arrow/builder-benchmark.cc
index 4acede1..f96728d 100644
--- a/cpp/src/arrow/builder-benchmark.cc
+++ b/cpp/src/arrow/builder-benchmark.cc
@@ -69,14 +69,14 @@ static void BM_BuildAdaptiveIntNoNulls(
     benchmark::State& state) {  // NOLINT non-const reference
   int64_t size = static_cast<int64_t>(std::numeric_limits<int16_t>::max()) * 256;
   int64_t chunk_size = size / 8;
-  std::vector<int64_t> data;
+  std::vector<int64_t> data(size);
   for (int64_t i = 0; i < size; i++) {
-    data.push_back(i);
+    data[i] = i;
   }
   while (state.KeepRunning()) {
     AdaptiveIntBuilder builder;
     for (int64_t i = 0; i < size; i += chunk_size) {
-      // Build up an array of 512 MiB in size
+      // Build up an array of 128 MiB in size
       ABORT_NOT_OK(builder.AppendValues(data.data() + i, chunk_size, nullptr));
     }
     std::shared_ptr<Array> out;
@@ -88,9 +88,9 @@ static void BM_BuildAdaptiveIntNoNulls(
 static void BM_BuildAdaptiveIntNoNullsScalarAppend(
     benchmark::State& state) {  // NOLINT non-const reference
   int64_t size = static_cast<int64_t>(std::numeric_limits<int16_t>::max()) * 256;
-  std::vector<int64_t> data;
+  std::vector<int64_t> data(size);
   for (int64_t i = 0; i < size; i++) {
-    data.push_back(i);
+    data[i] = i;
   }
   while (state.KeepRunning()) {
     AdaptiveIntBuilder builder;
@@ -107,14 +107,14 @@ static void BM_BuildAdaptiveUIntNoNulls(
     benchmark::State& state) {  // NOLINT non-const reference
   int64_t size = static_cast<int64_t>(std::numeric_limits<uint16_t>::max()) * 256;
   int64_t chunk_size = size / 8;
-  std::vector<uint64_t> data;
+  std::vector<uint64_t> data(size);
   for (uint64_t i = 0; i < static_cast<uint64_t>(size); i++) {
-    data.push_back(i);
+    data[i] = i;
   }
   while (state.KeepRunning()) {
     AdaptiveUIntBuilder builder;
     for (int64_t i = 0; i < size; i += chunk_size) {
-      // Build up an array of 512 MiB in size
+      // Build up an array of 128 MiB in size
       ABORT_NOT_OK(builder.AppendValues(data.data() + i, chunk_size, nullptr));
     }
     std::shared_ptr<Array> out;
@@ -123,6 +123,24 @@ static void BM_BuildAdaptiveUIntNoNulls(
   state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
 }
 
+static void BM_BuildAdaptiveUIntNoNullsScalarAppend(
+    benchmark::State& state) {  // NOLINT non-const reference
+  int64_t size = static_cast<int64_t>(std::numeric_limits<int16_t>::max()) * 256;
+  std::vector<uint64_t> data(size);
+  for (uint64_t i = 0; i < static_cast<uint64_t>(size); i++) {
+    data[i] = i;
+  }
+  while (state.KeepRunning()) {
+    AdaptiveUIntBuilder builder;
+    for (int64_t i = 0; i < size; i++) {
+      ABORT_NOT_OK(builder.Append(data[i]));
+    }
+    std::shared_ptr<Array> out;
+    ABORT_NOT_OK(builder.Finish(&out));
+  }
+  state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t));
+}
+
 static void BM_BuildBooleanArrayNoNulls(
     benchmark::State& state) {  // NOLINT non-const reference
   // 2 MiB block
@@ -349,6 +367,9 @@ BENCHMARK(BM_BuildAdaptiveIntNoNullsScalarAppend)
 BENCHMARK(BM_BuildAdaptiveUIntNoNulls)
     ->Repetitions(kRepetitions)
     ->Unit(benchmark::kMicrosecond);
+BENCHMARK(BM_BuildAdaptiveUIntNoNullsScalarAppend)
+    ->Repetitions(kRepetitions)
+    ->Unit(benchmark::kMicrosecond);
 
 BENCHMARK(BM_BuildBinaryArray)->Repetitions(kRepetitions)->Unit(benchmark::kMicrosecond);
 BENCHMARK(BM_BuildFixedSizeBinaryArray)
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 0d9e0e0..0e10be7 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -34,6 +34,7 @@
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/decimal.h"
 #include "arrow/util/hashing.h"
+#include "arrow/util/int-util.h"
 #include "arrow/util/logging.h"
 
 namespace arrow {
@@ -295,12 +296,19 @@ template class PrimitiveBuilder<FloatType>;
 template class PrimitiveBuilder<DoubleType>;
 
 AdaptiveIntBuilderBase::AdaptiveIntBuilderBase(MemoryPool* pool)
-    : ArrayBuilder(int64(), pool), data_(nullptr), raw_data_(nullptr), int_size_(1) {}
+    : ArrayBuilder(int64(), pool),
+      data_(nullptr),
+      raw_data_(nullptr),
+      int_size_(1),
+      pending_pos_(0),
+      pending_has_nulls_(false) {}
 
 void AdaptiveIntBuilderBase::Reset() {
   ArrayBuilder::Reset();
   data_.reset();
   raw_data_ = nullptr;
+  pending_pos_ = 0;
+  pending_has_nulls_ = false;
 }
 
 Status AdaptiveIntBuilderBase::Resize(int64_t capacity) {
@@ -321,6 +329,8 @@ Status AdaptiveIntBuilderBase::Resize(int64_t capacity) {
 AdaptiveIntBuilder::AdaptiveIntBuilder(MemoryPool* pool) : AdaptiveIntBuilderBase(pool) {}
 
 Status AdaptiveIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  RETURN_NOT_OK(CommitPendingData());
+
   std::shared_ptr<DataType> output_type;
   switch (int_size_) {
     case 1:
@@ -350,62 +360,91 @@ Status AdaptiveIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
   return Status::OK();
 }
 
-Status AdaptiveIntBuilder::AppendValues(const int64_t* values, int64_t length,
-                                        const uint8_t* valid_bytes) {
-  RETURN_NOT_OK(Reserve(length));
+Status AdaptiveIntBuilder::CommitPendingData() {
+  if (pending_pos_ == 0) {
+    return Status::OK();
+  }
+  RETURN_NOT_OK(Reserve(pending_pos_));
+  const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr;
+  RETURN_NOT_OK(AppendValuesInternal(reinterpret_cast<const int64_t*>(pending_data_),
+                                     pending_pos_, valid_bytes));
+  pending_has_nulls_ = false;
+  pending_pos_ = 0;
+  return Status::OK();
+}
 
-  if (length > 0) {
-    if (int_size_ < 8) {
-      uint8_t new_int_size = int_size_;
-      for (int64_t i = 0; i < length; i++) {
-        if (valid_bytes == nullptr || valid_bytes[i]) {
-          new_int_size = internal::ExpandedIntSize(values[i], new_int_size);
-        }
-      }
-      if (new_int_size != int_size_) {
-        RETURN_NOT_OK(ExpandIntSize(new_int_size));
-      }
+static constexpr int64_t kAdaptiveIntChunkSize = 8192;
+
+Status AdaptiveIntBuilder::AppendValuesInternal(const int64_t* values, int64_t length,
+                                                const uint8_t* valid_bytes) {
+  while (length > 0) {
+    // In case `length` is very large, we don't want to trash the cache by
+    // scanning it twice (first to detect int width, second to copy the data).
+    // Instead, process data in L2-cacheable chunks.
+    const int64_t chunk_size = std::min(length, kAdaptiveIntChunkSize);
+
+    uint8_t new_int_size;
+    new_int_size = internal::DetectIntWidth(values, valid_bytes, chunk_size, int_size_);
+
+    DCHECK_GE(new_int_size, int_size_);
+    if (new_int_size > int_size_) {
+      // This updates int_size_
+      RETURN_NOT_OK(ExpandIntSize(new_int_size));
     }
-  }
 
-  if (int_size_ == 8) {
-    std::memcpy(reinterpret_cast<int64_t*>(raw_data_) + length_, values,
-                sizeof(int64_t) * length);
-  } else {
-#ifdef _MSC_VER
-#pragma warning(push)
-#pragma warning(disable : 4996)
-#endif
-    // int_size_ may have changed, so we need to recheck
     switch (int_size_) {
-      case 1: {
-        int8_t* data_ptr = reinterpret_cast<int8_t*>(raw_data_) + length_;
-        std::transform(values, values + length, data_ptr,
-                       [](int64_t x) { return static_cast<int8_t>(x); });
-      } break;
-      case 2: {
-        int16_t* data_ptr = reinterpret_cast<int16_t*>(raw_data_) + length_;
-        std::transform(values, values + length, data_ptr,
-                       [](int64_t x) { return static_cast<int16_t>(x); });
-      } break;
-      case 4: {
-        int32_t* data_ptr = reinterpret_cast<int32_t*>(raw_data_) + length_;
-        std::transform(values, values + length, data_ptr,
-                       [](int64_t x) { return static_cast<int32_t>(x); });
-      } break;
+      case 1:
+        internal::DowncastInts(values, reinterpret_cast<int8_t*>(raw_data_) + length_,
+                               chunk_size);
+        break;
+      case 2:
+        internal::DowncastInts(values, reinterpret_cast<int16_t*>(raw_data_) + length_,
+                               chunk_size);
+        break;
+      case 4:
+        internal::DowncastInts(values, reinterpret_cast<int32_t*>(raw_data_) + length_,
+                               chunk_size);
+        break;
+      case 8:
+        internal::DowncastInts(values, reinterpret_cast<int64_t*>(raw_data_) + length_,
+                               chunk_size);
+        break;
       default:
         DCHECK(false);
     }
-#ifdef _MSC_VER
-#pragma warning(pop)
-#endif
+
+    // This updates length_
+    ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, chunk_size);
+    values += chunk_size;
+    if (valid_bytes != nullptr) {
+      valid_bytes += chunk_size;
+    }
+    length -= chunk_size;
   }
 
-  // length_ is update by these
-  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
   return Status::OK();
 }
 
+Status AdaptiveUIntBuilder::CommitPendingData() {
+  if (pending_pos_ == 0) {
+    return Status::OK();
+  }
+  RETURN_NOT_OK(Reserve(pending_pos_));
+  const uint8_t* valid_bytes = pending_has_nulls_ ? pending_valid_ : nullptr;
+  RETURN_NOT_OK(AppendValuesInternal(pending_data_, pending_pos_, valid_bytes));
+  pending_has_nulls_ = false;
+  pending_pos_ = 0;
+  return Status::OK();
+}
+
+Status AdaptiveIntBuilder::AppendValues(const int64_t* values, int64_t length,
+                                        const uint8_t* valid_bytes) {
+  RETURN_NOT_OK(CommitPendingData());
+  RETURN_NOT_OK(Reserve(length));
+
+  return AppendValuesInternal(values, length, valid_bytes);
+}
+
 template <typename new_type, typename old_type>
 typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
 AdaptiveIntBuilder::ExpandIntSizeInternal() {
@@ -418,9 +457,10 @@ typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::typ
 AdaptiveIntBuilder::ExpandIntSizeInternal() {
   int_size_ = sizeof(new_type);
   RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type)));
-
-  old_type* src = reinterpret_cast<old_type*>(raw_data_);
+  raw_data_ = reinterpret_cast<uint8_t*>(data_->mutable_data());
+  const old_type* src = reinterpret_cast<old_type*>(raw_data_);
   new_type* dst = reinterpret_cast<new_type*>(raw_data_);
+
   // By doing the backward copy, we ensure that no element is overriden during
   // the copy process and the copy stays in-place.
   std::copy_backward(src, src + length_, dst + length_);
@@ -474,6 +514,8 @@ AdaptiveUIntBuilder::AdaptiveUIntBuilder(MemoryPool* pool)
     : AdaptiveIntBuilderBase(pool) {}
 
 Status AdaptiveUIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
+  RETURN_NOT_OK(CommitPendingData());
+
   std::shared_ptr<DataType> output_type;
   switch (int_size_) {
     case 1:
@@ -503,62 +545,61 @@ Status AdaptiveUIntBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
   return Status::OK();
 }
 
-Status AdaptiveUIntBuilder::AppendValues(const uint64_t* values, int64_t length,
-                                         const uint8_t* valid_bytes) {
-  RETURN_NOT_OK(Reserve(length));
+Status AdaptiveUIntBuilder::AppendValuesInternal(const uint64_t* values, int64_t length,
+                                                 const uint8_t* valid_bytes) {
+  while (length > 0) {
+    // See AdaptiveIntBuilder::AppendValuesInternal
+    const int64_t chunk_size = std::min(length, kAdaptiveIntChunkSize);
 
-  if (length > 0) {
-    if (int_size_ < 8) {
-      uint8_t new_int_size = int_size_;
-      for (int64_t i = 0; i < length; i++) {
-        if (valid_bytes == nullptr || valid_bytes[i]) {
-          new_int_size = internal::ExpandedUIntSize(values[i], new_int_size);
-        }
-      }
-      if (new_int_size != int_size_) {
-        RETURN_NOT_OK(ExpandIntSize(new_int_size));
-      }
+    uint8_t new_int_size;
+    new_int_size = internal::DetectUIntWidth(values, valid_bytes, chunk_size, int_size_);
+
+    DCHECK_GE(new_int_size, int_size_);
+    if (new_int_size > int_size_) {
+      // This updates int_size_
+      RETURN_NOT_OK(ExpandIntSize(new_int_size));
     }
-  }
 
-  if (int_size_ == 8) {
-    std::memcpy(reinterpret_cast<uint64_t*>(raw_data_) + length_, values,
-                sizeof(uint64_t) * length);
-  } else {
-#ifdef _MSC_VER
-#pragma warning(push)
-#pragma warning(disable : 4996)
-#endif
-    // int_size_ may have changed, so we need to recheck
     switch (int_size_) {
-      case 1: {
-        uint8_t* data_ptr = reinterpret_cast<uint8_t*>(raw_data_) + length_;
-        std::transform(values, values + length, data_ptr,
-                       [](uint64_t x) { return static_cast<uint8_t>(x); });
-      } break;
-      case 2: {
-        uint16_t* data_ptr = reinterpret_cast<uint16_t*>(raw_data_) + length_;
-        std::transform(values, values + length, data_ptr,
-                       [](uint64_t x) { return static_cast<uint16_t>(x); });
-      } break;
-      case 4: {
-        uint32_t* data_ptr = reinterpret_cast<uint32_t*>(raw_data_) + length_;
-        std::transform(values, values + length, data_ptr,
-                       [](uint64_t x) { return static_cast<uint32_t>(x); });
-      } break;
+      case 1:
+        internal::DowncastUInts(values, reinterpret_cast<uint8_t*>(raw_data_) + length_,
+                                chunk_size);
+        break;
+      case 2:
+        internal::DowncastUInts(values, reinterpret_cast<uint16_t*>(raw_data_) + length_,
+                                chunk_size);
+        break;
+      case 4:
+        internal::DowncastUInts(values, reinterpret_cast<uint32_t*>(raw_data_) + length_,
+                                chunk_size);
+        break;
+      case 8:
+        internal::DowncastUInts(values, reinterpret_cast<uint64_t*>(raw_data_) + length_,
+                                chunk_size);
+        break;
       default:
         DCHECK(false);
     }
-#ifdef _MSC_VER
-#pragma warning(pop)
-#endif
+
+    // This updates length_
+    ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, chunk_size);
+    values += chunk_size;
+    if (valid_bytes != nullptr) {
+      valid_bytes += chunk_size;
+    }
+    length -= chunk_size;
   }
 
-  // length_ is update by these
-  ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
   return Status::OK();
 }
 
+Status AdaptiveUIntBuilder::AppendValues(const uint64_t* values, int64_t length,
+                                         const uint8_t* valid_bytes) {
+  RETURN_NOT_OK(Reserve(length));
+
+  return AppendValuesInternal(values, length, valid_bytes);
+}
+
 template <typename new_type, typename old_type>
 typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
 AdaptiveUIntBuilder::ExpandIntSizeInternal() {
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 183e7e9..34cac55 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -432,6 +432,7 @@ class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder {
 
   /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
   Status AppendNulls(const uint8_t* valid_bytes, int64_t length) {
+    ARROW_RETURN_NOT_OK(CommitPendingData());
     ARROW_RETURN_NOT_OK(Reserve(length));
     memset(data_->mutable_data() + length_ * int_size_, 0, int_size_ * length);
     UnsafeAppendToBitmap(valid_bytes, length);
@@ -439,9 +440,14 @@ class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder {
   }
 
   Status AppendNull() {
-    ARROW_RETURN_NOT_OK(Reserve(1));
-    memset(data_->mutable_data() + length_ * int_size_, 0, int_size_);
-    UnsafeAppendToBitmap(false);
+    pending_data_[pending_pos_] = 0;
+    pending_valid_[pending_pos_] = 0;
+    pending_has_nulls_ = true;
+    ++pending_pos_;
+
+    if (ARROW_PREDICT_FALSE(pending_pos_ >= pending_size_)) {
+      return CommitPendingData();
+    }
     return Status::OK();
   }
 
@@ -449,54 +455,18 @@ class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder {
   Status Resize(int64_t capacity) override;
 
  protected:
+  virtual Status CommitPendingData() = 0;
+
   std::shared_ptr<ResizableBuffer> data_;
   uint8_t* raw_data_;
-
   uint8_t int_size_;
-};
 
-// TODO investigate AdaptiveIntBuilder / AdaptiveUIntBuilder performance
-
-// Check if we would need to expand the underlying storage type
-inline uint8_t ExpandedIntSize(int64_t val, uint8_t current_int_size) {
-  if (current_int_size == 8 ||
-      (current_int_size < 8 &&
-       (val > static_cast<int64_t>(std::numeric_limits<int32_t>::max()) ||
-        val < static_cast<int64_t>(std::numeric_limits<int32_t>::min())))) {
-    return 8;
-  } else if (current_int_size == 4 ||
-             (current_int_size < 4 &&
-              (val > static_cast<int64_t>(std::numeric_limits<int16_t>::max()) ||
-               val < static_cast<int64_t>(std::numeric_limits<int16_t>::min())))) {
-    return 4;
-  } else if (current_int_size == 2 ||
-             (current_int_size == 1 &&
-              (val > static_cast<int64_t>(std::numeric_limits<int8_t>::max()) ||
-               val < static_cast<int64_t>(std::numeric_limits<int8_t>::min())))) {
-    return 2;
-  } else {
-    return 1;
-  }
-}
-
-// Check if we would need to expand the underlying storage type
-inline uint8_t ExpandedUIntSize(uint64_t val, uint8_t current_int_size) {
-  if (current_int_size == 8 ||
-      (current_int_size < 8 &&
-       (val > static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())))) {
-    return 8;
-  } else if (current_int_size == 4 ||
-             (current_int_size < 4 &&
-              (val > static_cast<uint64_t>(std::numeric_limits<uint16_t>::max())))) {
-    return 4;
-  } else if (current_int_size == 2 ||
-             (current_int_size == 1 &&
-              (val > static_cast<uint64_t>(std::numeric_limits<uint8_t>::max())))) {
-    return 2;
-  } else {
-    return 1;
-  }
-}
+  static constexpr int32_t pending_size_ = 1024;
+  uint8_t pending_valid_[pending_size_];
+  uint64_t pending_data_[pending_size_];
+  int32_t pending_pos_;
+  bool pending_has_nulls_;
+};
 
 }  // namespace internal
 
@@ -509,29 +479,12 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase
 
   /// Scalar append
   Status Append(const uint64_t val) {
-    ARROW_RETURN_NOT_OK(Reserve(1));
-    BitUtil::SetBit(null_bitmap_data_, length_);
+    pending_data_[pending_pos_] = val;
+    pending_valid_[pending_pos_] = 1;
+    ++pending_pos_;
 
-    uint8_t new_int_size = internal::ExpandedUIntSize(val, int_size_);
-    if (new_int_size != int_size_) {
-      ARROW_RETURN_NOT_OK(ExpandIntSize(new_int_size));
-    }
-
-    switch (int_size_) {
-      case 1:
-        reinterpret_cast<uint8_t*>(raw_data_)[length_++] = static_cast<uint8_t>(val);
-        break;
-      case 2:
-        reinterpret_cast<uint16_t*>(raw_data_)[length_++] = static_cast<uint16_t>(val);
-        break;
-      case 4:
-        reinterpret_cast<uint32_t*>(raw_data_)[length_++] = static_cast<uint32_t>(val);
-        break;
-      case 8:
-        reinterpret_cast<uint64_t*>(raw_data_)[length_++] = val;
-        break;
-      default:
-        return Status::NotImplemented("This code shall never be reached");
+    if (ARROW_PREDICT_FALSE(pending_pos_ >= pending_size_)) {
+      return CommitPendingData();
     }
     return Status::OK();
   }
@@ -548,8 +501,12 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase
   Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
 
  protected:
+  Status CommitPendingData() override;
   Status ExpandIntSize(uint8_t new_int_size);
 
+  Status AppendValuesInternal(const uint64_t* values, int64_t length,
+                              const uint8_t* valid_bytes);
+
   template <typename new_type, typename old_type>
   typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
   ExpandIntSizeInternal();
@@ -572,29 +529,14 @@ class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase
 
   /// Scalar append
   Status Append(const int64_t val) {
-    ARROW_RETURN_NOT_OK(Reserve(1));
-    BitUtil::SetBit(null_bitmap_data_, length_);
+    auto v = static_cast<uint64_t>(val);
 
-    uint8_t new_int_size = internal::ExpandedIntSize(val, int_size_);
-    if (new_int_size != int_size_) {
-      ARROW_RETURN_NOT_OK(ExpandIntSize(new_int_size));
-    }
+    pending_data_[pending_pos_] = v;
+    pending_valid_[pending_pos_] = 1;
+    ++pending_pos_;
 
-    switch (int_size_) {
-      case 1:
-        reinterpret_cast<int8_t*>(raw_data_)[length_++] = static_cast<int8_t>(val);
-        break;
-      case 2:
-        reinterpret_cast<int16_t*>(raw_data_)[length_++] = static_cast<int16_t>(val);
-        break;
-      case 4:
-        reinterpret_cast<int32_t*>(raw_data_)[length_++] = static_cast<int32_t>(val);
-        break;
-      case 8:
-        reinterpret_cast<int64_t*>(raw_data_)[length_++] = val;
-        break;
-      default:
-        return Status::NotImplemented("This code shall never be reached");
+    if (ARROW_PREDICT_FALSE(pending_pos_ >= pending_size_)) {
+      return CommitPendingData();
     }
     return Status::OK();
   }
@@ -611,8 +553,12 @@ class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase
   Status FinishInternal(std::shared_ptr<ArrayData>* out) override;
 
  protected:
+  Status CommitPendingData() override;
   Status ExpandIntSize(uint8_t new_int_size);
 
+  Status AppendValuesInternal(const int64_t* values, int64_t length,
+                              const uint8_t* valid_bytes);
+
   template <typename new_type, typename old_type>
   typename std::enable_if<sizeof(old_type) >= sizeof(new_type), Status>::type
   ExpandIntSizeInternal();
diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt
index d785eee..6b9c359 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -91,6 +91,7 @@ ADD_ARROW_TEST(checked-cast-test)
 ADD_ARROW_TEST(compression-test)
 ADD_ARROW_TEST(decimal-test)
 ADD_ARROW_TEST(hashing-test)
+ADD_ARROW_TEST(int-util-test)
 ADD_ARROW_TEST(key-value-metadata-test)
 ADD_ARROW_TEST(lazy-test)
 ADD_ARROW_TEST(logging-test)
@@ -105,6 +106,7 @@ ADD_ARROW_BENCHMARK(bit-util-benchmark)
 ADD_ARROW_BENCHMARK(compression-benchmark)
 ADD_ARROW_BENCHMARK(decimal-benchmark)
 ADD_ARROW_BENCHMARK(hashing-benchmark)
+ADD_ARROW_BENCHMARK(int-util-benchmark)
 ADD_ARROW_BENCHMARK(lazy-benchmark)
 ADD_ARROW_BENCHMARK(number-parsing-benchmark)
 ADD_ARROW_BENCHMARK(utf8-util-benchmark)
diff --git a/cpp/src/arrow/util/int-util-benchmark.cc b/cpp/src/arrow/util/int-util-benchmark.cc
new file mode 100644
index 0000000..3feb2ee
--- /dev/null
+++ b/cpp/src/arrow/util/int-util-benchmark.cc
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/util/int-util.h"
+
+namespace arrow {
+namespace internal {
+
+std::vector<uint64_t> GetUIntSequence(int n_values, uint64_t addend = 0) {
+  std::vector<uint64_t> values(n_values);
+  for (int i = 0; i < n_values; ++i) {
+    values[i] = static_cast<uint64_t>(i) + addend;
+  }
+  return values;
+}
+
+std::vector<int64_t> GetIntSequence(int n_values, uint64_t addend = 0) {
+  std::vector<int64_t> values(n_values);
+  for (int i = 0; i < n_values; ++i) {
+    values[i] = static_cast<int64_t>(i) + addend;
+  }
+  return values;
+}
+
+std::vector<uint8_t> GetValidBytes(int n_values) {
+  std::vector<uint8_t> valid_bytes(n_values);
+  for (int i = 0; i < n_values; ++i) {
+    valid_bytes[i] = (i % 3 == 0) ? 1 : 0;
+  }
+  return valid_bytes;
+}
+
+static void BM_DetectUIntWidthNoNulls(
+    benchmark::State& state) {  // NOLINT non-const reference
+  const auto values = GetUIntSequence(0x12345);
+
+  while (state.KeepRunning()) {
+    auto result = DetectUIntWidth(values.data(), static_cast<int64_t>(values.size()));
+    benchmark::DoNotOptimize(result);
+  }
+  state.SetBytesProcessed(state.iterations() * values.size() * sizeof(uint64_t));
+}
+
+static void BM_DetectUIntWidthNulls(
+    benchmark::State& state) {  // NOLINT non-const reference
+  const auto values = GetUIntSequence(0x12345);
+  const auto valid_bytes = GetValidBytes(0x12345);
+
+  while (state.KeepRunning()) {
+    auto result = DetectUIntWidth(values.data(), valid_bytes.data(),
+                                  static_cast<int64_t>(values.size()));
+    benchmark::DoNotOptimize(result);
+  }
+  state.SetBytesProcessed(state.iterations() * values.size() * sizeof(uint64_t));
+}
+
+static void BM_DetectIntWidthNoNulls(
+    benchmark::State& state) {  // NOLINT non-const reference
+  const auto values = GetIntSequence(0x12345, -0x1234);
+
+  while (state.KeepRunning()) {
+    auto result = DetectIntWidth(values.data(), static_cast<int64_t>(values.size()));
+    benchmark::DoNotOptimize(result);
+  }
+  state.SetBytesProcessed(state.iterations() * values.size() * sizeof(uint64_t));
+}
+
+static void BM_DetectIntWidthNulls(
+    benchmark::State& state) {  // NOLINT non-const reference
+  const auto values = GetIntSequence(0x12345, -0x1234);
+  const auto valid_bytes = GetValidBytes(0x12345);
+
+  while (state.KeepRunning()) {
+    auto result = DetectIntWidth(values.data(), valid_bytes.data(),
+                                 static_cast<int64_t>(values.size()));
+    benchmark::DoNotOptimize(result);
+  }
+  state.SetBytesProcessed(state.iterations() * values.size() * sizeof(uint64_t));
+}
+
+BENCHMARK(BM_DetectUIntWidthNoNulls)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
+BENCHMARK(BM_DetectUIntWidthNulls)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
+BENCHMARK(BM_DetectIntWidthNoNulls)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
+BENCHMARK(BM_DetectIntWidthNulls)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
+}  // namespace internal
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/int-util-test.cc b/cpp/src/arrow/util/int-util-test.cc
new file mode 100644
index 0000000..51fd96e
--- /dev/null
+++ b/cpp/src/arrow/util/int-util-test.cc
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <random>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/test-util.h"
+#include "arrow/util/int-util.h"
+
+namespace arrow {
+namespace internal {
+
+static std::vector<uint8_t> all_widths = {1, 2, 4, 8};
+
+template <typename T>
+void CheckUIntWidth(const std::vector<T>& values, uint8_t expected_width) {
+  for (const uint8_t min_width : all_widths) {
+    uint8_t width =
+        DetectUIntWidth(values.data(), static_cast<int64_t>(values.size()), min_width);
+    ASSERT_EQ(width, std::max(min_width, expected_width));
+    width = DetectUIntWidth(values.data(), nullptr, static_cast<int64_t>(values.size()),
+                            min_width);
+    ASSERT_EQ(width, std::max(min_width, expected_width));
+  }
+}
+
+template <typename T>
+void CheckUIntWidth(const std::vector<T>& values, const std::vector<uint8_t>& valid_bytes,
+                    uint8_t expected_width) {
+  for (const uint8_t min_width : all_widths) {
+    uint8_t width = DetectUIntWidth(values.data(), valid_bytes.data(),
+                                    static_cast<int64_t>(values.size()), min_width);
+    ASSERT_EQ(width, std::max(min_width, expected_width));
+  }
+}
+
+template <typename T>
+void CheckIntWidth(const std::vector<T>& values, uint8_t expected_width) {
+  for (const uint8_t min_width : all_widths) {
+    uint8_t width =
+        DetectIntWidth(values.data(), static_cast<int64_t>(values.size()), min_width);
+    ASSERT_EQ(width, std::max(min_width, expected_width));
+    width = DetectIntWidth(values.data(), nullptr, static_cast<int64_t>(values.size()),
+                           min_width);
+    ASSERT_EQ(width, std::max(min_width, expected_width));
+  }
+}
+
+template <typename T>
+void CheckIntWidth(const std::vector<T>& values, const std::vector<uint8_t>& valid_bytes,
+                   uint8_t expected_width) {
+  for (const uint8_t min_width : all_widths) {
+    uint8_t width = DetectIntWidth(values.data(), valid_bytes.data(),
+                                   static_cast<int64_t>(values.size()), min_width);
+    ASSERT_EQ(width, std::max(min_width, expected_width));
+  }
+}
+
+template <typename T>
+std::vector<T> MakeRandomVector(const std::vector<T>& base_values, int n_values) {
+  std::default_random_engine gen(42);
+  std::uniform_int_distribution<int> index_dist(0,
+                                                static_cast<int>(base_values.size() - 1));
+
+  std::vector<T> values(n_values);
+  for (int i = 0; i < n_values; ++i) {
+    values[i] = base_values[index_dist(gen)];
+  }
+  return values;
+}
+
+template <typename T>
+std::vector<std::pair<std::vector<T>, std::vector<uint8_t>>> AlmostAllNullValues(
+    int n_values, T null_value, T non_null_value) {
+  std::vector<std::pair<std::vector<T>, std::vector<uint8_t>>> vectors;
+  vectors.reserve(n_values);
+  for (int i = 0; i < n_values; ++i) {
+    std::vector<T> values(n_values, null_value);
+    std::vector<uint8_t> valid_bytes(n_values, 0);
+    values[i] = non_null_value;
+    valid_bytes[i] = 1;
+    vectors.push_back({std::move(values), std::move(valid_bytes)});
+  }
+  return vectors;
+}
+
+template <typename T>
+std::vector<std::vector<T>> AlmostAllZeros(int n_values, T nonzero_value) {
+  std::vector<std::vector<T>> vectors;
+  vectors.reserve(n_values);
+  for (int i = 0; i < n_values; ++i) {
+    std::vector<T> values(n_values, 0);
+    values[i] = nonzero_value;
+    vectors.push_back(std::move(values));
+  }
+  return vectors;
+}
+
+std::vector<uint64_t> valid_uint8 = {0, 0x7f, 0xff};
+std::vector<uint64_t> valid_uint16 = {0, 0x7f, 0xff, 0x1000, 0xffff};
+std::vector<uint64_t> valid_uint32 = {0, 0x7f, 0xff, 0x10000, 0xffffffffULL};
+std::vector<uint64_t> valid_uint64 = {0, 0x100000000ULL, 0xffffffffffffffffULL};
+
+TEST(UIntWidth, NoNulls) {
+  std::vector<uint64_t> values{0, 0x7f, 0xff};
+  CheckUIntWidth(values, 1);
+
+  values = {0, 0x100};
+  CheckUIntWidth(values, 2);
+
+  values = {0, 0xffff};
+  CheckUIntWidth(values, 2);
+
+  values = {0, 0x10000};
+  CheckUIntWidth(values, 4);
+
+  values = {0, 0xffffffffULL};
+  CheckUIntWidth(values, 4);
+
+  values = {0, 0x100000000ULL};
+  CheckUIntWidth(values, 8);
+
+  values = {0, 0xffffffffffffffffULL};
+  CheckUIntWidth(values, 8);
+}
+
+TEST(UIntWidth, Nulls) {
+  std::vector<uint8_t> valid10{true, false};
+  std::vector<uint8_t> valid01{false, true};
+
+  std::vector<uint64_t> values{0, 0xff};
+  CheckUIntWidth(values, valid01, 1);
+  CheckUIntWidth(values, valid10, 1);
+
+  values = {0, 0x100};
+  CheckUIntWidth(values, valid01, 2);
+  CheckUIntWidth(values, valid10, 1);
+
+  values = {0, 0xffff};
+  CheckUIntWidth(values, valid01, 2);
+  CheckUIntWidth(values, valid10, 1);
+
+  values = {0, 0x10000};
+  CheckUIntWidth(values, valid01, 4);
+  CheckUIntWidth(values, valid10, 1);
+
+  values = {0, 0xffffffffULL};
+  CheckUIntWidth(values, valid01, 4);
+  CheckUIntWidth(values, valid10, 1);
+
+  values = {0, 0x100000000ULL};
+  CheckUIntWidth(values, valid01, 8);
+  CheckUIntWidth(values, valid10, 1);
+
+  values = {0, 0xffffffffffffffffULL};
+  CheckUIntWidth(values, valid01, 8);
+  CheckUIntWidth(values, valid10, 1);
+}
+
+TEST(UIntWidth, NoNullsMany) {
+  constexpr int N = 40;
+  for (const auto& values : AlmostAllZeros<uint64_t>(N, 0xff)) {
+    CheckUIntWidth(values, 1);
+  }
+  for (const auto& values : AlmostAllZeros<uint64_t>(N, 0xffff)) {
+    CheckUIntWidth(values, 2);
+  }
+  for (const auto& values : AlmostAllZeros<uint64_t>(N, 0xffffffffULL)) {
+    CheckUIntWidth(values, 4);
+  }
+  for (const auto& values : AlmostAllZeros<uint64_t>(N, 0xffffffffffffffffULL)) {
+    CheckUIntWidth(values, 8);
+  }
+  auto values = MakeRandomVector(valid_uint8, N);
+  CheckUIntWidth(values, 1);
+
+  values = MakeRandomVector(valid_uint16, N);
+  CheckUIntWidth(values, 2);
+
+  values = MakeRandomVector(valid_uint32, N);
+  CheckUIntWidth(values, 4);
+
+  values = MakeRandomVector(valid_uint64, N);
+  CheckUIntWidth(values, 8);
+}
+
+TEST(UIntWidth, NullsMany) {
+  constexpr uint64_t huge = 0x123456789abcdefULL;
+  constexpr int N = 40;
+  for (const auto& p : AlmostAllNullValues<uint64_t>(N, 0, 0xff)) {
+    CheckUIntWidth(p.first, p.second, 1);
+  }
+  for (const auto& p : AlmostAllNullValues<uint64_t>(N, huge, 0xff)) {
+    CheckUIntWidth(p.first, p.second, 1);
+  }
+  for (const auto& p : AlmostAllNullValues<uint64_t>(N, 0, 0xffff)) {
+    CheckUIntWidth(p.first, p.second, 2);
+  }
+  for (const auto& p : AlmostAllNullValues<uint64_t>(N, huge, 0xffff)) {
+    CheckUIntWidth(p.first, p.second, 2);
+  }
+  for (const auto& p : AlmostAllNullValues<uint64_t>(N, 0, 0xffffffffULL)) {
+    CheckUIntWidth(p.first, p.second, 4);
+  }
+  for (const auto& p : AlmostAllNullValues<uint64_t>(N, huge, 0xffffffffULL)) {
+    CheckUIntWidth(p.first, p.second, 4);
+  }
+  for (const auto& p : AlmostAllNullValues<uint64_t>(N, 0, 0xffffffffffffffffULL)) {
+    CheckUIntWidth(p.first, p.second, 8);
+  }
+  for (const auto& p : AlmostAllNullValues<uint64_t>(N, huge, 0xffffffffffffffffULL)) {
+    CheckUIntWidth(p.first, p.second, 8);
+  }
+}
+
+TEST(IntWidth, NoNulls) {
+  std::vector<int64_t> values{0, 0x7f, -0x80};
+  CheckIntWidth(values, 1);
+
+  values = {0, 0x80};
+  CheckIntWidth(values, 2);
+
+  values = {0, -0x81};
+  CheckIntWidth(values, 2);
+
+  values = {0, 0x7fff, -0x8000};
+  CheckIntWidth(values, 2);
+
+  values = {0, 0x8000};
+  CheckIntWidth(values, 4);
+
+  values = {0, -0x8001};
+  CheckIntWidth(values, 4);
+
+  values = {0, 0x7fffffffLL, -0x80000000LL};
+  CheckIntWidth(values, 4);
+
+  values = {0, 0x80000000LL};
+  CheckIntWidth(values, 8);
+
+  values = {0, -0x80000001LL};
+  CheckIntWidth(values, 8);
+
+  values = {0, 0x7fffffffffffffffLL, -0x7fffffffffffffffLL - 1};
+  CheckIntWidth(values, 8);
+}
+
+TEST(IntWidth, Nulls) {
+  std::vector<uint8_t> valid100{true, false, false};
+  std::vector<uint8_t> valid010{false, true, false};
+  std::vector<uint8_t> valid001{false, false, true};
+
+  std::vector<int64_t> values{0, 0x7f, -0x80};
+  CheckIntWidth(values, valid100, 1);
+  CheckIntWidth(values, valid010, 1);
+  CheckIntWidth(values, valid001, 1);
+
+  values = {0, 0x80, -0x81};
+  CheckIntWidth(values, valid100, 1);
+  CheckIntWidth(values, valid010, 2);
+  CheckIntWidth(values, valid001, 2);
+
+  values = {0, 0x7fff, -0x8000};
+  CheckIntWidth(values, valid100, 1);
+  CheckIntWidth(values, valid010, 2);
+  CheckIntWidth(values, valid001, 2);
+
+  values = {0, 0x8000, -0x8001};
+  CheckIntWidth(values, valid100, 1);
+  CheckIntWidth(values, valid010, 4);
+  CheckIntWidth(values, valid001, 4);
+
+  values = {0, 0x7fffffffLL, -0x80000000LL};
+  CheckIntWidth(values, valid100, 1);
+  CheckIntWidth(values, valid010, 4);
+  CheckIntWidth(values, valid001, 4);
+
+  values = {0, 0x80000000LL, -0x80000001LL};
+  CheckIntWidth(values, valid100, 1);
+  CheckIntWidth(values, valid010, 8);
+  CheckIntWidth(values, valid001, 8);
+
+  values = {0, 0x7fffffffffffffffLL, -0x7fffffffffffffffLL - 1};
+  CheckIntWidth(values, valid100, 1);
+  CheckIntWidth(values, valid010, 8);
+  CheckIntWidth(values, valid001, 8);
+}
+
+TEST(IntWidth, NoNullsMany) {
+  constexpr int N = 40;
+  // 1 byte wide
+  for (const int64_t value : {0x7f, -0x80}) {
+    for (const auto& values : AlmostAllZeros<int64_t>(N, value)) {
+      CheckIntWidth(values, 1);
+    }
+  }
+  // 2 bytes wide
+  for (const int64_t value : {0x80, -0x81, 0x7fff, -0x8000}) {
+    for (const auto& values : AlmostAllZeros<int64_t>(N, value)) {
+      CheckIntWidth(values, 2);
+    }
+  }
+  // 4 bytes wide
+  for (const int64_t value : {0x8000LL, -0x8001LL, 0x7fffffffLL, -0x80000000LL}) {
+    for (const auto& values : AlmostAllZeros<int64_t>(N, value)) {
+      CheckIntWidth(values, 4);
+    }
+  }
+  // 8 bytes wide
+  for (const int64_t value : {0x80000000LL, -0x80000001LL, 0x7fffffffffffffffLL}) {
+    for (const auto& values : AlmostAllZeros<int64_t>(N, value)) {
+      CheckIntWidth(values, 8);
+    }
+  }
+}
+
+TEST(IntWidth, NullsMany) {
+  constexpr int64_t huge = 0x123456789abcdefLL;
+  constexpr int N = 40;
+  // 1 byte wide
+  for (const int64_t value : {0x7f, -0x80}) {
+    for (const auto& p : AlmostAllNullValues<int64_t>(N, 0, value)) {
+      CheckIntWidth(p.first, p.second, 1);
+    }
+    for (const auto& p : AlmostAllNullValues<int64_t>(N, huge, value)) {
+      CheckIntWidth(p.first, p.second, 1);
+    }
+  }
+  // 2 bytes wide
+  for (const int64_t value : {0x80, -0x81, 0x7fff, -0x8000}) {
+    for (const auto& p : AlmostAllNullValues<int64_t>(N, 0, value)) {
+      CheckIntWidth(p.first, p.second, 2);
+    }
+    for (const auto& p : AlmostAllNullValues<int64_t>(N, huge, value)) {
+      CheckIntWidth(p.first, p.second, 2);
+    }
+  }
+  // 4 bytes wide
+  for (const int64_t value : {0x8000LL, -0x8001LL, 0x7fffffffLL, -0x80000000LL}) {
+    for (const auto& p : AlmostAllNullValues<int64_t>(N, 0, value)) {
+      CheckIntWidth(p.first, p.second, 4);
+    }
+    for (const auto& p : AlmostAllNullValues<int64_t>(N, huge, value)) {
+      CheckIntWidth(p.first, p.second, 4);
+    }
+  }
+  // 8 bytes wide
+  for (const int64_t value : {0x80000000LL, -0x80000001LL, 0x7fffffffffffffffLL}) {
+    for (const auto& p : AlmostAllNullValues<int64_t>(N, 0, value)) {
+      CheckIntWidth(p.first, p.second, 8);
+    }
+    for (const auto& p : AlmostAllNullValues<int64_t>(N, huge, value)) {
+      CheckIntWidth(p.first, p.second, 8);
+    }
+  }
+}
+
+}  // namespace internal
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/int-util.cc b/cpp/src/arrow/util/int-util.cc
new file mode 100644
index 0000000..ced1cd1
--- /dev/null
+++ b/cpp/src/arrow/util/int-util.cc
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/util/int-util.h"
+
+#include <algorithm>
+#include <cstring>
+#include <limits>
+
+#include "arrow/util/bit-util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace internal {
+
+static constexpr uint64_t max_uint8 =
+    static_cast<uint64_t>(std::numeric_limits<uint8_t>::max());
+static constexpr uint64_t max_uint16 =
+    static_cast<uint64_t>(std::numeric_limits<uint16_t>::max());
+static constexpr uint64_t max_uint32 =
+    static_cast<uint64_t>(std::numeric_limits<uint32_t>::max());
+static constexpr uint64_t max_uint64 = std::numeric_limits<uint64_t>::max();
+
+static constexpr uint64_t mask_uint8 = ~0xffULL;
+static constexpr uint64_t mask_uint16 = ~0xffffULL;
+static constexpr uint64_t mask_uint32 = ~0xffffffffULL;
+
+//
+// Unsigned integer width detection
+//
+
+static const uint64_t max_uints[] = {0, max_uint8, max_uint16, 0,         max_uint32,
+                                     0, 0,         0,          max_uint64};
+
+// Check if we would need to expand the underlying storage type
+inline uint8_t ExpandedUIntWidth(uint64_t val, uint8_t current_width) {
+  // Optimize for the common case where width doesn't change
+  if (ARROW_PREDICT_TRUE(val <= max_uints[current_width])) {
+    return current_width;
+  }
+  if (current_width == 1 && val <= max_uint8) {
+    return 1;
+  } else if (current_width <= 2 && val <= max_uint16) {
+    return 2;
+  } else if (current_width <= 4 && val <= max_uint32) {
+    return 4;
+  } else {
+    return 8;
+  }
+}
+
+uint8_t DetectUIntWidth(const uint64_t* values, int64_t length, uint8_t min_width) {
+  uint8_t width = min_width;
+  if (min_width < 8) {
+    auto p = values;
+    const auto end = p + length;
+    while (p <= end - 16) {
+      // This is probably SIMD-izable
+      auto u = p[0];
+      auto v = p[1];
+      auto w = p[2];
+      auto x = p[3];
+      u |= p[4];
+      v |= p[5];
+      w |= p[6];
+      x |= p[7];
+      u |= p[8];
+      v |= p[9];
+      w |= p[10];
+      x |= p[11];
+      u |= p[12];
+      v |= p[13];
+      w |= p[14];
+      x |= p[15];
+      p += 16;
+      width = ExpandedUIntWidth(u | v | w | x, width);
+      if (ARROW_PREDICT_FALSE(width == 8)) {
+        break;
+      }
+    }
+    if (p <= end - 8) {
+      auto u = p[0];
+      auto v = p[1];
+      auto w = p[2];
+      auto x = p[3];
+      u |= p[4];
+      v |= p[5];
+      w |= p[6];
+      x |= p[7];
+      p += 8;
+      width = ExpandedUIntWidth(u | v | w | x, width);
+    }
+    while (p < end) {
+      width = ExpandedUIntWidth(*p++, width);
+    }
+  }
+  return width;
+}
+
+uint8_t DetectUIntWidth(const uint64_t* values, const uint8_t* valid_bytes,
+                        int64_t length, uint8_t min_width) {
+  if (valid_bytes == nullptr) {
+    return DetectUIntWidth(values, length, min_width);
+  }
+  uint8_t width = min_width;
+  if (min_width < 8) {
+    auto p = values;
+    const auto end = p + length;
+    auto b = valid_bytes;
+
+#define MASK(p, b, i) p[i] * (b[i] != 0)
+
+    while (p <= end - 8) {
+      // This is probably be SIMD-izable
+      auto u = MASK(p, b, 0);
+      auto v = MASK(p, b, 1);
+      auto w = MASK(p, b, 2);
+      auto x = MASK(p, b, 3);
+      u |= MASK(p, b, 4);
+      v |= MASK(p, b, 5);
+      w |= MASK(p, b, 6);
+      x |= MASK(p, b, 7);
+      b += 8;
+      p += 8;
+      width = ExpandedUIntWidth(u | v | w | x, width);
+      if (ARROW_PREDICT_FALSE(width == 8)) {
+        break;
+      }
+    }
+    uint64_t mask = 0;
+    while (p < end) {
+      mask |= MASK(p, b, 0);
+      ++b;
+      ++p;
+    }
+    width = ExpandedUIntWidth(mask, width);
+
+#undef MASK
+  }
+  return width;
+}
+
+//
+// Signed integer width detection
+//
+
+uint8_t DetectIntWidth(const int64_t* values, int64_t length, uint8_t min_width) {
+  if (min_width == 8) {
+    return min_width;
+  }
+  uint8_t width = min_width;
+
+  auto p = values;
+  const auto end = p + length;
+  // Strategy: to determine whether `x` is between -0x80 and 0x7f,
+  // we determine whether `x + 0x80` is between 0x00 and 0xff.  The
+  // latter can be done with a simple AND mask with ~0xff and, more
+  // importantly, can be computed in a single step over multiple ORed
+  // values (so we can branch once every N items instead of once every item).
+  // This strategy could probably lend itself to explicit SIMD-ization,
+  // if more performance is needed.
+  constexpr uint64_t addend8 = 0x80ULL;
+  constexpr uint64_t addend16 = 0x8000ULL;
+  constexpr uint64_t addend32 = 0x80000000ULL;
+
+  auto test_one_item = [&](uint64_t addend, uint64_t test_mask) -> bool {
+    auto v = *p++;
+    if (ARROW_PREDICT_FALSE(((v + addend) & test_mask) != 0)) {
+      --p;
+      return false;
+    } else {
+      return true;
+    }
+  };
+
+  auto test_four_items = [&](uint64_t addend, uint64_t test_mask) -> bool {
+    auto mask = (p[0] + addend) | (p[1] + addend) | (p[2] + addend) | (p[3] + addend);
+    p += 4;
+    if (ARROW_PREDICT_FALSE((mask & test_mask) != 0)) {
+      p -= 4;
+      return false;
+    } else {
+      return true;
+    }
+  };
+
+  if (width == 1) {
+    while (p <= end - 4) {
+      if (!test_four_items(addend8, mask_uint8)) {
+        width = 2;
+        goto width2;
+      }
+    }
+    while (p < end) {
+      if (!test_one_item(addend8, mask_uint8)) {
+        width = 2;
+        goto width2;
+      }
+    }
+    return 1;
+  }
+width2:
+  if (width == 2) {
+    while (p <= end - 4) {
+      if (!test_four_items(addend16, mask_uint16)) {
+        width = 4;
+        goto width4;
+      }
+    }
+    while (p < end) {
+      if (!test_one_item(addend16, mask_uint16)) {
+        width = 4;
+        goto width4;
+      }
+    }
+    return 2;
+  }
+width4:
+  if (width == 4) {
+    while (p <= end - 4) {
+      if (!test_four_items(addend32, mask_uint32)) {
+        width = 8;
+        goto width8;
+      }
+    }
+    while (p < end) {
+      if (!test_one_item(addend32, mask_uint32)) {
+        width = 8;
+        goto width8;
+      }
+    }
+    return 4;
+  }
+width8:
+  return 8;
+}
+
+uint8_t DetectIntWidth(const int64_t* values, const uint8_t* valid_bytes, int64_t length,
+                       uint8_t min_width) {
+  if (valid_bytes == nullptr) {
+    return DetectIntWidth(values, length, min_width);
+  }
+
+  if (min_width == 8) {
+    return min_width;
+  }
+  uint8_t width = min_width;
+
+  auto p = values;
+  const auto end = p + length;
+  auto b = valid_bytes;
+  // Strategy is similar to the no-nulls case above, but we also
+  // have to zero any incoming items that have a zero validity byte.
+  constexpr uint64_t addend8 = 0x80ULL;
+  constexpr uint64_t addend16 = 0x8000ULL;
+  constexpr uint64_t addend32 = 0x80000000ULL;
+
+#define MASK(p, b, addend, i) (p[i] + addend) * (b[i] != 0)
+
+  auto test_one_item = [&](uint64_t addend, uint64_t test_mask) -> bool {
+    auto v = MASK(p, b, addend, 0);
+    ++b;
+    ++p;
+    if (ARROW_PREDICT_FALSE((v & test_mask) != 0)) {
+      --b;
+      --p;
+      return false;
+    } else {
+      return true;
+    }
+  };
+
+  auto test_eight_items = [&](uint64_t addend, uint64_t test_mask) -> bool {
+    auto mask1 = MASK(p, b, addend, 0) | MASK(p, b, addend, 1) | MASK(p, b, addend, 2) |
+                 MASK(p, b, addend, 3);
+    auto mask2 = MASK(p, b, addend, 4) | MASK(p, b, addend, 5) | MASK(p, b, addend, 6) |
+                 MASK(p, b, addend, 7);
+    b += 8;
+    p += 8;
+    if (ARROW_PREDICT_FALSE(((mask1 | mask2) & test_mask) != 0)) {
+      b -= 8;
+      p -= 8;
+      return false;
+    } else {
+      return true;
+    }
+  };
+
+#undef MASK
+
+  if (width == 1) {
+    while (p <= end - 8) {
+      if (!test_eight_items(addend8, mask_uint8)) {
+        width = 2;
+        goto width2;
+      }
+    }
+    while (p < end) {
+      if (!test_one_item(addend8, mask_uint8)) {
+        width = 2;
+        goto width2;
+      }
+    }
+    return 1;
+  }
+width2:
+  if (width == 2) {
+    while (p <= end - 8) {
+      if (!test_eight_items(addend16, mask_uint16)) {
+        width = 4;
+        goto width4;
+      }
+    }
+    while (p < end) {
+      if (!test_one_item(addend16, mask_uint16)) {
+        width = 4;
+        goto width4;
+      }
+    }
+    return 2;
+  }
+width4:
+  if (width == 4) {
+    while (p <= end - 8) {
+      if (!test_eight_items(addend32, mask_uint32)) {
+        width = 8;
+        goto width8;
+      }
+    }
+    while (p < end) {
+      if (!test_one_item(addend32, mask_uint32)) {
+        width = 8;
+        goto width8;
+      }
+    }
+    return 4;
+  }
+width8:
+  return 8;
+}
+
+template <typename Source, typename Dest>
+inline void DowncastIntsInternal(const Source* src, Dest* dest, int64_t length) {
+  while (length >= 4) {
+    dest[0] = static_cast<Dest>(src[0]);
+    dest[1] = static_cast<Dest>(src[1]);
+    dest[2] = static_cast<Dest>(src[2]);
+    dest[3] = static_cast<Dest>(src[3]);
+    length -= 4;
+    src += 4;
+    dest += 4;
+  }
+  while (length > 0) {
+    *dest++ = static_cast<Dest>(*src++);
+    --length;
+  }
+}
+
+void DowncastInts(const int64_t* source, int8_t* dest, int64_t length) {
+  DowncastIntsInternal(source, dest, length);
+}
+
+void DowncastInts(const int64_t* source, int16_t* dest, int64_t length) {
+  DowncastIntsInternal(source, dest, length);
+}
+
+void DowncastInts(const int64_t* source, int32_t* dest, int64_t length) {
+  DowncastIntsInternal(source, dest, length);
+}
+
+void DowncastInts(const int64_t* source, int64_t* dest, int64_t length) {
+  memcpy(dest, source, length * sizeof(int64_t));
+}
+
+void DowncastUInts(const uint64_t* source, uint8_t* dest, int64_t length) {
+  DowncastIntsInternal(source, dest, length);
+}
+
+void DowncastUInts(const uint64_t* source, uint16_t* dest, int64_t length) {
+  DowncastIntsInternal(source, dest, length);
+}
+
+void DowncastUInts(const uint64_t* source, uint32_t* dest, int64_t length) {
+  DowncastIntsInternal(source, dest, length);
+}
+
+void DowncastUInts(const uint64_t* source, uint64_t* dest, int64_t length) {
+  memcpy(dest, source, length * sizeof(int64_t));
+}
+
+}  // namespace internal
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/int-util.h b/cpp/src/arrow/util/int-util.h
new file mode 100644
index 0000000..68355d3
--- /dev/null
+++ b/cpp/src/arrow/util/int-util.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_UTIL_INT_UTIL_H
+#define ARROW_UTIL_INT_UTIL_H
+
+#include <cstdint>
+
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace internal {
+
+ARROW_EXPORT
+uint8_t DetectUIntWidth(const uint64_t* values, int64_t length, uint8_t min_width = 1);
+
+ARROW_EXPORT
+uint8_t DetectUIntWidth(const uint64_t* values, const uint8_t* valid_bytes,
+                        int64_t length, uint8_t min_width = 1);
+
+ARROW_EXPORT
+uint8_t DetectIntWidth(const int64_t* values, int64_t length, uint8_t min_width = 1);
+
+ARROW_EXPORT
+uint8_t DetectIntWidth(const int64_t* values, const uint8_t* valid_bytes, int64_t length,
+                       uint8_t min_width = 1);
+
+ARROW_EXPORT
+void DowncastInts(const int64_t* source, int8_t* dest, int64_t length);
+
+ARROW_EXPORT
+void DowncastInts(const int64_t* source, int16_t* dest, int64_t length);
+
+ARROW_EXPORT
+void DowncastInts(const int64_t* source, int32_t* dest, int64_t length);
+
+ARROW_EXPORT
+void DowncastInts(const int64_t* source, int64_t* dest, int64_t length);
+
+ARROW_EXPORT
+void DowncastUInts(const uint64_t* source, uint8_t* dest, int64_t length);
+
+ARROW_EXPORT
+void DowncastUInts(const uint64_t* source, uint16_t* dest, int64_t length);
+
+ARROW_EXPORT
+void DowncastUInts(const uint64_t* source, uint32_t* dest, int64_t length);
+
+ARROW_EXPORT
+void DowncastUInts(const uint64_t* source, uint64_t* dest, int64_t length);
+
+}  // namespace internal
+}  // namespace arrow
+
+#endif  // ARROW_UTIL_INT_UTIL_H