You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/02/02 22:19:04 UTC

[3/3] kudu git commit: KUDU-721: Support for Decimal type, Part 1 (Server, C++ Client)

KUDU-721: Support for Decimal type, Part 1 (Server, C++ Client)

Introduces the Decimal type to the server and
C++ client. Follow on work will enhance the
utility in the client and add support to the
Java client and other integrations and add
documentation.

The decimal type has column type attributes to
support the “parameterized type”. The precision
and scale column type attributes are not stored
with each value, but instead are leveraged to map
to a correctly sized internal type
(DECIMAL32, DECIMAL64, DECIMAL128). These
internal types are represented and stored as
equivalently sized integers.

This also removes the int128 suffixes because
they break the client by using C++11 features.
They may be added back in a different way later.

Change-Id: I3b06142f43c66973f36376bd2c88ca6f8d9f7632
Reviewed-on: http://gerrit.cloudera.org:8080/8830
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Grant Henke <gr...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7b6cc845
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7b6cc845
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7b6cc845

Branch: refs/heads/master
Commit: 7b6cc8459d7f497b947ca091920f76324dc9505b
Parents: 0783467
Author: Grant Henke <gr...@gmail.com>
Authored: Mon Nov 20 14:50:10 2017 -0600
Committer: Grant Henke <gr...@gmail.com>
Committed: Fri Feb 2 22:04:21 2018 +0000

----------------------------------------------------------------------
 src/kudu/client/CMakeLists.txt                |   1 +
 src/kudu/client/predicate-test.cc             | 175 +++++++++++++++++++-
 src/kudu/client/scan_batch.cc                 |  33 ++++
 src/kudu/client/scan_batch.h                  |   5 +
 src/kudu/client/scan_predicate.cc             |   6 +-
 src/kudu/client/schema-internal.h             |  26 ++-
 src/kudu/client/schema.cc                     | 142 ++++++++++++++--
 src/kudu/client/schema.h                      | 116 +++++++++----
 src/kudu/client/value-internal.h              |  19 ++-
 src/kudu/client/value.cc                      |  51 +++++-
 src/kudu/client/value.h                       |  17 +-
 src/kudu/common/column_predicate-test.cc      |  62 +++++++
 src/kudu/common/column_predicate.cc           |   2 +
 src/kudu/common/common.proto                  |  13 ++
 src/kudu/common/partial_row-test.cc           |  31 +++-
 src/kudu/common/partial_row.cc                |  77 ++++++++-
 src/kudu/common/partial_row.h                 |   7 +-
 src/kudu/common/schema-test.cc                |  62 +++++++
 src/kudu/common/schema.cc                     |  26 ++-
 src/kudu/common/schema.h                      |  43 ++++-
 src/kudu/common/types.cc                      |   3 +
 src/kudu/common/types.h                       |  56 ++++++-
 src/kudu/common/wire_protocol.cc              |  23 ++-
 src/kudu/integration-tests/CMakeLists.txt     |   1 +
 src/kudu/integration-tests/all_types-itest.cc |  16 +-
 src/kudu/integration-tests/decimal-itest.cc   | 182 +++++++++++++++++++++
 src/kudu/util/CMakeLists.txt                  |   2 +
 src/kudu/util/decimal_util-test.cc            |  73 +++++++++
 src/kudu/util/decimal_util.cc                 |  83 ++++++++++
 src/kudu/util/decimal_util.h                  |  65 ++++++++
 src/kudu/util/int128-test.cc                  |  27 +--
 src/kudu/util/int128.h                        |  82 +---------
 32 files changed, 1356 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index 92a1c7e..fe31f77 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -193,6 +193,7 @@ install(FILES
 # Headers: util
 install(FILES
   ${CMAKE_CURRENT_BINARY_DIR}/../util/kudu_export.h
+  ../util/int128.h
   ../util/monotime.h
   ../util/slice.h
   ../util/status.h

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/predicate-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/predicate-test.cc b/src/kudu/client/predicate-test.cc
index 99148ad..623d59c 100644
--- a/src/kudu/client/predicate-test.cc
+++ b/src/kudu/client/predicate-test.cc
@@ -37,6 +37,7 @@
 #include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -165,7 +166,7 @@ class PredicateTest : public KuduTest {
       numeric_limits<T>::min(),
       numeric_limits<T>::min() + 1,
       -51,
-      50,
+      -50,
       0,
       49,
       50,
@@ -228,6 +229,37 @@ class PredicateTest : public KuduTest {
     };
   }
 
+  // Returns a vector of decimal(4, 2) numbers from -50.50 (inclusive) to 50.50
+  // (exclusive) (100 values) and boundary values.
+  vector<int128_t> CreateDecimalValues() {
+    vector<int128_t> values;
+    for (int i = -50; i < 50; i++) {
+      values.push_back(i * 100 + i);
+    }
+
+    values.push_back(-9999);
+    values.push_back(-9998);
+    values.push_back(9998);
+    values.push_back(9999);
+
+    return values;
+  }
+
+  /// Returns a vector of decimal numbers for creating test predicates.
+  vector<int128_t> CreateDecimalTestValues() {
+    return {
+        -9999,
+        -9998,
+        -5100,
+        -5000,
+        0,
+        4900,
+        5000,
+        9998,
+        9999,
+    };
+  }
+
   // Returns a vector of string values.
   vector<string> CreateStringValues() {
     return {
@@ -943,6 +975,147 @@ TEST_F(PredicateTest, TestDoublePredicates) {
   ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
 }
 
+TEST_F(PredicateTest, TestDecimalPredicates) {
+  KuduSchema schema;
+  {
+    KuduSchemaBuilder builder;
+    builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
+    builder.AddColumn("value")->Type(KuduColumnSchema::DECIMAL)->Precision(4)->Scale(2);
+    CHECK_OK(builder.Build(&schema));
+  }
+  unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+  CHECK_OK(table_creator->table_name("table")
+               .schema(&schema)
+               .set_range_partition_columns({ "key" })
+               .num_replicas(1)
+               .Create());
+
+  shared_ptr<KuduTable> table;
+  CHECK_OK(client_->OpenTable("table", &table));
+
+  shared_ptr<KuduSession> session = CreateSession();
+
+  vector<int128_t> values = CreateDecimalValues();
+  vector<int128_t> test_values = CreateDecimalTestValues();
+
+  int i = 0;
+  for (int128_t value : values) {
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
+    ASSERT_OK(insert->mutable_row()->SetUnscaledDecimal("value", value));
+    ASSERT_OK(session->Apply(insert.release()));
+  }
+  unique_ptr<KuduInsert> null_insert(table->NewInsert());
+  ASSERT_OK(null_insert->mutable_row()->SetInt64("key", i++));
+  ASSERT_OK(null_insert->mutable_row()->SetNull("value"));
+  ASSERT_OK(session->Apply(null_insert.release()));
+  ASSERT_OK(session->Flush());
+
+  ASSERT_EQ(values.size() + 1, CountRows(table, {}));
+
+  for (int128_t v : test_values) {
+    SCOPED_TRACE(strings::Substitute("test value: $0", v));
+
+    { // value = v
+      int count = count_if(values.begin(), values.end(),
+                           [&] (int128_t value) { return value == v; });
+      ASSERT_EQ(count, CountRows(table, {
+          table->NewComparisonPredicate("value", KuduPredicate::EQUAL,
+                                        KuduValue::FromDecimal(v, 2)),
+      }));
+    }
+
+    { // value >= v
+      int count = count_if(values.begin(), values.end(),
+                           [&] (int128_t value) { return value >= v; });
+      ASSERT_EQ(count, CountRows(table, {
+          table->NewComparisonPredicate("value",
+                                        KuduPredicate::GREATER_EQUAL,
+                                        KuduValue::FromDecimal(v, 2)),
+      }));
+    }
+
+    { // value <= v
+      int count = count_if(values.begin(), values.end(),
+                           [&] (int128_t value) { return value <= v; });
+      ASSERT_EQ(count, CountRows(table, {
+          table->NewComparisonPredicate("value",
+                                        KuduPredicate::LESS_EQUAL,
+                                        KuduValue::FromDecimal(v, 2)),
+      }));
+    }
+
+    { // value > v
+      int count = count_if(values.begin(), values.end(),
+                           [&] (int128_t value) { return value > v; });
+      ASSERT_EQ(count, CountRows(table, {
+          table->NewComparisonPredicate("value",
+                                        KuduPredicate::GREATER,
+                                        KuduValue::FromDecimal(v, 2)),
+      }));
+    }
+
+    { // value < v
+      int count = count_if(values.begin(), values.end(),
+                           [&] (int128_t value) { return value < v; });
+      ASSERT_EQ(count, CountRows(table, {
+          table->NewComparisonPredicate("value",
+                                        KuduPredicate::LESS,
+                                        KuduValue::FromDecimal(v, 2)),
+      }));
+    }
+
+    { // value >= 0
+      // value <= v
+      int count = count_if(values.begin(), values.end(),
+                           [&] (int128_t value) { return value >= 0 && value <= v; });
+      ASSERT_EQ(count, CountRows(table, {
+          table->NewComparisonPredicate("value",
+                                        KuduPredicate::GREATER_EQUAL,
+                                        KuduValue::FromDecimal(0, 2)),
+          table->NewComparisonPredicate("value",
+                                        KuduPredicate::LESS_EQUAL,
+                                        KuduValue::FromDecimal(v, 2)),
+      }));
+    }
+
+    { // value >= v
+      // value <= 0.0
+      int count = count_if(values.begin(), values.end(),
+                           [&] (int128_t value) { return value >= v && value <= 0; });
+      ASSERT_EQ(count, CountRows(table, {
+          table->NewComparisonPredicate("value",
+                                        KuduPredicate::GREATER_EQUAL,
+                                        KuduValue::FromDecimal(v, 2)),
+          table->NewComparisonPredicate("value",
+                                        KuduPredicate::LESS_EQUAL,
+                                        KuduValue::FromDecimal(0, 2)),
+      }));
+    }
+  }
+
+  // IN list predicates
+  std::random_shuffle(test_values.begin(), test_values.end());
+
+  for (auto end = test_values.begin(); end <= test_values.end(); end++) {
+    vector<KuduValue*> vals;
+
+    for (auto itr = test_values.begin(); itr != end; itr++) {
+      vals.push_back(KuduValue::FromDecimal(*itr, 2));
+    }
+
+    int count = CountMatchedRows<int128_t>(values, vector<int128_t>(test_values.begin(), end));
+    ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) }));
+  }
+
+  // IS NOT NULL predicate
+  ASSERT_EQ(values.size(),
+            CountRows(table, { table->NewIsNotNullPredicate("value") }));
+
+  // IS NULL predicate
+  ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
+}
+
 TEST_F(PredicateTest, TestStringPredicates) {
   shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::STRING);
   shared_ptr<KuduSession> session = CreateSession();

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/scan_batch.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_batch.cc b/src/kudu/client/scan_batch.cc
index cacf76b..c424986 100644
--- a/src/kudu/client/scan_batch.cc
+++ b/src/kudu/client/scan_batch.cc
@@ -30,6 +30,7 @@
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/bitmap.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/logging.h"
 
 using std::string;
