You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2020/12/15 12:25:36 UTC

[arrow] branch master updated: ARROW-10607: [C++][Parquet] Add parquet support for decimal256.

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e8de08  ARROW-10607: [C++][Parquet] Add parquet support for decimal256.
0e8de08 is described below

commit 0e8de085f4afd3c9aa9c9ed05405a6c6ef8c360a
Author: Micah Kornfield <mi...@google.com>
AuthorDate: Tue Dec 15 13:24:25 2020 +0100

    ARROW-10607: [C++][Parquet] Add parquet support for decimal256.
    
    - Refactor common code (DecimalSize, FromBigEndian) to places in arrow
    - Support writing Decimal256 as FLBA
    - Support reading Decimal256 from bytes and FLBA.  Integer types
      don't seem like they would be worthwhile to ever convert to Decimal256
      and the code path is hard to test.
    - Adds addition and shift operators to Decimal256 to support testing.
    
    Closes #8897 from emkornfield/parquet_bigdecimal
    
    Authored-by: Micah Kornfield <mi...@google.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/compute/kernels/codegen_internal.cc  |   2 +-
 .../arrow/compute/kernels/scalar_cast_numeric.cc   |  24 +-
 cpp/src/arrow/compute/kernels/vector_hash.cc       |   8 +-
 cpp/src/arrow/compute/kernels/vector_selection.cc  |   3 +-
 cpp/src/arrow/testing/util.cc                      |  71 +---
 cpp/src/arrow/type.cc                              |  22 ++
 cpp/src/arrow/type.h                               |   7 +
 cpp/src/arrow/util/basic_decimal.cc                |  67 ++++
 cpp/src/arrow/util/basic_decimal.h                 |  15 +
 cpp/src/arrow/util/decimal.cc                      |  44 ++-
 cpp/src/arrow/util/decimal.h                       |   5 +
 cpp/src/arrow/util/decimal_test.cc                 | 136 ++++++++
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc  |  44 ++-
 cpp/src/parquet/arrow/reader_internal.cc           | 379 +++++++++------------
 cpp/src/parquet/arrow/schema.cc                    |  13 +-
 cpp/src/parquet/arrow/schema_internal.cc           |   5 +-
 cpp/src/parquet/arrow/test_util.h                  |  61 +++-
 cpp/src/parquet/column_writer.cc                   |  51 +--
 cpp/src/parquet/schema.cc                          |   2 +-
 cpp/src/parquet/types.cc                           |  76 -----
 cpp/src/parquet/types.h                            |   6 -
 21 files changed, 612 insertions(+), 429 deletions(-)

diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.cc b/cpp/src/arrow/compute/kernels/codegen_internal.cc
index 641998f..a5941ea2 100644
--- a/cpp/src/arrow/compute/kernels/codegen_internal.cc
+++ b/cpp/src/arrow/compute/kernels/codegen_internal.cc
@@ -155,7 +155,7 @@ const std::vector<std::shared_ptr<DataType>>& PrimitiveTypes() {
 
 const std::vector<std::shared_ptr<DataType>>& ExampleParametricTypes() {
   static DataTypeVector example_parametric_types = {
-      decimal(12, 2),
+      decimal128(12, 2),
       duration(TimeUnit::SECOND),
       timestamp(TimeUnit::SECOND),
       time32(TimeUnit::SECOND),
diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
index 51b5609..f46cf7e 100644
--- a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
+++ b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc
@@ -591,11 +591,11 @@ std::shared_ptr<CastFunction> GetCastToFloating(std::string name) {
   return func;
 }
 
-std::shared_ptr<CastFunction> GetCastToDecimal() {
+std::shared_ptr<CastFunction> GetCastToDecimal128() {
   OutputType sig_out_ty(ResolveOutputFromOptions);
 
-  auto func = std::make_shared<CastFunction>("cast_decimal", Type::DECIMAL);
-  AddCommonCasts(Type::DECIMAL, sig_out_ty, func.get());
+  auto func = std::make_shared<CastFunction>("cast_decimal", Type::DECIMAL128);
+  AddCommonCasts(Type::DECIMAL128, sig_out_ty, func.get());
 
   // Cast from floating point
   DCHECK_OK(func->AddKernel(Type::FLOAT, {float32()}, sig_out_ty,
@@ -606,8 +606,19 @@ std::shared_ptr<CastFunction> GetCastToDecimal() {
   // Cast from other decimal
   auto exec = CastFunctor<Decimal128Type, Decimal128Type>::Exec;
   // We resolve the output type of this kernel from the CastOptions
-  DCHECK_OK(func->AddKernel(Type::DECIMAL, {InputType::Array(Type::DECIMAL)}, sig_out_ty,
-                            exec));
+  DCHECK_OK(func->AddKernel(Type::DECIMAL128, {InputType::Array(Type::DECIMAL128)},
+                            sig_out_ty, exec));
+  return func;
+}
+
+std::shared_ptr<CastFunction> GetCastToDecimal256() {
+  OutputType sig_out_ty(ResolveOutputFromOptions);
+
+  auto func = std::make_shared<CastFunction>("cast_decimal256", Type::DECIMAL256);
+  // Needed for Parquet conversion. Full implementation is ARROW-10606
+  // tracks full implementation.
+  AddCommonCasts(Type::DECIMAL256, sig_out_ty, func.get());
+
   return func;
 }
 
@@ -654,7 +665,8 @@ std::vector<std::shared_ptr<CastFunction>> GetNumericCasts() {
   functions.push_back(GetCastToFloating<FloatType>("cast_float"));
   functions.push_back(GetCastToFloating<DoubleType>("cast_double"));
 
-  functions.push_back(GetCastToDecimal());
+  functions.push_back(GetCastToDecimal128());
+  functions.push_back(GetCastToDecimal256());
 
   return functions;
 }
diff --git a/cpp/src/arrow/compute/kernels/vector_hash.cc b/cpp/src/arrow/compute/kernels/vector_hash.cc
index 0009fe5..34d18c2 100644
--- a/cpp/src/arrow/compute/kernels/vector_hash.cc
+++ b/cpp/src/arrow/compute/kernels/vector_hash.cc
@@ -620,9 +620,11 @@ void AddHashKernels(VectorFunction* func, VectorKernel base, OutputType out_ty)
     DCHECK_OK(func->AddKernel(base));
   }
 
-  base.init = GetHashInit<Action>(Type::DECIMAL);
-  base.signature = KernelSignature::Make({InputType::Array(Type::DECIMAL)}, out_ty);
-  DCHECK_OK(func->AddKernel(base));
+  for (auto t : {Type::DECIMAL128, Type::DECIMAL256}) {
+    base.init = GetHashInit<Action>(t);
+    base.signature = KernelSignature::Make({InputType::Array(t)}, out_ty);
+    DCHECK_OK(func->AddKernel(base));
+  }
 }
 
 const FunctionDoc unique_doc(
diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc
index 044bb29..fe3bcf5 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection.cc
@@ -2151,7 +2151,8 @@ void RegisterVectorSelection(FunctionRegistry* registry) {
        TakeExec<VarBinaryImpl<LargeBinaryType>>},
       {InputType::Array(Type::FIXED_SIZE_BINARY), TakeExec<FSBImpl>},
       {InputType::Array(null()), NullTake},
-      {InputType::Array(Type::DECIMAL), TakeExec<FSBImpl>},
+      {InputType::Array(Type::DECIMAL128), TakeExec<FSBImpl>},
+      {InputType::Array(Type::DECIMAL256), TakeExec<FSBImpl>},
       {InputType::Array(Type::DICTIONARY), DictionaryTake},
       {InputType::Array(Type::EXTENSION), ExtensionTake},
       {InputType::Array(Type::LIST), TakeExec<ListImpl<ListType>>},
diff --git a/cpp/src/arrow/testing/util.cc b/cpp/src/arrow/testing/util.cc
index 9034d09..85885cc 100644
--- a/cpp/src/arrow/testing/util.cc
+++ b/cpp/src/arrow/testing/util.cc
@@ -38,6 +38,7 @@
 #endif
 
 #include "arrow/table.h"
+#include "arrow/type.h"
 #include "arrow/testing/random.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/logging.h"
@@ -78,78 +79,10 @@ std::string random_string(int64_t n, uint32_t seed) {
   return s;
 }
 
-int32_t DecimalSize(int32_t precision) {
-  DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
-                          << precision;
-  DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
-                           << precision;
-
-  switch (precision) {
-    case 1:
-    case 2:
-      return 1;  // 127
-    case 3:
-    case 4:
-      return 2;  // 32,767
-    case 5:
-    case 6:
-      return 3;  // 8,388,607
-    case 7:
-    case 8:
-    case 9:
-      return 4;  // 2,147,483,427
-    case 10:
-    case 11:
-      return 5;  // 549,755,813,887
-    case 12:
-    case 13:
-    case 14:
-      return 6;  // 140,737,488,355,327
-    case 15:
-    case 16:
-      return 7;  // 36,028,797,018,963,967
-    case 17:
-    case 18:
-      return 8;  // 9,223,372,036,854,775,807
-    case 19:
-    case 20:
-    case 21:
-      return 9;  // 2,361,183,241,434,822,606,847
-    case 22:
-    case 23:
-      return 10;  // 604,462,909,807,314,587,353,087
-    case 24:
-    case 25:
-    case 26:
-      return 11;  // 154,742,504,910,672,534,362,390,527
-    case 27:
-    case 28:
-      return 12;  // 39,614,081,257,132,168,796,771,975,167
-    case 29:
-    case 30:
-    case 31:
-      return 13;  // 10,141,204,801,825,835,211,973,625,643,007
-    case 32:
-    case 33:
-      return 14;  // 2,596,148,429,267,413,814,265,248,164,610,047
-    case 34:
-    case 35:
-      return 15;  // 664,613,997,892,457,936,451,903,530,140,172,287
-    case 36:
-    case 37:
-    case 38:
-      return 16;  // 170,141,183,460,469,231,731,687,303,715,884,105,727
-    default:
-      DCHECK(false);
-      break;
-  }
-  return -1;
-}
-
 void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t* out) {
   std::default_random_engine gen(seed);
   std::uniform_int_distribution<uint32_t> d(0, std::numeric_limits<uint8_t>::max());
-  const int32_t required_bytes = DecimalSize(precision);
+  const int32_t required_bytes = DecimalType::DecimalSize(precision);
   constexpr int32_t byte_width = 16;
   std::fill(out, out + byte_width * n, '\0');
 
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index f0e3ef6..5e6fec8 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -773,6 +773,28 @@ std::vector<std::shared_ptr<Field>> StructType::GetAllFieldsByName(
   return result;
 }
 
+// Taken from the Apache Impala codebase. The comments next
+// to the return values are the maximum value that can be represented in 2's
+// complement with the returned number of bytes.
+int32_t DecimalType::DecimalSize(int32_t precision) {
+  DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
+                          << precision;
+
+  // Generated in python with:
+  // >>> decimal_size = lambda prec: int(math.ceil((prec * math.log2(10) + 1) / 8))
+  // >>> [-1] + [decimal_size(i) for i in range(1, 77)]
+  constexpr int32_t kBytes[] = {
+      -1, 1,  1,  2,  2,  3,  3,  4,  4,  4,  5,  5,  6,  6,  6,  7,  7,  8,  8,  9,
+      9,  9,  10, 10, 11, 11, 11, 12, 12, 13, 13, 13, 14, 14, 15, 15, 16, 16, 16, 17,
+      17, 18, 18, 18, 19, 19, 20, 20, 21, 21, 21, 22, 22, 23, 23, 23, 24, 24, 25, 25,
+      26, 26, 26, 27, 27, 28, 28, 28, 29, 29, 30, 30, 31, 31, 31, 32, 32};
+
+  if (precision <= 76) {
+    return kBytes[precision];
+  }
+  return static_cast<int32_t>(std::ceil((precision / 8.0) * std::log2(10) + 1));
+}
+
 // ----------------------------------------------------------------------
 // Decimal128 type
 
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index d20d603..ed504c3 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -880,6 +880,11 @@ class ARROW_EXPORT DecimalType : public FixedSizeBinaryType {
   int32_t precision() const { return precision_; }
   int32_t scale() const { return scale_; }
 
+  /// \brief Returns the number of bytes needed for precision.
+  ///
+  /// precision must be >= 1
+  static int32_t DecimalSize(int32_t precision);
+
  protected:
   std::string ComputeFingerprint() const override;
 
@@ -905,6 +910,7 @@ class ARROW_EXPORT Decimal128Type : public DecimalType {
 
   static constexpr int32_t kMinPrecision = 1;
   static constexpr int32_t kMaxPrecision = 38;
+  static constexpr int32_t kByteWidth = 16;
 };
 
 /// \brief Concrete type class for 256-bit decimal data
@@ -925,6 +931,7 @@ class ARROW_EXPORT Decimal256Type : public DecimalType {
 
   static constexpr int32_t kMinPrecision = 1;
   static constexpr int32_t kMaxPrecision = 76;
+  static constexpr int32_t kByteWidth = 32;
 };
 
 /// \brief Concrete type class for union data
diff --git a/cpp/src/arrow/util/basic_decimal.cc b/cpp/src/arrow/util/basic_decimal.cc
index 1abcc2c..78d5b15 100644
--- a/cpp/src/arrow/util/basic_decimal.cc
+++ b/cpp/src/arrow/util/basic_decimal.cc
@@ -1041,6 +1041,50 @@ BasicDecimal256 BasicDecimal256::Abs(const BasicDecimal256& in) {
   return result.Abs();
 }
 
+BasicDecimal256& BasicDecimal256::operator+=(const BasicDecimal256& right) {
+  uint64_t carry = 0;
+  for (size_t i = 0; i < little_endian_array_.size(); i++) {
+    const uint64_t right_value = right.little_endian_array_[i];
+    uint64_t sum = right_value + carry;
+    carry = 0;
+    if (sum < right_value) {
+      carry += 1;
+    }
+    sum += little_endian_array_[i];
+    if (sum < little_endian_array_[i]) {
+      carry += 1;
+    }
+    little_endian_array_[i] = sum;
+  }
+  return *this;
+}
+
+BasicDecimal256& BasicDecimal256::operator<<=(uint32_t bits) {
+  if (bits == 0) {
+    return *this;
+  }
+  int cross_word_shift = bits / 64;
+  if (static_cast<size_t>(cross_word_shift) >= little_endian_array_.size()) {
+    little_endian_array_ = {0, 0, 0, 0};
+    return *this;
+  }
+  uint32_t in_word_shift = bits % 64;
+  for (int i = static_cast<int>(little_endian_array_.size() - 1); i >= cross_word_shift;
+       i--) {
+    // Account for shifts larger then 64 bits
+    little_endian_array_[i] = little_endian_array_[i - cross_word_shift];
+    little_endian_array_[i] <<= in_word_shift;
+    if (in_word_shift != 0 && i >= cross_word_shift + 1) {
+      little_endian_array_[i] |=
+          little_endian_array_[i - (cross_word_shift + 1)] >> (64 - in_word_shift);
+    }
+  }
+  for (int i = cross_word_shift - 1; i >= 0; i--) {
+    little_endian_array_[i] = 0;
+  }
+  return *this;
+}
+
 std::array<uint8_t, 32> BasicDecimal256::ToBytes() const {
   std::array<uint8_t, 32> out{{0}};
   ToBytes(out.data());
@@ -1091,6 +1135,12 @@ DecimalStatus BasicDecimal256::Rescale(int32_t original_scale, int32_t new_scale
   return DecimalRescale(*this, original_scale, new_scale, out);
 }
 
+bool BasicDecimal256::FitsInPrecision(int32_t precision) const {
+  DCHECK_GT(precision, 0);
+  DCHECK_LE(precision, 76);
+  return BasicDecimal256::Abs(*this) < ScaleMultipliersDecimal256[precision];
+}
+
 const BasicDecimal256& BasicDecimal256::GetScaleMultiplier(int32_t scale) {
   DCHECK_GE(scale, 0);
   DCHECK_LE(scale, 76);
@@ -1113,6 +1163,17 @@ bool operator<(const BasicDecimal256& left, const BasicDecimal256& right) {
                                 : lhs[1] != rhs[1] ? lhs[1] < rhs[1] : lhs[0] < rhs[0];
 }
 
+BasicDecimal256 operator-(const BasicDecimal256& operand) {
+  BasicDecimal256 result(operand);
+  return result.Negate();
+}
+
+BasicDecimal256 operator~(const BasicDecimal256& operand) {
+  const std::array<uint64_t, 4>& arr = operand.little_endian_array();
+  BasicDecimal256 result({~arr[0], ~arr[1], ~arr[2], ~arr[3]});
+  return result;
+}
+
 BasicDecimal256& BasicDecimal256::operator/=(const BasicDecimal256& right) {
   BasicDecimal256 remainder;
   auto s = Divide(right, this, &remainder);
@@ -1120,6 +1181,12 @@ BasicDecimal256& BasicDecimal256::operator/=(const BasicDecimal256& right) {
   return *this;
 }
 
+BasicDecimal256 operator+(const BasicDecimal256& left, const BasicDecimal256& right) {
+  BasicDecimal256 sum = left;
+  sum += right;
+  return sum;
+}
+
 BasicDecimal256 operator/(const BasicDecimal256& left, const BasicDecimal256& right) {
   BasicDecimal256 remainder;
   BasicDecimal256 result;
diff --git a/cpp/src/arrow/util/basic_decimal.h b/cpp/src/arrow/util/basic_decimal.h
index af58ee1..b62d894 100644
--- a/cpp/src/arrow/util/basic_decimal.h
+++ b/cpp/src/arrow/util/basic_decimal.h
@@ -224,6 +224,9 @@ class ARROW_EXPORT BasicDecimal256 {
   /// \brief Absolute value
   static BasicDecimal256 Abs(const BasicDecimal256& left);
 
+  /// \brief Add a number to this one. The result is truncated to 256 bits.
+  BasicDecimal256& operator+=(const BasicDecimal256& right);
+
   /// \brief Get the bits of the two's complement representation of the number. The 4
   /// elements are in little endian order. The bits within each uint64_t element are in
   /// native endian order. For example,
@@ -245,6 +248,11 @@ class ARROW_EXPORT BasicDecimal256 {
   DecimalStatus Rescale(int32_t original_scale, int32_t new_scale,
                         BasicDecimal256* out) const;
 
+  /// \brief Whether this number fits in the given precision
+  ///
+  /// Return true if the number of significant digits is less or equal to `precision`.
+  bool FitsInPrecision(int32_t precision) const;
+
   inline int64_t Sign() const {
     return 1 | (static_cast<int64_t>(little_endian_array_[3]) >> 63);
   }
@@ -269,6 +277,9 @@ class ARROW_EXPORT BasicDecimal256 {
   /// \param[out] remainder the remainder after the division
   DecimalStatus Divide(const BasicDecimal256& divisor, BasicDecimal256* result,
                        BasicDecimal256* remainder) const;
+  /// \brief Shift left by the given number of bits.
+  BasicDecimal256& operator<<=(uint32_t bits);
+
   /// \brief In-place division.
   BasicDecimal256& operator/=(const BasicDecimal256& right);
 
@@ -303,6 +314,10 @@ ARROW_EXPORT inline bool operator>=(const BasicDecimal256& left,
   return !operator<(left, right);
 }
 
+ARROW_EXPORT BasicDecimal256 operator-(const BasicDecimal256& operand);
+ARROW_EXPORT BasicDecimal256 operator~(const BasicDecimal256& operand);
+ARROW_EXPORT BasicDecimal256 operator+(const BasicDecimal256& left,
+                                       const BasicDecimal256& right);
 ARROW_EXPORT BasicDecimal256 operator*(const BasicDecimal256& left,
                                        const BasicDecimal256& right);
 ARROW_EXPORT BasicDecimal256 operator/(const BasicDecimal256& left,
diff --git a/cpp/src/arrow/util/decimal.cc b/cpp/src/arrow/util/decimal.cc
index 5249fd1..dcb2023 100644
--- a/cpp/src/arrow/util/decimal.cc
+++ b/cpp/src/arrow/util/decimal.cc
@@ -568,7 +568,7 @@ Result<Decimal128> Decimal128::FromBigEndian(const uint8_t* bytes, int32_t lengt
 
   int64_t high, low;
 
-  if (length < kMinDecimalBytes || length > kMaxDecimalBytes) {
+  if (ARROW_PREDICT_FALSE(length < kMinDecimalBytes || length > kMaxDecimalBytes)) {
     return Status::Invalid("Length of byte array passed to Decimal128::FromBigEndian ",
                            "was ", length, ", but must be between ", kMinDecimalBytes,
                            " and ", kMaxDecimalBytes);
@@ -718,6 +718,48 @@ Result<Decimal256> Decimal256::FromString(const char* s) {
   return FromString(util::string_view(s));
 }
 
+Result<Decimal256> Decimal256::FromBigEndian(const uint8_t* bytes, int32_t length) {
+  static constexpr int32_t kMinDecimalBytes = 1;
+  static constexpr int32_t kMaxDecimalBytes = 32;
+
+  std::array<uint64_t, 4> little_endian_array;
+
+  if (ARROW_PREDICT_FALSE(length < kMinDecimalBytes || length > kMaxDecimalBytes)) {
+    return Status::Invalid("Length of byte array passed to Decimal128::FromBigEndian ",
+                           "was ", length, ", but must be between ", kMinDecimalBytes,
+                           " and ", kMaxDecimalBytes);
+  }
+
+  // Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
+  // sign bit.
+  const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
+
+  for (int word_idx = 0; word_idx < 4; word_idx++) {
+    const int32_t word_length = std::min(length, static_cast<int32_t>(sizeof(uint64_t)));
+
+    if (word_length == 8) {
+      // Full words can be assigned as is (and are UB with the shift below).
+      little_endian_array[word_idx] =
+          UInt64FromBigEndian(bytes + length - word_length, word_length);
+    } else {
+      // Sign extend the word its if necessary
+      uint64_t word = -1 * is_negative;
+      if (length > 0) {
+        // Incorporate the actual values if present.
+        // Shift left enough bits to make room for the incoming int64_t
+        word = SafeLeftShift(word, word_length * CHAR_BIT);
+        // Preserve the upper bits by inplace OR-ing the int64_t
+        word |= UInt64FromBigEndian(bytes + length - word_length, word_length);
+      }
+      little_endian_array[word_idx] = word;
+    }
+    // Move on to the next word.
+    length -= word_length;
+  }
+
+  return Decimal256(little_endian_array);
+}
+
 Status Decimal256::ToArrowStatus(DecimalStatus dstatus) const {
   return arrow::ToArrowStatus(dstatus, 256);
 }
diff --git a/cpp/src/arrow/util/decimal.h b/cpp/src/arrow/util/decimal.h
index 1c83b4b..16cf3f8 100644
--- a/cpp/src/arrow/util/decimal.h
+++ b/cpp/src/arrow/util/decimal.h
@@ -244,6 +244,11 @@ class ARROW_EXPORT Decimal256 : public BasicDecimal256 {
     return std::move(result);
   }
 
+  /// \brief Convert from a big-endian byte representation. The length must be
+  ///        between 1 and 32.
+  /// \return error status if the length is an invalid value
+  static Result<Decimal256> FromBigEndian(const uint8_t* data, int32_t length);
+
   friend ARROW_EXPORT std::ostream& operator<<(std::ostream& os,
                                                const Decimal256& decimal);
 
diff --git a/cpp/src/arrow/util/decimal_test.cc b/cpp/src/arrow/util/decimal_test.cc
index 06731be..40ae49d 100644
--- a/cpp/src/arrow/util/decimal_test.cc
+++ b/cpp/src/arrow/util/decimal_test.cc
@@ -1333,6 +1333,81 @@ TEST(Decimal256Test, Multiply) {
   }
 }
 
+TEST(Decimal256Test, Shift) {
+  {
+    // Values compared against python's implementation of shift.
+    Decimal256 v(967);
+    v <<= 16;
+    ASSERT_EQ(v, Decimal256("63373312"));
+    v <<= 66;
+    ASSERT_EQ(v, Decimal256("4676125070269385647763488768"));
+    v <<= 128;
+    ASSERT_EQ(v,
+              Decimal256(
+                  "1591202906929606242763855199532957938318305582067671727858104926208"));
+  }
+  {
+    // Values compared against python's implementation of shift.
+    Decimal256 v(0xEFFACDA);
+    v <<= 17;
+    ASSERT_EQ(v, Decimal256("32982558834688"));
+    v <<= 67;
+    ASSERT_EQ(v, Decimal256("4867366573756459829801535578046464"));
+    v <<= 129;
+    ASSERT_EQ(
+        v,
+        Decimal256(
+            "3312558036779413504434176328500812891073739806516698535430241719490183168"));
+    v <<= 43;
+    ASSERT_EQ(v, Decimal256(0));
+  }
+
+  {
+    // Values compared against python's implementation of shift.
+    Decimal256 v("-12346789123456789123456789");
+    v <<= 15;
+    ASSERT_EQ(v, Decimal256("-404579585997432065997432061952"))
+        << std::hex << v.little_endian_array()[0] << " " << v.little_endian_array()[1]
+        << " " << v.little_endian_array()[2] << " " << v.little_endian_array()[3] << "\n"
+        << Decimal256("-404579585997432065997432061952").little_endian_array()[0] << " "
+        << Decimal256("-404579585997432065997432061952").little_endian_array()[1] << " "
+        << Decimal256("-404579585997432065997432061952").little_endian_array()[2] << " "
+        << Decimal256("-404579585997432065997432061952").little_endian_array()[3];
+    v <<= 30;
+    ASSERT_EQ(v, Decimal256("-434414022622047565860171081516421480448"));
+    v <<= 66;
+    ASSERT_EQ(v,
+              Decimal256("-32054097189358332105678889809255994470201895906771963215872"));
+  }
+}
+
+TEST(Decimal256Test, Add) {
+  EXPECT_EQ(Decimal256(103), Decimal256(100) + Decimal256(3));
+  EXPECT_EQ(Decimal256(203), Decimal256(200) + Decimal256(3));
+  EXPECT_EQ(Decimal256(20401), Decimal256(20100) + Decimal256(301));
+  EXPECT_EQ(Decimal256(-19799), Decimal256(-20100) + Decimal256(301));
+  EXPECT_EQ(Decimal256(19799), Decimal256(20100) + Decimal256(-301));
+  EXPECT_EQ(Decimal256(-20401), Decimal256(-20100) + Decimal256(-301));
+  EXPECT_EQ(Decimal256("100000000000000000000000000000000001"),
+            Decimal256("99999999999999999999999999999999999") + Decimal256("2"));
+  EXPECT_EQ(Decimal256("120200000000000000000000000000002019"),
+            Decimal256("99999999999999999999999999999999999") +
+                Decimal256("20200000000000000000000000000002020"));
+
+  // Test some random numbers.
+  for (auto x : GetRandomNumbers<Int32Type>(16)) {
+    for (auto y : GetRandomNumbers<Int32Type>(16)) {
+      if (y == 0) {
+        continue;
+      }
+
+      Decimal256 result = Decimal256(x) + Decimal256(y);
+      ASSERT_EQ(Decimal256(static_cast<int64_t>(x) + y), result)
+          << " x: " << x << " y: " << y;
+    }
+  }
+}
+
 TEST(Decimal256Test, Divide) {
   ASSERT_EQ(Decimal256(33), Decimal256(100) / Decimal256(3));
   ASSERT_EQ(Decimal256(66), Decimal256(200) / Decimal256(3));
@@ -1419,6 +1494,67 @@ TEST(Decimal256Test, Rescale) {
   }
 }
 
+TEST(Decimal256, FromBigEndianTest) {
+  // We test out a variety of scenarios:
+  //
+  // * Positive values that are left shifted
+  //   and filled in with the same bit pattern
+  // * Negated of the positive values
+  // * Complement of the positive values
+  //
+  // For the positive values, we can call FromBigEndian
+  // with a length that is less than 16, whereas we must
+  // pass all 32 bytes for the negative and complement.
+  //
+  // We use a number of bit patterns to increase the coverage
+  // of scenarios
+  for (int32_t start : {1, 1, 15, /* 00001111 */
+                        85,       /* 01010101 */
+                        127 /* 01111111 */}) {
+    Decimal256 value(start);
+    for (int ii = 0; ii < 32; ++ii) {
+      auto native_endian = value.ToBytes();
+#if ARROW_LITTLE_ENDIAN
+      std::reverse(native_endian.begin(), native_endian.end());
+#endif
+      // Limit the number of bytes we are passing to make
+      // sure that it works correctly. That's why all of the
+      // 'start' values don't have a 1 in the most significant
+      // bit place
+      ASSERT_OK_AND_EQ(value,
+                       Decimal256::FromBigEndian(native_endian.data() + 31 - ii, ii + 1));
+
+      // Negate it
+      auto negated = -value;
+      native_endian = negated.ToBytes();
+#if ARROW_LITTLE_ENDIAN
+      // convert to big endian
+      std::reverse(native_endian.begin(), native_endian.end());
+#endif
+      // The sign bit is looked up in the MSB
+      ASSERT_OK_AND_EQ(negated,
+                       Decimal256::FromBigEndian(native_endian.data() + 31 - ii, ii + 1));
+
+      // Take the complement
+      auto complement = ~value;
+      native_endian = complement.ToBytes();
+#if ARROW_LITTLE_ENDIAN
+      // convert to big endian
+      std::reverse(native_endian.begin(), native_endian.end());
+#endif
+      ASSERT_OK_AND_EQ(complement, Decimal256::FromBigEndian(native_endian.data(), 32));
+
+      value <<= 8;
+      value += Decimal256(start);
+    }
+  }
+}
+
+TEST(Decimal256Test, TestFromBigEndianBadLength) {
+  ASSERT_RAISES(Invalid, Decimal128::FromBigEndian(nullptr, -1));
+  ASSERT_RAISES(Invalid, Decimal128::FromBigEndian(nullptr, 33));
+}
+
 class Decimal256ToStringTest : public ::testing::TestWithParam<ToStringTestParam> {};
 
 TEST_P(Decimal256ToStringTest, ToString) {
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index bbf95c5..45eeb09 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -65,6 +65,7 @@ using arrow::Buffer;
 using arrow::ChunkedArray;
 using arrow::DataType;
 using arrow::Datum;
+using arrow::DecimalType;
 using arrow::default_memory_pool;
 using arrow::ListArray;
 using arrow::PrimitiveArray;
@@ -158,10 +159,15 @@ std::shared_ptr<const LogicalType> get_logical_type(const DataType& type) {
           static_cast<const ::arrow::DictionaryType&>(type);
       return get_logical_type(*dict_type.value_type());
     }
-    case ArrowId::DECIMAL: {
+    case ArrowId::DECIMAL128: {
       const auto& dec_type = static_cast<const ::arrow::Decimal128Type&>(type);
       return LogicalType::Decimal(dec_type.precision(), dec_type.scale());
     }
+    case ArrowId::DECIMAL256: {
+      const auto& dec_type = static_cast<const ::arrow::Decimal256Type&>(type);
+      return LogicalType::Decimal(dec_type.precision(), dec_type.scale());
+    }
+
     default:
       break;
   }
@@ -193,7 +199,8 @@ ParquetType::type get_physical_type(const DataType& type) {
     case ArrowId::LARGE_STRING:
       return ParquetType::BYTE_ARRAY;
     case ArrowId::FIXED_SIZE_BINARY:
-    case ArrowId::DECIMAL:
+    case ArrowId::DECIMAL128:
+    case ArrowId::DECIMAL256:
       return ParquetType::FIXED_LEN_BYTE_ARRAY;
     case ArrowId::DATE32:
       return ParquetType::INT32;
@@ -456,10 +463,10 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const DataType& type,
           byte_width =
               static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
           break;
-        case ::arrow::Type::DECIMAL: {
-          const auto& decimal_type =
-              static_cast<const ::arrow::Decimal128Type&>(values_type);
-          byte_width = ::parquet::internal::DecimalSize(decimal_type.precision());
+        case ::arrow::Type::DECIMAL128:
+        case ::arrow::Type::DECIMAL256: {
+          const auto& decimal_type = static_cast<const DecimalType&>(values_type);
+          byte_width = DecimalType::DecimalSize(decimal_type.precision());
         } break;
         default:
           break;
@@ -468,9 +475,10 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const DataType& type,
     case ::arrow::Type::FIXED_SIZE_BINARY:
       byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
       break;
-    case ::arrow::Type::DECIMAL: {
-      const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
-      byte_width = ::parquet::internal::DecimalSize(decimal_type.precision());
+    case ::arrow::Type::DECIMAL128:
+    case ::arrow::Type::DECIMAL256: {
+      const auto& decimal_type = static_cast<const DecimalType&>(type);
+      byte_width = DecimalType::DecimalSize(decimal_type.precision());
     } break;
     default:
       break;
@@ -680,7 +688,9 @@ typedef ::testing::Types<
     ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>,
     DecimalWithPrecisionAndScale<5>, DecimalWithPrecisionAndScale<10>,
     DecimalWithPrecisionAndScale<19>, DecimalWithPrecisionAndScale<23>,
-    DecimalWithPrecisionAndScale<27>, DecimalWithPrecisionAndScale<38>>
+    DecimalWithPrecisionAndScale<27>, DecimalWithPrecisionAndScale<38>,
+    Decimal256WithPrecisionAndScale<39>, Decimal256WithPrecisionAndScale<56>,
+    Decimal256WithPrecisionAndScale<76>>
     TestTypes;
 
 TYPED_TEST_SUITE(TestParquetIO, TestTypes);
@@ -2465,6 +2475,20 @@ TEST(ArrowReadWrite, NestedRequiredField) {
                        /*row_group_size=*/8);
 }
 
+TEST(ArrowReadWrite, Decimal256) {
+  using ::arrow::Decimal256;
+  using ::arrow::field;
+
+  auto type = ::arrow::decimal256(8, 4);
+
+  const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", 
+                         "-9999.9999", "9999.9999"])";
+  auto array = ::arrow::ArrayFromJSON(type, json);
+  auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
+  auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
+  CheckSimpleRoundtrip(table, 2, props_store_schema);
+}
+
 TEST(ArrowReadWrite, NestedNullableField) {
   auto int_field = ::arrow::field("int_array", ::arrow::int32());
   auto int_array =
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
index 29d7384..6c387df 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -61,6 +61,8 @@ using arrow::BooleanArray;
 using arrow::ChunkedArray;
 using arrow::DataType;
 using arrow::Datum;
+using arrow::Decimal128Array;
+using arrow::Decimal256Array;
 using arrow::Field;
 using arrow::Int32Array;
 using arrow::ListArray;
@@ -369,225 +371,136 @@ Status TransferBinary(RecordReader* reader, MemoryPool* pool,
 }
 
 // ----------------------------------------------------------------------
-// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128
-
-static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) {
-  const int32_t length = stop - start;
-
-  DCHECK_GE(length, 0);
-  DCHECK_LE(length, 8);
-
-  switch (length) {
-    case 0:
-      return 0;
-    case 1:
-      return bytes[start];
-    case 2:
-      return FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
-    case 3: {
-      const uint64_t first_two_bytes = FromBigEndian(SafeLoadAs<uint16_t>(bytes + start));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_two_bytes << 8 | last_byte;
-    }
-    case 4:
-      return FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-    case 5: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_four_bytes << 8 | last_byte;
-    }
-    case 6: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t last_two_bytes =
-          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
-      return first_four_bytes << 16 | last_two_bytes;
-    }
-    case 7: {
-      const uint64_t first_four_bytes =
-          FromBigEndian(SafeLoadAs<uint32_t>(bytes + start));
-      const uint64_t second_two_bytes =
-          FromBigEndian(SafeLoadAs<uint16_t>(bytes + start + 4));
-      const uint64_t last_byte = bytes[stop - 1];
-      return first_four_bytes << 24 | second_two_bytes << 8 | last_byte;
-    }
-    case 8:
-      return FromBigEndian(SafeLoadAs<uint64_t>(bytes + start));
-    default: {
-      DCHECK(false);
-      return UINT64_MAX;
-    }
-  }
+// INT32 / INT64 / BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY -> Decimal128 || Decimal256
+
+template <typename DecimalType>
+Status RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
+                              uint8_t* out_buf) {
+  ARROW_ASSIGN_OR_RAISE(DecimalType t, DecimalType::FromBigEndian(value, byte_width));
+  t.ToBytes(out_buf);
+  return ::arrow::Status::OK();
 }
 
-static constexpr int32_t kMinDecimalBytes = 1;
-static constexpr int32_t kMaxDecimalBytes = 16;
-
-/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one
-/// uint64_t (low bits).
-static void BytesToIntegerPair(const uint8_t* bytes, const int32_t length,
-                               int64_t* out_high, uint64_t* out_low) {
-  DCHECK_GE(length, kMinDecimalBytes);
-  DCHECK_LE(length, kMaxDecimalBytes);
-
-  // XXX This code is copied from Decimal::FromBigEndian
+template <typename DecimalArrayType>
+struct DecimalTypeTrait;
 
-  int64_t high, low;
-
-  // Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the
-  // sign bit.
-  const bool is_negative = static_cast<int8_t>(bytes[0]) < 0;
-
-  // 1. Extract the high bytes
-  // Stop byte of the high bytes
-  const int32_t high_bits_offset = std::max(0, length - 8);
-  const auto high_bits = BytesToInteger(bytes, 0, high_bits_offset);
-
-  if (high_bits_offset == 8) {
-    // Avoid undefined shift by 64 below
-    high = high_bits;
-  } else {
-    high = -1 * (is_negative && length < kMaxDecimalBytes);
-    // Shift left enough bits to make room for the incoming int64_t
-    high = SafeLeftShift(high, high_bits_offset * CHAR_BIT);
-    // Preserve the upper bits by inplace OR-ing the int64_t
-    high |= high_bits;
-  }
-
-  // 2. Extract the low bytes
-  // Stop byte of the low bytes
-  const int32_t low_bits_offset = std::min(length, 8);
-  const auto low_bits = BytesToInteger(bytes, high_bits_offset, length);
-
-  if (low_bits_offset == 8) {
-    // Avoid undefined shift by 64 below
-    low = low_bits;
-  } else {
-    // Sign extend the low bits if necessary
-    low = -1 * (is_negative && length < 8);
-    // Shift left enough bits to make room for the incoming int64_t
-    low = SafeLeftShift(low, low_bits_offset * CHAR_BIT);
-    // Preserve the upper bits by inplace OR-ing the int64_t
-    low |= low_bits;
-  }
-
-  *out_high = high;
-  *out_low = static_cast<uint64_t>(low);
-}
-
-static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width,
-                                          uint8_t* out_buf) {
-  // view the first 8 bytes as an unsigned 64-bit integer
-  auto low = reinterpret_cast<uint64_t*>(out_buf);
-
-  // view the second 8 bytes as a signed 64-bit integer
-  auto high = reinterpret_cast<int64_t*>(out_buf + sizeof(uint64_t));
-
-  // Convert the fixed size binary array bytes into a Decimal128 compatible layout
-  BytesToIntegerPair(value, byte_width, high, low);
-}
-
-template <typename T>
-Status ConvertToDecimal128(const Array& array, const std::shared_ptr<DataType>&,
-                           MemoryPool* pool, std::shared_ptr<Array>*) {
-  return Status::NotImplemented("not implemented");
-}
+template <>
+struct DecimalTypeTrait<::arrow::Decimal128Array> {
+  using value = ::arrow::Decimal128;
+};
 
 template <>
-Status ConvertToDecimal128<FLBAType>(const Array& array,
-                                     const std::shared_ptr<DataType>& type,
-                                     MemoryPool* pool, std::shared_ptr<Array>* out) {
-  const auto& fixed_size_binary_array =
-      static_cast<const ::arrow::FixedSizeBinaryArray&>(array);
-
-  // The byte width of each decimal value
-  const int32_t type_length =
-      static_cast<const ::arrow::Decimal128Type&>(*type).byte_width();
-
-  // number of elements in the entire array
-  const int64_t length = fixed_size_binary_array.length();
-
-  // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
-  // this will be different from the decimal array width because we write the minimum
-  // number of bytes necessary to represent a given precision
-  const int32_t byte_width =
-      static_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
-          .byte_width();
-  if (byte_width < kMinDecimalBytes || byte_width > kMaxDecimalBytes) {
-    return Status::Invalid("Invalid FIXED_LEN_BYTE_ARRAY length for Decimal128");
+struct DecimalTypeTrait<::arrow::Decimal256Array> {
+  using value = ::arrow::Decimal256;
+};
+
+template <typename DecimalArrayType, typename ParquetType>
+struct DecimalConverter {
+  static inline Status ConvertToDecimal(const Array& array,
+                                        const std::shared_ptr<DataType>&,
+                                        MemoryPool* pool, std::shared_ptr<Array>*) {
+    return Status::NotImplemented("not implemented");
   }
-
-  // allocate memory for the decimal array
-  ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
-
-  // raw bytes that we can write to
-  uint8_t* out_ptr = data->mutable_data();
-
-  // convert each FixedSizeBinary value to valid decimal bytes
-  const int64_t null_count = fixed_size_binary_array.null_count();
-  if (null_count > 0) {
-    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
-      if (!fixed_size_binary_array.IsNull(i)) {
-        RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
+};
+
+template <typename DecimalArrayType>
+struct DecimalConverter<DecimalArrayType, FLBAType> {
+  static inline Status ConvertToDecimal(const Array& array,
+                                        const std::shared_ptr<DataType>& type,
+                                        MemoryPool* pool, std::shared_ptr<Array>* out) {
+    const auto& fixed_size_binary_array =
+        checked_cast<const ::arrow::FixedSizeBinaryArray&>(array);
+
+    // The byte width of each decimal value
+    const int32_t type_length =
+        static_cast<const ::arrow::DecimalType&>(*type).byte_width();
+
+    // number of elements in the entire array
+    const int64_t length = fixed_size_binary_array.length();
+
+    // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time
+    // this will be different from the decimal array width because we write the minimum
+    // number of bytes necessary to represent a given precision
+    const int32_t byte_width =
+        checked_cast<const ::arrow::FixedSizeBinaryType&>(*fixed_size_binary_array.type())
+            .byte_width();
+    // allocate memory for the decimal array
+    ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
+
+    // raw bytes that we can write to
+    uint8_t* out_ptr = data->mutable_data();
+
+    // convert each FixedSizeBinary value to valid decimal bytes
+    const int64_t null_count = fixed_size_binary_array.null_count();
+
+    using DecimalType = typename DecimalTypeTrait<DecimalArrayType>::value;
+    if (null_count > 0) {
+      for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+        if (!fixed_size_binary_array.IsNull(i)) {
+          RETURN_NOT_OK(RawBytesToDecimalBytes<DecimalType>(
+              fixed_size_binary_array.GetValue(i), byte_width, out_ptr));
+        } else {
+          std::memset(out_ptr, 0, type_length);
+        }
+      }
+    } else {
+      for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
+        RETURN_NOT_OK(RawBytesToDecimalBytes<DecimalType>(
+            fixed_size_binary_array.GetValue(i), byte_width, out_ptr));
       }
     }
-  } else {
-    for (int64_t i = 0; i < length; ++i, out_ptr += type_length) {
-      RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr);
-    }
-  }
 
-  *out = std::make_shared<::arrow::Decimal128Array>(
-      type, length, std::move(data), fixed_size_binary_array.null_bitmap(), null_count);
+    *out = std::make_shared<DecimalArrayType>(
+        type, length, std::move(data), fixed_size_binary_array.null_bitmap(), null_count);
 
-  return Status::OK();
-}
+    return Status::OK();
+  }
+};
 
-template <>
-Status ConvertToDecimal128<ByteArrayType>(const Array& array,
-                                          const std::shared_ptr<DataType>& type,
-                                          MemoryPool* pool, std::shared_ptr<Array>* out) {
-  const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
-  const int64_t length = binary_array.length();
+template <typename DecimalArrayType>
+struct DecimalConverter<DecimalArrayType, ByteArrayType> {
+  static inline Status ConvertToDecimal(const Array& array,
+                                        const std::shared_ptr<DataType>& type,
+                                        MemoryPool* pool, std::shared_ptr<Array>* out) {
+    const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(array);
+    const int64_t length = binary_array.length();
 
-  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
-  const int64_t type_length = decimal_type.byte_width();
+    const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
+    const int64_t type_length = decimal_type.byte_width();
 
-  ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
+    ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
 
-  // raw bytes that we can write to
-  uint8_t* out_ptr = data->mutable_data();
+    // raw bytes that we can write to
+    uint8_t* out_ptr = data->mutable_data();
 
-  const int64_t null_count = binary_array.null_count();
+    const int64_t null_count = binary_array.null_count();
 
-  // convert each BinaryArray value to valid decimal bytes
-  for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
-    int32_t record_len = 0;
-    const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
+    // convert each BinaryArray value to valid decimal bytes
+    for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
+      int32_t record_len = 0;
+      const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
 
-    if (record_len < 0 || record_len > type_length) {
-      return Status::Invalid("Invalid BYTE_ARRAY length for Decimal128");
-    }
+      if (record_len < 0 || record_len > type_length) {
+        return Status::Invalid("Invalid BYTE_ARRAY length for Decimal128");
+      }
 
-    auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
-    out_ptr_view[0] = 0;
-    out_ptr_view[1] = 0;
+      auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
+      out_ptr_view[0] = 0;
+      out_ptr_view[1] = 0;
 
-    // only convert rows that are not null if there are nulls, or
-    // all rows, if there are not
-    if ((null_count > 0 && !binary_array.IsNull(i)) || null_count <= 0) {
-      if (record_len <= 0) {
-        return Status::Invalid("Invalid BYTE_ARRAY length for Decimal128");
+      // only convert rows that are not null if there are nulls, or
+      // all rows, if there are not
+      if ((null_count > 0 && !binary_array.IsNull(i)) || null_count <= 0) {
+        using DecimalType = typename DecimalTypeTrait<DecimalArrayType>::value;
+        RETURN_NOT_OK(
+            RawBytesToDecimalBytes<DecimalType>(record_loc, record_len, out_ptr));
       }
-      RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
     }
+    *out = std::make_shared<DecimalArrayType>(type, length, std::move(data),
+                                              binary_array.null_bitmap(), null_count);
+    return Status::OK();
   }
-
-  *out = std::make_shared<::arrow::Decimal128Array>(
-      type, length, std::move(data), binary_array.null_bitmap(), null_count);
-  return Status::OK();
-}
+};
 
 /// \brief Convert an Int32 or Int64 array into a Decimal128Array
 /// The parquet spec allows systems to write decimals in int32, int64 if the values are
@@ -599,7 +512,15 @@ template <
                                     std::is_same<ParquetIntegerType, Int64Type>::value>>
 static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
                                      const std::shared_ptr<DataType>& type, Datum* out) {
-  DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+  // Decimal128 and Decimal256 are only Arrow constructs.  Parquet does not
+  // specifically distinguish between decimal byte widths.
+  // Decimal256 isn't relevant here because the Arrow-Parquet C++ bindings never
+  // write Decimal values as integers and if the decimal value can fit in an
+  // integer it is wasteful to use Decimal256. Put another way, the only
+  // way an integer column could be construed as Decimal256 is if an arrow
+  // schema was stored as metadata in the file indicating the column was
+  // Decimal256. The current Arrow-Parquet C++ bindings will never do this.
+  DCHECK(type->id() == ::arrow::Type::DECIMAL128);
 
   const int64_t length = reader->values_written();
 
@@ -610,7 +531,7 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
 
   const auto values = reinterpret_cast<const ElementType*>(reader->values());
 
-  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
+  const auto& decimal_type = static_cast<const ::arrow::DecimalType&>(*type);
   const int64_t type_length = decimal_type.byte_width();
 
   ARROW_ASSIGN_OR_RAISE(auto data, ::arrow::AllocateBuffer(length * type_length, pool));
@@ -622,21 +543,16 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
     // sign/zero extend int32_t values, otherwise a no-op
     const auto value = static_cast<int64_t>(values[i]);
 
-    auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
-
-    // No-op on little endian machines, byteswap on big endian
-    out_ptr_view[0] = FromLittleEndian(static_cast<uint64_t>(value));
-
-    // no need to byteswap here because we're sign/zero extending exactly 8 bytes
-    out_ptr_view[1] = static_cast<uint64_t>(value < 0 ? -1 : 0);
+    ::arrow::Decimal128 decimal(value);
+    decimal.ToBytes(out_ptr);
   }
 
   if (reader->nullable_values()) {
     std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
-    *out = std::make_shared<::arrow::Decimal128Array>(type, length, std::move(data),
-                                                      is_valid, reader->null_count());
+    *out = std::make_shared<Decimal128Array>(type, length, std::move(data), is_valid,
+                                             reader->null_count());
   } else {
-    *out = std::make_shared<::arrow::Decimal128Array>(type, length, std::move(data));
+    *out = std::make_shared<Decimal128Array>(type, length, std::move(data));
   }
   return Status::OK();
 }
@@ -647,20 +563,16 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
 /// 2. Allocating a buffer for the arrow::Decimal128Array
 /// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
 ///    representing the high and low bits of each decimal value.
-template <typename ParquetType>
+template <typename DecimalArrayType, typename ParquetType>
 Status TransferDecimal(RecordReader* reader, MemoryPool* pool,
                        const std::shared_ptr<DataType>& type, Datum* out) {
-  if (type->id() != ::arrow::Type::DECIMAL128) {
-    return Status::NotImplemented("Only reading decimal128 types is currently supported");
-  }
-
   auto binary_reader = dynamic_cast<BinaryRecordReader*>(reader);
   DCHECK(binary_reader);
   ::arrow::ArrayVector chunks = binary_reader->GetBuilderChunks();
   for (size_t i = 0; i < chunks.size(); ++i) {
     std::shared_ptr<Array> chunk_as_decimal;
-    RETURN_NOT_OK(
-        ConvertToDecimal128<ParquetType>(*chunks[i], type, pool, &chunk_as_decimal));
+    auto fn = &DecimalConverter<DecimalArrayType, ParquetType>::ConvertToDecimal;
+    RETURN_NOT_OK(fn(*chunks[i], type, pool, &chunk_as_decimal));
     // Replace the chunk, which will hopefully also free memory as we go
     chunks[i] = chunk_as_decimal;
   }
@@ -723,29 +635,46 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> value_
       RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result));
       result = chunked_result;
     } break;
-    case ::arrow::Type::DECIMAL: {
+    case ::arrow::Type::DECIMAL128: {
       switch (descr->physical_type()) {
         case ::parquet::Type::INT32: {
-          RETURN_NOT_OK(
-              DecimalIntegerTransfer<Int32Type>(reader, pool, value_type, &result));
+          auto fn = DecimalIntegerTransfer<Int32Type>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
         } break;
         case ::parquet::Type::INT64: {
-          RETURN_NOT_OK(
-              DecimalIntegerTransfer<Int64Type>(reader, pool, value_type, &result));
+          auto fn = &DecimalIntegerTransfer<Int64Type>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
         } break;
         case ::parquet::Type::BYTE_ARRAY: {
-          RETURN_NOT_OK(
-              TransferDecimal<ByteArrayType>(reader, pool, value_type, &result));
+          auto fn = &TransferDecimal<Decimal128Array, ByteArrayType>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
         } break;
         case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
-          RETURN_NOT_OK(TransferDecimal<FLBAType>(reader, pool, value_type, &result));
+          auto fn = &TransferDecimal<Decimal128Array, FLBAType>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
         } break;
         default:
           return Status::Invalid(
-              "Physical type for decimal must be int32, int64, byte array, or fixed "
+              "Physical type for decimal128 must be int32, int64, byte array, or fixed "
               "length binary");
       }
     } break;
+    case ::arrow::Type::DECIMAL256:
+      switch (descr->physical_type()) {
+        case ::parquet::Type::BYTE_ARRAY: {
+          auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+        } break;
+        case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
+          auto fn = &TransferDecimal<Decimal256Array, FLBAType>;
+          RETURN_NOT_OK(fn(reader, pool, value_type, &result));
+        } break;
+        default:
+          return Status::Invalid(
+              "Physical type for decimal256 must be fixed length binary");
+      }
+      break;
+
     case ::arrow::Type::TIMESTAMP: {
       const ::arrow::TimestampType& timestamp_type =
           static_cast<::arrow::TimestampType&>(*value_type);
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index e81c427..391c57c 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -35,6 +35,7 @@
 #include "parquet/properties.h"
 #include "parquet/types.h"
 
+using arrow::DecimalType;
 using arrow::Field;
 using arrow::FieldVector;
 using arrow::KeyValueMetadata;
@@ -54,7 +55,6 @@ using ParquetType = parquet::Type;
 using parquet::ConvertedType;
 using parquet::LogicalType;
 
-using parquet::internal::DecimalSize;
 using parquet::internal::LevelInfo;
 
 namespace parquet {
@@ -297,12 +297,13 @@ Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
           static_cast<const ::arrow::FixedSizeBinaryType&>(*field->type());
       length = fixed_size_binary_type.byte_width();
     } break;
-    case ArrowTypeId::DECIMAL: {
+    case ArrowTypeId::DECIMAL128:
+    case ArrowTypeId::DECIMAL256: {
       type = ParquetType::FIXED_LEN_BYTE_ARRAY;
       const auto& decimal_type = static_cast<const ::arrow::DecimalType&>(*field->type());
       precision = decimal_type.precision();
       scale = decimal_type.scale();
-      length = DecimalSize(precision);
+      length = DecimalType::DecimalSize(precision);
       PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale));
     } break;
     case ArrowTypeId::DATE32:
@@ -881,6 +882,12 @@ Result<bool> ApplyOriginalStorageMetadata(const Field& origin_field,
     modified = true;
   }
 
+  if (origin_type->id() == ::arrow::Type::DECIMAL256 &&
+      inferred_type->id() == ::arrow::Type::DECIMAL128) {
+    inferred->field = inferred->field->WithType(origin_type);
+    modified = true;
+  }
+
   // Restore field metadata
   std::shared_ptr<const KeyValueMetadata> field_metadata = origin_field.metadata();
   if (field_metadata != nullptr) {
diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc
index 3d83a38..fbdfa09 100644
--- a/cpp/src/parquet/arrow/schema_internal.cc
+++ b/cpp/src/parquet/arrow/schema_internal.cc
@@ -33,7 +33,10 @@ using ::arrow::internal::checked_cast;
 
 Result<std::shared_ptr<ArrowType>> MakeArrowDecimal(const LogicalType& logical_type) {
   const auto& decimal = checked_cast<const DecimalLogicalType&>(logical_type);
-  return ::arrow::Decimal128Type::Make(decimal.precision(), decimal.scale());
+  if (decimal.precision() <= ::arrow::Decimal128Type::kMaxPrecision) {
+    return ::arrow::Decimal128Type::Make(decimal.precision(), decimal.scale());
+  }
+  return ::arrow::Decimal256Type::Make(decimal.precision(), decimal.scale());
 }
 
 Result<std::shared_ptr<ArrowType>> MakeArrowInt(const LogicalType& logical_type) {
diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h
index dd8b170..fb1d398 100644
--- a/cpp/src/parquet/arrow/test_util.h
+++ b/cpp/src/parquet/arrow/test_util.h
@@ -55,6 +55,16 @@ struct DecimalWithPrecisionAndScale {
   static constexpr int32_t scale = PRECISION - 1;
 };
 
+template <int32_t PRECISION>
+struct Decimal256WithPrecisionAndScale {
+  static_assert(PRECISION >= 1 && PRECISION <= 76, "Invalid precision value");
+
+  using type = ::arrow::Decimal256Type;
+  static constexpr ::arrow::Type::type type_id = ::arrow::Decimal256Type::type_id;
+  static constexpr int32_t precision = PRECISION;
+  static constexpr int32_t scale = PRECISION - 1;
+};
+
 template <class ArrowType>
 ::arrow::enable_if_floating_point<ArrowType, Status> NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
@@ -122,8 +132,8 @@ template <typename ArrowType>
 static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t* out) {
   std::default_random_engine gen(seed);
   std::uniform_int_distribution<uint32_t> d(0, std::numeric_limits<uint8_t>::max());
-  const int32_t required_bytes = ::arrow::DecimalSize(precision);
-  constexpr int32_t byte_width = 16;
+  const int32_t required_bytes = ::arrow::DecimalType::DecimalSize(precision);
+  int32_t byte_width = precision <= 38 ? 16 : 32;
   std::fill(out, out + byte_width * n, '\0');
 
   for (int64_t i = 0; i < n; ++i, out += byte_width) {
@@ -159,6 +169,27 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
   return builder.Finish(out);
 }
 
+template <typename ArrowType, int32_t precision = ArrowType::precision>
+::arrow::enable_if_t<
+    std::is_same<ArrowType, Decimal256WithPrecisionAndScale<precision>>::value, Status>
+NonNullArray(size_t size, std::shared_ptr<Array>* out) {
+  constexpr int32_t kDecimalPrecision = precision;
+  constexpr int32_t kDecimalScale = Decimal256WithPrecisionAndScale<precision>::scale;
+
+  const auto type = ::arrow::decimal256(kDecimalPrecision, kDecimalScale);
+  ::arrow::Decimal256Builder builder(type);
+  const int32_t byte_width =
+      static_cast<const ::arrow::Decimal256Type&>(*type).byte_width();
+
+  constexpr int32_t seed = 0;
+
+  ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));
+  random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
+
+  RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
+  return builder.Finish(out);
+}
+
 template <class ArrowType>
 ::arrow::enable_if_boolean<ArrowType, Status> NonNullArray(size_t size,
                                                            std::shared_ptr<Array>* out) {
@@ -322,6 +353,32 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,
   return builder.Finish(out);
 }
 
+template <typename ArrowType, int32_t precision = ArrowType::precision>
+::arrow::enable_if_t<
+    std::is_same<ArrowType, Decimal256WithPrecisionAndScale<precision>>::value, Status>
+NullableArray(size_t size, size_t num_nulls, uint32_t seed,
+              std::shared_ptr<::arrow::Array>* out) {
+  std::vector<uint8_t> valid_bytes(size, '\1');
+
+  for (size_t i = 0; i < num_nulls; ++i) {
+    valid_bytes[i * 2] = '\0';
+  }
+
+  constexpr int32_t kDecimalPrecision = precision;
+  constexpr int32_t kDecimalScale = Decimal256WithPrecisionAndScale<precision>::scale;
+  const auto type = ::arrow::decimal256(kDecimalPrecision, kDecimalScale);
+  const int32_t byte_width =
+      static_cast<const ::arrow::Decimal256Type&>(*type).byte_width();
+
+  ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));
+
+  random_decimals(size, seed, precision, out_buf->mutable_data());
+
+  ::arrow::Decimal256Builder builder(type);
+  RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
+  return builder.Finish(out);
+}
+
 // This helper function only supports (size/2) nulls yet.
 template <class ArrowType>
 ::arrow::enable_if_boolean<ArrowType, Status> NullableArray(size_t size, size_t num_nulls,
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index e35eeac..ce44530 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -1939,23 +1939,23 @@ struct SerializeFunctor<
 // ----------------------------------------------------------------------
 // Write Arrow to Decimal128
 
-// Requires a custom serializer because decimal128 in parquet are in big-endian
+// Requires a custom serializer because decimal in parquet are in big-endian
 // format. Thus, a temporary local buffer is required.
 template <typename ParquetType, typename ArrowType>
-struct SerializeFunctor<ParquetType, ArrowType,
-                        ::arrow::enable_if_decimal128<ArrowType>> {
-  Status Serialize(const ::arrow::Decimal128Array& array, ArrowWriteContext* ctx,
-                   FLBA* out) {
+struct SerializeFunctor<ParquetType, ArrowType, ::arrow::enable_if_decimal<ArrowType>> {
+  Status Serialize(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
+                   ArrowWriteContext* ctx, FLBA* out) {
     AllocateScratch(array, ctx);
     auto offset = Offset(array);
 
     if (array.null_count() == 0) {
       for (int64_t i = 0; i < array.length(); i++) {
-        out[i] = FixDecimalEndianess(array.GetValue(i), offset);
+        out[i] = FixDecimalEndianess<ArrowType::kByteWidth>(array.GetValue(i), offset);
       }
     } else {
       for (int64_t i = 0; i < array.length(); i++) {
-        out[i] = array.IsValid(i) ? FixDecimalEndianess(array.GetValue(i), offset)
+        out[i] = array.IsValid(i) ? FixDecimalEndianess<ArrowType::kByteWidth>(
+                                        array.GetValue(i), offset)
                                   : FixedLenByteArray();
       }
     }
@@ -1964,26 +1964,38 @@ struct SerializeFunctor<ParquetType, ArrowType,
   }
 
   // Parquet's Decimal are stored with FixedLength values where the length is
-  // proportional to the precision. Arrow's Decimal are always stored with 16
+  // proportional to the precision. Arrow's Decimal are always stored with 16/32
   // bytes. Thus the internal FLBA pointer must be adjusted by the offset calculated
   // here.
-  int32_t Offset(const ::arrow::Decimal128Array& array) {
-    auto decimal_type = checked_pointer_cast<::arrow::Decimal128Type>(array.type());
-    return decimal_type->byte_width() - internal::DecimalSize(decimal_type->precision());
+  int32_t Offset(const Array& array) {
+    auto decimal_type = checked_pointer_cast<::arrow::DecimalType>(array.type());
+    return decimal_type->byte_width() -
+           ::arrow::DecimalType::DecimalSize(decimal_type->precision());
   }
 
-  void AllocateScratch(const ::arrow::Decimal128Array& array, ArrowWriteContext* ctx) {
+  void AllocateScratch(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
+                       ArrowWriteContext* ctx) {
     int64_t non_null_count = array.length() - array.null_count();
-    int64_t size = non_null_count * 16;
+    int64_t size = non_null_count * ArrowType::kByteWidth;
     scratch_buffer = AllocateBuffer(ctx->memory_pool, size);
     scratch = reinterpret_cast<int64_t*>(scratch_buffer->mutable_data());
   }
 
+  template <int byte_width>
   FixedLenByteArray FixDecimalEndianess(const uint8_t* in, int64_t offset) {
     const auto* u64_in = reinterpret_cast<const int64_t*>(in);
     auto out = reinterpret_cast<const uint8_t*>(scratch) + offset;
-    *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[1]);
-    *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[0]);
+    static_assert(byte_width == 16 || byte_width == 32,
+                  "only 16 and 32 byte Decimals supported");
+    if (byte_width == 32) {
+      *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[3]);
+      *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[2]);
+      *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[1]);
+      *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[0]);
+    } else {
+      *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[1]);
+      *scratch++ = ::arrow::BitUtil::ToBigEndian(u64_in[0]);
+    }
     return FixedLenByteArray(out);
   }
 
@@ -1991,15 +2003,6 @@ struct SerializeFunctor<ParquetType, ArrowType,
   int64_t* scratch;
 };
 
-template <typename ParquetType, typename ArrowType>
-struct SerializeFunctor<ParquetType, ArrowType,
-                        ::arrow::enable_if_decimal256<ArrowType>> {
-  Status Serialize(const ::arrow::Decimal256Array& array, ArrowWriteContext* ctx,
-                   FLBA* out) {
-    return Status::NotImplemented("Decimal256 serialization isn't implemented");
-  }
-};
-
 template <>
 Status TypedColumnWriterImpl<FLBAType>::WriteArrowDense(
     const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc
index b1be41f..1d73002 100644
--- a/cpp/src/parquet/schema.cc
+++ b/cpp/src/parquet/schema.cc
@@ -211,7 +211,7 @@ PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetitio
       break;
     default:
       ss << ConvertedTypeToString(converted_type);
-      ss << " can not be applied to a primitive type";
+      ss << " cannot be applied to a primitive type";
       throw ParquetException(ss.str());
   }
   // For forward compatibility, create an equivalent logical type
diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc
index cd5b8e0..3d3e7c7 100644
--- a/cpp/src/parquet/types.cc
+++ b/cpp/src/parquet/types.cc
@@ -1570,80 +1570,4 @@ class LogicalType::Impl::Unknown final : public LogicalType::Impl::SimpleCompati
 
 GENERATE_MAKE(Unknown)
 
-namespace internal {
-
-/// \brief Compute the number of bytes required to represent a decimal of a
-/// given precision. Taken from the Apache Impala codebase. The comments next
-/// to the return values are the maximum value that can be represented in 2's
-/// complement with the returned number of bytes.
-int32_t DecimalSize(int32_t precision) {
-  DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
-                          << precision;
-  DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
-                           << precision;
-
-  switch (precision) {
-    case 1:
-    case 2:
-      return 1;  // 127
-    case 3:
-    case 4:
-      return 2;  // 32,767
-    case 5:
-    case 6:
-      return 3;  // 8,388,607
-    case 7:
-    case 8:
-    case 9:
-      return 4;  // 2,147,483,427
-    case 10:
-    case 11:
-      return 5;  // 549,755,813,887
-    case 12:
-    case 13:
-    case 14:
-      return 6;  // 140,737,488,355,327
-    case 15:
-    case 16:
-      return 7;  // 36,028,797,018,963,967
-    case 17:
-    case 18:
-      return 8;  // 9,223,372,036,854,775,807
-    case 19:
-    case 20:
-    case 21:
-      return 9;  // 2,361,183,241,434,822,606,847
-    case 22:
-    case 23:
-      return 10;  // 604,462,909,807,314,587,353,087
-    case 24:
-    case 25:
-    case 26:
-      return 11;  // 154,742,504,910,672,534,362,390,527
-    case 27:
-    case 28:
-      return 12;  // 39,614,081,257,132,168,796,771,975,167
-    case 29:
-    case 30:
-    case 31:
-      return 13;  // 10,141,204,801,825,835,211,973,625,643,007
-    case 32:
-    case 33:
-      return 14;  // 2,596,148,429,267,413,814,265,248,164,610,047
-    case 34:
-    case 35:
-      return 15;  // 664,613,997,892,457,936,451,903,530,140,172,287
-    case 36:
-    case 37:
-    case 38:
-      return 16;  // 170,141,183,460,469,231,731,687,303,715,884,105,727
-    default:
-      break;
-  }
-  DCHECK(false);
-  return -1;
-}
-
-}  // namespace internal
-
 }  // namespace parquet
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index 21cef0f..953da83 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -704,10 +704,4 @@ PARQUET_EXPORT SortOrder::type GetSortOrder(ConvertedType::type converted,
 PARQUET_EXPORT SortOrder::type GetSortOrder(
     const std::shared_ptr<const LogicalType>& logical_type, Type::type primitive);
 
-namespace internal {
-
-PARQUET_EXPORT
-int32_t DecimalSize(int32_t precision);
-
-}  // namespace internal
 }  // namespace parquet