You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/09/12 01:49:17 UTC

arrow git commit: ARROW-1519: [C++] Move DecimalUtil functions to methods on the Int128 class

Repository: arrow
Updated Branches:
  refs/heads/master 9ede7fbab -> b590c2454


ARROW-1519: [C++] Move DecimalUtil functions to methods on the Int128 class

This is mostly moving things around so that there's one place to access all Int128 functionality from third parties, the original impetus being `parquet-cpp` decimal reads/writes.

Author: Phillip Cloud <cp...@gmail.com>
Author: Wes McKinney <we...@twosigma.com>

Closes #1083 from cpcloud/ARROW-1519 and squashes the following commits:

27ae41dc [Wes McKinney] Rename Int128 to Decimal128
dd401c6d [Phillip Cloud] IWYU
51a3f240 [Phillip Cloud] Use a bool for negativity
94b5b37a [Phillip Cloud] Doc for Int128::FromString
db4d2070 [Phillip Cloud] Add doc and const
c06ebe83 [Phillip Cloud] Add DCHECK for Int128 pointer in StringToInteger
bec1c8b9 [Phillip Cloud] Add DCHECK for Int128 initial value in StringToInteger
d9829c9b [Phillip Cloud] Refactor Int128 construction from string
d7c2068c [Phillip Cloud] ARROW-1519: [C++] Move DecimalUtil functions to methods on the Int128 class


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b590c245
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b590c245
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b590c245

Branch: refs/heads/master
Commit: b590c245493084e584a7f79defc395282fd5c2a4
Parents: 9ede7fb
Author: Phillip Cloud <cp...@gmail.com>
Authored: Mon Sep 11 21:49:13 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Sep 11 21:49:13 2017 -0400