@@ -153,6 +154,12 @@ Status KuduScanBatch::RowPtr::GetDouble(const Slice& col_name, double* val) cons
   return Get<TypeTraits<DOUBLE> >(col_name, val);
 }
 
+Status KuduScanBatch::RowPtr::GetUnscaledDecimal(const Slice& col_name, int128_t* val) const {
+  int col_idx;
+  RETURN_NOT_OK(FindColumn(*schema_, col_name, &col_idx));
+  return GetUnscaledDecimal(col_idx, val);
+}
+
 Status KuduScanBatch::RowPtr::GetString(const Slice& col_name, Slice* val) const {
   return Get<TypeTraits<STRING> >(col_name, val);
 }
@@ -298,6 +305,32 @@ Status KuduScanBatch::RowPtr::Get<TypeTraits<STRING> >(int col_idx, Slice* val)
 template
 Status KuduScanBatch::RowPtr::Get<TypeTraits<BINARY> >(int col_idx, Slice* val) const;
 
+Status KuduScanBatch::RowPtr::GetUnscaledDecimal(int col_idx, int128_t* val) const {
+  const ColumnSchema& col = schema_->column(col_idx);
+  const DataType col_type = col.type_info()->type();
+  switch (col_type) {
+    case DECIMAL32:
+      int32_t i32_val;
+      RETURN_NOT_OK(Get<TypeTraits<DECIMAL32> >(col_idx, &i32_val));
+      *val = i32_val;
+      return Status::OK();
+    case DECIMAL64:
+      int64_t i64_val;
+      RETURN_NOT_OK(Get<TypeTraits<DECIMAL64> >(col_idx, &i64_val));
+      *val = i64_val;
+      return Status::OK();
+    case DECIMAL128:
+      int128_t i128_val;
+      RETURN_NOT_OK(Get<TypeTraits<DECIMAL128> >(col_idx, &i128_val));
+      *val = i128_val;
+      return Status::OK();
+    default:
+      return Status::InvalidArgument(
+          Substitute("invalid type $0 provided for column '$1' (expected DECIMAL)",
+                     col.type_info()->name(), col.name()));
+  }
+}
+
 string KuduScanBatch::RowPtr::ToString() const {
   // Client-users calling ToString() will likely expect it to not be redacted.
   ScopedDisableRedaction no_redaction;

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/scan_batch.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_batch.h b/src/kudu/client/scan_batch.h
index 15b458e..b89264b 100644
--- a/src/kudu/client/scan_batch.h
+++ b/src/kudu/client/scan_batch.h
@@ -32,6 +32,7 @@
 #include "kudu/client/stubs.h"
 #endif
 
+#include "kudu/util/int128.h"
 #include "kudu/util/kudu_export.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -194,6 +195,8 @@ class KUDU_EXPORT KuduScanBatch::RowPtr {
 
   Status GetFloat(const Slice& col_name, float* val) const WARN_UNUSED_RESULT;
   Status GetDouble(const Slice& col_name, double* val) const WARN_UNUSED_RESULT;
+
+  Status GetUnscaledDecimal(const Slice& col_name, int128_t* val) const WARN_UNUSED_RESULT;
   ///@}
 
   /// @name Getters for integral type columns by column index.
@@ -224,6 +227,8 @@ class KUDU_EXPORT KuduScanBatch::RowPtr {
 
   Status GetFloat(int col_idx, float* val) const WARN_UNUSED_RESULT;
   Status GetDouble(int col_idx, double* val) const WARN_UNUSED_RESULT;
+
+  Status GetUnscaledDecimal(int col_idx, int128_t* val) const WARN_UNUSED_RESULT;
   ///@}
 
   /// @name Getters for string/binary column by column name.

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/scan_predicate.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_predicate.cc b/src/kudu/client/scan_predicate.cc
index c58490c..736d078 100644
--- a/src/kudu/client/scan_predicate.cc
+++ b/src/kudu/client/scan_predicate.cc
@@ -77,7 +77,8 @@ ComparisonPredicateData::~ComparisonPredicateData() {
 Status ComparisonPredicateData::AddToScanSpec(ScanSpec* spec, Arena* arena) {
   void* val_void;
   RETURN_NOT_OK(val_->data_->CheckTypeAndGetPointer(col_.name(),
-                                                    col_.type_info()->physical_type(),
+                                                    col_.type_info()->type(),
+                                                    col_.type_attributes(),
                                                     &val_void));
   switch (op_) {
     case KuduPredicate::LESS_EQUAL: {
@@ -136,7 +137,8 @@ Status InListPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*arena*/) {
     // passed to the ColumnPredicate::InList constructor. The constructor for
     // ColumnPredicate::InList will assume ownership of the pointers via a swap.
     RETURN_NOT_OK(value->data_->CheckTypeAndGetPointer(col_.name(),
-                                                       col_.type_info()->physical_type(),
+                                                       col_.type_info()->type(),
+                                                       col_.type_attributes(),
                                                        &val_void));
     vals_list.push_back(val_void);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/schema-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema-internal.h b/src/kudu/client/schema-internal.h
index c5a9e44..769adec 100644
--- a/src/kudu/client/schema-internal.h
+++ b/src/kudu/client/schema-internal.h
@@ -39,16 +39,30 @@ KuduColumnStorageAttributes::CompressionType FromInternalCompressionType(
     kudu::CompressionType type);
 
 kudu::DataType ToInternalDataType(
-    KuduColumnSchema::DataType type);
-KuduColumnSchema::DataType FromInternalDataType(
-    kudu::DataType type);
+    KuduColumnSchema::DataType type,
+    KuduColumnTypeAttributes attributes);
+KuduColumnSchema::DataType FromInternalDataType(kudu::DataType type);
 
+class KuduColumnTypeAttributes::Data {
+ public:
+  Data(int8_t precision, int8_t scale)
+      : precision(precision),
+        scale(scale) {
+  }
+
+  int8_t precision;
+  int8_t scale;
+};
 
 class KuduColumnSpec::Data {
  public:
   explicit Data(std::string name)
       : name(std::move(name)),
         has_type(false),
+        has_precision(false),
+        precision(-1),
+        has_scale(false),
+        scale(-1),
         has_encoding(false),
         has_compression(false),
         has_block_size(false),
@@ -69,6 +83,12 @@ class KuduColumnSpec::Data {
   bool has_type;
   KuduColumnSchema::DataType type;
 
+  bool has_precision;
+  int8_t precision;
+
+  bool has_scale;
+  int8_t scale;
+
   bool has_encoding;
   KuduColumnStorageAttributes::EncodingType encoding;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/schema.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema.cc b/src/kudu/client/schema.cc
index 968160b..f29281a 100644
--- a/src/kudu/client/schema.cc
+++ b/src/kudu/client/schema.cc
@@ -38,6 +38,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/decimal_util.h"
 #include "kudu/util/compression/compression.pb.h"
 #include "kudu/util/slice.h"
 
@@ -110,7 +111,8 @@ KuduColumnStorageAttributes::CompressionType FromInternalCompressionType(
   }
 }
 
-kudu::DataType ToInternalDataType(KuduColumnSchema::DataType type) {
+kudu::DataType ToInternalDataType(KuduColumnSchema::DataType type,
+                                  KuduColumnTypeAttributes attributes) {
   switch (type) {
     case KuduColumnSchema::INT8: return kudu::INT8;
     case KuduColumnSchema::INT16: return kudu::INT16;
@@ -122,6 +124,16 @@ kudu::DataType ToInternalDataType(KuduColumnSchema::DataType type) {
     case KuduColumnSchema::STRING: return kudu::STRING;
     case KuduColumnSchema::BINARY: return kudu::BINARY;
     case KuduColumnSchema::BOOL: return kudu::BOOL;
+    case KuduColumnSchema::DECIMAL:
+      if (attributes.precision() <= kMaxDecimal32Precision) {
+        return kudu::DECIMAL32;
+      } else if (attributes.precision() <= kMaxDecimal64Precision) {
+        return kudu::DECIMAL64;
+      } else if (attributes.precision() <= kMaxDecimal128Precision) {
+        return kudu::DECIMAL128;
+      } else {
+        LOG(FATAL) << "Unsupported decimal type precision: " << attributes.precision();
+      }
     default: LOG(FATAL) << "Unexpected data type: " << type;
   }
 }
@@ -138,11 +150,56 @@ KuduColumnSchema::DataType FromInternalDataType(kudu::DataType type) {
     case kudu::STRING: return KuduColumnSchema::STRING;
     case kudu::BINARY: return KuduColumnSchema::BINARY;
     case kudu::BOOL: return KuduColumnSchema::BOOL;
+    case kudu::DECIMAL32: return KuduColumnSchema::DECIMAL;
+    case kudu::DECIMAL64: return KuduColumnSchema::DECIMAL;
+    case kudu::DECIMAL128: return KuduColumnSchema::DECIMAL;
     default: LOG(FATAL) << "Unexpected internal data type: " << type;
   }
 }
 
 ////////////////////////////////////////////////////////////
+// KuduColumnTypeAttributes
+////////////////////////////////////////////////////////////
+
+KuduColumnTypeAttributes::KuduColumnTypeAttributes()
+    : data_(new Data(0, 0)) {
+}
+
+KuduColumnTypeAttributes::KuduColumnTypeAttributes(const KuduColumnTypeAttributes& other)
+    : data_(nullptr) {
+  CopyFrom(other);
+}
+
+KuduColumnTypeAttributes::KuduColumnTypeAttributes(int8_t precision, int8_t scale)
+    : data_(new Data(precision, scale)) {
+}
+
+KuduColumnTypeAttributes::~KuduColumnTypeAttributes() {
+  delete data_;
+}
+
+KuduColumnTypeAttributes& KuduColumnTypeAttributes::operator=(
+    const KuduColumnTypeAttributes& other) {
+  if (&other != this) {
+    CopyFrom(other);
+  }
+  return *this;
+}
+
+void KuduColumnTypeAttributes::CopyFrom(const KuduColumnTypeAttributes& other) {
+  delete data_;
+  data_ = new Data(*other.data_);
+}
+
+int8_t KuduColumnTypeAttributes::precision() const {
+  return data_->precision;
+}
+
+int8_t KuduColumnTypeAttributes::scale() const {
+  return data_->scale;
+}
+
+////////////////////////////////////////////////////////////
 // KuduColumnSpec
 ////////////////////////////////////////////////////////////
 
@@ -187,6 +244,18 @@ KuduColumnSpec* KuduColumnSpec::BlockSize(int32_t block_size) {
   return this;
 }
 
+KuduColumnSpec* KuduColumnSpec::Precision(int8_t precision) {
+  data_->has_precision = true;
+  data_->precision = precision;
+  return this;
+}
+
+KuduColumnSpec* KuduColumnSpec::Scale(int8_t scale) {
+  data_->has_scale = true;
+  data_->scale = scale;
+  return this;
+}
+
 KuduColumnSpec* KuduColumnSpec::PrimaryKey() {
   data_->primary_key = true;
   return this;
@@ -230,18 +299,56 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema* col) const {
   if (!data_->has_type) {
     return Status::InvalidArgument("no type provided for column", data_->name);
   }
-  DataType internal_type = ToInternalDataType(data_->type);
 
+  if (data_->type == KuduColumnSchema::DECIMAL) {
+    if (!data_->has_precision) {
+      return Status::InvalidArgument("no precision provided for decimal column", data_->name);
+    }
+    if (data_->precision < kMinDecimalPrecision ||
+        data_->precision > kMaxDecimalPrecision) {
+      return Status::InvalidArgument(
+          strings::Substitute("precision must be between $0 and $1",
+                              kMinDecimalPrecision,
+                              kMaxDecimalPrecision), data_->name);
+    }
+    if (data_->has_scale) {
+      if (data_->scale < kMinDecimalScale) {
+        return Status::InvalidArgument(
+            strings::Substitute("scale is less than the minimum value of $0",
+                                kMinDecimalScale), data_->name);
+      }
+      if (data_->scale > data_->precision) {
+        return Status::InvalidArgument(
+            strings::Substitute("scale is greater than the precision value of",
+                                data_->precision), data_->name);
+      }
+    }
+  } else {
+    if (data_->has_precision) {
+      return Status::InvalidArgument(
+          strings::Substitute("precision is not valid on a $0 column", data_->type), data_->name);
+    }
+    if (data_->has_scale) {
+      return Status::InvalidArgument(
+          strings::Substitute("scale is not valid on a $0 column", data_->type), data_->name);
+    }
+  }
+
+  int8_t precision = (data_->has_precision) ? data_->precision : 0;
+  int8_t scale = (data_->has_scale) ? data_->scale : kDefaultDecimalScale;
+
+  KuduColumnTypeAttributes type_attrs(precision, scale);
+  DataType internal_type = ToInternalDataType(data_->type, type_attrs);
   bool nullable = data_->has_nullable ? data_->nullable : true;
 
   void* default_val = nullptr;
   // TODO: distinguish between DEFAULT NULL and no default?
   if (data_->has_default) {
+    ColumnTypeAttributes internal_type_attrs(precision, scale);
     RETURN_NOT_OK(data_->default_val->data_->CheckTypeAndGetPointer(
-                      data_->name, internal_type, &default_val));
+        data_->name, internal_type, internal_type_attrs,  &default_val));
   }
 
-
   // Encoding and compression
   KuduColumnStorageAttributes::EncodingType encoding =
     KuduColumnStorageAttributes::AUTO_ENCODING;
@@ -262,7 +369,8 @@ Status KuduColumnSpec::ToColumnSchema(KuduColumnSchema* col) const {
 
   *col = KuduColumnSchema(data_->name, data_->type, nullable,
                           default_val,
-                          KuduColumnStorageAttributes(encoding, compression, block_size));
+                          KuduColumnStorageAttributes(encoding, compression, block_size),
+                          type_attrs);
 
   return Status::OK();
 }
@@ -450,20 +558,23 @@ Status KuduSchemaBuilder::Build(KuduSchema* schema) {
 // KuduColumnSchema
 ////////////////////////////////////////////////////////////
 
-std::string KuduColumnSchema::DataTypeToString(DataType type) {
-  return DataType_Name(ToInternalDataType(type));
-}
-
 KuduColumnSchema::KuduColumnSchema(const std::string &name,
                                    DataType type,
                                    bool is_nullable,
                                    const void* default_value,
-                                   KuduColumnStorageAttributes attributes) {
+                                   KuduColumnStorageAttributes storage_attributes,
+                                   KuduColumnTypeAttributes type_attributes) {
   ColumnStorageAttributes attr_private;
-  attr_private.encoding = ToInternalEncodingType(attributes.encoding());
-  attr_private.compression = ToInternalCompressionType(attributes.compression());
-  col_ = new ColumnSchema(name, ToInternalDataType(type), is_nullable,
-                          default_value, default_value, attr_private);
+  attr_private.encoding = ToInternalEncodingType(storage_attributes.encoding());
+  attr_private.compression = ToInternalCompressionType(
+      storage_attributes.compression());
+  ColumnTypeAttributes type_attr_private;
+  type_attr_private.precision = type_attributes.precision();
+  type_attr_private.scale = type_attributes.scale();
+  col_ = new ColumnSchema(name, ToInternalDataType(type, type_attributes),
+                          is_nullable,
+                          default_value, default_value, attr_private,
+                          type_attr_private);
 }
 
 KuduColumnSchema::KuduColumnSchema(const KuduColumnSchema& other)
@@ -568,9 +679,10 @@ KuduColumnSchema KuduSchema::Column(size_t idx) const {
   ColumnSchema col(schema_->column(idx));
   KuduColumnStorageAttributes attrs(FromInternalEncodingType(col.attributes().encoding),
                                     FromInternalCompressionType(col.attributes().compression));
+  KuduColumnTypeAttributes type_attrs(col.type_attributes().precision, col.type_attributes().scale);
   return KuduColumnSchema(col.name(), FromInternalDataType(col.type_info()->type()),
                           col.is_nullable(), col.read_default_value(),
-                          attrs);
+                          attrs, type_attrs);
 }
 
 KuduPartialRow* KuduSchema::NewRow() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index 5358e15..4861cfc 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -60,6 +60,49 @@ class WriteRpc;
 class KuduSchema;
 class KuduValue;
 
+/// @brief Representation of column type attributes.
+class KUDU_EXPORT KuduColumnTypeAttributes {
+ public:
+  KuduColumnTypeAttributes();
+
+  /// Create a KuduColumnTypeAttributes object as a copy of the other one.
+  ///
+  /// @param [in] other
+  ///   The other KuduColumnTypeAttributes object to use as a reference.
+  KuduColumnTypeAttributes(const KuduColumnTypeAttributes& other);
+
+  /// Create a KuduColumnTypeAttributes object
+  ///
+  /// @param [in] precision
+  ///   The precision of a decimal column.
+  /// @param [in] scale
+  ///   The scale of a decimal column.
+  KuduColumnTypeAttributes(int8_t precision, int8_t scale);
+
+  ~KuduColumnTypeAttributes();
+
+  /// @name Assign/copy KuduColumnTypeAttributes.
+  ///
+  /// @param [in] other
+  ///   The source KuduColumnTypeAttributes object to use as a reference.
+  ///
+  ///@{
+  KuduColumnTypeAttributes& operator=(const KuduColumnTypeAttributes& other);
+  void CopyFrom(const KuduColumnTypeAttributes& other);
+  ///@}
+
+  /// @return Precision for the column type.
+  int8_t precision() const;
+
+  /// @return Scale for the column type.
+  int8_t scale() const;
+
+ private:
+  class KUDU_NO_EXPORT Data;
+  // Owned.
+  Data* data_;
+};
+
 /// @brief Representation of column storage attributes.
 class KUDU_EXPORT KuduColumnStorageAttributes {
  public:
@@ -113,7 +156,7 @@ class KUDU_EXPORT KuduColumnStorageAttributes {
     return encoding_;
   }
 
-  /// @return Comporession type for the column storage.
+  /// @return Compression type for the column storage.
   const CompressionType compression() const {
     return compression_;
   }
@@ -142,37 +185,10 @@ class KUDU_EXPORT KuduColumnSchema {
     DOUBLE = 7,
     BINARY = 8,
     UNIXTIME_MICROS = 9,
+    DECIMAL = 10,
     TIMESTAMP = UNIXTIME_MICROS //!< deprecated, use UNIXTIME_MICROS
   };
 
-  /// @param [in] type
-  ///   Column data type.
-  /// @return String representation of the column data type.
-  static std::string DataTypeToString(DataType type);
-
-  /// @deprecated Use KuduSchemaBuilder instead.
-  ///
-  /// @todo KUDU-809: make this hard-to-use constructor private.
-  ///   Clients should use the Builder API. Currently only the Python API
-  ///   uses this old API.
-  ///
-  /// @param [in] name
-  ///   The name of the column.
-  /// @param [in] type
-  ///   The type of the column.
-  /// @param [in] is_nullable
-  ///   Whether the column is nullable.
-  /// @param [in] default_value
-  ///   Default value for the column.
-  /// @param [in] attributes
-  ///   Column storage attributes.
-  KuduColumnSchema(const std::string &name,
-                   DataType type,
-                   bool is_nullable = false,
-                   const void* default_value = NULL,
-                   KuduColumnStorageAttributes attributes = KuduColumnStorageAttributes())
-      ATTRIBUTE_DEPRECATED("use KuduSchemaBuilder instead");
-
   /// Construct KuduColumnSchema object as a copy of another object.
   ///
   /// @param [in] other
@@ -225,6 +241,14 @@ class KUDU_EXPORT KuduColumnSchema {
 
   KuduColumnSchema();
 
+  /// This constructor is private because clients should use the Builder API.
+  KuduColumnSchema(const std::string &name,
+                   DataType type,
+                   bool is_nullable = false,
+                   const void* default_value = NULL,
+                   KuduColumnStorageAttributes storage_attributes = KuduColumnStorageAttributes(),
+                   KuduColumnTypeAttributes type_attributes = KuduColumnTypeAttributes());
+
   // Owned.
   ColumnSchema* col_;
 };
@@ -289,6 +313,40 @@ class KUDU_EXPORT KuduColumnSpec {
   /// @return Pointer to the modified object.
   KuduColumnSpec* BlockSize(int32_t block_size);
 
+  /// @name Operations only relevant for decimal columns.
+  ///
+  ///@{
+  /// Set the precision for the column.
+  ///
+  /// Clients must specify a precision for decimal columns.
+  /// Precision is the total number of digits that can be
+  /// represented by the column, regardless of the location of the decimal point.
+  /// For example, representing integer values up to 9999, and fractional
+  /// values up to 99.99, both require a precision of 4. You can also represent
+  /// corresponding negative values, without any change in the precision.
+  /// For example, the range -9999 to 9999 still only requires a precision of 4.
+  ///
+  /// The precision must be between 1 and 38.
+  ///
+  /// @return Pointer to the modified object.
+  KuduColumnSpec* Precision(int8_t precision);
+
+  /// Set the scale for the column.
+  ///
+  /// Clients can specify a scale for decimal columns.
+  /// Scale represents the number of fractional digits. This value must be less
+  /// than or equal to precision. A scale of 0 produces integral values,
+  /// with no fractional part. If precision and scale are equal, all the digits
+  /// come after the decimal point, making all the values between
+  /// 0.9999 and -0.9999.
+  ///
+  /// The scale must be greater than 0 and less than the column's precision.
+  /// If no scale is provided a default scale of 0 is used.
+  ///
+  /// @return Pointer to the modified object.
+  KuduColumnSpec* Scale(int8_t scale);
+  ///@}
+
   /// @name Operations only relevant for Create Table
   ///
   ///@{

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/value-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/value-internal.h b/src/kudu/client/value-internal.h
index 33aeeb8..fbeb974 100644
--- a/src/kudu/client/value-internal.h
+++ b/src/kudu/client/value-internal.h
@@ -21,7 +21,9 @@
 
 #include "kudu/client/value.h"
 #include "kudu/common/types.h"
+#include "kudu/common/schema.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
@@ -34,15 +36,19 @@ class KuduValue::Data {
     INT,
     FLOAT,
     DOUBLE,
-    SLICE
+    SLICE,
+    DECIMAL
   };
   Type type_;
   union {
     int64_t int_val_;
     float float_val_;
     double double_val_;
+    int128_t decimal_val_;
   };
   Slice slice_val_;
+  // Scale is only used with DECIMAL types.
+  int8_t scale_;
 
   // Check that this value can be converted to the given datatype 't',
   // and return a pointer to the underlying value in '*val_void'.
@@ -50,10 +56,14 @@ class KuduValue::Data {
   // 'col_name' is used to generate reasonable error messages in the case
   // that the type cannot be coerced.
   //
+  // 'type_attributes' is used to ensure the value matches and fits the
+  //  columns specified "parameterized type".
+  //
   // The returned pointer in *val_void is only guaranteed to live as long
   // as this KuduValue object.
   Status CheckTypeAndGetPointer(const std::string& col_name,
                                 DataType t,
+                                ColumnTypeAttributes type_attributes,
                                 void** val_void);
 
   // Return this KuduValue as a Slice. The KuduValue object retains ownership
@@ -76,6 +86,13 @@ class KuduValue::Data {
   Status CheckAndPointToInt(const std::string& col_name,
                             size_t int_size, void** val_void);
 
+  // Check that this value is a decimal constant, with a scale that
+  // matches the type_attributes, and within the valid range.
+  // Set *val_void to point to it if so.
+  Status CheckAndPointToDecimal(const std::string& col_name,
+                                DataType t, ColumnTypeAttributes type_attributes,
+                                void** val_void);
+
   // Check that this value is a string constant, and set *val_void to
   // point to it if so.
   Status CheckAndPointToString(const std::string& col_name,

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/value.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/value.cc b/src/kudu/client/value.cc
index ea5bd6f..7314ed4 100644
--- a/src/kudu/client/value.cc
+++ b/src/kudu/client/value.cc
@@ -24,9 +24,12 @@
 #include "kudu/client/value-internal.h"
 #include "kudu/client/value.h"
 #include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/decimal_util.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/status.h"
 
 using std::string;
@@ -56,6 +59,8 @@ KuduValue* KuduValue::Clone() const {
       return KuduValue::FromFloat(data_->float_val_);
     case Data::SLICE:
       return KuduValue::CopyString(data_->slice_val_);
+    case Data::DECIMAL:
+      return KuduValue::FromDecimal(data_->decimal_val_, data_->scale_);
   }
   LOG(FATAL);
 }
@@ -93,6 +98,15 @@ KuduValue* KuduValue::FromBool(bool v) {
   return new KuduValue(d);
 }
 
+KuduValue* KuduValue::FromDecimal(int128_t dv, int8_t scale) {
+  auto d = new Data;
+  d->type_ = Data::DECIMAL;
+  d->decimal_val_ = dv;
+  d->scale_ = scale;
+
+  return new KuduValue(d);
+}
+
 KuduValue* KuduValue::CopyString(Slice s) {
   auto copy = new uint8_t[s.size()];
   memcpy(copy, s.data(), s.size());
@@ -106,13 +120,15 @@ KuduValue* KuduValue::CopyString(Slice s) {
 
 Status KuduValue::Data::CheckTypeAndGetPointer(const string& col_name,
                                                DataType t,
+                                               ColumnTypeAttributes type_attributes,
                                                void** val_void) {
   const TypeInfo* ti = GetTypeInfo(t);
-  switch (ti->physical_type()) {
+  switch (t) {
     case kudu::INT8:
     case kudu::INT16:
     case kudu::INT32:
     case kudu::INT64:
+    case kudu::UNIXTIME_MICROS:
       RETURN_NOT_OK(CheckAndPointToInt(col_name, ti->size(), val_void));
       break;
 
@@ -130,7 +146,14 @@ Status KuduValue::Data::CheckTypeAndGetPointer(const string& col_name,
       *val_void = &double_val_;
       break;
 
+    case kudu::DECIMAL32:
+    case kudu::DECIMAL64:
+    case kudu::DECIMAL128:
+      RETURN_NOT_OK(CheckAndPointToDecimal(col_name, t, type_attributes, val_void));
+      break;
+
     case kudu::BINARY:
+    case kudu::STRING:
       RETURN_NOT_OK(CheckAndPointToString(col_name, val_void));
       break;
 
@@ -190,6 +213,28 @@ Status KuduValue::Data::CheckAndPointToInt(const string& col_name,
   return Status::OK();
 }
 
+Status KuduValue::Data::CheckAndPointToDecimal(const string& col_name,
+                                               DataType type,
+                                               ColumnTypeAttributes type_attributes,
+                                               void** val_void) {
+  RETURN_NOT_OK(CheckValType(col_name, KuduValue::Data::DECIMAL, "decimal"));
+  // TODO: Coerce when possible
+  if (scale_ != type_attributes.scale) {
+    return Status::InvalidArgument(
+        Substitute("value scale $0 does not match the decimal column '$1' (expected $2)",
+                   scale_, col_name, type_attributes.scale));
+  }
+  int128_t max_val = MaxUnscaledDecimal(type_attributes.precision);
+  int128_t min_val = -max_val;
+  if (decimal_val_ < min_val || decimal_val_ > max_val) {
+    return Status::InvalidArgument(
+        Substitute("value $0 out of range for decimal column '$1'",
+                   decimal_val_, col_name));
+  }
+  *val_void = &decimal_val_;
+  return Status::OK();
+}
+
 Status KuduValue::Data::CheckAndPointToString(const string& col_name,
                                               void** val_void) {
   RETURN_NOT_OK(CheckValType(col_name, KuduValue::Data::SLICE, "string"));
@@ -209,6 +254,10 @@ const Slice KuduValue::Data::GetSlice() {
     case DOUBLE:
       return Slice(reinterpret_cast<uint8_t*>(&double_val_),
                    sizeof(double));
+    case DECIMAL:
+      return Slice(reinterpret_cast<uint8_t*>(&decimal_val_),
+                   sizeof(int128_t));
+
     case SLICE:
       return slice_val_;
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/client/value.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/value.h b/src/kudu/client/value.h
index e7c9ebd..4e146c4 100644
--- a/src/kudu/client/value.h
+++ b/src/kudu/client/value.h
@@ -24,8 +24,9 @@
 #else
 #include "kudu/client/stubs.h"
 #endif
-#include "kudu/util/slice.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/kudu_export.h"
+#include "kudu/util/slice.h"
 
 namespace kudu {
 namespace client {
@@ -51,6 +52,20 @@ class KUDU_EXPORT KuduValue {
   static KuduValue* FromBool(bool b);
   ///@}
 
+  /// Construct a decimal KuduValue from the raw value and scale.
+  ///
+  /// The validity of the decimal value is not checked until the
+  /// KuduValue is used by Kudu.
+  ///
+  /// @param [in] dv
+  ///   The raw decimal value to build the KuduValue from.
+  /// @param [in] scale
+  ///   The decimal value's scale. (Must match the column's scale exactly)
+  /// @return A new KuduValue object.
+  ///@{
+  static KuduValue* FromDecimal(int128_t dv, int8_t scale);
+///@}
+
   /// Construct a KuduValue by copying the value of the given Slice.
   ///
   /// @param [in] s

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/column_predicate-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate-test.cc b/src/kudu/common/column_predicate-test.cc
index 7548e9c..d96cc41 100644
--- a/src/kudu/common/column_predicate-test.cc
+++ b/src/kudu/common/column_predicate-test.cc
@@ -30,6 +30,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/test_util.h"
@@ -889,6 +890,9 @@ TEST_F(TestColumnPredicate, TestLess) {
     ColumnSchema micros("micros", UNIXTIME_MICROS);
     ColumnSchema f32("f32", FLOAT);
     ColumnSchema f64("f64", DOUBLE);
+    ColumnSchema d32("d32", DECIMAL32);
+    ColumnSchema d64("d64", DECIMAL64);
+    ColumnSchema d128("d128", DECIMAL128);
     ColumnSchema string("string", STRING);
     ColumnSchema binary("binary", BINARY);
 
@@ -914,6 +918,15 @@ TEST_F(TestColumnPredicate, TestLess) {
               ColumnPredicate::Range(f64, nullptr, TypeTraits<DOUBLE>::min_value())
                               .predicate_type());
     ASSERT_EQ(PredicateType::None,
+              ColumnPredicate::Range(d32, nullptr, TypeTraits<DECIMAL32>::min_value())
+                              .predicate_type());
+    ASSERT_EQ(PredicateType::None,
+              ColumnPredicate::Range(d64, nullptr, TypeTraits<DECIMAL64>::min_value())
+                              .predicate_type());
+    ASSERT_EQ(PredicateType::None,
+              ColumnPredicate::Range(d128, nullptr, TypeTraits<DECIMAL128>::min_value())
+                               .predicate_type());
+    ASSERT_EQ(PredicateType::None,
               ColumnPredicate::Range(string, nullptr, TypeTraits<STRING>::min_value())
                               .predicate_type());
     ASSERT_EQ(PredicateType::None,
@@ -929,6 +942,9 @@ TEST_F(TestColumnPredicate, TestGreaterThanEquals) {
     ColumnSchema micros("micros", UNIXTIME_MICROS);
     ColumnSchema f32("f32", FLOAT);
     ColumnSchema f64("f64", DOUBLE);
+    ColumnSchema d32("d32", DECIMAL32);
+    ColumnSchema d64("d64", DECIMAL64);
+    ColumnSchema d128("d128", DECIMAL128);
     ColumnSchema string("string", STRING);
     ColumnSchema binary("binary", BINARY);
 
@@ -954,6 +970,15 @@ TEST_F(TestColumnPredicate, TestGreaterThanEquals) {
               ColumnPredicate::Range(f64, TypeTraits<DOUBLE>::min_value(), nullptr)
                               .predicate_type());
     ASSERT_EQ(PredicateType::IsNotNull,
+              ColumnPredicate::Range(d32, TypeTraits<DECIMAL32>::min_value(), nullptr)
+                              .predicate_type());
+    ASSERT_EQ(PredicateType::IsNotNull,
+              ColumnPredicate::Range(d64, TypeTraits<DECIMAL64>::min_value(), nullptr)
+                              .predicate_type());
+    ASSERT_EQ(PredicateType::IsNotNull,
+              ColumnPredicate::Range(d128, TypeTraits<DECIMAL128>::min_value(), nullptr)
+                              .predicate_type());
+    ASSERT_EQ(PredicateType::IsNotNull,
               ColumnPredicate::Range(string, TypeTraits<STRING>::min_value(), nullptr)
                               .predicate_type());
     ASSERT_EQ(PredicateType::IsNotNull,
@@ -981,6 +1006,15 @@ TEST_F(TestColumnPredicate, TestGreaterThanEquals) {
     ASSERT_EQ(PredicateType::Equality,
               ColumnPredicate::Range(f64, TypeTraits<DOUBLE>::max_value(), nullptr)
                               .predicate_type());
+    ASSERT_EQ(PredicateType::Equality,
+              ColumnPredicate::Range(d32, TypeTraits<DECIMAL32>::max_value(), nullptr)
+                              .predicate_type());
+    ASSERT_EQ(PredicateType::Equality,
+              ColumnPredicate::Range(d64, TypeTraits<DECIMAL64>::max_value(), nullptr)
+                              .predicate_type());
+    ASSERT_EQ(PredicateType::Equality,
+              ColumnPredicate::Range(d128, TypeTraits<DECIMAL128>::max_value(), nullptr)
+                              .predicate_type());
 
     Slice s = "foo";
     ASSERT_EQ(PredicateType::Range,
@@ -1054,11 +1088,15 @@ TEST_F(TestColumnPredicate, TestSelectivity) {
   int64_t one_64 = 1;
   double_t one_d = 1.0;
   Slice one_s("one", 3);
+  int128_t one_dec = 1;
 
   ColumnSchema column_i32("a", INT32, true);
   ColumnSchema column_i64("b", INT64, true);
   ColumnSchema column_d("c", DOUBLE, true);
   ColumnSchema column_s("d", STRING, true);
+  ColumnSchema column_d32("e", DECIMAL32, true);
+  ColumnSchema column_d64("f", DECIMAL64, true);
+  ColumnSchema column_d128("g", DECIMAL128, true);
 
   // Predicate type
   ASSERT_LT(SelectivityComparator(ColumnPredicate::IsNull(column_i32),
@@ -1073,6 +1111,15 @@ TEST_F(TestColumnPredicate, TestSelectivity) {
   ASSERT_LT(SelectivityComparator(ColumnPredicate::Range(column_i32, &one_32, nullptr),
                                   ColumnPredicate::IsNotNull(column_i32)),
             0);
+  ASSERT_LT(SelectivityComparator(ColumnPredicate::Range(column_d32, &one_dec, nullptr),
+                                  ColumnPredicate::IsNotNull(column_d32)),
+            0);
+  ASSERT_LT(SelectivityComparator(ColumnPredicate::Range(column_d64, &one_dec, nullptr),
+                                  ColumnPredicate::IsNotNull(column_d64)),
+            0);
+  ASSERT_LT(SelectivityComparator(ColumnPredicate::Range(column_d128, &one_dec, nullptr),
+                                  ColumnPredicate::IsNotNull(column_d128)),
+            0);
 
   // Size of column type
   ASSERT_LT(SelectivityComparator(ColumnPredicate::Equality(column_i32, &one_32),
@@ -1090,6 +1137,21 @@ TEST_F(TestColumnPredicate, TestSelectivity) {
   ASSERT_LT(SelectivityComparator(ColumnPredicate::Equality(column_d, &one_d),
                                   ColumnPredicate::Equality(column_s, &one_s)),
             0);
+  ASSERT_LT(SelectivityComparator(ColumnPredicate::Equality(column_d32, &one_dec),
+                                  ColumnPredicate::Equality(column_i64, &one_64)),
+            0);
+  ASSERT_LT(SelectivityComparator(ColumnPredicate::Equality(column_d32, &one_dec),
+                                  ColumnPredicate::Equality(column_d, &one_d)),
+            0);
+  ASSERT_LT(SelectivityComparator(ColumnPredicate::Equality(column_i32, &one_32),
+                                  ColumnPredicate::Equality(column_s, &one_s)),
+            0);
+  ASSERT_LT(SelectivityComparator(ColumnPredicate::Equality(column_d32, &one_dec),
+                                  ColumnPredicate::Equality(column_d64, &one_dec)),
+            0);
+  ASSERT_LT(SelectivityComparator(ColumnPredicate::Equality(column_d64, &one_dec),
+                                  ColumnPredicate::Equality(column_d128, &one_dec)),
+            0);
 }
 
 TEST_F(TestColumnPredicate, TestRedaction) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/column_predicate.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate.cc b/src/kudu/common/column_predicate.cc
index 8a1dea3..d2bb583 100644
--- a/src/kudu/common/column_predicate.cc
+++ b/src/kudu/common/column_predicate.cc
@@ -167,6 +167,8 @@ void ColumnPredicate::SetToNone() {
   upper_ = nullptr;
 }
 
+// TODO: For decimal columns, use column_.type_attributes().precision
+// to calculate the "true" max/min values for improved simplification.
 void ColumnPredicate::Simplify() {
   auto type_info = column_.type_info();
   switch (predicate_type_) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/common.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index faa4d58..4879b18 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -51,6 +51,9 @@ enum DataType {
   BINARY = 12;
   UNIXTIME_MICROS = 13;
   INT128 = 14;
+  DECIMAL32 = 15;
+  DECIMAL64 = 16;
+  DECIMAL128 = 17;
 }
 
 enum EncodingType {
@@ -65,6 +68,14 @@ enum EncodingType {
   BIT_SHUFFLE = 6;
 }
 
+// Holds detailed attributes for the column. Only certain fields will be set,
+// depending on the type of the column.
+message ColumnTypeAttributesPB {
+  // For decimal columns
+  optional int32 precision = 1;
+  optional int32 scale = 2;
+}
+
 // TODO: Differentiate between the schema attributes
 // that are only relevant to the server (e.g.,
 // encoding and compression) and those that also
@@ -95,6 +106,8 @@ message ColumnSchemaPB {
   optional EncodingType encoding = 8 [default=AUTO_ENCODING];
   optional CompressionType compression = 9 [default=DEFAULT_COMPRESSION];
   optional int32 cfile_block_size = 10 [default=0];
+
+  optional ColumnTypeAttributesPB type_attributes = 11;
 }
 
 message ColumnSchemaDeltaPB {

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/partial_row-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partial_row-test.cc b/src/kudu/common/partial_row-test.cc
index e1d3af9..07d8658 100644
--- a/src/kudu/common/partial_row-test.cc
+++ b/src/kudu/common/partial_row-test.cc
@@ -39,7 +39,9 @@ class PartialRowTest : public KuduTest {
     : schema_({ ColumnSchema("key", INT32),
                 ColumnSchema("int_val", INT32),
                 ColumnSchema("string_val", STRING, true),
-                ColumnSchema("binary_val", BINARY, true) },
+                ColumnSchema("binary_val", BINARY, true),
+                ColumnSchema("decimal_val", DECIMAL32, true, nullptr, nullptr,
+                             ColumnStorageAttributes(), ColumnTypeAttributes(6, 2)) },
               1) {
     SeedRandom();
   }
@@ -203,6 +205,33 @@ TEST_F(PartialRowTest, UnitTest) {
   EXPECT_OK(row.Unset("binary_val"));
   EXPECT_EQ("int32 int_val=99999", row.ToString());
 
+  // Unset the column by index
+  EXPECT_OK(row.Unset(1));
+  EXPECT_EQ("", row.ToString());
+
+  // Set a decimal column
+  EXPECT_OK(row.SetUnscaledDecimal("decimal_val", 123456));
+  EXPECT_TRUE(row.IsColumnSet(4));
+  EXPECT_EQ("decimal decimal_val=123456_D32", row.ToString());
+
+  // Set the max decimal value for the decimal_val column
+  EXPECT_OK(row.SetUnscaledDecimal("decimal_val", 999999));
+  EXPECT_EQ("decimal decimal_val=999999_D32", row.ToString());
+
+  // Set the min decimal value for the decimal_val column
+  EXPECT_OK(row.SetUnscaledDecimal("decimal_val", -999999));
+  EXPECT_EQ("decimal decimal_val=-999999_D32", row.ToString());
+
+  // Set a value that's too large for the decimal_val column
+  s = row.SetUnscaledDecimal("decimal_val", 1000000);
+  EXPECT_EQ("Invalid argument: value 10000.00 out of range for decimal column 'decimal_val'",
+            s.ToString());
+
+  // Set a value that's too small for the decimal_val column
+  s = row.SetUnscaledDecimal("decimal_val", -1000000);
+  EXPECT_EQ("Invalid argument: value -10000.00 out of range for decimal column 'decimal_val'",
+            s.ToString());
+
   // Even though the storage is actually the same at the moment, we shouldn't be
   // able to set string columns with SetBinary and vice versa.
   EXPECT_FALSE(row.SetBinaryCopy("string_val", "oops").ok());

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/partial_row.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partial_row.cc b/src/kudu/common/partial_row.cc
index c52da89..de9dd6b 100644
--- a/src/kudu/common/partial_row.cc
+++ b/src/kudu/common/partial_row.cc
@@ -31,6 +31,8 @@
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/bitmap.h"
+#include "kudu/util/decimal_util.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/memory/overwrite.h"
 #include "kudu/util/status.h"
@@ -39,7 +41,6 @@ using std::string;
 using strings::Substitute;
 
 namespace kudu {
-
 namespace {
 inline Status FindColumn(const Schema& schema, const Slice& col_name, int* idx) {
   StringPiece sp(reinterpret_cast<const char*>(col_name.data()), col_name.size());
@@ -197,6 +198,21 @@ Status KuduPartialRow::Set(int32_t column_idx, const uint8_t* val) {
       RETURN_NOT_OK(SetUnixTimeMicros(column_idx, *reinterpret_cast<const int64_t*>(val)));
       break;
     };
+    case DECIMAL32: {
+      RETURN_NOT_OK(Set<TypeTraits<DECIMAL32> >(column_idx,
+                                                *reinterpret_cast<const int32_t*>(val)));
+      break;
+    };
+    case DECIMAL64: {
+      RETURN_NOT_OK(Set<TypeTraits<DECIMAL64> >(column_idx,
+                                                *reinterpret_cast<const int64_t*>(val)));
+      break;
+    };
+    case DECIMAL128: {
+      RETURN_NOT_OK(Set<TypeTraits<DECIMAL128> >(column_idx,
+                                                 *reinterpret_cast<const int128_t*>(val)));
+      break;
+    };
     default: {
       return Status::InvalidArgument("Unknown column type in schema",
                                      column_schema.ToString());
@@ -254,6 +270,11 @@ Status KuduPartialRow::SetFloat(const Slice& col_name, float val) {
 Status KuduPartialRow::SetDouble(const Slice& col_name, double val) {
   return Set<TypeTraits<DOUBLE> >(col_name, val);
 }
+Status KuduPartialRow::SetUnscaledDecimal(const Slice& col_name, int128_t val) {
+  int col_idx;
+  RETURN_NOT_OK(FindColumn(*schema_, col_name, &col_idx));
+  return SetUnscaledDecimal(col_idx, val);
+}
 Status KuduPartialRow::SetBool(int col_idx, bool val) {
   return Set<TypeTraits<BOOL> >(col_idx, val);
 }
@@ -278,6 +299,30 @@ Status KuduPartialRow::SetFloat(int col_idx, float val) {
 Status KuduPartialRow::SetDouble(int col_idx, double val) {
   return Set<TypeTraits<DOUBLE> >(col_idx, val);
 }
+Status KuduPartialRow::SetUnscaledDecimal(int col_idx, int128_t val) {
+  const ColumnSchema& col = schema_->column(col_idx);
+  const DataType col_type = col.type_info()->type();
+
+  int128_t max_val = MaxUnscaledDecimal(col.type_attributes().precision);
+  int128_t min_val = -max_val;
+  if (val < min_val || val > max_val) {
+    return Status::InvalidArgument(
+        Substitute("value $0 out of range for decimal column '$1'",
+                   DecimalToString(val, col.type_attributes().scale), col.name()));
+  }
+  switch (col_type) {
+    case DECIMAL32:
+      return Set<TypeTraits<DECIMAL32> >(col_idx, static_cast<int32_t>(val));
+    case DECIMAL64:
+      return Set<TypeTraits<DECIMAL64> >(col_idx, static_cast<int64_t>(val));
+    case DECIMAL128:
+      return Set<TypeTraits<DECIMAL128> >(col_idx, static_cast<int128_t>(val));
+    default:
+      return Status::InvalidArgument(
+          Substitute("invalid type $0 provided for column '$1' (expected DECIMAL)",
+                     col.type_info()->name(), col.name()));
+  }
+}
 
 Status KuduPartialRow::SetBinary(const Slice& col_name, const Slice& val) {
   return SetBinaryCopy(col_name, val);
@@ -555,6 +600,11 @@ Status KuduPartialRow::GetFloat(const Slice& col_name, float* val) const {
 Status KuduPartialRow::GetDouble(const Slice& col_name, double* val) const {
   return Get<TypeTraits<DOUBLE> >(col_name, val);
 }
+Status KuduPartialRow::GetUnscaledDecimal(const Slice &col_name, int128_t *val) {
+  int col_idx;
+  RETURN_NOT_OK(FindColumn(*schema_, col_name, &col_idx));
+  return GetUnscaledDecimal(col_idx, val);
+}
 Status KuduPartialRow::GetString(const Slice& col_name, Slice* val) const {
   return Get<TypeTraits<STRING> >(col_name, val);
 }
@@ -586,6 +636,31 @@ Status KuduPartialRow::GetFloat(int col_idx, float* val) const {
 Status KuduPartialRow::GetDouble(int col_idx, double* val) const {
   return Get<TypeTraits<DOUBLE> >(col_idx, val);
 }
+Status KuduPartialRow::GetUnscaledDecimal(int col_idx, int128_t *val) {
+  const ColumnSchema& col = schema_->column(col_idx);
+  const DataType col_type = col.type_info()->type();
+  switch (col_type) {
+    case DECIMAL32:
+      int32_t i32_val;
+      RETURN_NOT_OK(Get<TypeTraits<DECIMAL32> >(col_idx, &i32_val));
+      *val = i32_val;
+      return Status::OK();
+    case DECIMAL64:
+      int64_t i64_val;
+      RETURN_NOT_OK(Get<TypeTraits<DECIMAL64> >(col_idx, &i64_val));
+      *val = i64_val;
+      return Status::OK();
+    case DECIMAL128:
+      int128_t i128_val;
+      RETURN_NOT_OK(Get<TypeTraits<DECIMAL128> >(col_idx, &i128_val));
+      *val = i128_val;
+      return Status::OK();
+    default:
+      return Status::InvalidArgument(
+          Substitute("invalid type $0 provided for column '$1' (expected DECIMAL)",
+                     col.type_info()->name(), col.name()));
+  }
+}
 Status KuduPartialRow::GetString(int col_idx, Slice* val) const {
   return Get<TypeTraits<STRING> >(col_idx, val);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/partial_row.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index 845c4e0..ce40835 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -33,9 +33,10 @@
 #include "kudu/client/stubs.h"
 #endif
 
+#include "kudu/util/int128.h"
 #include "kudu/util/kudu_export.h"
-#include "kudu/util/status.h"
 #include "kudu/util/slice.h"
+#include "kudu/util/status.h"
 
 /// @cond
 namespace kudu {
@@ -104,6 +105,7 @@ class KUDU_EXPORT KuduPartialRow {
 
   Status SetFloat(const Slice& col_name, float val) WARN_UNUSED_RESULT;
   Status SetDouble(const Slice& col_name, double val) WARN_UNUSED_RESULT;
+  Status SetUnscaledDecimal(const Slice& col_name, int128_t val) WARN_UNUSED_RESULT;
   ///@}
 
   /// @name Setters for integral type columns by index.
@@ -132,6 +134,7 @@ class KUDU_EXPORT KuduPartialRow {
 
   Status SetFloat(int col_idx, float val) WARN_UNUSED_RESULT;
   Status SetDouble(int col_idx, double val) WARN_UNUSED_RESULT;
+  Status SetUnscaledDecimal(int col_idx, int128_t val) WARN_UNUSED_RESULT;
   ///@}
 
   /// @name Setters for binary/string columns by name (copying).
@@ -353,6 +356,7 @@ class KUDU_EXPORT KuduPartialRow {
 
   Status GetFloat(const Slice& col_name, float* val) const WARN_UNUSED_RESULT;
   Status GetDouble(const Slice& col_name, double* val) const WARN_UNUSED_RESULT;
+  Status GetUnscaledDecimal(const Slice& col_name, int128_t* val);
   ///@}
 
   /// @name Getters for column of integral type by column index.
@@ -383,6 +387,7 @@ class KUDU_EXPORT KuduPartialRow {
 
   Status GetFloat(int col_idx, float* val) const WARN_UNUSED_RESULT;
   Status GetDouble(int col_idx, double* val) const WARN_UNUSED_RESULT;
+  Status GetUnscaledDecimal(int col_idx, int128_t* val);
   ///@}
 
   /// @name Getters for string/binary column by column name.

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/schema-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/schema-test.cc b/src/kudu/common/schema-test.cc
index c965428..6724580 100644
--- a/src/kudu/common/schema-test.cc
+++ b/src/kudu/common/schema-test.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/common/schema.h"
 
+#include <cstddef>
 #include <cstdint>
 #include <memory>
 #include <string>
@@ -35,6 +36,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/hexdump.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -99,6 +101,66 @@ TEST_F(TestSchema, TestSchema) {
   EXPECT_EQ("uint32 NULLABLE", schema.column(1).TypeToString());
 }
 
+// Test basic functionality of Schema definition with decimal columns
+TEST_F(TestSchema, TestSchemaWithDecimal) {
+  ColumnSchema col1("key", STRING);
+  ColumnSchema col2("decimal32val", DECIMAL32, false,
+                    NULL, NULL, ColumnStorageAttributes(),
+                    ColumnTypeAttributes(9, 4));
+  ColumnSchema col3("decimal64val", DECIMAL64, true,
+                    NULL, NULL, ColumnStorageAttributes(),
+                    ColumnTypeAttributes(18, 10));
+  ColumnSchema col4("decimal128val", DECIMAL128, true,
+                    NULL, NULL, ColumnStorageAttributes(),
+                    ColumnTypeAttributes(38, 2));
+
+  vector<ColumnSchema> cols = { col1, col2, col3, col4 };
+  Schema schema(cols, 1);
+
+  ASSERT_EQ(sizeof(Slice) + sizeof(int32_t) +
+                sizeof(int64_t) + sizeof(int128_t),
+            schema.byte_size());
+
+  EXPECT_EQ("Schema [\n"
+                "\tkey[string NOT NULL],\n"
+                "\tdecimal32val[decimal(9, 4) NOT NULL],\n"
+                "\tdecimal64val[decimal(18, 10) NULLABLE],\n"
+                "\tdecimal128val[decimal(38, 2) NULLABLE]\n"
+                "]",
+            schema.ToString());
+
+  EXPECT_EQ("decimal(9, 4) NOT NULL", schema.column(1).TypeToString());
+  EXPECT_EQ("decimal(18, 10) NULLABLE", schema.column(2).TypeToString());
+  EXPECT_EQ("decimal(38, 2) NULLABLE", schema.column(3).TypeToString());
+}
+
+// Test Schema::Equals respects decimal column attributes
+TEST_F(TestSchema, TestSchemaEqualsWithDecimal) {
+  ColumnSchema col1("key", STRING);
+  ColumnSchema col_18_10("decimal64val", DECIMAL64, true,
+                         NULL, NULL, ColumnStorageAttributes(),
+                         ColumnTypeAttributes(18, 10));
+  ColumnSchema col_18_9("decimal64val", DECIMAL64, true,
+                        NULL, NULL, ColumnStorageAttributes(),
+                        ColumnTypeAttributes(18, 9));
+  ColumnSchema col_17_10("decimal64val", DECIMAL64, true,
+                         NULL, NULL, ColumnStorageAttributes(),
+                         ColumnTypeAttributes(17, 10));
+  ColumnSchema col_17_9("decimal64val", DECIMAL64, true,
+                        NULL, NULL, ColumnStorageAttributes(),
+                        ColumnTypeAttributes(17, 9));
+
+  Schema schema_18_10({ col1, col_18_10 }, 1);
+  Schema schema_18_9({ col1, col_18_9 }, 1);
+  Schema schema_17_10({ col1, col_17_10 }, 1);
+  Schema schema_17_9({ col1, col_17_9 }, 1);
+
+  EXPECT_TRUE(schema_18_10.Equals(schema_18_10));
+  EXPECT_FALSE(schema_18_10.Equals(schema_18_9));
+  EXPECT_FALSE(schema_18_10.Equals(schema_17_10));
+  EXPECT_FALSE(schema_18_10.Equals(schema_17_9));
+}
+
 TEST_F(TestSchema, TestSwap) {
   Schema schema1({ ColumnSchema("col1", STRING),
                    ColumnSchema("col2", STRING),

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/schema.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/schema.cc b/src/kudu/common/schema.cc
index 026aeab..2645d12 100644
--- a/src/kudu/common/schema.cc
+++ b/src/kudu/common/schema.cc
@@ -46,6 +46,29 @@ static const ColumnId kFirstColumnId(0);
 static const ColumnId  kFirstColumnId(10);
 #endif
 
+bool ColumnTypeAttributes::EqualsForType(ColumnTypeAttributes other,
+                                         DataType type) const {
+  switch (type) {
+    case DECIMAL32:
+    case DECIMAL64:
+    case DECIMAL128:
+      return precision == other.precision && scale == other.scale;
+    default:
+      return true; // true because unhandled types don't use ColumnTypeAttributes.
+  }
+}
+
+string ColumnTypeAttributes::ToStringForType(DataType type) const {
+  switch (type) {
+    case DECIMAL32:
+    case DECIMAL64:
+    case DECIMAL128:
+      return strings::Substitute("($0, $1)", precision, scale);
+    default:
+      return "";
+  }
+}
+
 string ColumnStorageAttributes::ToString() const {
   return strings::Substitute("encoding=$0, compression=$1, cfile_block_size=$2",
                              EncodingType_Name(encoding),
@@ -99,8 +122,9 @@ string ColumnSchema::ToString() const {
 }
 
 string ColumnSchema::TypeToString() const {
-  return strings::Substitute("$0 $1",
+  return strings::Substitute("$0$1 $2",
                              type_info_->name(),
+                             type_attributes().ToStringForType(type_info()->type()),
                              is_nullable_ ? "NULLABLE" : "NOT NULL");
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h
index 362b1a7..2fd76d0 100644
--- a/src/kudu/common/schema.h
+++ b/src/kudu/common/schema.h
@@ -84,6 +84,35 @@ struct ColumnId {
   int32_t t;
 };
 
+// Class for storing column attributes such as precision and scale
+// for decimal types. Column attributes describe logical features of
+// the column; these features are usually relative to the column's type.
+struct ColumnTypeAttributes {
+ public:
+  ColumnTypeAttributes()
+      : precision(0),
+        scale(0) {
+  }
+
+  ColumnTypeAttributes(int8_t precision, int8_t scale)
+      : precision(precision),
+        scale(scale) {
+  }
+
+  // Does `other` represent equivalent attributes for `type`?
+  // For example, if type == DECIMAL64 then attributes are equivalent iff
+  // they have the same precision and scale.
+  bool EqualsForType(ColumnTypeAttributes other, DataType type) const;
+
+  // Return a string representation appropriate for `type`
+  // This is meant to be postfixed to the name of a primitive type to describe
+  // the full type, e.g. decimal(10, 4)
+  std::string ToStringForType(DataType type) const;
+
+  int8_t precision;
+  int8_t scale;
+};
+
 // Class for storing column attributes such as compression and
 // encoding.  Column attributes describe the physical storage and
 // representation of bytes, as opposed to a purely logical description
@@ -171,12 +200,14 @@ class ColumnSchema {
   ColumnSchema(std::string name, DataType type, bool is_nullable = false,
                const void* read_default = NULL,
                const void* write_default = NULL,
-               ColumnStorageAttributes attributes = ColumnStorageAttributes())
+               ColumnStorageAttributes attributes = ColumnStorageAttributes(),
+               ColumnTypeAttributes type_attributes = ColumnTypeAttributes())
       : name_(std::move(name)),
         type_info_(GetTypeInfo(type)),
         is_nullable_(is_nullable),
         read_default_(read_default ? new Variant(type, read_default) : NULL),
-        attributes_(attributes) {
+        attributes_(attributes),
+        type_attributes_(type_attributes) {
     if (write_default == read_default) {
       write_default_ = read_default_;
     } else if (write_default != NULL) {
@@ -251,7 +282,8 @@ class ColumnSchema {
   bool EqualsType(const ColumnSchema &other) const {
     if (this == &other) return true;
     return is_nullable_ == other.is_nullable_ &&
-           type_info()->type() == other.type_info()->type();
+           type_info()->type() == other.type_info()->type() &&
+           type_attributes().EqualsForType(other.type_attributes(), type_info()->type());
   }
 
   // compare types in Equals function
@@ -305,6 +337,10 @@ class ColumnSchema {
     return attributes_;
   }
 
+  const ColumnTypeAttributes& type_attributes() const {
+    return type_attributes_;
+  }
+
   int Compare(const void *lhs, const void *rhs) const {
     return type_info_->Compare(lhs, rhs);
   }
@@ -354,6 +390,7 @@ class ColumnSchema {
   std::shared_ptr<Variant> read_default_;
   std::shared_ptr<Variant> write_default_;
   ColumnStorageAttributes attributes_;
+  ColumnTypeAttributes type_attributes_;
 };
 
 // The schema for a set of rows.

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/types.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/types.cc b/src/kudu/common/types.cc
index 0a4052b..12a09d2 100644
--- a/src/kudu/common/types.cc
+++ b/src/kudu/common/types.cc
@@ -85,6 +85,9 @@ class TypeInfoResolver {
     AddMapping<DOUBLE>();
     AddMapping<BINARY>();
     AddMapping<INT128>();
+    AddMapping<DECIMAL32>();
+    AddMapping<DECIMAL64>();
+    AddMapping<DECIMAL128>();
   }
 
   template<DataType type> void AddMapping() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/types.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/types.h b/src/kudu/common/types.h
index 7e9ac97..6919d0d 100644
--- a/src/kudu/common/types.h
+++ b/src/kudu/common/types.h
@@ -341,10 +341,10 @@ struct DataTypeTraits<INT128> {
     return AreIntegersConsecutive<INT128>(a, b);
   }
   static const cpp_type* min_value() {
-    return &MathLimits<cpp_type>::kMin;
+    return &INT128_MIN;
   }
   static const cpp_type* max_value() {
-    return &MathLimits<cpp_type>::kMin;
+    return &INT128_MAX;
   }
 };
 
@@ -534,6 +534,48 @@ struct DataTypeTraits<UNIXTIME_MICROS> : public DerivedTypeTraits<INT64>{
   }
 };
 
+template<>
+struct DataTypeTraits<DECIMAL32> : public DerivedTypeTraits<INT32>{
+  static const char* name() {
+    return "decimal";
+  }
+  // AppendDebugStringForValue appends the (string representation of) the
+  // underlying integer value with the "_D32" suffix as there's no "full"
+  // type information available to format it.
+  static void AppendDebugStringForValue(const void *val, std::string *str) {
+    DataTypeTraits<physical_type>::AppendDebugStringForValue(val, str);
+    str->append("_D32");
+  }
+};
+
+template<>
+struct DataTypeTraits<DECIMAL64> : public DerivedTypeTraits<INT64>{
+  static const char* name() {
+    return "decimal";
+  }
+  // AppendDebugStringForValue appends the (string representation of) the
+  // underlying integer value with the "_D64" suffix as there's no "full"
+  // type information available to format it.
+  static void AppendDebugStringForValue(const void *val, std::string *str) {
+    DataTypeTraits<physical_type>::AppendDebugStringForValue(val, str);
+    str->append("_D64");
+  }
+};
+
+template<>
+struct DataTypeTraits<DECIMAL128> : public DerivedTypeTraits<INT128>{
+  static const char* name() {
+    return "decimal";
+  }
+  // AppendDebugStringForValue appends the (string representation of) the
+  // underlying integer value with the "_D128" suffix as there's no "full"
+  // type information available to format it.
+  static void AppendDebugStringForValue(const void *val, std::string *str) {
+    DataTypeTraits<physical_type>::AppendDebugStringForValue(val, str);
+    str->append("_D128");
+  }
+};
+
 // Instantiate this template to get static access to the type traits.
 template<DataType datatype>
 struct TypeTraits : public DataTypeTraits<datatype> {
@@ -590,12 +632,14 @@ class Variant {
       case UINT16:
         numeric_.u16 = *static_cast<const uint16_t *>(value);
         break;
+      case DECIMAL32:
       case INT32:
         numeric_.i32 = *static_cast<const int32_t *>(value);
         break;
       case UINT32:
         numeric_.u32 = *static_cast<const uint32_t *>(value);
         break;
+      case DECIMAL64:
       case UNIXTIME_MICROS:
       case INT64:
         numeric_.i64 = *static_cast<const int64_t *>(value);
@@ -603,6 +647,7 @@ class Variant {
       case UINT64:
         numeric_.u64 = *static_cast<const uint64_t *>(value);
         break;
+      case DECIMAL128:
       case INT128:
         numeric_.i128 = *static_cast<const int128_t *>(value);
         break;
@@ -666,12 +711,15 @@ class Variant {
       case UINT8:        return &(numeric_.u8);
       case INT16:        return &(numeric_.i16);
       case UINT16:       return &(numeric_.u16);
+      case DECIMAL32:
       case INT32:        return &(numeric_.i32);
       case UINT32:       return &(numeric_.u32);
+      case DECIMAL64:
+      case UNIXTIME_MICROS:
       case INT64:        return &(numeric_.i64);
-      case INT128:       return &(numeric_.i128);
-      case UNIXTIME_MICROS:    return &(numeric_.i64);
       case UINT64:       return &(numeric_.u64);
+      case DECIMAL128:
+      case INT128:       return &(numeric_.i128);
       case FLOAT:        return (&numeric_.float_val);
       case DOUBLE:       return (&numeric_.double_val);
       case STRING:

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/common/wire_protocol.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 25b530a..0a5ce2a 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -209,8 +209,16 @@ Status SchemaFromPB(const SchemaPB& pb, Schema *schema) {
 void ColumnSchemaToPB(const ColumnSchema& col_schema, ColumnSchemaPB *pb, int flags) {
   pb->Clear();
   pb->set_name(col_schema.name());
-  pb->set_type(col_schema.type_info()->type());
   pb->set_is_nullable(col_schema.is_nullable());
+  DataType type = col_schema.type_info()->type();
+  pb->set_type(type);
+  // Only serialize precision and scale for decimal types.
+  if (type == DataType::DECIMAL32 ||
+      type == DataType::DECIMAL64 ||
+      type == DataType::DECIMAL128) {
+    pb->mutable_type_attributes()->set_precision(col_schema.type_attributes().precision);
+    pb->mutable_type_attributes()->set_scale(col_schema.type_attributes().scale);
+  }
   if (!(flags & SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES)) {
     pb->set_encoding(col_schema.attributes().encoding);
     pb->set_compression(col_schema.attributes().compression);
@@ -259,6 +267,17 @@ ColumnSchema ColumnSchemaFromPB(const ColumnSchemaPB& pb) {
     }
   }
 
+  ColumnTypeAttributes type_attributes;
+  if (pb.has_type_attributes()) {
+    const ColumnTypeAttributesPB& typeAttributesPB = pb.type_attributes();
+    if (typeAttributesPB.has_precision()) {
+      type_attributes.precision = typeAttributesPB.precision();
+    }
+    if (typeAttributesPB.has_scale()) {
+      type_attributes.scale = typeAttributesPB.scale();
+    }
+  }
+
   ColumnStorageAttributes attributes;
   if (pb.has_encoding()) {
     attributes.encoding = pb.encoding();
@@ -271,7 +290,7 @@ ColumnSchema ColumnSchemaFromPB(const ColumnSchemaPB& pb) {
   }
   return ColumnSchema(pb.name(), pb.type(), pb.is_nullable(),
                       read_default_ptr, write_default_ptr,
-                      attributes);
+                      attributes, type_attributes);
 }
 
 void ColumnSchemaDeltaToPB(const ColumnSchemaDelta& col_delta, ColumnSchemaDeltaPB *pb) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index a6a6938..17fbc68 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -66,6 +66,7 @@ ADD_KUDU_TEST(client-stress-test
 ADD_KUDU_TEST(consistency-itest)
 ADD_KUDU_TEST(create-table-itest)
 ADD_KUDU_TEST(create-table-stress-test)
+ADD_KUDU_TEST(decimal-itest)
 ADD_KUDU_TEST(delete_table-itest)
 ADD_KUDU_TEST(delete_tablet-itest)
 ADD_KUDU_TEST(disk_failure-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/integration-tests/all_types-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/all_types-itest.cc b/src/kudu/integration-tests/all_types-itest.cc
index f9c9309..3d355f7 100644
--- a/src/kudu/integration-tests/all_types-itest.cc
+++ b/src/kudu/integration-tests/all_types-itest.cc
@@ -45,6 +45,7 @@
 #include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/util/bitmap.h"
+#include "kudu/util/int128.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -240,6 +241,7 @@ struct ExpectedVals {
   bool expected_bool_val;
   float expected_float_val;
   double expected_double_val;
+  int128_t expected_decimal_val;
 };
 
 // Integration that writes, scans and verifies all types.
@@ -269,6 +271,7 @@ class AllTypesItest : public KuduTest {
     builder.AddColumn("float_val")->Type(KuduColumnSchema::FLOAT);
     builder.AddColumn("double_val")->Type(KuduColumnSchema::DOUBLE);
     builder.AddColumn("binary_val")->Type(KuduColumnSchema::BINARY);
+    builder.AddColumn("decimal_val")->Type(KuduColumnSchema::DECIMAL)->Precision(18)->Scale(8);
     CHECK_OK(builder.Build(&schema_));
   }
 
@@ -336,6 +339,7 @@ class AllTypesItest : public KuduTest {
     RETURN_NOT_OK(row->SetDouble("double_val", double_val));
     RETURN_NOT_OK(row->SetFloat("float_val", double_val));
     RETURN_NOT_OK(row->SetBool("bool_val", int_val % 2));
+    RETURN_NOT_OK(row->SetUnscaledDecimal("decimal_val", int_val));
     VLOG(1) << "Inserting row[" << split_idx << "," << row_idx << "]" << insert->ToString();
     RETURN_NOT_OK(session->Apply(insert));
     return Status::OK();
@@ -372,6 +376,7 @@ class AllTypesItest : public KuduTest {
     projection->push_back("double_val");
     projection->push_back("float_val");
     projection->push_back("bool_val");
+    projection->push_back("decimal_val");
   }
 
   ExpectedVals GetExpectedValsForRow(int split_idx, int row_idx) {
@@ -388,6 +393,7 @@ class AllTypesItest : public KuduTest {
     vals.expected_bool_val = expected_int_val % 2;
     vals.expected_float_val = expected_int_val;
     vals.expected_double_val = expected_int_val;
+    vals.expected_decimal_val = expected_int_val;
     return vals;
   }
 
@@ -426,6 +432,9 @@ class AllTypesItest : public KuduTest {
     float float_val;
     ASSERT_OK(row.GetFloat("float_val", &float_val));
     ASSERT_EQ(float_val, vals.expected_float_val);
+    int128_t decimal_val;
+    ASSERT_OK(row.GetUnscaledDecimal("decimal_val", &decimal_val));
+    ASSERT_EQ(decimal_val, vals.expected_decimal_val);
   }
 
   typedef std::function<Status (KuduScanner* scanner)> ScannerSetup;
@@ -565,7 +574,8 @@ TYPED_TEST(AllTypesItest, TestTimestampPadding) {
           row_stride += kPaddedTimestampSize;
           break;
         default:
-          int col_size = GetTypeInfo(ToInternalDataType(col_schema.type()))->size();
+          int col_size = GetTypeInfo(ToInternalDataType(col_schema.type(),
+                                                        KuduColumnTypeAttributes(18, 8)))->size();
           projection_offsets.push_back(col_size);
           row_stride += col_size;
       }
@@ -588,7 +598,6 @@ TYPED_TEST(AllTypesItest, TestTimestampPadding) {
           ASSERT_OK(this->setup_.VerifyRowKeyRaw(row_data, num_tablet, *total_rows_in_tablet + i));
         } else {
           ExpectedVals vals = this->GetExpectedValsForRow(num_tablet, *total_rows_in_tablet + i);
-
           switch (col_schema.type()) {
             case KuduColumnSchema::INT8:
               ASSERT_EQ(*reinterpret_cast<const int8_t*>(row_data), vals.expected_int8_val);
@@ -620,6 +629,9 @@ TYPED_TEST(AllTypesItest, TestTimestampPadding) {
             case KuduColumnSchema::DOUBLE:
               ASSERT_EQ(*reinterpret_cast<const double*>(row_data), vals.expected_double_val);
               break;
+            case KuduColumnSchema::DECIMAL:
+              ASSERT_EQ(*reinterpret_cast<const int64_t*>(row_data), vals.expected_decimal_val);
+              break;
             default:
               LOG(FATAL) << "Unexpected type: " << col_schema.type();
           }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7b6cc845/src/kudu/integration-tests/decimal-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/decimal-itest.cc b/src/kudu/integration-tests/decimal-itest.cc
new file mode 100644
index 0000000..5fdbd11
--- /dev/null
+++ b/src/kudu/integration-tests/decimal-itest.cc
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/client/value.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/decimal_util.h"
+#include "kudu/util/int128.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace client {
+
+using sp::shared_ptr;
+
+class DecimalItest : public ExternalMiniClusterITestBase {
+};
+
+// Tests writing and reading various decimal columns with various
+// precisions and scales to ensure the values match expectations.
+TEST_F(DecimalItest, TestDecimalTypes) {
+  const int kNumServers = 3;
+  const int kNumTablets = 3;
+  const char* const kTableName = "decimal-table";
+  NO_FATALS(StartCluster({}, {}, kNumServers));
+
+  // Create Schema
+  KuduSchemaBuilder builder;
+  builder.AddColumn("key")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(kMaxDecimal64Precision)->NotNull()->PrimaryKey();
+  builder.AddColumn("numeric32")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(kMaxDecimal32Precision);
+  builder.AddColumn("scale32")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(kMaxDecimal32Precision)
+      ->Scale(kMaxDecimal32Precision);
+  builder.AddColumn("numeric64")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(kMaxDecimal64Precision);
+  builder.AddColumn("scale64")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(kMaxDecimal64Precision)
+      ->Scale(kMaxDecimal64Precision);
+  builder.AddColumn("numeric128")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(kMaxDecimal128Precision);
+  builder.AddColumn("scale128")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(kMaxDecimal128Precision)
+      ->Scale(kMaxDecimal128Precision);
+  builder.AddColumn("default")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(5)->Scale(2)
+      ->Default(KuduValue::FromDecimal(12345, 2));
+  builder.AddColumn("alteredDefault")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(5)->Scale(2);
+  builder.AddColumn("money")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(6)->Scale(2);
+  builder.AddColumn("small")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(2);
+  builder.AddColumn("null")->Type(KuduColumnSchema::DECIMAL)
+      ->Precision(8);
+  KuduSchema schema;
+  ASSERT_OK(builder.Build(&schema));
+
+  // Create Table
+  gscoped_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+      .schema(&schema)
+      .num_replicas(kNumServers)
+      .add_hash_partitions({ "key" }, kNumTablets)
+      .Create());
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(kTableName, &table));
+
+  // Alter Default Value
+  gscoped_ptr<client::KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+  table_alterer->AlterColumn("alteredDefault")->Default(KuduValue::FromDecimal(456789, 2));
+  ASSERT_OK(table_alterer->Alter());
+
+  // Insert Row
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+  KuduInsert* insert = table->NewInsert();
+  KuduPartialRow* write = insert->mutable_row();
+  ASSERT_OK(write->SetUnscaledDecimal("key", 1));
+  ASSERT_OK(write->SetUnscaledDecimal("numeric32", 123456789));
+  ASSERT_OK(write->SetUnscaledDecimal("scale32", 123456789));
+  ASSERT_OK(write->SetUnscaledDecimal("numeric64", 12345678910111213L));
+  ASSERT_OK(write->SetUnscaledDecimal("scale64", 12345678910111213L));
+  ASSERT_OK(write->SetUnscaledDecimal("numeric128",
+      static_cast<int128_t>(12345678910111213L) * 1000000000000000000L));
+  ASSERT_OK(write->SetUnscaledDecimal("scale128",
+      static_cast<int128_t>(12345678910111213L) * 1000000000000000000L));
+  ASSERT_OK(write->SetUnscaledDecimal("money", 123400));
+  // Test a value thats too large
+  Status s = write->SetUnscaledDecimal("small", 999);
+  EXPECT_EQ("Invalid argument: value 999 out of range for decimal column 'small'",
+            s.ToString());
+  ASSERT_OK(session->Apply(insert));
+  ASSERT_OK(session->Flush());
+
+  // Read Rows
+  KuduScanner scanner(table.get());
+  ASSERT_OK(scanner.SetFaultTolerant());
+  ASSERT_OK(scanner.Open());
+  while (scanner.HasMoreRows()) {
+    KuduScanBatch batch;
+    CHECK_OK(scanner.NextBatch(&batch));
+    for (const KuduScanBatch::RowPtr& read : batch) {
+      // Verify the row
+      int128_t key;
+      ASSERT_OK(read.GetUnscaledDecimal("key", &key));
+      ASSERT_EQ("1", DecimalToString(key, kDefaultDecimalScale));
+      int128_t numeric32;
+      ASSERT_OK(read.GetUnscaledDecimal("numeric32", &numeric32));
+      ASSERT_EQ("123456789", DecimalToString(numeric32, kDefaultDecimalScale));
+      int128_t scale32;
+      ASSERT_OK(read.GetUnscaledDecimal("scale32", &scale32));
+      ASSERT_EQ("0.123456789",
+                DecimalToString(scale32, kMaxDecimal32Precision));
+      int128_t numeric64;
+      ASSERT_OK(read.GetUnscaledDecimal("numeric64", &numeric64));
+      ASSERT_EQ("12345678910111213", DecimalToString(numeric64, kDefaultDecimalScale));
+      int128_t scale64;
+      ASSERT_OK(read.GetUnscaledDecimal("scale64", &scale64));
+      ASSERT_EQ("0.012345678910111213",
+                DecimalToString(scale64, kMaxDecimal64Precision));
+      int128_t numeric128;
+      ASSERT_OK(read.GetUnscaledDecimal("numeric128", &numeric128));
+      ASSERT_EQ("12345678910111213000000000000000000",
+                DecimalToString(numeric128, kDefaultDecimalScale));
+      int128_t scale128;
+      ASSERT_OK(read.GetUnscaledDecimal("scale128", &scale128));
+      ASSERT_EQ("0.00012345678910111213000000000000000000",
+                DecimalToString(scale128, kMaxDecimal128Precision));
+      int128_t money;
+      ASSERT_OK(read.GetUnscaledDecimal("money", &money));
+      ASSERT_EQ("1234.00", DecimalToString(money, 2));
+      int128_t defaulted;
+      ASSERT_OK(read.GetUnscaledDecimal("default", &defaulted));
+      ASSERT_EQ("123.45", DecimalToString(defaulted, 2));
+      int128_t altered;
+      ASSERT_OK(read.GetUnscaledDecimal("alteredDefault", &altered));
+      ASSERT_EQ("4567.89", DecimalToString(altered, 2));
+      ASSERT_TRUE(read.IsNull("null"));
+      // Try to read a decimal as an integer.
+      int32_t int32;
+      Status s = read.GetInt32("numeric32", &int32);
+      EXPECT_EQ("Invalid argument: invalid type int32 provided for column "
+                "'numeric32' (expected decimal)", s.ToString());
+    }
+  }
+}
+
+} // namespace client
+} // namespace kudu