----------------------------------------------------------------------
 cpp/CMakeLists.txt                      |   1 -
 cpp/src/arrow/array-test.cc             |  20 +-
 cpp/src/arrow/array.cc                  |   5 +-
 cpp/src/arrow/builder.cc                |   3 +-
 cpp/src/arrow/builder.h                 |   4 +-
 cpp/src/arrow/compare.cc                |   1 -
 cpp/src/arrow/pretty_print-test.cc      |   7 +-
 cpp/src/arrow/python/arrow_to_pandas.cc |   5 +-
 cpp/src/arrow/python/builtin_convert.cc |   5 +-
 cpp/src/arrow/python/helpers.cc         |   2 +-
 cpp/src/arrow/python/pandas_to_arrow.cc |   5 +-
 cpp/src/arrow/python/python-test.cc     |   2 -
 cpp/src/arrow/util/CMakeLists.txt       |   2 +-
 cpp/src/arrow/util/decimal-test.cc      | 140 +++---
 cpp/src/arrow/util/decimal.cc           | 617 ++++++++++++++++++++++++---
 cpp/src/arrow/util/decimal.h            | 118 ++++-
 cpp/src/arrow/util/int128.cc            | 527 -----------------------
 cpp/src/arrow/util/int128.h             | 129 ------
 18 files changed, 755 insertions(+), 838 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 24735ac..577a4bb 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -755,7 +755,6 @@ set(ARROW_SRCS
   src/arrow/util/compression.cc
   src/arrow/util/cpu-info.cc
   src/arrow/util/decimal.cc
-  src/arrow/util/int128.cc
   src/arrow/util/key_value_metadata.cc
 )
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/array-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index a9596c5..39db715 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -34,7 +34,6 @@
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/decimal.h"
-#include "arrow/util/int128.h"
 
 namespace arrow {
 
@@ -2568,7 +2567,7 @@ TEST(TestUnionArrayAdHoc, TestSliceEquals) {
   CheckUnion(batch->column(2));
 }
 
-using DecimalVector = std::vector<Int128>;
+using DecimalVector = std::vector<Decimal128>;
 
 class DecimalTest : public ::testing::TestWithParam<int> {
  public:
@@ -2634,8 +2633,8 @@ class DecimalTest : public ::testing::TestWithParam<int> {
 
 TEST_P(DecimalTest, NoNulls) {
   int32_t precision = GetParam();
-  std::vector<Int128> draw = {Int128(1), Int128(-2), Int128(2389), Int128(4),
-                              Int128(-12348)};
+  std::vector<Decimal128> draw = {Decimal128(1), Decimal128(-2), Decimal128(2389),
+                                  Decimal128(4), Decimal128(-12348)};
   std::vector<uint8_t> valid_bytes = {true, true, true, true, true};
   this->TestCreate(precision, draw, valid_bytes, 0);
   this->TestCreate(precision, draw, valid_bytes, 2);
@@ -2643,14 +2642,15 @@ TEST_P(DecimalTest, NoNulls) {
 
 TEST_P(DecimalTest, WithNulls) {
   int32_t precision = GetParam();
-  std::vector<Int128> draw = {Int128(1),  Int128(2), Int128(-1), Int128(4),
-                              Int128(-1), Int128(1), Int128(2)};
-  Int128 big;
-  ASSERT_OK(DecimalUtil::FromString("230342903942.234234", &big));
+  std::vector<Decimal128> draw = {Decimal128(1), Decimal128(2),  Decimal128(-1),
+                                  Decimal128(4), Decimal128(-1), Decimal128(1),
+                                  Decimal128(2)};
+  Decimal128 big;
+  ASSERT_OK(Decimal128::FromString("230342903942.234234", &big));
   draw.push_back(big);
 
-  Int128 big_negative;
-  ASSERT_OK(DecimalUtil::FromString("-23049302932.235234", &big_negative));
+  Decimal128 big_negative;
+  ASSERT_OK(Decimal128::FromString("-23049302932.235234", &big_negative));
   draw.push_back(big_negative);
 
   std::vector<uint8_t> valid_bytes = {true, true, false, true, false,

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 2d37274..a1d3bed 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -29,7 +29,6 @@
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/decimal.h"
-#include "arrow/util/int128.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 #include "arrow/visitor.h"
@@ -319,8 +318,8 @@ DecimalArray::DecimalArray(const std::shared_ptr<internal::ArrayData>& data)
 
 std::string DecimalArray::FormatValue(int64_t i) const {
   const auto& type_ = static_cast<const DecimalType&>(*type());
-  Int128 value(GetValue(i));
-  return DecimalUtil::ToString(value, type_.precision(), type_.scale());
+  Decimal128 value(GetValue(i));
+  return value.ToString(type_.precision(), type_.scale());
 }
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 5945677..7f0f402 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -35,7 +35,6 @@
 #include "arrow/util/cpu-info.h"
 #include "arrow/util/decimal.h"
 #include "arrow/util/hash-util.h"
-#include "arrow/util/int128.h"
 #include "arrow/util/logging.h"
 
 namespace arrow {
@@ -1119,7 +1118,7 @@ DecimalBuilder::DecimalBuilder(MemoryPool* pool, const std::shared_ptr<DataType>
     : DecimalBuilder(type, pool) {}
 #endif
 
-Status DecimalBuilder::Append(const Int128& value) {
+Status DecimalBuilder::Append(const Decimal128& value) {
   RETURN_NOT_OK(FixedSizeBinaryBuilder::Reserve(1));
   std::array<uint8_t, 16> bytes;
   RETURN_NOT_OK(value.ToBytes(&bytes));

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index bf7b317..7df8899 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -38,7 +38,7 @@
 namespace arrow {
 
 class Array;
-class Int128;
+class Decimal128;
 
 namespace internal {
 
@@ -778,7 +778,7 @@ class ARROW_EXPORT DecimalBuilder : public FixedSizeBinaryBuilder {
 
   using FixedSizeBinaryBuilder::Append;
 
-  Status Append(const Int128& val);
+  Status Append(const Decimal128& val);
 
   Status Finish(std::shared_ptr<Array>* out) override;
 };

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index 876e7f7..2aeb03b 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -29,7 +29,6 @@
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
-#include "arrow/util/decimal.h"
 #include "arrow/util/logging.h"
 #include "arrow/visitor_inline.h"
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/pretty_print-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc
index ab0bc16..8b9a24f 100644
--- a/cpp/src/arrow/pretty_print-test.cc
+++ b/cpp/src/arrow/pretty_print-test.cc
@@ -32,7 +32,6 @@
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/decimal.h"
-#include "arrow/util/int128.h"
 
 namespace arrow {
 
@@ -116,12 +115,12 @@ TEST_F(TestPrettyPrint, DecimalType) {
 
   DecimalBuilder builder(type);
 
-  Int128 val;
+  Decimal128 val;
 
-  ASSERT_OK(DecimalUtil::FromString("123.4567", &val));
+  ASSERT_OK(Decimal128::FromString("123.4567", &val));
   ASSERT_OK(builder.Append(val));
 
-  ASSERT_OK(DecimalUtil::FromString("456.7891", &val));
+  ASSERT_OK(Decimal128::FromString("456.7891", &val));
   ASSERT_OK(builder.Append(val));
   ASSERT_OK(builder.AppendNull());
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/python/arrow_to_pandas.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc
index 117bf23..148963f 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.cc
+++ b/cpp/src/arrow/python/arrow_to_pandas.cc
@@ -37,7 +37,6 @@
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/decimal.h"
-#include "arrow/util/int128.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/parallel.h"
@@ -606,8 +605,8 @@ static Status ConvertTimes(PandasOptions options, const ChunkedArray& data,
 static Status RawDecimalToString(const uint8_t* bytes, int precision, int scale,
                                  std::string* result) {
   DCHECK_NE(result, nullptr);
-  Int128 decimal(bytes);
-  *result = DecimalUtil::ToString(decimal, precision, scale);
+  Decimal128 decimal(bytes);
+  *result = decimal.ToString(precision, scale);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/python/builtin_convert.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/builtin_convert.cc b/cpp/src/arrow/python/builtin_convert.cc
index eb17f4e..747b872 100644
--- a/cpp/src/arrow/python/builtin_convert.cc
+++ b/cpp/src/arrow/python/builtin_convert.cc
@@ -28,7 +28,6 @@
 #include "arrow/api.h"
 #include "arrow/status.h"
 #include "arrow/util/decimal.h"
-#include "arrow/util/int128.h"
 #include "arrow/util/logging.h"
 
 #include "arrow/python/helpers.h"
@@ -589,8 +588,8 @@ class DecimalConverter
       std::string string;
       RETURN_NOT_OK(PythonDecimalToString(item.obj(), &string));
 
-      Int128 value;
-      RETURN_NOT_OK(DecimalUtil::FromString(string, &value));
+      Decimal128 value;
+      RETURN_NOT_OK(Decimal128::FromString(string, &value));
       return typed_builder_->Append(value);
     }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/python/helpers.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/helpers.cc b/cpp/src/arrow/python/helpers.cc
index deda93f..fb2fed7 100644
--- a/cpp/src/arrow/python/helpers.cc
+++ b/cpp/src/arrow/python/helpers.cc
@@ -103,7 +103,7 @@ Status InferDecimalPrecisionAndScale(PyObject* python_decimal, int* precision,
   auto size = str.size;
 
   std::string c_string(bytes, size);
-  return DecimalUtil::FromString(c_string, nullptr, precision, scale);
+  return Decimal128::FromString(c_string, nullptr, precision, scale);
 }
 
 Status DecimalFromString(PyObject* decimal_constructor, const std::string& decimal_string,

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/python/pandas_to_arrow.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/pandas_to_arrow.cc b/cpp/src/arrow/python/pandas_to_arrow.cc
index 1f96f4e..64f753e 100644
--- a/cpp/src/arrow/python/pandas_to_arrow.cc
+++ b/cpp/src/arrow/python/pandas_to_arrow.cc
@@ -39,7 +39,6 @@
 #include "arrow/type_traits.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/decimal.h"
-#include "arrow/util/int128.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 #include "arrow/visitor_inline.h"
@@ -659,8 +658,8 @@ Status PandasConverter::ConvertDecimals() {
       std::string string;
       RETURN_NOT_OK(PythonDecimalToString(object, &string));
 
-      Int128 value;
-      RETURN_NOT_OK(DecimalUtil::FromString(string, &value));
+      Decimal128 value;
+      RETURN_NOT_OK(Decimal128::FromString(string, &value));
       RETURN_NOT_OK(builder.Append(value));
     } else if (PandasObjectIsNull(object)) {
       RETURN_NOT_OK(builder.AppendNull());

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/python/python-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc
index 0241ff0..e1796c0 100644
--- a/cpp/src/arrow/python/python-test.cc
+++ b/cpp/src/arrow/python/python-test.cc
@@ -29,8 +29,6 @@
 #include "arrow/python/builtin_convert.h"
 #include "arrow/python/helpers.h"
 
-#include "arrow/util/decimal.h"
-
 namespace arrow {
 namespace py {
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt
index 0705820..44be1c9 100644
--- a/cpp/src/arrow/util/CMakeLists.txt
+++ b/cpp/src/arrow/util/CMakeLists.txt
@@ -32,8 +32,8 @@ install(FILES
   compression_zlib.h
   compression_zstd.h
   cpu-info.h
+  decimal.h
   hash-util.h
-  int128.h
   key_value_metadata.h
   logging.h
   macros.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/util/decimal-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/decimal-test.cc b/cpp/src/arrow/util/decimal-test.cc
index 97af088..6f5118e 100644
--- a/cpp/src/arrow/util/decimal-test.cc
+++ b/cpp/src/arrow/util/decimal-test.cc
@@ -24,31 +24,29 @@
 #include "arrow/status.h"
 #include "arrow/test-util.h"
 #include "arrow/util/decimal.h"
-#include "arrow/util/int128.h"
 
 namespace arrow {
-namespace DecimalUtil {
 
 class DecimalTestFixture : public ::testing::Test {
  public:
   DecimalTestFixture() : integer_value_(23423445), string_value_("234.23445") {}
-  Int128 integer_value_;
+  Decimal128 integer_value_;
   std::string string_value_;
 };
 
 TEST_F(DecimalTestFixture, TestToString) {
-  Int128 decimal(this->integer_value_);
+  Decimal128 decimal(this->integer_value_);
   int precision = 8;
   int scale = 5;
-  std::string result = ToString(decimal, precision, scale);
+  std::string result = decimal.ToString(precision, scale);
   ASSERT_EQ(result, this->string_value_);
 }
 
 TEST_F(DecimalTestFixture, TestFromString) {
-  Int128 expected(this->integer_value_);
-  Int128 result;
+  Decimal128 expected(this->integer_value_);
+  Decimal128 result;
   int precision, scale;
-  ASSERT_OK(FromString(this->string_value_, &result, &precision, &scale));
+  ASSERT_OK(Decimal128::FromString(this->string_value_, &result, &precision, &scale));
   ASSERT_EQ(result, expected);
   ASSERT_EQ(precision, 8);
   ASSERT_EQ(scale, 5);
@@ -56,10 +54,10 @@ TEST_F(DecimalTestFixture, TestFromString) {
 
 TEST_F(DecimalTestFixture, TestStringStartingWithPlus) {
   std::string plus_value("+234.234");
-  Int128 out;
+  Decimal128 out;
   int scale;
   int precision;
-  ASSERT_OK(FromString(plus_value, &out, &precision, &scale));
+  ASSERT_OK(Decimal128::FromString(plus_value, &out, &precision, &scale));
   ASSERT_EQ(234234, out);
   ASSERT_EQ(6, precision);
   ASSERT_EQ(3, scale);
@@ -67,39 +65,20 @@ TEST_F(DecimalTestFixture, TestStringStartingWithPlus) {
 
 TEST_F(DecimalTestFixture, TestStringStartingWithPlus128) {
   std::string plus_value("+2342394230592.232349023094");
-  Int128 expected_value("2342394230592232349023094");
-  Int128 out;
+  Decimal128 expected_value("2342394230592232349023094");
+  Decimal128 out;
   int scale;
   int precision;
-  ASSERT_OK(FromString(plus_value, &out, &precision, &scale));
+  ASSERT_OK(Decimal128::FromString(plus_value, &out, &precision, &scale));
   ASSERT_EQ(expected_value, out);
   ASSERT_EQ(25, precision);
   ASSERT_EQ(12, scale);
 }
 
-TEST(DecimalTest, TestStringToInt32) {
-  Int128 value;
-  StringToInteger("123", "456", 1, &value);
-  ASSERT_EQ(value, 123456);
-}
-
-TEST(DecimalTest, TestStringToInt64) {
-  Int128 value;
-  StringToInteger("123456789", "456", -1, &value);
-  ASSERT_EQ(value, -123456789456);
-}
-
-TEST(DecimalTest, TestStringToInt128) {
-  Int128 value;
-  StringToInteger("123456789", "456789123", 1, &value);
-  ASSERT_EQ(value.high_bits(), 0);
-  ASSERT_EQ(value.low_bits(), 123456789456789123);
-}
-
-TEST(DecimalTest, TestFromString128) {
-  static const std::string string_value("-23049223942343532412");
-  Int128 result(string_value);
-  Int128 expected(static_cast<int64_t>(-230492239423435324));
+TEST(DecimalTest, TestFromStringDecimal128) {
+  std::string string_value("-23049223942343532412");
+  Decimal128 result(string_value);
+  Decimal128 expected(static_cast<int64_t>(-230492239423435324));
   ASSERT_EQ(result, expected * 100 - 12);
 
   // Sanity check that our number is actually using more than 64 bits
@@ -108,9 +87,9 @@ TEST(DecimalTest, TestFromString128) {
 
 TEST(DecimalTest, TestFromDecimalString128) {
   std::string string_value("-23049223942343.532412");
-  Int128 result;
-  ASSERT_OK(FromString(string_value, &result));
-  Int128 expected(static_cast<int64_t>(-230492239423435324));
+  Decimal128 result;
+  ASSERT_OK(Decimal128::FromString(string_value, &result));
+  Decimal128 expected(static_cast<int64_t>(-230492239423435324));
   expected *= 100;
   expected -= 12;
   ASSERT_EQ(result, expected);
@@ -120,109 +99,109 @@ TEST(DecimalTest, TestFromDecimalString128) {
 }
 
 TEST(DecimalTest, TestDecimal32SignedRoundTrip) {
-  Int128 expected("-3402692");
+  Decimal128 expected("-3402692");
 
   std::array<uint8_t, 16> bytes;
   ASSERT_OK(expected.ToBytes(&bytes));
 
-  Int128 result(bytes.data());
+  Decimal128 result(bytes.data());
   ASSERT_EQ(expected, result);
 }
 
 TEST(DecimalTest, TestDecimal64SignedRoundTrip) {
-  Int128 expected;
+  Decimal128 expected;
   std::string string_value("-34034293045.921");
-  ASSERT_OK(FromString(string_value, &expected));
+  ASSERT_OK(Decimal128::FromString(string_value, &expected));
 
   std::array<uint8_t, 16> bytes;
   ASSERT_OK(expected.ToBytes(&bytes));
 
-  Int128 result(bytes.data());
+  Decimal128 result(bytes.data());
 
   ASSERT_EQ(expected, result);
 }
 
 TEST(DecimalTest, TestDecimalStringAndBytesRoundTrip) {
-  Int128 expected;
+  Decimal128 expected;
   std::string string_value("-340282366920938463463374607431.711455");
-  ASSERT_OK(FromString(string_value, &expected));
+  ASSERT_OK(Decimal128::FromString(string_value, &expected));
 
   std::string expected_string_value("-340282366920938463463374607431711455");
-  Int128 expected_underlying_value(expected_string_value);
+  Decimal128 expected_underlying_value(expected_string_value);
 
   ASSERT_EQ(expected, expected_underlying_value);
 
   std::array<uint8_t, 16> bytes;
   ASSERT_OK(expected.ToBytes(&bytes));
 
-  Int128 result(bytes.data());
+  Decimal128 result(bytes.data());
 
   ASSERT_EQ(expected, result);
 }
 
 TEST(DecimalTest, TestInvalidInputMinus) {
   std::string invalid_value("-");
-  Int128 out;
-  Status status = DecimalUtil::FromString(invalid_value, &out);
+  Decimal128 out;
+  Status status = Decimal128::FromString(invalid_value, &out);
   ASSERT_RAISES(Invalid, status);
 }
 
 TEST(DecimalTest, TestInvalidInputDot) {
   std::string invalid_value("0.0.0");
-  Int128 out;
-  Status status = DecimalUtil::FromString(invalid_value, &out);
+  Decimal128 out;
+  Status status = Decimal128::FromString(invalid_value, &out);
   ASSERT_RAISES(Invalid, status);
 }
 
 TEST(DecimalTest, TestInvalidInputEmbeddedMinus) {
   std::string invalid_value("0-13-32");
-  Int128 out;
-  Status status = DecimalUtil::FromString(invalid_value, &out);
+  Decimal128 out;
+  Status status = Decimal128::FromString(invalid_value, &out);
   ASSERT_RAISES(Invalid, status);
 }
 
 TEST(DecimalTest, TestInvalidInputSingleChar) {
   std::string invalid_value("a");
-  Int128 out;
-  Status status = DecimalUtil::FromString(invalid_value, &out);
+  Decimal128 out;
+  Status status = Decimal128::FromString(invalid_value, &out);
   ASSERT_RAISES(Invalid, status);
 }
 
 TEST(DecimalTest, TestInvalidInputWithValidSubstring) {
   std::string invalid_value("-23092.235-");
-  Int128 out;
-  Status status = DecimalUtil::FromString(invalid_value, &out);
+  Decimal128 out;
+  Status status = Decimal128::FromString(invalid_value, &out);
   auto msg = status.message();
   ASSERT_RAISES(Invalid, status);
 }
 
 TEST(DecimalTest, TestInvalidInputWithMinusPlus) {
   std::string invalid_value("-+23092.235");
-  Int128 out;
-  Status status = DecimalUtil::FromString(invalid_value, &out);
+  Decimal128 out;
+  Status status = Decimal128::FromString(invalid_value, &out);
   ASSERT_RAISES(Invalid, status);
 }
 
 TEST(DecimalTest, TestInvalidInputWithPlusMinus) {
   std::string invalid_value("+-23092.235");
-  Int128 out;
-  Status status = DecimalUtil::FromString(invalid_value, &out);
+  Decimal128 out;
+  Status status = Decimal128::FromString(invalid_value, &out);
   ASSERT_RAISES(Invalid, status);
 }
 
 TEST(DecimalTest, TestInvalidInputWithLeadingZeros) {
   std::string invalid_value("00a");
-  Int128 out;
-  Status status = DecimalUtil::FromString(invalid_value, &out);
+  Decimal128 out;
+  Status status = Decimal128::FromString(invalid_value, &out);
   ASSERT_RAISES(Invalid, status);
 }
 
 TEST(DecimalZerosTest, LeadingZerosNoDecimalPoint) {
   std::string string_value("0000000");
-  Int128 d;
+  Decimal128 d;
   int precision;
   int scale;
-  ASSERT_OK(FromString(string_value, &d, &precision, &scale));
+  ASSERT_OK(Decimal128::FromString(string_value, &d, &precision, &scale));
   ASSERT_EQ(precision, 7);
   ASSERT_EQ(scale, 0);
   ASSERT_EQ(d, 0);
@@ -230,10 +209,10 @@ TEST(DecimalZerosTest, LeadingZerosNoDecimalPoint) {
 
 TEST(DecimalZerosTest, LeadingZerosDecimalPoint) {
   std::string string_value("000.0000");
-  Int128 d;
+  Decimal128 d;
   int precision;
   int scale;
-  ASSERT_OK(FromString(string_value, &d, &precision, &scale));
+  ASSERT_OK(Decimal128::FromString(string_value, &d, &precision, &scale));
   // We explicitly do not support this for now, otherwise this would be ASSERT_EQ
   ASSERT_NE(precision, 7);
 
@@ -243,44 +222,43 @@ TEST(DecimalZerosTest, LeadingZerosDecimalPoint) {
 
 TEST(DecimalZerosTest, NoLeadingZerosDecimalPoint) {
   std::string string_value(".00000");
-  Int128 d;
+  Decimal128 d;
   int precision;
   int scale;
-  ASSERT_OK(FromString(string_value, &d, &precision, &scale));
+  ASSERT_OK(Decimal128::FromString(string_value, &d, &precision, &scale));
   ASSERT_EQ(precision, 5);
   ASSERT_EQ(scale, 5);
   ASSERT_EQ(d, 0);
 }
 
 template <typename T>
-class Int128Test : public ::testing::Test {
+class Decimal128Test : public ::testing::Test {
  public:
-  Int128Test() : value_(42) {}
+  Decimal128Test() : value_(42) {}
   const T value_;
 };
 
-using Int128Types =
+using Decimal128Types =
     ::testing::Types<char, unsigned char, short, unsigned short,  // NOLINT
                      int, unsigned int, long, unsigned long,      // NOLINT
                      long long, unsigned long long                // NOLINT
                      >;
 
-TYPED_TEST_CASE(Int128Test, Int128Types);
+TYPED_TEST_CASE(Decimal128Test, Decimal128Types);
 
-TYPED_TEST(Int128Test, ConstructibleFromAnyIntegerType) {
-  Int128 value(this->value_);
+TYPED_TEST(Decimal128Test, ConstructibleFromAnyIntegerType) {
+  Decimal128 value(this->value_);
   ASSERT_EQ(42, value.low_bits());
 }
 
-TEST(Int128TestTrue, ConstructibleFromBool) {
-  Int128 value(true);
+TEST(Decimal128TestTrue, ConstructibleFromBool) {
+  Decimal128 value(true);
   ASSERT_EQ(1, value.low_bits());
 }
 
-TEST(Int128TestFalse, ConstructibleFromBool) {
-  Int128 value(false);
+TEST(Decimal128TestFalse, ConstructibleFromBool) {
+  Decimal128 value(false);
   ASSERT_EQ(0, value.low_bits());
 }
 
-}  // namespace DecimalUtil
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/util/decimal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/decimal.cc b/cpp/src/arrow/util/decimal.cc
index 7dd5807..123482e 100644
--- a/cpp/src/arrow/util/decimal.cc
+++ b/cpp/src/arrow/util/decimal.cc
@@ -17,40 +17,140 @@
 
 #include <algorithm>
 #include <cctype>
-#include <cstdlib>
+#include <cmath>
+#include <cstring>
+#include <limits>
 #include <sstream>
 
-#include "arrow/util/bit-util.h"
+#ifdef _MSC_VER
+#include <intrin.h>
+#pragma intrinsic(_BitScanReverse)
+#endif
+
 #include "arrow/util/decimal.h"
-#include "arrow/util/int128.h"
 #include "arrow/util/logging.h"
 
 namespace arrow {
-namespace DecimalUtil {
 
-void StringToInteger(const std::string& whole, const std::string& fractional, int8_t sign,
-                     Int128* out) {
-  DCHECK(sign == -1 || sign == 1);
-  DCHECK_NE(out, nullptr);
-  DCHECK(!whole.empty() || !fractional.empty());
-  *out = Int128(whole + fractional) * sign;
+static constexpr uint64_t kIntMask = 0xFFFFFFFF;
+static constexpr auto kCarryBit = static_cast<uint64_t>(1) << static_cast<uint64_t>(32);
+
+Decimal128::Decimal128(const std::string& str) : Decimal128() {
+  Status status(Decimal128::FromString(str, this));
+  DCHECK(status.ok()) << status.message();
+}
+
+Decimal128::Decimal128(const uint8_t* bytes)
+    : Decimal128(reinterpret_cast<const int64_t*>(bytes)[0],
+                 reinterpret_cast<const uint64_t*>(bytes)[1]) {}
+
+Status Decimal128::ToBytes(std::array<uint8_t, 16>* out) const {
+  if (out == nullptr) {
+    return Status::Invalid("Cannot fill nullptr of bytes from Decimal128");
+  }
+
+  const uint64_t raw[] = {static_cast<uint64_t>(high_bits_), low_bits_};
+  const auto* raw_data = reinterpret_cast<const uint8_t*>(raw);
+  std::copy(raw_data, raw_data + out->size(), out->begin());
+  return Status::OK();
+}
+
+std::string Decimal128::ToString(int precision, int scale) const {
+  using std::size_t;
+
+  const bool is_negative = *this < 0;
+
+  // Decimal values are sent to clients as strings so in the interest of
+  // speed the string will be created without the using stringstream with the
+  // whole/fractional_part().
+  size_t last_char_idx = precision + (scale > 0)  // Add a space for decimal place
+                         + (scale == precision)   // Add a space for leading 0
+                         + is_negative;           // Add a space for negative sign
+
+  std::string str(last_char_idx, '0');
+
+  // Start filling in the values in reverse order by taking the last digit
+  // of the value. Use a positive value and worry about the sign later. At this
+  // point the last_char_idx points to the string terminator.
+  Decimal128 remaining_value(*this);
+
+  const auto first_digit_idx = static_cast<size_t>(is_negative);
+  if (is_negative) {
+    remaining_value.Negate();
+  }
+
+  if (scale > 0) {
+    int remaining_scale = scale;
+    do {
+      str[--last_char_idx] =
+          static_cast<char>(remaining_value % 10 + '0');  // Ascii offset
+      remaining_value /= 10;
+    } while (--remaining_scale > 0);
+    str[--last_char_idx] = '.';
+    DCHECK_GT(last_char_idx, first_digit_idx) << "Not enough space remaining";
+  }
+
+  do {
+    str[--last_char_idx] = static_cast<char>(remaining_value % 10 + '0');  // Ascii offset
+    remaining_value /= 10;
+    if (remaining_value == 0) {
+      // Trim any extra leading 0's.
+      if (last_char_idx > first_digit_idx) {
+        str.erase(0, last_char_idx - first_digit_idx);
+      }
+
+      break;
+    }
+    // For safety, enforce string length independent of remaining_value.
+  } while (last_char_idx > first_digit_idx);
+
+  if (is_negative) {
+    str[0] = '-';
+  }
+
+  return str;
+}
+
+static void StringToInteger(const std::string& str, Decimal128* out) {
+  using std::size_t;
+
+  DCHECK_NE(out, nullptr) << "Decimal128 output variable cannot be nullptr";
+  DCHECK_EQ(*out, 0)
+      << "When converting a string to Decimal128 the initial output must be 0";
+
+  const size_t length = str.length();
+
+  DCHECK_GT(length, 0) << "length of parsed decimal string should be greater than 0";
+
+  size_t posn = 0;
+
+  while (posn < length) {
+    const size_t group = std::min(static_cast<size_t>(18), length - posn);
+    const auto chunk = static_cast<int64_t>(std::stoll(str.substr(posn, group)));
+    const auto multiple =
+        static_cast<int64_t>(std::pow(10.0, static_cast<double>(group)));
+
+    *out *= multiple;
+    *out += chunk;
+
+    posn += group;
+  }
 }
 
-Status FromString(const std::string& s, Int128* out, int* precision, int* scale) {
+Status Decimal128::FromString(const std::string& s, Decimal128* out, int* precision,
+                              int* scale) {
   // Implements this regex: "(\\+?|-?)((0*)(\\d*))(\\.(\\d+))?";
   if (s.empty()) {
     return Status::Invalid("Empty string cannot be converted to decimal");
   }
 
-  int8_t sign = 1;
   std::string::const_iterator charp = s.cbegin();
   std::string::const_iterator end = s.cend();
 
   char first_char = *charp;
+  bool is_negative = false;
   if (first_char == '+' || first_char == '-') {
-    if (first_char == '-') {
-      sign = -1;
-    }
+    is_negative = first_char == '-';
     ++charp;
   }
 
@@ -70,7 +170,7 @@ Status FromString(const std::string& s, Int128* out, int* precision, int* scale)
   // all zeros and no decimal point
   if (charp == end) {
     if (out != nullptr) {
-      *out = Int128(0);
+      *out = 0;
     }
 
     // Not sure what other libraries assign precision to for this case (this case of
@@ -146,64 +246,469 @@ Status FromString(const std::string& s, Int128* out, int* precision, int* scale)
   }
 
   if (out != nullptr) {
-    StringToInteger(whole_part, fractional_part, sign, out);
+    // zero out in case we've passed in a previously used value
+    *out = 0;
+    StringToInteger(whole_part + fractional_part, out);
+    if (is_negative) {
+      out->Negate();
+    }
   }
 
   return Status::OK();
 }
 
-std::string ToString(const Int128& decimal_value, int precision, int scale) {
-  const bool is_negative = decimal_value < 0;
+Decimal128& Decimal128::Negate() {
+  low_bits_ = ~low_bits_ + 1;
+  high_bits_ = ~high_bits_;
+  if (low_bits_ == 0) {
+    ++high_bits_;
+  }
+  return *this;
+}
 
-  // Decimal values are sent to clients as strings so in the interest of
-  // speed the string will be created without the using stringstream with the
-  // whole/fractional_part().
-  size_t last_char_idx = precision + (scale > 0)  // Add a space for decimal place
-                         + (scale == precision)   // Add a space for leading 0
-                         + is_negative;           // Add a space for negative sign
-  std::string str(last_char_idx, '0');
+Decimal128& Decimal128::operator+=(const Decimal128& right) {
+  const uint64_t sum = low_bits_ + right.low_bits_;
+  high_bits_ += right.high_bits_;
+  if (sum < low_bits_) {
+    ++high_bits_;
+  }
+  low_bits_ = sum;
+  return *this;
+}
 
-  // Start filling in the values in reverse order by taking the last digit
-  // of the value. Use a positive value and worry about the sign later. At this
-  // point the last_char_idx points to the string terminator.
-  Int128 remaining_value(decimal_value);
+Decimal128& Decimal128::operator-=(const Decimal128& right) {
+  const uint64_t diff = low_bits_ - right.low_bits_;
+  high_bits_ -= right.high_bits_;
+  if (diff > low_bits_) {
+    --high_bits_;
+  }
+  low_bits_ = diff;
+  return *this;
+}
 
-  const auto first_digit_idx = static_cast<size_t>(is_negative);
-  if (is_negative) {
-    remaining_value.Negate();
+Decimal128& Decimal128::operator/=(const Decimal128& right) {
+  Decimal128 remainder;
+  DCHECK(Divide(right, this, &remainder).ok());
+  return *this;
+}
+
+Decimal128::operator char() const {
+  DCHECK(high_bits_ == 0 || high_bits_ == -1)
+      << "Trying to cast an Decimal128 greater than the value range of a "
+         "char. high_bits_ must be equal to 0 or -1, got: "
+      << high_bits_;
+  DCHECK_LE(low_bits_, std::numeric_limits<char>::max())
+      << "low_bits_ too large for C type char, got: " << low_bits_;
+  return static_cast<char>(low_bits_);
+}
+
+Decimal128& Decimal128::operator|=(const Decimal128& right) {
+  low_bits_ |= right.low_bits_;
+  high_bits_ |= right.high_bits_;
+  return *this;
+}
+
+Decimal128& Decimal128::operator&=(const Decimal128& right) {
+  low_bits_ &= right.low_bits_;
+  high_bits_ &= right.high_bits_;
+  return *this;
+}
+
+Decimal128& Decimal128::operator<<=(uint32_t bits) {
+  if (bits != 0) {
+    if (bits < 64) {
+      high_bits_ <<= bits;
+      high_bits_ |= (low_bits_ >> (64 - bits));
+      low_bits_ <<= bits;
+    } else if (bits < 128) {
+      high_bits_ = static_cast<int64_t>(low_bits_) << (bits - 64);
+      low_bits_ = 0;
+    } else {
+      high_bits_ = 0;
+      low_bits_ = 0;
+    }
   }
+  return *this;
+}
 
-  if (scale > 0) {
-    int remaining_scale = scale;
-    do {
-      str[--last_char_idx] =
-          static_cast<char>(remaining_value % 10 + '0');  // Ascii offset
-      remaining_value /= 10;
-    } while (--remaining_scale > 0);
-    str[--last_char_idx] = '.';
-    DCHECK_GT(last_char_idx, first_digit_idx) << "Not enough space remaining";
+Decimal128& Decimal128::operator>>=(uint32_t bits) {
+  if (bits != 0) {
+    if (bits < 64) {
+      low_bits_ >>= bits;
+      low_bits_ |= static_cast<uint64_t>(high_bits_ << (64 - bits));
+      high_bits_ = static_cast<int64_t>(static_cast<uint64_t>(high_bits_) >> bits);
+    } else if (bits < 128) {
+      low_bits_ = static_cast<uint64_t>(high_bits_ >> (bits - 64));
+      high_bits_ = static_cast<int64_t>(high_bits_ >= 0L ? 0L : -1L);
+    } else {
+      high_bits_ = static_cast<int64_t>(high_bits_ >= 0L ? 0L : -1L);
+      low_bits_ = static_cast<uint64_t>(high_bits_);
+    }
   }
+  return *this;
+}
 
-  do {
-    str[--last_char_idx] = static_cast<char>(remaining_value % 10 + '0');  // Ascii offset
-    remaining_value /= 10;
-    if (remaining_value == 0) {
-      // Trim any extra leading 0's.
-      if (last_char_idx > first_digit_idx) {
-        str.erase(0, last_char_idx - first_digit_idx);
-      }
+Decimal128& Decimal128::operator*=(const Decimal128& right) {
+  // Break the left and right numbers into 32 bit chunks
+  // so that we can multiply them without overflow.
+  const uint64_t L0 = static_cast<uint64_t>(high_bits_) >> 32;
+  const uint64_t L1 = static_cast<uint64_t>(high_bits_) & kIntMask;
+  const uint64_t L2 = low_bits_ >> 32;
+  const uint64_t L3 = low_bits_ & kIntMask;
+
+  const uint64_t R0 = static_cast<uint64_t>(right.high_bits_) >> 32;
+  const uint64_t R1 = static_cast<uint64_t>(right.high_bits_) & kIntMask;
+  const uint64_t R2 = right.low_bits_ >> 32;
+  const uint64_t R3 = right.low_bits_ & kIntMask;
+
+  uint64_t product = L3 * R3;
+  low_bits_ = product & kIntMask;
+
+  uint64_t sum = product >> 32;
+
+  product = L2 * R3;
+  sum += product;
+
+  product = L3 * R2;
+  sum += product;
+
+  low_bits_ += sum << 32;
+
+  high_bits_ = static_cast<int64_t>(sum < product ? kCarryBit : 0);
+  if (sum < product) {
+    high_bits_ += kCarryBit;
+  }
+
+  high_bits_ += static_cast<int64_t>(sum >> 32);
+  high_bits_ += L1 * R3 + L2 * R2 + L3 * R1;
+  high_bits_ += (L0 * R3 + L1 * R2 + L2 * R1 + L3 * R0) << 32;
+  return *this;
+}
+
+/// Expands the given value into an array of ints so that we can work on
+/// it. The array will be converted to an absolute value and the wasNegative
+/// flag will be set appropriately. The array will remove leading zeros from
+/// the value.
+/// \param array an array of length 4 to set with the value
+/// \param was_negative a flag for whether the value was original negative
+/// \result the output length of the array
+static int64_t FillInArray(const Decimal128& value, uint32_t* array, bool& was_negative) {
+  uint64_t high;
+  uint64_t low;
+  const int64_t highbits = value.high_bits();
+  const uint64_t lowbits = value.low_bits();
+
+  if (highbits < 0) {
+    low = ~lowbits + 1;
+    high = static_cast<uint64_t>(~highbits);
+    if (low == 0) {
+      ++high;
+    }
+    was_negative = true;
+  } else {
+    low = lowbits;
+    high = static_cast<uint64_t>(highbits);
+    was_negative = false;
+  }
+
+  if (high != 0) {
+    if (high > std::numeric_limits<uint32_t>::max()) {
+      array[0] = static_cast<uint32_t>(high >> 32);
+      array[1] = static_cast<uint32_t>(high);
+      array[2] = static_cast<uint32_t>(low >> 32);
+      array[3] = static_cast<uint32_t>(low);
+      return 4;
+    }
+
+    array[0] = static_cast<uint32_t>(high);
+    array[1] = static_cast<uint32_t>(low >> 32);
+    array[2] = static_cast<uint32_t>(low);
+    return 3;
+  }
 
+  if (low >= std::numeric_limits<uint32_t>::max()) {
+    array[0] = static_cast<uint32_t>(low >> 32);
+    array[1] = static_cast<uint32_t>(low);
+    return 2;
+  }
+
+  if (low == 0) {
+    return 0;
+  }
+
+  array[0] = static_cast<uint32_t>(low);
+  return 1;
+}
+
+/// \brief Find last set bit in a 32 bit integer. Bit 1 is the LSB and bit 32 is the MSB.
+static int64_t FindLastSetBit(uint32_t value) {
+#if defined(__clang__) || defined(__GNUC__)
+  // Count leading zeros
+  return __builtin_clz(value) + 1;
+#elif defined(_MSC_VER)
+  unsigned long index;                                         // NOLINT
+  _BitScanReverse(&index, static_cast<unsigned long>(value));  // NOLINT
+  return static_cast<int64_t>(index + 1UL);
+#endif
+}
+
+/// Shift the number in the array left by bits positions.
+/// \param array the number to shift, must have length elements
+/// \param length the number of entries in the array
+/// \param bits the number of bits to shift (0 <= bits < 32)
+static void ShiftArrayLeft(uint32_t* array, int64_t length, int64_t bits) {
+  if (length > 0 && bits != 0) {
+    for (int64_t i = 0; i < length - 1; ++i) {
+      array[i] = (array[i] << bits) | (array[i + 1] >> (32 - bits));
+    }
+    array[length - 1] <<= bits;
+  }
+}
+
+/// Shift the number in the array right by bits positions.
+/// \param array the number to shift, must have length elements
+/// \param length the number of entries in the array
+/// \param bits the number of bits to shift (0 <= bits < 32)
+static void ShiftArrayRight(uint32_t* array, int64_t length, int64_t bits) {
+  if (length > 0 && bits != 0) {
+    for (int64_t i = length - 1; i > 0; --i) {
+      array[i] = (array[i] >> bits) | (array[i - 1] << (32 - bits));
+    }
+    array[0] >>= bits;
+  }
+}
+
+/// \brief Fix the signs of the result and remainder at the end of the division based on
+/// the signs of the dividend and divisor.
+static void FixDivisionSigns(Decimal128* result, Decimal128* remainder,
+                             bool dividend_was_negative, bool divisor_was_negative) {
+  if (dividend_was_negative != divisor_was_negative) {
+    result->Negate();
+  }
+
+  if (dividend_was_negative) {
+    remainder->Negate();
+  }
+}
+
+/// \brief Build a Decimal128 from a list of ints.
+static Status BuildFromArray(Decimal128* value, uint32_t* array, int64_t length) {
+  switch (length) {
+    case 0:
+      *value = {static_cast<int64_t>(0)};
+      break;
+    case 1:
+      *value = {static_cast<int64_t>(array[0])};
+      break;
+    case 2:
+      *value = {static_cast<int64_t>(0),
+                (static_cast<uint64_t>(array[0]) << 32) + array[1]};
+      break;
+    case 3:
+      *value = {static_cast<int64_t>(array[0]),
+                (static_cast<uint64_t>(array[1]) << 32) + array[2]};
+      break;
+    case 4:
+      *value = {(static_cast<int64_t>(array[0]) << 32) + array[1],
+                (static_cast<uint64_t>(array[2]) << 32) + array[3]};
+      break;
+    case 5:
+      if (array[0] != 0) {
+        return Status::Invalid("Can't build Decimal128 with 5 ints.");
+      }
+      *value = {(static_cast<int64_t>(array[1]) << 32) + array[2],
+                (static_cast<uint64_t>(array[3]) << 32) + array[4]};
       break;
+    default:
+      return Status::Invalid("Unsupported length for building Decimal128");
+  }
+
+  return Status::OK();
+}
+
+/// \brief Do a division where the divisor fits into a single 32 bit value.
+static Status SingleDivide(const uint32_t* dividend, int64_t dividend_length,
+                           uint32_t divisor, Decimal128* remainder,
+                           bool dividend_was_negative, bool divisor_was_negative,
+                           Decimal128* result) {
+  uint64_t r = 0;
+  uint32_t result_array[5];
+  for (int64_t j = 0; j < dividend_length; j++) {
+    r <<= 32;
+    r += dividend[j];
+    result_array[j] = static_cast<uint32_t>(r / divisor);
+    r %= divisor;
+  }
+  RETURN_NOT_OK(BuildFromArray(result, result_array, dividend_length));
+  *remainder = static_cast<int64_t>(r);
+  FixDivisionSigns(result, remainder, dividend_was_negative, divisor_was_negative);
+  return Status::OK();
+}
+
+Status Decimal128::Divide(const Decimal128& divisor, Decimal128* result,
+                          Decimal128* remainder) const {
+  // Split the dividend and divisor into integer pieces so that we can
+  // work on them.
+  uint32_t dividend_array[5];
+  uint32_t divisor_array[4];
+  bool dividend_was_negative;
+  bool divisor_was_negative;
+  // leave an extra zero before the dividend
+  dividend_array[0] = 0;
+  int64_t dividend_length =
+      FillInArray(*this, dividend_array + 1, dividend_was_negative) + 1;
+  int64_t divisor_length = FillInArray(divisor, divisor_array, divisor_was_negative);
+
+  // Handle some of the easy cases.
+  if (dividend_length <= divisor_length) {
+    *remainder = *this;
+    *result = 0;
+    return Status::OK();
+  }
+
+  if (divisor_length == 0) {
+    return Status::Invalid("Division by 0 in Decimal128");
+  }
+
+  if (divisor_length == 1) {
+    return SingleDivide(dividend_array, dividend_length, divisor_array[0], remainder,
+                        dividend_was_negative, divisor_was_negative, result);
+  }
+
+  int64_t result_length = dividend_length - divisor_length;
+  uint32_t result_array[4];
+
+  // Normalize by shifting both by a multiple of 2 so that
+  // the digit guessing is better. The requirement is that
+  // divisor_array[0] is greater than 2**31.
+  int64_t normalize_bits = 32 - FindLastSetBit(divisor_array[0]);
+  ShiftArrayLeft(divisor_array, divisor_length, normalize_bits);
+  ShiftArrayLeft(dividend_array, dividend_length, normalize_bits);
+
+  // compute each digit in the result
+  for (int64_t j = 0; j < result_length; ++j) {
+    // Guess the next digit. At worst it is two too large
+    uint32_t guess = std::numeric_limits<uint32_t>::max();
+    auto high_dividend =
+        static_cast<uint64_t>(dividend_array[j]) << 32 | dividend_array[j + 1];
+    if (dividend_array[j] != divisor_array[0]) {
+      guess = static_cast<uint32_t>(high_dividend / divisor_array[0]);
     }
-    // For safety, enforce string length independent of remaining_value.
-  } while (last_char_idx > first_digit_idx);
 
-  if (is_negative) {
-    str[0] = '-';
+    // catch all of the cases where guess is two too large and most of the
+    // cases where it is one too large
+    auto rhat = static_cast<uint32_t>(high_dividend -
+                                      guess * static_cast<uint64_t>(divisor_array[0]));
+    while (static_cast<uint64_t>(divisor_array[1]) * guess >
+           (static_cast<uint64_t>(rhat) << 32) + dividend_array[j + 2]) {
+      --guess;
+      rhat += divisor_array[0];
+      if (static_cast<uint64_t>(rhat) < divisor_array[0]) {
+        break;
+      }
+    }
+
+    // subtract off the guess * divisor from the dividend
+    uint64_t mult = 0;
+    for (int64_t i = divisor_length - 1; i >= 0; --i) {
+      mult += static_cast<uint64_t>(guess) * divisor_array[i];
+      uint32_t prev = dividend_array[j + i + 1];
+      dividend_array[j + i + 1] -= static_cast<uint32_t>(mult);
+      mult >>= 32;
+      if (dividend_array[j + i + 1] > prev) {
+        ++mult;
+      }
+    }
+    uint32_t prev = dividend_array[j];
+    dividend_array[j] -= static_cast<uint32_t>(mult);
+
+    // if guess was too big, we add back divisor
+    if (dividend_array[j] > prev) {
+      --guess;
+
+      uint32_t carry = 0;
+      for (int64_t i = divisor_length - 1; i >= 0; --i) {
+        uint64_t sum =
+            static_cast<uint64_t>(divisor_array[i]) + dividend_array[j + i + 1] + carry;
+        dividend_array[j + i + 1] = static_cast<uint32_t>(sum);
+        carry = static_cast<uint32_t>(sum >> 32);
+      }
+      dividend_array[j] += carry;
+    }
+
+    result_array[j] = guess;
   }
 
-  return str;
+  // denormalize the remainder
+  ShiftArrayRight(dividend_array, dividend_length, normalize_bits);
+
+  // return result and remainder
+  RETURN_NOT_OK(BuildFromArray(result, result_array, result_length));
+  RETURN_NOT_OK(BuildFromArray(remainder, dividend_array, dividend_length));
+  FixDivisionSigns(result, remainder, dividend_was_negative, divisor_was_negative);
+  return Status::OK();
+}
+
+bool operator==(const Decimal128& left, const Decimal128& right) {
+  return left.high_bits() == right.high_bits() && left.low_bits() == right.low_bits();
+}
+
+bool operator!=(const Decimal128& left, const Decimal128& right) {
+  return !operator==(left, right);
+}
+
+bool operator<(const Decimal128& left, const Decimal128& right) {
+  return left.high_bits() < right.high_bits() ||
+         (left.high_bits() == right.high_bits() && left.low_bits() < right.low_bits());
+}
+
+bool operator<=(const Decimal128& left, const Decimal128& right) {
+  return !operator>(left, right);
+}
+
+bool operator>(const Decimal128& left, const Decimal128& right) {
+  return operator<(right, left);
+}
+
+bool operator>=(const Decimal128& left, const Decimal128& right) {
+  return !operator<(left, right);
+}
+
+Decimal128 operator-(const Decimal128& operand) {
+  Decimal128 result(operand.high_bits(), operand.low_bits());
+  return result.Negate();
+}
+
+Decimal128 operator+(const Decimal128& left, const Decimal128& right) {
+  Decimal128 result(left.high_bits(), left.low_bits());
+  result += right;
+  return result;
+}
+
+Decimal128 operator-(const Decimal128& left, const Decimal128& right) {
+  Decimal128 result(left.high_bits(), left.low_bits());
+  result -= right;
+  return result;
+}
+
+Decimal128 operator*(const Decimal128& left, const Decimal128& right) {
+  Decimal128 result(left.high_bits(), left.low_bits());
+  result *= right;
+  return result;
+}
+
+Decimal128 operator/(const Decimal128& left, const Decimal128& right) {
+  Decimal128 remainder;
+  Decimal128 result;
+  DCHECK(left.Divide(right, &result, &remainder).ok());
+  return result;
+}
+
+Decimal128 operator%(const Decimal128& left, const Decimal128& right) {
+  Decimal128 remainder;
+  Decimal128 result;
+  DCHECK(left.Divide(right, &result, &remainder).ok());
+  return remainder;
 }
 
-}  // namespace DecimalUtil
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/util/decimal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/decimal.h b/cpp/src/arrow/util/decimal.h
index 2e94877..ff30d44 100644
--- a/cpp/src/arrow/util/decimal.h
+++ b/cpp/src/arrow/util/decimal.h
@@ -18,25 +18,125 @@
 #ifndef ARROW_DECIMAL_H
 #define ARROW_DECIMAL_H
 
+#include <array>
+#include <cstdint>
 #include <string>
+#include <type_traits>
 
 #include "arrow/status.h"
+#include "arrow/util/visibility.h"
 
 namespace arrow {
 
-class Int128;
+/// Represents a signed 128-bit integer in two's complement.
+/// Calculations wrap around and overflow is ignored.
+///
+/// For a discussion of the algorithms, look at Knuth's volume 2,
+/// Semi-numerical Algorithms section 4.3.1.
+///
+/// Adapted from the Apache ORC C++ implementation
+class ARROW_EXPORT Decimal128 {
+ public:
+  /// \brief Create an Decimal128 from the two's complement representation.
+  constexpr Decimal128(int64_t high, uint64_t low) : high_bits_(high), low_bits_(low) {}
 
-namespace DecimalUtil {
+  /// \brief Empty constructor creates an Decimal128 with a value of 0.
+  constexpr Decimal128() : Decimal128(0, 0) {}
 
-ARROW_EXPORT void StringToInteger(const std::string& whole, const std::string& fractional,
-                                  int8_t sign, Int128* out);
+  /// \brief Convert any integer value into an Decimal128.
+  template <typename T,
+            typename = typename std::enable_if<std::is_integral<T>::value, T>::type>
+  constexpr Decimal128(T value)
+      : Decimal128(static_cast<int64_t>(value) >= 0 ? 0 : -1,
+                   static_cast<uint64_t>(value)) {}
 
-ARROW_EXPORT Status FromString(const std::string& string, Int128* out,
-                               int* precision = nullptr, int* scale = nullptr);
+  /// \brief Parse the number from a base 10 string representation.
+  explicit Decimal128(const std::string& value);
 
-ARROW_EXPORT std::string ToString(const Int128& decimal_value, int precision, int scale);
+  /// \brief Create an Decimal128 from an array of bytes
+  explicit Decimal128(const uint8_t* bytes);
+
+  /// \brief Negate the current value
+  Decimal128& Negate();
+
+  /// \brief Add a number to this one. The result is truncated to 128 bits.
+  Decimal128& operator+=(const Decimal128& right);
+
+  /// \brief Subtract a number from this one. The result is truncated to 128 bits.
+  Decimal128& operator-=(const Decimal128& right);
+
+  /// \brief Multiply this number by another number. The result is truncated to 128 bits.
+  Decimal128& operator*=(const Decimal128& right);
+
+  /// Divide this number by right and return the result. This operation is
+  /// not destructive.
+  /// The answer rounds to zero. Signs work like:
+  ///   21 /  5 ->  4,  1
+  ///  -21 /  5 -> -4, -1
+  ///   21 / -5 -> -4,  1
+  ///  -21 / -5 ->  4, -1
+  /// @param right the number to divide by
+  /// @param remainder the remainder after the division
+  Status Divide(const Decimal128& divisor, Decimal128* result,
+                Decimal128* remainder) const;
+
+  /// \brief In-place division.
+  Decimal128& operator/=(const Decimal128& right);
+
+  /// \brief Cast the value to char. This is used when converting the value a string.
+  explicit operator char() const;
+
+  /// \brief Bitwise or between two Decimal128.
+  Decimal128& operator|=(const Decimal128& right);
+
+  /// \brief Bitwise and between two Decimal128.
+  Decimal128& operator&=(const Decimal128& right);
+
+  /// \brief Shift left by the given number of bits.
+  Decimal128& operator<<=(uint32_t bits);
+
+  /// \brief Shift right by the given number of bits. Negative values will
+  Decimal128& operator>>=(uint32_t bits);
+
+  /// \brief Get the high bits of the two's complement representation of the number.
+  int64_t high_bits() const { return high_bits_; }
+
+  /// \brief Get the low bits of the two's complement representation of the number.
+  uint64_t low_bits() const { return low_bits_; }
+
+  /// \brief Put the raw bytes of the value into a pointer to uint8_t.
+  Status ToBytes(std::array<uint8_t, 16>* out) const;
+
+  /// \brief Convert the Decimal128 value to a base 10 decimal string with the given
+  /// precision
+  /// and scale.
+  std::string ToString(int precision, int scale) const;
+
+  /// \brief Convert a decimal string to an Decimal128 value, optionally including
+  /// precision
+  /// and scale if they're passed in and not null.
+  static Status FromString(const std::string& s, Decimal128* out,
+                           int* precision = nullptr, int* scale = nullptr);
+
+ private:
+  int64_t high_bits_;
+  uint64_t low_bits_;
+};
+
+ARROW_EXPORT bool operator==(const Decimal128& left, const Decimal128& right);
+ARROW_EXPORT bool operator!=(const Decimal128& left, const Decimal128& right);
+ARROW_EXPORT bool operator<(const Decimal128& left, const Decimal128& right);
+ARROW_EXPORT bool operator<=(const Decimal128& left, const Decimal128& right);
+ARROW_EXPORT bool operator>(const Decimal128& left, const Decimal128& right);
+ARROW_EXPORT bool operator>=(const Decimal128& left, const Decimal128& right);
+
+ARROW_EXPORT Decimal128 operator-(const Decimal128& operand);
+ARROW_EXPORT Decimal128 operator+(const Decimal128& left, const Decimal128& right);
+ARROW_EXPORT Decimal128 operator-(const Decimal128& left, const Decimal128& right);
+ARROW_EXPORT Decimal128 operator*(const Decimal128& left, const Decimal128& right);
+ARROW_EXPORT Decimal128 operator/(const Decimal128& left, const Decimal128& right);
+ARROW_EXPORT Decimal128 operator%(const Decimal128& left, const Decimal128& right);
 
-}  // namespace DecimalUtil
 }  // namespace arrow
 
-#endif  // ARROW_DECIMAL_H
+#endif  //  ARROW_DECIMAL_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/util/int128.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/int128.cc b/cpp/src/arrow/util/int128.cc
deleted file mode 100644
index d5659a8..0000000
--- a/cpp/src/arrow/util/int128.cc
+++ /dev/null
@@ -1,527 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <algorithm>
-#include <cmath>
-#include <cstring>
-#include <limits>
-
-#ifdef _MSC_VER
-#include <intrin.h>
-#pragma intrinsic(_BitScanReverse)
-#endif
-
-#include "arrow/util/int128.h"
-#include "arrow/util/logging.h"
-
-namespace arrow {
-
-static constexpr uint64_t kIntMask = 0xFFFFFFFF;
-static constexpr auto kCarryBit = static_cast<uint64_t>(1) << static_cast<uint64_t>(32);
-
-Int128::Int128(const std::string& str) : Int128() {
-  const size_t length = str.length();
-
-  if (length > 0) {
-    bool is_negative = str[0] == '-';
-    auto posn = static_cast<size_t>(is_negative);
-
-    while (posn < length) {
-      const size_t group = std::min(static_cast<size_t>(18), length - posn);
-      const auto chunk = static_cast<int64_t>(std::stoll(str.substr(posn, group)));
-      const auto multiple =
-          static_cast<int64_t>(std::pow(10.0, static_cast<double>(group)));
-
-      *this *= multiple;
-      *this += chunk;
-
-      posn += group;
-    }
-
-    if (is_negative) {
-      Negate();
-    }
-  }
-}
-
-Int128::Int128(const uint8_t* bytes)
-    : Int128(reinterpret_cast<const int64_t*>(bytes)[0],
-             reinterpret_cast<const uint64_t*>(bytes)[1]) {}
-
-Status Int128::ToBytes(std::array<uint8_t, 16>* out) const {
-  if (out == nullptr) {
-    return Status::Invalid("Cannot fill nullptr of bytes from Int128");
-  }
-
-  const uint64_t raw[] = {static_cast<uint64_t>(high_bits_), low_bits_};
-  const auto* raw_data = reinterpret_cast<const uint8_t*>(raw);
-  std::copy(raw_data, raw_data + out->size(), out->begin());
-  return Status::OK();
-}
-
-Int128& Int128::Negate() {
-  low_bits_ = ~low_bits_ + 1;
-  high_bits_ = ~high_bits_;
-  if (low_bits_ == 0) {
-    ++high_bits_;
-  }
-  return *this;
-}
-
-Int128& Int128::operator+=(const Int128& right) {
-  const uint64_t sum = low_bits_ + right.low_bits_;
-  high_bits_ += right.high_bits_;
-  if (sum < low_bits_) {
-    ++high_bits_;
-  }
-  low_bits_ = sum;
-  return *this;
-}
-
-Int128& Int128::operator-=(const Int128& right) {
-  const uint64_t diff = low_bits_ - right.low_bits_;
-  high_bits_ -= right.high_bits_;
-  if (diff > low_bits_) {
-    --high_bits_;
-  }
-  low_bits_ = diff;
-  return *this;
-}
-
-Int128& Int128::operator/=(const Int128& right) {
-  Int128 remainder;
-  DCHECK(Divide(right, this, &remainder).ok());
-  return *this;
-}
-
-Int128::operator char() const {
-  DCHECK(high_bits_ == 0 || high_bits_ == -1)
-      << "Trying to cast an Int128 greater than the value range of a "
-         "char. high_bits_ must be equal to 0 or -1, got: "
-      << high_bits_;
-  DCHECK_LE(low_bits_, std::numeric_limits<char>::max())
-      << "low_bits_ too large for C type char, got: " << low_bits_;
-  return static_cast<char>(low_bits_);
-}
-
-Int128& Int128::operator|=(const Int128& right) {
-  low_bits_ |= right.low_bits_;
-  high_bits_ |= right.high_bits_;
-  return *this;
-}
-
-Int128& Int128::operator&=(const Int128& right) {
-  low_bits_ &= right.low_bits_;
-  high_bits_ &= right.high_bits_;
-  return *this;
-}
-
-Int128& Int128::operator<<=(uint32_t bits) {
-  if (bits != 0) {
-    if (bits < 64) {
-      high_bits_ <<= bits;
-      high_bits_ |= (low_bits_ >> (64 - bits));
-      low_bits_ <<= bits;
-    } else if (bits < 128) {
-      high_bits_ = static_cast<int64_t>(low_bits_) << (bits - 64);
-      low_bits_ = 0;
-    } else {
-      high_bits_ = 0;
-      low_bits_ = 0;
-    }
-  }
-  return *this;
-}
-
-Int128& Int128::operator>>=(uint32_t bits) {
-  if (bits != 0) {
-    if (bits < 64) {
-      low_bits_ >>= bits;
-      low_bits_ |= static_cast<uint64_t>(high_bits_ << (64 - bits));
-      high_bits_ = static_cast<int64_t>(static_cast<uint64_t>(high_bits_) >> bits);
-    } else if (bits < 128) {
-      low_bits_ = static_cast<uint64_t>(high_bits_ >> (bits - 64));
-      high_bits_ = static_cast<int64_t>(high_bits_ >= 0L ? 0L : -1L);
-    } else {
-      high_bits_ = static_cast<int64_t>(high_bits_ >= 0L ? 0L : -1L);
-      low_bits_ = static_cast<uint64_t>(high_bits_);
-    }
-  }
-  return *this;
-}
-
-Int128& Int128::operator*=(const Int128& right) {
-  // Break the left and right numbers into 32 bit chunks
-  // so that we can multiply them without overflow.
-  const uint64_t L0 = static_cast<uint64_t>(high_bits_) >> 32;
-  const uint64_t L1 = static_cast<uint64_t>(high_bits_) & kIntMask;
-  const uint64_t L2 = low_bits_ >> 32;
-  const uint64_t L3 = low_bits_ & kIntMask;
-
-  const uint64_t R0 = static_cast<uint64_t>(right.high_bits_) >> 32;
-  const uint64_t R1 = static_cast<uint64_t>(right.high_bits_) & kIntMask;
-  const uint64_t R2 = right.low_bits_ >> 32;
-  const uint64_t R3 = right.low_bits_ & kIntMask;
-
-  uint64_t product = L3 * R3;
-  low_bits_ = product & kIntMask;
-
-  uint64_t sum = product >> 32;
-
-  product = L2 * R3;
-  sum += product;
-
-  product = L3 * R2;
-  sum += product;
-
-  low_bits_ += sum << 32;
-
-  high_bits_ = static_cast<int64_t>(sum < product ? kCarryBit : 0);
-  if (sum < product) {
-    high_bits_ += kCarryBit;
-  }
-
-  high_bits_ += static_cast<int64_t>(sum >> 32);
-  high_bits_ += L1 * R3 + L2 * R2 + L3 * R1;
-  high_bits_ += (L0 * R3 + L1 * R2 + L2 * R1 + L3 * R0) << 32;
-  return *this;
-}
-
-/// Expands the given value into an array of ints so that we can work on
-/// it. The array will be converted to an absolute value and the wasNegative
-/// flag will be set appropriately. The array will remove leading zeros from
-/// the value.
-/// \param array an array of length 4 to set with the value
-/// \param was_negative a flag for whether the value was original negative
-/// \result the output length of the array
-static int64_t FillInArray(const Int128& value, uint32_t* array, bool& was_negative) {
-  uint64_t high;
-  uint64_t low;
-  const int64_t highbits = value.high_bits();
-  const uint64_t lowbits = value.low_bits();
-
-  if (highbits < 0) {
-    low = ~lowbits + 1;
-    high = static_cast<uint64_t>(~highbits);
-    if (low == 0) {
-      ++high;
-    }
-    was_negative = true;
-  } else {
-    low = lowbits;
-    high = static_cast<uint64_t>(highbits);
-    was_negative = false;
-  }
-
-  if (high != 0) {
-    if (high > std::numeric_limits<uint32_t>::max()) {
-      array[0] = static_cast<uint32_t>(high >> 32);
-      array[1] = static_cast<uint32_t>(high);
-      array[2] = static_cast<uint32_t>(low >> 32);
-      array[3] = static_cast<uint32_t>(low);
-      return 4;
-    }
-
-    array[0] = static_cast<uint32_t>(high);
-    array[1] = static_cast<uint32_t>(low >> 32);
-    array[2] = static_cast<uint32_t>(low);
-    return 3;
-  }
-
-  if (low >= std::numeric_limits<uint32_t>::max()) {
-    array[0] = static_cast<uint32_t>(low >> 32);
-    array[1] = static_cast<uint32_t>(low);
-    return 2;
-  }
-
-  if (low == 0) {
-    return 0;
-  }
-
-  array[0] = static_cast<uint32_t>(low);
-  return 1;
-}
-
-/// \brief Find last set bit in a 32 bit integer. Bit 1 is the LSB and bit 32 is the MSB.
-static int64_t FindLastSetBit(uint32_t value) {
-#if defined(__clang__) || defined(__GNUC__)
-  // Count leading zeros
-  return __builtin_clz(value) + 1;
-#elif defined(_MSC_VER)
-  unsigned long index;                                         // NOLINT
-  _BitScanReverse(&index, static_cast<unsigned long>(value));  // NOLINT
-  return static_cast<int64_t>(index + 1UL);
-#endif
-}
-
-/// Shift the number in the array left by bits positions.
-/// \param array the number to shift, must have length elements
-/// \param length the number of entries in the array
-/// \param bits the number of bits to shift (0 <= bits < 32)
-static void ShiftArrayLeft(uint32_t* array, int64_t length, int64_t bits) {
-  if (length > 0 && bits != 0) {
-    for (int64_t i = 0; i < length - 1; ++i) {
-      array[i] = (array[i] << bits) | (array[i + 1] >> (32 - bits));
-    }
-    array[length - 1] <<= bits;
-  }
-}
-
-/// Shift the number in the array right by bits positions.
-/// \param array the number to shift, must have length elements
-/// \param length the number of entries in the array
-/// \param bits the number of bits to shift (0 <= bits < 32)
-static void ShiftArrayRight(uint32_t* array, int64_t length, int64_t bits) {
-  if (length > 0 && bits != 0) {
-    for (int64_t i = length - 1; i > 0; --i) {
-      array[i] = (array[i] >> bits) | (array[i - 1] << (32 - bits));
-    }
-    array[0] >>= bits;
-  }
-}
-
-/// \brief Fix the signs of the result and remainder at the end of the division based on
-/// the signs of the dividend and divisor.
-static void FixDivisionSigns(Int128* result, Int128* remainder,
-                             bool dividend_was_negative, bool divisor_was_negative) {
-  if (dividend_was_negative != divisor_was_negative) {
-    result->Negate();
-  }
-
-  if (dividend_was_negative) {
-    remainder->Negate();
-  }
-}
-
-/// \brief Build a Int128 from a list of ints.
-static Status BuildFromArray(Int128* value, uint32_t* array, int64_t length) {
-  switch (length) {
-    case 0:
-      *value = {static_cast<int64_t>(0)};
-      break;
-    case 1:
-      *value = {static_cast<int64_t>(array[0])};
-      break;
-    case 2:
-      *value = {static_cast<int64_t>(0),
-                (static_cast<uint64_t>(array[0]) << 32) + array[1]};
-      break;
-    case 3:
-      *value = {static_cast<int64_t>(array[0]),
-                (static_cast<uint64_t>(array[1]) << 32) + array[2]};
-      break;
-    case 4:
-      *value = {(static_cast<int64_t>(array[0]) << 32) + array[1],
-                (static_cast<uint64_t>(array[2]) << 32) + array[3]};
-      break;
-    case 5:
-      if (array[0] != 0) {
-        return Status::Invalid("Can't build Int128 with 5 ints.");
-      }
-      *value = {(static_cast<int64_t>(array[1]) << 32) + array[2],
-                (static_cast<uint64_t>(array[3]) << 32) + array[4]};
-      break;
-    default:
-      return Status::Invalid("Unsupported length for building Int128");
-  }
-
-  return Status::OK();
-}
-
-/// \brief Do a division where the divisor fits into a single 32 bit value.
-static Status SingleDivide(const uint32_t* dividend, int64_t dividend_length,
-                           uint32_t divisor, Int128* remainder,
-                           bool dividend_was_negative, bool divisor_was_negative,
-                           Int128* result) {
-  uint64_t r = 0;
-  uint32_t result_array[5];
-  for (int64_t j = 0; j < dividend_length; j++) {
-    r <<= 32;
-    r += dividend[j];
-    result_array[j] = static_cast<uint32_t>(r / divisor);
-    r %= divisor;
-  }
-  RETURN_NOT_OK(BuildFromArray(result, result_array, dividend_length));
-  *remainder = static_cast<int64_t>(r);
-  FixDivisionSigns(result, remainder, dividend_was_negative, divisor_was_negative);
-  return Status::OK();
-}
-
-Status Int128::Divide(const Int128& divisor, Int128* result, Int128* remainder) const {
-  // Split the dividend and divisor into integer pieces so that we can
-  // work on them.
-  uint32_t dividend_array[5];
-  uint32_t divisor_array[4];
-  bool dividend_was_negative;
-  bool divisor_was_negative;
-  // leave an extra zero before the dividend
-  dividend_array[0] = 0;
-  int64_t dividend_length =
-      FillInArray(*this, dividend_array + 1, dividend_was_negative) + 1;
-  int64_t divisor_length = FillInArray(divisor, divisor_array, divisor_was_negative);
-
-  // Handle some of the easy cases.
-  if (dividend_length <= divisor_length) {
-    *remainder = *this;
-    *result = 0;
-    return Status::OK();
-  }
-
-  if (divisor_length == 0) {
-    return Status::Invalid("Division by 0 in Int128");
-  }
-
-  if (divisor_length == 1) {
-    return SingleDivide(dividend_array, dividend_length, divisor_array[0], remainder,
-                        dividend_was_negative, divisor_was_negative, result);
-  }
-
-  int64_t result_length = dividend_length - divisor_length;
-  uint32_t result_array[4];
-
-  // Normalize by shifting both by a multiple of 2 so that
-  // the digit guessing is better. The requirement is that
-  // divisor_array[0] is greater than 2**31.
-  int64_t normalize_bits = 32 - FindLastSetBit(divisor_array[0]);
-  ShiftArrayLeft(divisor_array, divisor_length, normalize_bits);
-  ShiftArrayLeft(dividend_array, dividend_length, normalize_bits);
-
-  // compute each digit in the result
-  for (int64_t j = 0; j < result_length; ++j) {
-    // Guess the next digit. At worst it is two too large
-    uint32_t guess = std::numeric_limits<uint32_t>::max();
-    auto high_dividend =
-        static_cast<uint64_t>(dividend_array[j]) << 32 | dividend_array[j + 1];
-    if (dividend_array[j] != divisor_array[0]) {
-      guess = static_cast<uint32_t>(high_dividend / divisor_array[0]);
-    }
-
-    // catch all of the cases where guess is two too large and most of the
-    // cases where it is one too large
-    auto rhat = static_cast<uint32_t>(high_dividend -
-                                      guess * static_cast<uint64_t>(divisor_array[0]));
-    while (static_cast<uint64_t>(divisor_array[1]) * guess >
-           (static_cast<uint64_t>(rhat) << 32) + dividend_array[j + 2]) {
-      --guess;
-      rhat += divisor_array[0];
-      if (static_cast<uint64_t>(rhat) < divisor_array[0]) {
-        break;
-      }
-    }
-
-    // subtract off the guess * divisor from the dividend
-    uint64_t mult = 0;
-    for (int64_t i = divisor_length - 1; i >= 0; --i) {
-      mult += static_cast<uint64_t>(guess) * divisor_array[i];
-      uint32_t prev = dividend_array[j + i + 1];
-      dividend_array[j + i + 1] -= static_cast<uint32_t>(mult);
-      mult >>= 32;
-      if (dividend_array[j + i + 1] > prev) {
-        ++mult;
-      }
-    }
-    uint32_t prev = dividend_array[j];
-    dividend_array[j] -= static_cast<uint32_t>(mult);
-
-    // if guess was too big, we add back divisor
-    if (dividend_array[j] > prev) {
-      --guess;
-
-      uint32_t carry = 0;
-      for (int64_t i = divisor_length - 1; i >= 0; --i) {
-        uint64_t sum =
-            static_cast<uint64_t>(divisor_array[i]) + dividend_array[j + i + 1] + carry;
-        dividend_array[j + i + 1] = static_cast<uint32_t>(sum);
-        carry = static_cast<uint32_t>(sum >> 32);
-      }
-      dividend_array[j] += carry;
-    }
-
-    result_array[j] = guess;
-  }
-
-  // denormalize the remainder
-  ShiftArrayRight(dividend_array, dividend_length, normalize_bits);
-
-  // return result and remainder
-  RETURN_NOT_OK(BuildFromArray(result, result_array, result_length));
-  RETURN_NOT_OK(BuildFromArray(remainder, dividend_array, dividend_length));
-  FixDivisionSigns(result, remainder, dividend_was_negative, divisor_was_negative);
-  return Status::OK();
-}
-
-bool operator==(const Int128& left, const Int128& right) {
-  return left.high_bits() == right.high_bits() && left.low_bits() == right.low_bits();
-}
-
-bool operator!=(const Int128& left, const Int128& right) {
-  return !operator==(left, right);
-}
-
-bool operator<(const Int128& left, const Int128& right) {
-  return left.high_bits() < right.high_bits() ||
-         (left.high_bits() == right.high_bits() && left.low_bits() < right.low_bits());
-}
-
-bool operator<=(const Int128& left, const Int128& right) {
-  return !operator>(left, right);
-}
-
-bool operator>(const Int128& left, const Int128& right) { return operator<(right, left); }
-
-bool operator>=(const Int128& left, const Int128& right) {
-  return !operator<(left, right);
-}
-
-Int128 operator-(const Int128& operand) {
-  Int128 result(operand.high_bits(), operand.low_bits());
-  return result.Negate();
-}
-
-Int128 operator+(const Int128& left, const Int128& right) {
-  Int128 result(left.high_bits(), left.low_bits());
-  result += right;
-  return result;
-}
-
-Int128 operator-(const Int128& left, const Int128& right) {
-  Int128 result(left.high_bits(), left.low_bits());
-  result -= right;
-  return result;
-}
-
-Int128 operator*(const Int128& left, const Int128& right) {
-  Int128 result(left.high_bits(), left.low_bits());
-  result *= right;
-  return result;
-}
-
-Int128 operator/(const Int128& left, const Int128& right) {
-  Int128 remainder;
-  Int128 result;
-  DCHECK(left.Divide(right, &result, &remainder).ok());
-  return result;
-}
-
-Int128 operator%(const Int128& left, const Int128& right) {
-  Int128 remainder;
-  Int128 result;
-  DCHECK(left.Divide(right, &result, &remainder).ok());
-  return remainder;
-}
-
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/b590c245/cpp/src/arrow/util/int128.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/int128.h b/cpp/src/arrow/util/int128.h
deleted file mode 100644
index dd2ec52..0000000
--- a/cpp/src/arrow/util/int128.h
+++ /dev/null
@@ -1,129 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef ARROW_INT128_H
-#define ARROW_INT128_H
-
-#include <array>
-#include <cstdint>
-#include <string>
-#include <type_traits>
-
-#include "arrow/status.h"
-#include "arrow/util/visibility.h"
-
-namespace arrow {
-
-/// Represents a signed 128-bit integer in two's complement.
-/// Calculations wrap around and overflow is ignored.
-///
-/// For a discussion of the algorithms, look at Knuth's volume 2,
-/// Semi-numerical Algorithms section 4.3.1.
-///
-/// Adapted from the Apache ORC C++ implementation
-class ARROW_EXPORT Int128 {
- public:
-  /// \brief Create an Int128 from the two's complement representation.
-  constexpr Int128(int64_t high, uint64_t low) : high_bits_(high), low_bits_(low) {}
-
-  /// \brief Empty constructor creates an Int128 with a value of 0.
-  constexpr Int128() : Int128(0, 0) {}
-
-  /// \brief Convert any integer value into an Int128.
-  template <typename T,
-            typename = typename std::enable_if<std::is_integral<T>::value, T>::type>
-  constexpr Int128(T value)
-      : Int128(static_cast<int64_t>(value) >= 0 ? 0 : -1, static_cast<uint64_t>(value)) {}
-
-  /// \brief Parse the number from a base 10 string representation.
-  explicit Int128(const std::string& value);
-
-  /// \brief Create an Int128 from an array of bytes
-  explicit Int128(const uint8_t* bytes);
-
-  /// \brief Negate the current value
-  Int128& Negate();
-
-  /// \brief Add a number to this one. The result is truncated to 128 bits.
-  Int128& operator+=(const Int128& right);
-
-  /// \brief Subtract a number from this one. The result is truncated to 128 bits.
-  Int128& operator-=(const Int128& right);
-
-  /// \brief Multiply this number by another number. The result is truncated to 128 bits.
-  Int128& operator*=(const Int128& right);
-
-  /// Divide this number by right and return the result. This operation is
-  /// not destructive.
-  /// The answer rounds to zero. Signs work like:
-  ///   21 /  5 ->  4,  1
-  ///  -21 /  5 -> -4, -1
-  ///   21 / -5 -> -4,  1
-  ///  -21 / -5 ->  4, -1
-  /// @param right the number to divide by
-  /// @param remainder the remainder after the division
-  Status Divide(const Int128& divisor, Int128* result, Int128* remainder) const;
-
-  /// \brief In-place division.
-  Int128& operator/=(const Int128& right);
-
-  /// \brief Cast the value to char. This is used when converting the value a string.
-  explicit operator char() const;
-
-  /// \brief Bitwise or between two Int128.
-  Int128& operator|=(const Int128& right);
-
-  /// \brief Bitwise and between two Int128.
-  Int128& operator&=(const Int128& right);
-
-  /// \brief Shift left by the given number of bits.
-  Int128& operator<<=(uint32_t bits);
-
-  /// \brief Shift right by the given number of bits. Negative values will
-  Int128& operator>>=(uint32_t bits);
-
-  /// \brief Get the high bits of the two's complement representation of the number.
-  int64_t high_bits() const { return high_bits_; }
-
-  /// \brief Get the low bits of the two's complement representation of the number.
-  uint64_t low_bits() const { return low_bits_; }
-
-  /// \brief Put the raw bytes of the value into a pointer to uint8_t.
-  Status ToBytes(std::array<uint8_t, 16>* out) const;
-
- private:
-  int64_t high_bits_;
-  uint64_t low_bits_;
-};
-
-ARROW_EXPORT bool operator==(const Int128& left, const Int128& right);
-ARROW_EXPORT bool operator!=(const Int128& left, const Int128& right);
-ARROW_EXPORT bool operator<(const Int128& left, const Int128& right);
-ARROW_EXPORT bool operator<=(const Int128& left, const Int128& right);
-ARROW_EXPORT bool operator>(const Int128& left, const Int128& right);
-ARROW_EXPORT bool operator>=(const Int128& left, const Int128& right);
-
-ARROW_EXPORT Int128 operator-(const Int128& operand);
-ARROW_EXPORT Int128 operator+(const Int128& left, const Int128& right);
-ARROW_EXPORT Int128 operator-(const Int128& left, const Int128& right);
-ARROW_EXPORT Int128 operator*(const Int128& left, const Int128& right);
-ARROW_EXPORT Int128 operator/(const Int128& left, const Int128& right);
-ARROW_EXPORT Int128 operator%(const Int128& left, const Int128& right);
-
-}  // namespace arrow
-
-#endif  //  ARROW_INT128_H