You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/02/13 22:28:18 UTC

[1/6] kudu git commit: [docs] Document how to recover from a majority failed tablet

Repository: kudu
Updated Branches:
  refs/heads/master 5d10a56f9 -> cbd34fa85


[docs] Document how to recover from a majority failed tablet

This adds some docs on how to recover when a tablet can no longer find
a majority due to the permanent failure of replicas.

I tested this procedure by failing tablets in various ways:
- deleting important bits like cmeta or tablet metadata
- deleting entire data dirs
- tombstoning 2/3 replicas (and disabling tombstoned voting)
and I was always able to recover using these instructions.

Change-Id: Ic6326f65d029a1cd75e487b16ce5be4baea2f215
Reviewed-on: http://gerrit.cloudera.org:8080/8402
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Will Berkeley <wd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/51218713
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/51218713
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/51218713

Branch: refs/heads/master
Commit: 51218713a1084c9e6d50e2a93bd79f81a4a9aea0
Parents: 5d10a56
Author: Will Berkeley <wd...@apache.org>
Authored: Thu Oct 26 15:15:46 2017 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Tue Feb 13 21:07:53 2018 +0000

----------------------------------------------------------------------
 docs/administration.adoc | 65 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 65 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/51218713/docs/administration.adoc
----------------------------------------------------------------------
diff --git a/docs/administration.adoc b/docs/administration.adoc
index becdebe..076fa99 100644
--- a/docs/administration.adoc
+++ b/docs/administration.adoc
@@ -840,3 +840,68 @@ leading to lower storage volume and reduced read parallelism. Since removing
 data directories is not currently supported in Kudu, the administrator should
 schedule a window to bring the node down for maintenance and
 <<rebuilding_kudu,rebuild the node>> at their convenience.
+
+[[tablet_majority_down_recovery]]
+=== Bringing a tablet that has lost a majority of replicas back online
+
+If a tablet has permanently lost a majority of its replicas, it cannot recover
+automatically and operator intervention is required. The steps below may cause
+recent edits to the tablet to be lost, potentially resulting in permanent data
+loss. Only attempt the procedure below if it is impossible to bring
+a majority back online.
+
+Suppose a tablet has lost a majority of its replicas. The first step in
+diagnosing and fixing the problem is to examine the tablet's state using ksck:
+
+[source,bash]
+----
+$ kudu cluster ksck --tablets=e822cab6c0584bc0858219d1539a17e6 master-00,master-01,master-02
+Connected to the Master
+Fetched info from all 5 Tablet Servers
+Tablet e822cab6c0584bc0858219d1539a17e6 of table 'my_table' is unavailable: 2 replica(s) not RUNNING
+  638a20403e3e4ae3b55d4d07d920e6de (tserver-00:7150): RUNNING
+  9a56fa85a38a4edc99c6229cba68aeaa (tserver-01:7150): bad state
+    State:       FAILED
+    Data state:  TABLET_DATA_READY
+    Last status: <failure message>
+  c311fef7708a4cf9bb11a3e4cbcaab8c (tserver-02:7150): bad state
+    State:       FAILED
+    Data state:  TABLET_DATA_READY
+    Last status: <failure message>
+----
+
+This output shows that, for tablet `e822cab6c0584bc0858219d1539a17e6`, the two
+tablet replicas on `tserver-01` and `tserver-02` failed. The remaining replica
+is not the leader, so the leader replica failed as well. This means the chance
+of data loss is higher since the remaining replica on `tserver-00` may have
+been lagging. In general, to accept the potential data loss and restore the
+tablet from the remaining replicas, divide the tablet replicas into two groups:
+
+1. Healthy replicas: Those in `RUNNING` state as reported by ksck
+2. Unhealthy replicas
+
+For example, in the above ksck output, the replica on tablet server `tserver-00`
+is healthy, while the replicas on `tserver-01` and `tserver-02` are unhealthy.
+On each tablet server with a healthy replica, alter the consensus configuration
+to remove unhealthy replicas. In the typical case of 1 out of 3 surviving
+replicas, there will be only one healthy replica, so the consensus configuration
+will be rewritten to include only the healthy replica.
+
+[source,bash]
+----
+$ kudu remote_replica unsafe_change_config tserver-00:7150 <tablet-id> <tserver-00-uuid>
+----
+
+where `<tablet-id>` is `e822cab6c0584bc0858219d1539a17e6` and
+`<tserver-00-uuid>` is the uuid of `tserver-00`,
+`638a20403e3e4ae3b55d4d07d920e6de`.
+
+Once the healthy replicas' consensus configurations have been forced to exclude
+the unhealthy replicas, the healthy replicas will be able to elect a leader.
+The tablet will become available for writes, though it will still be
+under-replicated. Shortly after the tablet becomes available, the leader master
+will notice that it is under-replicated, and will cause the tablet to
+re-replicate until the proper replication factor is restored. The unhealthy
+replicas will be tombstoned by the master, causing their remaining data to be
+deleted.
+


[3/6] kudu git commit: Add back KuduColumnSchema DataTypeToString

Posted by gr...@apache.org.
Add back KuduColumnSchema DataTypeToString

KuduColumnSchema’s DataTypeToString method
was accidentally removed in KUDU-721.

This method/functionality should be added back
to prevent a compatibility break. However, it’s
implimentation has been updated to print based
on the public DataType instead of converting to the
internal type and printing that.

Change-Id: I2bf6879f5ab623366a962c6a3d51efc49978a53c
Reviewed-on: http://gerrit.cloudera.org:8080/9240
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/118e8f34
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/118e8f34
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/118e8f34

Branch: refs/heads/master
Commit: 118e8f34c6d72c049e422df827f53341687dd275
Parents: c9e3bd8
Author: Grant Henke <gr...@gmail.com>
Authored: Wed Feb 7 12:03:32 2018 -0600
Committer: Grant Henke <gr...@gmail.com>
Committed: Tue Feb 13 21:52:14 2018 +0000

----------------------------------------------------------------------
 src/kudu/client/schema.cc | 28 ++++++++++++++++++++++++++++
 src/kudu/client/schema.h  |  5 +++++
 2 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/118e8f34/src/kudu/client/schema.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema.cc b/src/kudu/client/schema.cc
index c8e5323..8ac73e9 100644
--- a/src/kudu/client/schema.cc
+++ b/src/kudu/client/schema.cc
@@ -558,6 +558,34 @@ Status KuduSchemaBuilder::Build(KuduSchema* schema) {
 // KuduColumnSchema
 ////////////////////////////////////////////////////////////
 
+std::string KuduColumnSchema::DataTypeToString(DataType type) {
+  switch (type) {
+    case INT8:
+      return "INT8";
+    case INT16:
+      return "INT16";
+    case INT32:
+      return "INT32";
+    case INT64:
+      return "INT64";
+    case STRING:
+      return "STRING";
+    case BOOL:
+      return "BOOL";
+    case FLOAT:
+      return "FLOAT";
+    case DOUBLE:
+      return "DOUBLE";
+    case BINARY:
+      return "BINARY";
+    case UNIXTIME_MICROS:
+      return "UNIXTIME_MICROS";
+    case DECIMAL:
+      return "DECIMAL";
+  }
+  LOG(FATAL) << "Unhandled type " << type;
+}
+
 KuduColumnSchema::KuduColumnSchema(const std::string &name,
                                    DataType type,
                                    bool is_nullable,

http://git-wip-us.apache.org/repos/asf/kudu/blob/118e8f34/src/kudu/client/schema.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/schema.h b/src/kudu/client/schema.h
index d6084ca..9163076 100644
--- a/src/kudu/client/schema.h
+++ b/src/kudu/client/schema.h
@@ -189,6 +189,11 @@ class KUDU_EXPORT KuduColumnSchema {
     TIMESTAMP = UNIXTIME_MICROS //!< deprecated, use UNIXTIME_MICROS
   };
 
+  /// @param [in] type
+  ///   Column data type.
+  /// @return String representation of the column data type.
+  static std::string DataTypeToString(DataType type);
+
   /// Construct KuduColumnSchema object as a copy of another object.
   ///
   /// @param [in] other


[4/6] kudu git commit: KUDU-721: [Java] Add DECIMAL column type support

Posted by gr...@apache.org.
http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 1081cc6..f655222 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.Closeable;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -47,9 +48,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.util.CapturingLogAppender;
+import org.apache.kudu.util.DecimalUtil;
 
 public class TestKuduClient extends BaseKuduTest {
   private static final Logger LOG = LoggerFactory.getLogger(TestKuduClient.class);
@@ -86,6 +89,21 @@ public class TestKuduClient extends BaseKuduTest {
     return new Schema(columns);
   }
 
+  private Schema createSchemaWithDecimalColumns() {
+    ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>();
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.DECIMAL).key(true)
+        .typeAttributes(
+            new ColumnTypeAttributesBuilder()
+                .precision(org.apache.kudu.util.DecimalUtil.MAX_DECIMAL64_PRECISION).build()
+        ).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.DECIMAL).nullable(true)
+        .typeAttributes(
+            new ColumnTypeAttributesBuilder()
+                .precision(org.apache.kudu.util.DecimalUtil.MAX_DECIMAL128_PRECISION).build()
+        ).build());
+    return new Schema(columns);
+  }
+
   private static CreateTableOptions createTableOptions() {
     return new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"));
   }
@@ -426,6 +444,48 @@ public class TestKuduClient extends BaseKuduTest {
   }
 
   /**
+   * Test inserting and retrieving decimal columns.
+   */
+  @Test(timeout = 100000)
+  public void testDecimalColumns() throws Exception {
+    Schema schema = createSchemaWithDecimalColumns();
+    syncClient.createTable(tableName, schema, createTableOptions());
+
+    List<Long> timestamps = new ArrayList<>();
+
+    KuduSession session = syncClient.newSession();
+    KuduTable table = syncClient.openTable(tableName);
+
+    // Verify ColumnTypeAttributes
+    assertEquals(DecimalUtil.MAX_DECIMAL128_PRECISION,
+        table.getSchema().getColumn("c1").getTypeAttributes().getPrecision());
+
+    for (int i = 0; i < 9; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addDecimal("key", BigDecimal.valueOf(i));
+      if (i % 2 == 1) {
+        row.addDecimal("c1", BigDecimal.valueOf(i));
+      }
+      session.apply(insert);
+    }
+    session.flush();
+
+    List<String> rowStrings = scanTableToStrings(table);
+    assertEquals(9, rowStrings.size());
+    for (int i = 0; i < rowStrings.size(); i++) {
+      StringBuilder expectedRow = new StringBuilder();
+      expectedRow.append(String.format("DECIMAL key(18, 0)=%s, DECIMAL c1(38, 0)=", String.valueOf(i)));
+      if (i % 2 == 1) {
+        expectedRow.append(String.valueOf(i));
+      } else {
+        expectedRow.append("NULL");
+      }
+      assertEquals(expectedRow.toString(), rowStrings.get(i));
+    }
+  }
+
+  /**
    * Test scanning with predicates.
    */
   @Test

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
index d192be8..e7c40c0 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduPredicate.java
@@ -24,6 +24,7 @@ import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS;
 import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS_EQUAL;
 import static org.apache.kudu.client.KuduPredicate.PredicateType.RANGE;
 
+import java.math.BigDecimal;
 import java.util.Arrays;
 
 import com.google.common.base.Preconditions;
@@ -33,6 +34,7 @@ import org.junit.Test;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Type;
+import org.apache.kudu.util.DecimalUtil;
 
 public class TestKuduPredicate {
 
@@ -63,6 +65,21 @@ public class TestKuduPredicate {
   private static final ColumnSchema binaryCol =
       new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build();
 
+  private static final ColumnSchema decimal32Col =
+      new ColumnSchema.ColumnSchemaBuilder("decimal32", Type.DECIMAL)
+          .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL32_PRECISION, 2))
+          .build();
+
+  private static final ColumnSchema decimal64Col =
+      new ColumnSchema.ColumnSchemaBuilder("decimal64", Type.DECIMAL)
+          .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 2))
+          .build();
+
+  private static final ColumnSchema decimal128Col =
+      new ColumnSchema.ColumnSchemaBuilder("decimal128", Type.DECIMAL)
+          .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL128_PRECISION, 2))
+          .build();
+
   private static KuduPredicate intRange(int lower, int upper) {
     Preconditions.checkArgument(lower < upper);
     return new KuduPredicate(RANGE, intCol, Bytes.fromInt(lower), Bytes.fromInt(upper));
@@ -844,6 +861,66 @@ public class TestKuduPredicate {
               KuduPredicate.newInListPredicate(doubleCol, ImmutableList.of(14d, 18d, 20d)),
               KuduPredicate.newInListPredicate(doubleCol, ImmutableList.of(14d, 18d)));
 
+    testMerge(KuduPredicate.newComparisonPredicate(decimal32Col, GREATER_EQUAL,
+        BigDecimal.valueOf(12345, 2)),
+        KuduPredicate.newComparisonPredicate(decimal32Col, LESS,
+            BigDecimal.valueOf(67890,2)),
+        new KuduPredicate(RANGE,
+            decimal32Col,
+            Bytes.fromBigDecimal(BigDecimal.valueOf(12345, 2),
+                DecimalUtil.MAX_DECIMAL32_PRECISION),
+            Bytes.fromBigDecimal(BigDecimal.valueOf(67890, 2),
+                DecimalUtil.MAX_DECIMAL32_PRECISION)));
+
+    testMerge(KuduPredicate.newInListPredicate(decimal32Col, ImmutableList.of(
+            BigDecimal.valueOf(12345, 2),
+            BigDecimal.valueOf(45678, 2))),
+        KuduPredicate.newInListPredicate(decimal32Col, ImmutableList.of(
+            BigDecimal.valueOf(45678, 2),
+            BigDecimal.valueOf(98765, 2))),
+        KuduPredicate.newInListPredicate(decimal32Col, ImmutableList.of(
+            BigDecimal.valueOf(45678, 2))));
+
+    testMerge(KuduPredicate.newInListPredicate(decimal64Col, ImmutableList.of(
+        BigDecimal.valueOf(12345678910L, 2),
+        BigDecimal.valueOf(34567891011L, 2))),
+        KuduPredicate.newInListPredicate(decimal64Col, ImmutableList.of(
+            BigDecimal.valueOf(34567891011L, 2),
+            BigDecimal.valueOf(98765432111L, 2))),
+        KuduPredicate.newInListPredicate(decimal64Col, ImmutableList.of(
+            BigDecimal.valueOf(34567891011L, 2))));
+
+    testMerge(KuduPredicate.newComparisonPredicate(decimal64Col, GREATER_EQUAL,
+        BigDecimal.valueOf(12345678910L, 2)),
+        KuduPredicate.newComparisonPredicate(decimal64Col, LESS,
+            BigDecimal.valueOf(67890101112L,2)),
+        new KuduPredicate(RANGE,
+            decimal64Col,
+            Bytes.fromBigDecimal(BigDecimal.valueOf(12345678910L, 2),
+                DecimalUtil.MAX_DECIMAL64_PRECISION),
+            Bytes.fromBigDecimal(BigDecimal.valueOf(67890101112L, 2),
+                DecimalUtil.MAX_DECIMAL64_PRECISION)));
+
+    testMerge(KuduPredicate.newInListPredicate(decimal128Col, ImmutableList.of(
+        new BigDecimal("1234567891011121314.15"),
+        new BigDecimal("3456789101112131415.16"))),
+        KuduPredicate.newInListPredicate(decimal128Col, ImmutableList.of(
+            new BigDecimal("3456789101112131415.16"),
+            new BigDecimal("9876543212345678910.11"))),
+        KuduPredicate.newInListPredicate(decimal128Col, ImmutableList.of(
+            new BigDecimal("3456789101112131415.16"))));
+
+    testMerge(KuduPredicate.newComparisonPredicate(decimal128Col, GREATER_EQUAL,
+        new BigDecimal("1234567891011121314.15")),
+        KuduPredicate.newComparisonPredicate(decimal128Col, LESS,
+            new BigDecimal("67891011121314151617.18")),
+        new KuduPredicate(RANGE,
+            decimal128Col,
+            Bytes.fromBigDecimal(new BigDecimal("1234567891011121314.15"),
+                DecimalUtil.MAX_DECIMAL128_PRECISION),
+            Bytes.fromBigDecimal(new BigDecimal("67891011121314151617.18"),
+                DecimalUtil.MAX_DECIMAL128_PRECISION)));
+
     testMerge(KuduPredicate.newComparisonPredicate(binaryCol, GREATER_EQUAL,
                                                    new byte[] { 0, 1, 2, 3, 4, 5, 6 }),
               KuduPredicate.newComparisonPredicate(binaryCol, LESS, new byte[] { 10 }),
@@ -871,11 +948,13 @@ public class TestKuduPredicate {
                         KuduPredicate.newComparisonPredicate(floatCol, LESS, Math.nextAfter(12.345f, Float.POSITIVE_INFINITY)));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(doubleCol, LESS_EQUAL, 12.345),
                         KuduPredicate.newComparisonPredicate(doubleCol, LESS, Math.nextAfter(12.345, Float.POSITIVE_INFINITY)));
+    Assert.assertEquals(
+        KuduPredicate.newComparisonPredicate(decimal32Col, LESS_EQUAL, BigDecimal.valueOf(12345,2)),
+        KuduPredicate.newComparisonPredicate(decimal32Col, LESS, BigDecimal.valueOf(12346,2)));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(stringCol, LESS_EQUAL, "a"),
                         KuduPredicate.newComparisonPredicate(stringCol, LESS, "a\0"));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, LESS_EQUAL, new byte[] { (byte) 10 }),
                         KuduPredicate.newComparisonPredicate(binaryCol, LESS, new byte[] { (byte) 10, (byte) 0 }));
-
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(byteCol, LESS_EQUAL, Byte.MAX_VALUE),
                         KuduPredicate.newIsNotNullPredicate(byteCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(shortCol, LESS_EQUAL, Short.MAX_VALUE),
@@ -908,6 +987,9 @@ public class TestKuduPredicate {
                         KuduPredicate.newComparisonPredicate(floatCol, GREATER, 12.345f));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(doubleCol, GREATER_EQUAL, Math.nextAfter(12.345, Float.MAX_VALUE)),
                         KuduPredicate.newComparisonPredicate(doubleCol, GREATER, 12.345));
+    Assert.assertEquals(
+        KuduPredicate.newComparisonPredicate(decimal32Col, GREATER_EQUAL, BigDecimal.valueOf(12346, 2)),
+        KuduPredicate.newComparisonPredicate(decimal32Col, GREATER, BigDecimal.valueOf(12345, 2)));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(stringCol, GREATER_EQUAL, "a\0"),
                         KuduPredicate.newComparisonPredicate(stringCol, GREATER, "a"));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, GREATER_EQUAL, new byte[] { (byte) 10, (byte) 0 }),
@@ -945,6 +1027,15 @@ public class TestKuduPredicate {
                         KuduPredicate.none(floatCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(doubleCol, LESS, Double.NEGATIVE_INFINITY),
                         KuduPredicate.none(doubleCol));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(decimal32Col, LESS,
+        DecimalUtil.minValue(DecimalUtil.MAX_DECIMAL32_PRECISION, 2)),
+        KuduPredicate.none(decimal32Col));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(decimal64Col, LESS,
+        DecimalUtil.minValue(DecimalUtil.MAX_DECIMAL64_PRECISION, 2)),
+        KuduPredicate.none(decimal64Col));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(decimal128Col, LESS,
+        DecimalUtil.minValue(DecimalUtil.MAX_DECIMAL128_PRECISION, 2)),
+        KuduPredicate.none(decimal128Col));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(stringCol, LESS, ""),
                         KuduPredicate.none(stringCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, LESS, new byte[] {}),
@@ -965,6 +1056,15 @@ public class TestKuduPredicate {
                         KuduPredicate.newIsNotNullPredicate(floatCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(doubleCol, GREATER_EQUAL, Double.NEGATIVE_INFINITY),
                         KuduPredicate.newIsNotNullPredicate(doubleCol));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(decimal32Col, GREATER_EQUAL,
+        DecimalUtil.minValue(DecimalUtil.MAX_DECIMAL32_PRECISION, 2)),
+        KuduPredicate.newIsNotNullPredicate(decimal32Col));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(decimal64Col, GREATER_EQUAL,
+        DecimalUtil.minValue(DecimalUtil.MAX_DECIMAL64_PRECISION, 2)),
+        KuduPredicate.newIsNotNullPredicate(decimal64Col));
+    Assert.assertEquals(KuduPredicate.newComparisonPredicate(decimal128Col, GREATER_EQUAL,
+        DecimalUtil.minValue(DecimalUtil.MAX_DECIMAL128_PRECISION, 2)),
+        KuduPredicate.newIsNotNullPredicate(decimal128Col));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(stringCol, GREATER_EQUAL, ""),
                         KuduPredicate.newIsNotNullPredicate(stringCol));
     Assert.assertEquals(KuduPredicate.newComparisonPredicate(binaryCol, GREATER_EQUAL, new byte[] {}),
@@ -1000,6 +1100,15 @@ public class TestKuduPredicate {
                         KuduPredicate.newComparisonPredicate(floatCol, EQUAL, 123.456f).toString());
     Assert.assertEquals("`double` = 123.456",
                         KuduPredicate.newComparisonPredicate(doubleCol, EQUAL, 123.456).toString());
+    Assert.assertEquals("`decimal32` = 123.45",
+        KuduPredicate.newComparisonPredicate(decimal32Col, EQUAL,
+            BigDecimal.valueOf(12345, 2)).toString());
+    Assert.assertEquals("`decimal64` = 123456789.10",
+        KuduPredicate.newComparisonPredicate(decimal64Col, EQUAL,
+            BigDecimal.valueOf(12345678910L, 2)).toString());
+    Assert.assertEquals("`decimal128` = 1234567891011121314.15",
+        KuduPredicate.newComparisonPredicate(decimal128Col, EQUAL,
+            new BigDecimal("1234567891011121314.15")).toString());
     Assert.assertEquals("`string` = \"my string\"",
                         KuduPredicate.newComparisonPredicate(stringCol, EQUAL, "my string").toString());
     Assert.assertEquals("`binary` = 0xAB01CD", KuduPredicate.newComparisonPredicate(

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
index b4dd756..44e3c45 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.junit.Test;
@@ -51,6 +52,7 @@ public class TestPartialRow {
     assertEquals(ByteBuffer.wrap(new byte[] { 5, 6, 7, 8, 9 }), partialRow.getBinary("binary-bytebuffer"));
     assertTrue(partialRow.isSet("null"));
     assertTrue(partialRow.isNull("null"));
+    assertEquals(BigDecimal.valueOf(12345, 3), partialRow.getDecimal("decimal"));
   }
 
   @Test(expected = IllegalArgumentException.class)
@@ -199,6 +201,32 @@ public class TestPartialRow {
     partialRow.isSet(999);
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testAddInvalidPrecisionDecimal() {
+    PartialRow partialRow = getPartialRowWithAllTypes();
+    partialRow.addDecimal("decimal", BigDecimal.valueOf(123456, 3));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testAddInvalidScaleDecimal() {
+    PartialRow partialRow = getPartialRowWithAllTypes();
+    partialRow.addDecimal("decimal", BigDecimal.valueOf(12345, 4));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testAddInvalidCoercedScaleDecimal() {
+    PartialRow partialRow = getPartialRowWithAllTypes();
+    partialRow.addDecimal("decimal", BigDecimal.valueOf(12345, 2));
+  }
+
+  @Test
+  public void testAddCoercedScaleAndPrecisionDecimal() {
+    PartialRow partialRow = getPartialRowWithAllTypes();
+    partialRow.addDecimal("decimal", BigDecimal.valueOf(222, 1));
+    BigDecimal decimal = partialRow.getDecimal("decimal");
+    assertEquals("22.200", decimal.toString());
+  }
+
   @Test
   public void testToString() {
     Schema schema = BaseKuduTest.getSchemaWithAllTypes();
@@ -228,12 +256,96 @@ public class TestPartialRow {
     assertEquals("(int8 int8=42, int32 int32=42, double double=52.35, " +
                      "string string=\"fun with ütf\\0\", binary binary-bytebuffer=[2, 3, 4])",
                  row.toString());
+
+    row.addDecimal("decimal", BigDecimal.valueOf(12345, 3));
+    assertEquals("(int8 int8=42, int32 int32=42, double double=52.35, " +
+            "string string=\"fun with ütf\\0\", binary binary-bytebuffer=[2, 3, 4], " +
+            "decimal(5, 3) decimal=12.345)",
+        row.toString());
+  }
+
+  @Test
+  public void testIncrementColumn() {
+    PartialRow partialRow = getPartialRowWithAllTypes();
+
+    // Boolean
+    int boolIndex = getColumnIndex(partialRow, "bool");
+    partialRow.addBoolean(boolIndex, false);
+    assertTrue(partialRow.incrementColumn(boolIndex));
+    assertEquals(true, partialRow.getBoolean(boolIndex));
+    assertFalse(partialRow.incrementColumn(boolIndex));
+
+    // Int8
+    int int8Index = getColumnIndex(partialRow, "int8");
+    partialRow.addByte(int8Index, (byte)(Byte.MAX_VALUE - 1));
+    assertTrue(partialRow.incrementColumn(int8Index));
+    assertEquals(Byte.MAX_VALUE, partialRow.getByte(int8Index));
+    assertFalse(partialRow.incrementColumn(int8Index));
+
+    // Int16
+    int int16Index = getColumnIndex(partialRow, "int16");
+    partialRow.addShort(int16Index, (short)(Short.MAX_VALUE - 1));
+    assertTrue(partialRow.incrementColumn(int16Index));
+    assertEquals(Short.MAX_VALUE, partialRow.getShort(int16Index));
+    assertFalse(partialRow.incrementColumn(int16Index));
+
+    // Int32
+    int int32Index = getColumnIndex(partialRow, "int32");
+    partialRow.addInt(int32Index, Integer.MAX_VALUE - 1);
+    assertTrue(partialRow.incrementColumn(int32Index));
+    assertEquals(Integer.MAX_VALUE, partialRow.getInt(int32Index));
+    assertFalse(partialRow.incrementColumn(int32Index));
+
+    // Int64
+    int int64Index = getColumnIndex(partialRow, "int64");
+    partialRow.addLong(int64Index, Long.MAX_VALUE - 1);
+    assertTrue(partialRow.incrementColumn(int64Index));
+    assertEquals(Long.MAX_VALUE, partialRow.getLong(int64Index));
+    assertFalse(partialRow.incrementColumn(int64Index));
+
+    // Float
+    int floatIndex = getColumnIndex(partialRow, "float");
+    partialRow.addFloat(floatIndex, Float.MAX_VALUE);
+    assertTrue(partialRow.incrementColumn(floatIndex));
+    assertEquals(Float.POSITIVE_INFINITY, partialRow.getFloat(floatIndex), 0.0f);
+    assertFalse(partialRow.incrementColumn(floatIndex));
+
+    // Float
+    int doubleIndex = getColumnIndex(partialRow, "double");
+    partialRow.addDouble(doubleIndex, Double.MAX_VALUE);
+    assertTrue(partialRow.incrementColumn(doubleIndex));
+    assertEquals(Double.POSITIVE_INFINITY, partialRow.getDouble(doubleIndex), 0.0);
+    assertFalse(partialRow.incrementColumn(doubleIndex));
+
+    // Decimal
+    int decimalIndex = getColumnIndex(partialRow, "decimal");
+    // Decimal with precision 5, scale 3 has a max of 99.999
+    partialRow.addDecimal(decimalIndex, new BigDecimal("99.998"));
+    assertTrue(partialRow.incrementColumn(decimalIndex));
+    assertEquals(new BigDecimal("99.999"), partialRow.getDecimal(decimalIndex));
+    assertFalse(partialRow.incrementColumn(decimalIndex));
+
+    // String
+    int stringIndex = getColumnIndex(partialRow, "string");
+    partialRow.addString(stringIndex, "hello");
+    assertTrue(partialRow.incrementColumn(stringIndex));
+    assertEquals("hello\0", partialRow.getString(stringIndex));
+
+    // Binary
+    int binaryIndex = getColumnIndex(partialRow, "binary-array");
+    partialRow.addBinary(binaryIndex, new byte[] { 0, 1, 2, 3, 4 });
+    assertTrue(partialRow.incrementColumn(binaryIndex));
+    assertArrayEquals(new byte[] { 0, 1, 2, 3, 4, 0 }, partialRow.getBinaryCopy(binaryIndex));
+  }
+
+  private int getColumnIndex(PartialRow partialRow, String columnName) {
+    return partialRow.getSchema().getColumnIndex(columnName);
   }
 
   private PartialRow getPartialRowWithAllTypes() {
     Schema schema = BaseKuduTest.getSchemaWithAllTypes();
     // Ensure we aren't missing any types
-    assertEquals(12, schema.getColumnCount());
+    assertEquals(13, schema.getColumnCount());
 
     PartialRow row = schema.newPartialRow();
     row.addByte("int8", (byte) 42);
@@ -249,6 +361,7 @@ public class TestPartialRow {
     ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] { 5, 6, 7, 8, 9 });
     row.addBinary("binary-bytebuffer", binaryBuffer);
     row.setNull("null");
+    row.addDecimal("decimal", BigDecimal.valueOf(12345, 3));
     return row;
   }
 
@@ -270,6 +383,7 @@ public class TestPartialRow {
       case FLOAT: return partialRow.getFloat(columnName);
       case DOUBLE: return partialRow.getDouble(columnName);
       case BOOL: return partialRow.getBoolean(columnName);
+      case DECIMAL: return partialRow.getDecimal(columnName);
       default:
         throw new UnsupportedOperationException();
     }
@@ -287,6 +401,7 @@ public class TestPartialRow {
       case FLOAT: return partialRow.getFloat(columnIndex);
       case DOUBLE: return partialRow.getDouble(columnIndex);
       case BOOL: return partialRow.getBoolean(columnIndex);
+      case DECIMAL: return partialRow.getDecimal(columnIndex);
       default:
         throw new UnsupportedOperationException();
     }
@@ -304,6 +419,7 @@ public class TestPartialRow {
       case FLOAT: partialRow.addFloat(columnName, 52.35F); break;
       case DOUBLE: partialRow.addDouble(columnName, 53.35); break;
       case BOOL: partialRow.addBoolean(columnName, true); break;
+      case DECIMAL: partialRow.addDecimal(columnName, BigDecimal.valueOf(12345, 3)); break;
       default:
         throw new UnsupportedOperationException();
     }
@@ -321,6 +437,7 @@ public class TestPartialRow {
       case FLOAT: partialRow.addFloat(columnIndex, 52.35F); break;
       case DOUBLE: partialRow.addDouble(columnIndex, 53.35); break;
       case BOOL: partialRow.addBoolean(columnIndex, true); break;
+      case DECIMAL: partialRow.addDecimal(columnIndex, BigDecimal.valueOf(12345, 3)); break;
       default:
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
index 223ddc4..5196eda 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.junit.BeforeClass;
@@ -61,6 +62,7 @@ public class TestRowResult extends BaseKuduTest {
     row.addBinary(9, bb);
     row.setNull(10);
     row.addLong(11, 11l);
+    row.addDecimal(12, BigDecimal.valueOf(12345, 3));
 
     KuduSession session = syncClient.newSession();
     session.apply(insert);
@@ -113,6 +115,9 @@ public class TestRowResult extends BaseKuduTest {
       assertEquals(11, rr.getLong(11));
       assertEquals(11, rr.getLong(allTypesSchema.getColumnByIndex(11).getName()));
 
+      assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(12));
+      assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(allTypesSchema.getColumnByIndex(12).getName()));
+
       // We test with the column name once since it's the same method for all types, unlike above.
       assertEquals(Type.INT8, rr.getColumnType(allTypesSchema.getColumnByIndex(0).getName()));
       assertEquals(Type.INT8, rr.getColumnType(0));
@@ -125,6 +130,7 @@ public class TestRowResult extends BaseKuduTest {
       assertEquals(Type.STRING, rr.getColumnType(7));
       assertEquals(Type.BINARY, rr.getColumnType(8));
       assertEquals(Type.UNIXTIME_MICROS, rr.getColumnType(11));
+      assertEquals(Type.DECIMAL, rr.getColumnType(12));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
index d89800c..7682d6d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanPredicate.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
@@ -31,6 +32,7 @@ import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.KuduPredicate.ComparisonOp;
+import org.apache.kudu.util.DecimalUtil;
 
 public class TestScanPredicate extends BaseKuduTest {
 
@@ -170,6 +172,36 @@ public class TestScanPredicate extends BaseKuduTest {
     );
   }
 
+  // Returns a vector of decimal(4, 2) numbers from -50.50 (inclusive) to 50.50
+  // (exclusive) (100 values) and boundary values.
+  private NavigableSet<BigDecimal> createDecimalValues() {
+    NavigableSet<BigDecimal> values = new TreeSet<>();
+    for (long i = -50; i < 50; i++) {
+      values.add(BigDecimal.valueOf(i * 100 + i, 2));
+    }
+
+    values.add(BigDecimal.valueOf(-9999, 2));
+    values.add(BigDecimal.valueOf(-9998, 2));
+    values.add(BigDecimal.valueOf(9998, 2));
+    values.add(BigDecimal.valueOf(9999, 2));
+
+    return values;
+  }
+
+  private List<BigDecimal> createDecimalTestValues() {
+    return ImmutableList.of(
+        BigDecimal.valueOf(-9999, 2),
+        BigDecimal.valueOf(-9998, 2),
+        BigDecimal.valueOf(5100, 2),
+        BigDecimal.valueOf(-5000, 2),
+        BigDecimal.valueOf(0, 2),
+        BigDecimal.valueOf(4900, 2),
+        BigDecimal.valueOf(5000, 2),
+        BigDecimal.valueOf(9998, 2),
+        BigDecimal.valueOf(9999, 2)
+    );
+  }
+
   private NavigableSet<String> createStringValues() {
     return ImmutableSortedSet.of("", "\0", "\0\0", "a", "a\0", "a\0a", "aa\0");
   }
@@ -520,6 +552,69 @@ public class TestScanPredicate extends BaseKuduTest {
   }
 
   @Test
+  public void testDecimalPredicates() throws Exception {
+    ColumnSchema key = new ColumnSchema.ColumnSchemaBuilder("key", Type.INT64).key(true).build();
+    ColumnSchema val = new ColumnSchema.ColumnSchemaBuilder("value", Type.DECIMAL)
+        .typeAttributes(DecimalUtil.typeAttributes(4, 2)).nullable(true).build();
+    Schema schema = new Schema(ImmutableList.of(key, val));
+
+    syncClient.createTable("decimal-table", schema, createTableOptions());
+    KuduTable table = syncClient.openTable("decimal-table");
+
+    NavigableSet<BigDecimal> values = createDecimalValues();
+    List<BigDecimal> testValues = createDecimalTestValues();
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    long i = 0;
+    for (BigDecimal value : values) {
+      Insert insert = table.newInsert();
+      insert.getRow().addLong("key", i++);
+      insert.getRow().addDecimal("value", value);
+      session.apply(insert);
+    }
+    Insert nullInsert = table.newInsert();
+    nullInsert.getRow().addLong("key", i++);
+    nullInsert.getRow().setNull("value");
+    session.apply(nullInsert);
+    session.flush();
+
+    ColumnSchema col = table.getSchema().getColumn("value");
+    Assert.assertEquals(values.size() + 1, countRows(table));
+
+    for (BigDecimal v : testValues) {
+      // value = v
+      KuduPredicate equal = KuduPredicate.newComparisonPredicate(col, ComparisonOp.EQUAL, v);
+      Assert.assertEquals(values.subSet(v, true, v, true).size(), countRows(table, equal));
+
+      // value >= v
+      KuduPredicate greaterEqual =
+          KuduPredicate.newComparisonPredicate(col, ComparisonOp.GREATER_EQUAL, v);
+      Assert.assertEquals(values.tailSet(v).size(), countRows(table, greaterEqual));
+
+      // value <= v
+      KuduPredicate lessEqual =
+          KuduPredicate.newComparisonPredicate(col, ComparisonOp.LESS_EQUAL, v);
+      Assert.assertEquals(values.headSet(v, true).size(), countRows(table, lessEqual));
+
+      // value > v
+      KuduPredicate greater =
+          KuduPredicate.newComparisonPredicate(col, ComparisonOp.GREATER, v);
+      Assert.assertEquals(values.tailSet(v, false).size(), countRows(table, greater));
+
+      // value < v
+      KuduPredicate less =
+          KuduPredicate.newComparisonPredicate(col, ComparisonOp.LESS, v);
+      Assert.assertEquals(values.headSet(v).size(), countRows(table, less));
+    }
+
+    KuduPredicate isNotNull = KuduPredicate.newIsNotNullPredicate(col);
+    Assert.assertEquals(values.size(), countRows(table, isNotNull));
+
+    KuduPredicate isNull = KuduPredicate.newIsNullPredicate(col);
+    Assert.assertEquals(1, countRows(table, isNull));
+  }
+
+  @Test
   public void testStringPredicates() throws Exception {
     Schema schema = createTableSchema(Type.STRING);
     syncClient.createTable("string-table", schema, createTableOptions());


[6/6] kudu git commit: KUDU-721: [Spark] Add DECIMAL type support

Posted by gr...@apache.org.
KUDU-721: [Spark] Add DECIMAL type support

Adds DECIMAL support to the Kudu Spark source and RDD.

Change-Id: Ia5f7a801778ed81b68949bbf8d7c08d1a13ed840
Reviewed-on: http://gerrit.cloudera.org:8080/9213
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cbd34fa8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cbd34fa8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cbd34fa8

Branch: refs/heads/master
Commit: cbd34fa850036b4e22e3ff5be92238677e99adb3
Parents: 4f34b69
Author: Grant Henke <gr...@gmail.com>
Authored: Sun Feb 4 21:40:39 2018 -0600
Committer: Grant Henke <gr...@gmail.com>
Committed: Tue Feb 13 22:20:22 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 10 ++--
 .../apache/kudu/spark/kudu/KuduContext.scala    | 54 ++++++++++++++------
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    |  1 +
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 15 +++++-
 .../kudu/spark/kudu/KuduContextTest.scala       |  7 ++-
 .../apache/kudu/spark/kudu/TestContext.scala    | 22 +++++++-
 6 files changed, 84 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd34fa8/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index c0b1b0a..7987bf8 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.spark.kudu
 
+import java.math.BigDecimal
 import java.net.InetAddress
 import java.sql.Timestamp
 
@@ -31,7 +32,7 @@ import org.apache.yetus.audience.InterfaceStability
 
 import org.apache.kudu.client.KuduPredicate.ComparisonOp
 import org.apache.kudu.client._
-import org.apache.kudu.{ColumnSchema, Type}
+import org.apache.kudu.{ColumnSchema, ColumnTypeAttributes, Type}
 
 /**
   * Data source for integration with Spark's [[DataFrame]] API.
@@ -183,7 +184,7 @@ class KuduRelation(private val tableName: String,
 
   def kuduColumnToSparkField: (ColumnSchema) => StructField = {
     columnSchema =>
-      val sparkType = kuduTypeToSparkType(columnSchema.getType)
+      val sparkType = kuduTypeToSparkType(columnSchema.getType, columnSchema.getTypeAttributes)
       new StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
   }
 
@@ -271,6 +272,7 @@ class KuduRelation(private val tableName: String,
       case value: Double => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
       case value: String => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
       case value: Array[Byte] => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: BigDecimal => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
     }
   }
 
@@ -327,9 +329,10 @@ private[spark] object KuduRelation {
     * Converts a Kudu [[Type]] to a Spark SQL [[DataType]].
     *
     * @param t the Kudu type
+    * @param a the Kudu type attributes
     * @return the corresponding Spark SQL type
     */
-  private def kuduTypeToSparkType(t: Type): DataType = t match {
+  private def kuduTypeToSparkType(t: Type, a: ColumnTypeAttributes): DataType = t match {
     case Type.BOOL => BooleanType
     case Type.INT8 => ByteType
     case Type.INT16 => ShortType
@@ -340,6 +343,7 @@ private[spark] object KuduRelation {
     case Type.DOUBLE => DoubleType
     case Type.STRING => StringType
     case Type.BINARY => BinaryType
+    case Type.DECIMAL => DecimalType(a.getPrecision, a.getScale)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd34fa8/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index a981f5e..ece4df0 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -26,9 +26,10 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, DataTypes, StructType}
+import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType}
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.util.AccumulatorV2
 import org.apache.yetus.audience.InterfaceStability
@@ -161,17 +162,32 @@ class KuduContext(val kuduMaster: String,
     val kuduCols = new util.ArrayList[ColumnSchema]()
     // add the key columns first, in the order specified
     for (key <- keys) {
-      val f = schema.fields(schema.fieldIndex(key))
-      kuduCols.add(new ColumnSchema.ColumnSchemaBuilder(f.name, kuduType(f.dataType)).key(true).build())
+      val field = schema.fields(schema.fieldIndex(key))
+      val col = createColumn(field, isKey = true)
+      kuduCols.add(col)
     }
     // now add the non-key columns
-    for (f <- schema.fields.filter(field=> !keys.contains(field.name))) {
-      kuduCols.add(new ColumnSchema.ColumnSchemaBuilder(f.name, kuduType(f.dataType)).nullable(f.nullable).key(false).build())
+    for (field <- schema.fields.filter(field => !keys.contains(field.name))) {
+      val col = createColumn(field, isKey = false)
+      kuduCols.add(col)
     }
 
     syncClient.createTable(tableName, new Schema(kuduCols), options)
   }
 
+  private def createColumn(field: StructField, isKey: Boolean): ColumnSchema = {
+    val kt = kuduType(field.dataType)
+    val col = new ColumnSchema.ColumnSchemaBuilder(field.name, kt).key(isKey).nullable(field.nullable)
+    // Add ColumnTypeAttributesBuilder to DECIMAL columns
+    if (kt == Type.DECIMAL) {
+      val dt = field.dataType.asInstanceOf[DecimalType]
+      col.typeAttributes(
+        new ColumnTypeAttributesBuilder().precision(dt.precision).scale(dt.scale).build()
+      )
+    }
+    col.build()
+  }
+
   /** Map Spark SQL type to Kudu type */
   def kuduType(dt: DataType) : Type = dt match {
     case DataTypes.BinaryType => Type.BINARY
@@ -184,6 +200,7 @@ class KuduContext(val kuduMaster: String,
     case DataTypes.LongType => Type.INT64
     case DataTypes.FloatType => Type.FLOAT
     case DataTypes.DoubleType => Type.DOUBLE
+    case DecimalType() => Type.DECIMAL
     case _ => throw new IllegalArgumentException(s"No support for Spark SQL type $dt")
   }
 
@@ -276,18 +293,21 @@ class KuduContext(val kuduMaster: String,
         for ((sparkIdx, kuduIdx) <- indices) {
           if (row.isNullAt(sparkIdx)) {
             operation.getRow.setNull(kuduIdx)
-          } else schema.fields(sparkIdx).dataType match {
-            case DataTypes.StringType => operation.getRow.addString(kuduIdx, row.getString(sparkIdx))
-            case DataTypes.BinaryType => operation.getRow.addBinary(kuduIdx, row.getAs[Array[Byte]](sparkIdx))
-            case DataTypes.BooleanType => operation.getRow.addBoolean(kuduIdx, row.getBoolean(sparkIdx))
-            case DataTypes.ByteType => operation.getRow.addByte(kuduIdx, row.getByte(sparkIdx))
-            case DataTypes.ShortType => operation.getRow.addShort(kuduIdx, row.getShort(sparkIdx))
-            case DataTypes.IntegerType => operation.getRow.addInt(kuduIdx, row.getInt(sparkIdx))
-            case DataTypes.LongType => operation.getRow.addLong(kuduIdx, row.getLong(sparkIdx))
-            case DataTypes.FloatType => operation.getRow.addFloat(kuduIdx, row.getFloat(sparkIdx))
-            case DataTypes.DoubleType => operation.getRow.addDouble(kuduIdx, row.getDouble(sparkIdx))
-            case DataTypes.TimestampType => operation.getRow.addLong(kuduIdx, KuduRelation.timestampToMicros(row.getTimestamp(sparkIdx)))
-            case t => throw new IllegalArgumentException(s"No support for Spark SQL type $t")
+          } else {
+            schema.fields(sparkIdx).dataType match {
+              case DataTypes.StringType => operation.getRow.addString(kuduIdx, row.getString(sparkIdx))
+              case DataTypes.BinaryType => operation.getRow.addBinary(kuduIdx, row.getAs[Array[Byte]](sparkIdx))
+              case DataTypes.BooleanType => operation.getRow.addBoolean(kuduIdx, row.getBoolean(sparkIdx))
+              case DataTypes.ByteType => operation.getRow.addByte(kuduIdx, row.getByte(sparkIdx))
+              case DataTypes.ShortType => operation.getRow.addShort(kuduIdx, row.getShort(sparkIdx))
+              case DataTypes.IntegerType => operation.getRow.addInt(kuduIdx, row.getInt(sparkIdx))
+              case DataTypes.LongType => operation.getRow.addLong(kuduIdx, row.getLong(sparkIdx))
+              case DataTypes.FloatType => operation.getRow.addFloat(kuduIdx, row.getFloat(sparkIdx))
+              case DataTypes.DoubleType => operation.getRow.addDouble(kuduIdx, row.getDouble(sparkIdx))
+              case DataTypes.TimestampType => operation.getRow.addLong(kuduIdx, KuduRelation.timestampToMicros(row.getTimestamp(sparkIdx)))
+              case DecimalType() => operation.getRow.addDecimal(kuduIdx, row.getDecimal(sparkIdx))
+              case t => throw new IllegalArgumentException(s"No support for Spark SQL type $t")
+            }
           }
         }
         session.apply(operation)

http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd34fa8/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index daed3f0..5a4ed8b 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -123,6 +123,7 @@ private class RowIterator(private val scanner: KuduScanner,
       case Type.DOUBLE => rowResult.getDouble(i)
       case Type.STRING => rowResult.getString(i)
       case Type.BINARY => rowResult.getBinaryCopy(i)
+      case Type.DECIMAL => rowResult.getDecimal(i)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd34fa8/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 69d1f20..59997d2 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -249,7 +249,18 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter wi
                  sqlContext.sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""").count())
 
   }
-
+  test("table scan with projection and predicate decimal32") {
+    assertEquals(rows.count { case (key, i, s, ts) => i > 5},
+      sqlContext.sql(s"""SELECT key, c11_decimal32 FROM $tableName where c11_decimal32 > 5""").count())
+  }
+  test("table scan with projection and predicate decimal64") {
+    assertEquals(rows.count { case (key, i, s, ts) => i > 5},
+      sqlContext.sql(s"""SELECT key, c12_decimal64 FROM $tableName where c12_decimal64 > 5""").count())
+  }
+  test("table scan with projection and predicate decimal128") {
+    assertEquals(rows.count { case (key, i, s, ts) => i > 5},
+      sqlContext.sql(s"""SELECT key, c13_decimal128 FROM $tableName where c13_decimal128 > 5""").count())
+  }
   test("table scan with projection and predicate ") {
     assertEquals(rows.count { case (key, i, s, ts) => s != null && s > "5" },
       sqlContext.sql(s"""SELECT key FROM $tableName where c2_s > "5"""").count())
@@ -474,7 +485,7 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter wi
     ))
 
     val dfDefaultSchema = sqlContext.read.options(kuduOptions).kudu
-    assertEquals(11, dfDefaultSchema.schema.fields.length)
+    assertEquals(14, dfDefaultSchema.schema.fields.length)
 
     val dfWithUserSchema = sqlContext.read.options(kuduOptions).schema(userSchema).kudu
     assertEquals(2, dfWithUserSchema.schema.fields.length)

http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd34fa8/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
index 8156365..8e12dcd 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
@@ -17,6 +17,7 @@
 package org.apache.kudu.spark.kudu
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.math.BigDecimal
 import java.sql.Timestamp
 
 import org.apache.spark.sql.functions.decode
@@ -60,7 +61,8 @@ class KuduContextTest extends FunSuite with TestContext with Matchers {
   test("Test basic kuduRDD") {
     val rows = insertRows(rowCount)
     val scanList = kuduContext.kuduRDD(ss.sparkContext, "test", Seq("key", "c1_i", "c2_s", "c3_double",
-        "c4_long", "c5_bool", "c6_short", "c7_float", "c8_binary", "c9_unixtime_micros", "c10_byte"))
+        "c4_long", "c5_bool", "c6_short", "c7_float", "c8_binary", "c9_unixtime_micros", "c10_byte",
+        "c11_decimal32", "c12_decimal64", "c13_decimal128"))
       .map(r => r.toSeq).collect()
     scanList.foreach(r => {
       val index = r.apply(0).asInstanceOf[Int]
@@ -77,6 +79,9 @@ class KuduContextTest extends FunSuite with TestContext with Matchers {
       assert(r.apply(9).asInstanceOf[Timestamp] ==
         KuduRelation.microsToTimestamp(rows.apply(index)._4))
       assert(r.apply(10).asInstanceOf[Byte] == rows.apply(index)._2.toByte)
+      assert(r.apply(11).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
+      assert(r.apply(12).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
+      assert(r.apply(13).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
     })
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd34fa8/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
index 9ce0991..62b41cd 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.kudu.spark.kudu
 
+import java.math.BigDecimal
 import java.util.Date
 
 import scala.collection.JavaConverters._
@@ -26,10 +27,12 @@ import org.apache.spark.SparkConf
 import org.scalatest.{BeforeAndAfterAll, Suite}
 
 import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.ColumnTypeAttributes.ColumnTypeAttributesBuilder
 import org.apache.kudu.client.KuduClient.KuduClientBuilder
 import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder
 import org.apache.kudu.client.{CreateTableOptions, KuduClient, KuduTable, MiniKuduCluster}
 import org.apache.kudu.{Schema, Type}
+import org.apache.kudu.util.DecimalUtil
 import org.apache.spark.sql.SparkSession
 
 trait TestContext extends BeforeAndAfterAll { self: Suite =>
@@ -54,8 +57,20 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
       new ColumnSchemaBuilder("c7_float", Type.FLOAT).build(),
       new ColumnSchemaBuilder("c8_binary", Type.BINARY).build(),
       new ColumnSchemaBuilder("c9_unixtime_micros", Type.UNIXTIME_MICROS).build(),
-      new ColumnSchemaBuilder("c10_byte", Type.INT8).build())
-    new Schema(columns)
+      new ColumnSchemaBuilder("c10_byte", Type.INT8).build(),
+      new ColumnSchemaBuilder("c11_decimal32", Type.DECIMAL)
+        .typeAttributes(
+          new ColumnTypeAttributesBuilder().precision(DecimalUtil.MAX_DECIMAL32_PRECISION).build()
+        ).build(),
+      new ColumnSchemaBuilder("c12_decimal64", Type.DECIMAL)
+        .typeAttributes(
+          new ColumnTypeAttributesBuilder().precision(DecimalUtil.MAX_DECIMAL64_PRECISION).build()
+        ).build(),
+      new ColumnSchemaBuilder("c13_decimal128", Type.DECIMAL)
+        .typeAttributes(
+          new ColumnTypeAttributesBuilder().precision(DecimalUtil.MAX_DECIMAL128_PRECISION).build()
+        ).build())
+      new Schema(columns)
   }
 
   val appID: String = new Date().toString + math.floor(math.random * 10E4).toLong.toString
@@ -113,6 +128,9 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
       val ts = System.currentTimeMillis() * 1000
       row.addLong(9, ts)
       row.addByte(10, i.toByte)
+      row.addDecimal(11, BigDecimal.valueOf(i))
+      row.addDecimal(12, BigDecimal.valueOf(i))
+      row.addDecimal(13, BigDecimal.valueOf(i))
 
       // Sprinkling some nulls so that queries see them.
       val s = if (i % 2 == 0) {


[5/6] kudu git commit: KUDU-721: [Java] Add DECIMAL column type support

Posted by gr...@apache.org.
KUDU-721: [Java] Add DECIMAL column type support

This patch adds basic support to the Java client to
create, read, and write tables with DECIMAL columns.

Change-Id: I6240e3cfe0d6328b68c50099d442ffeeab6c9fd9
Reviewed-on: http://gerrit.cloudera.org:8080/8882
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4f34b69d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4f34b69d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4f34b69d

Branch: refs/heads/master
Commit: 4f34b69d4969c5a63885067a131c4d85016a173e
Parents: 118e8f3
Author: Grant Henke <gr...@gmail.com>
Authored: Mon Dec 18 23:24:40 2017 -0600
Committer: Grant Henke <gr...@gmail.com>
Committed: Tue Feb 13 22:20:03 2018 +0000

----------------------------------------------------------------------
 .../main/java/org/apache/kudu/ColumnSchema.java |  48 ++++-
 .../org/apache/kudu/ColumnTypeAttributes.java   | 162 +++++++++++++++
 .../src/main/java/org/apache/kudu/Schema.java   |   4 +-
 .../src/main/java/org/apache/kudu/Type.java     |  66 ++++++-
 .../apache/kudu/client/AsyncKuduScanner.java    |   1 +
 .../main/java/org/apache/kudu/client/Bytes.java | 196 ++++++++++++++++++-
 .../kudu/client/ColumnRangePredicate.java       |  35 +++-
 .../java/org/apache/kudu/client/KeyEncoder.java |  34 +++-
 .../org/apache/kudu/client/KuduPredicate.java   | 102 +++++++++-
 .../org/apache/kudu/client/KuduScanToken.java   |   2 +-
 .../java/org/apache/kudu/client/Operation.java  |   4 +-
 .../java/org/apache/kudu/client/PartialRow.java | 127 +++++++++++-
 .../org/apache/kudu/client/ProtobufHelper.java  |  48 ++++-
 .../java/org/apache/kudu/client/RowResult.java  |  47 ++++-
 .../java/org/apache/kudu/util/DecimalUtil.java  | 152 ++++++++++++++
 .../org/apache/kudu/client/BaseKuduTest.java    |   6 +-
 .../java/org/apache/kudu/client/TestBytes.java  |  45 ++++-
 .../kudu/client/TestColumnRangePredicate.java   |  18 +-
 .../org/apache/kudu/client/TestKeyEncoding.java |  76 +++++--
 .../org/apache/kudu/client/TestKuduClient.java  |  60 ++++++
 .../apache/kudu/client/TestKuduPredicate.java   | 111 ++++++++++-
 .../org/apache/kudu/client/TestPartialRow.java  | 119 ++++++++++-
 .../org/apache/kudu/client/TestRowResult.java   |   6 +
 .../apache/kudu/client/TestScanPredicate.java   |  95 +++++++++
 24 files changed, 1502 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
index cf0273a..17dec97 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
@@ -17,6 +17,8 @@
 
 package org.apache.kudu;
 
+import java.util.Objects;
+
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -39,6 +41,8 @@ public class ColumnSchema {
   private final int desiredBlockSize;
   private final Encoding encoding;
   private final CompressionAlgorithm compressionAlgorithm;
+  private final ColumnTypeAttributes typeAttributes;
+  private final int typeSize;
 
   /**
    * Specifies the encoding of data for a column on disk.
@@ -92,7 +96,7 @@ public class ColumnSchema {
 
   private ColumnSchema(String name, Type type, boolean key, boolean nullable,
                        Object defaultValue, int desiredBlockSize, Encoding encoding,
-                       CompressionAlgorithm compressionAlgorithm) {
+                       CompressionAlgorithm compressionAlgorithm, ColumnTypeAttributes typeAttributes) {
     this.name = name;
     this.type = type;
     this.key = key;
@@ -101,6 +105,8 @@ public class ColumnSchema {
     this.desiredBlockSize = desiredBlockSize;
     this.encoding = encoding;
     this.compressionAlgorithm = compressionAlgorithm;
+    this.typeAttributes = typeAttributes;
+    this.typeSize = type.getSize(typeAttributes);
   }
 
   /**
@@ -168,6 +174,21 @@ public class ColumnSchema {
     return compressionAlgorithm;
   }
 
+  /**
+   * Return the column type attributes for the column, or null if it is not known.
+   */
+  public ColumnTypeAttributes getTypeAttributes() {
+    return typeAttributes;
+  }
+
+  /**
+   * The size of this type in bytes on the wire.
+   * @return A size
+   */
+  public int getTypeSize() {
+    return typeSize;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -188,21 +209,21 @@ public class ColumnSchema {
     if (!type.equals(that.type)) {
       return false;
     }
+    if (!typeAttributes.equals(that.typeAttributes)) {
+      return false;
+    }
 
     return true;
   }
 
   @Override
   public int hashCode() {
-    int result = name.hashCode();
-    result = 31 * result + type.hashCode();
-    result = 31 * result + (key ? 1 : 0);
-    return result;
+    return Objects.hash(name, type, key, typeAttributes);
   }
 
   @Override
   public String toString() {
-    return "Column name: " + name + ", type: " + type.getName();
+    return "Column name: " + name + ", type: " + type.getName() + typeAttributes.toStringForType(type);
   }
 
   /**
@@ -219,6 +240,7 @@ public class ColumnSchema {
     private int blockSize = 0;
     private Encoding encoding = null;
     private CompressionAlgorithm compressionAlgorithm = null;
+    private ColumnTypeAttributes typeAttributes = null;
 
     /**
      * Constructor for the required parameters.
@@ -310,13 +332,25 @@ public class ColumnSchema {
     }
 
     /**
+     * Set the column type attributes for this column.
+     */
+    public ColumnSchemaBuilder typeAttributes(ColumnTypeAttributes typeAttributes) {
+      if (type != Type.DECIMAL && typeAttributes != null) {
+        throw new IllegalArgumentException(
+            "ColumnTypeAttributes are not used on " + type + " columns");
+      }
+      this.typeAttributes = typeAttributes;
+      return this;
+    }
+
+    /**
      * Builds a {@link ColumnSchema} using the passed parameters.
      * @return a new {@link ColumnSchema}
      */
     public ColumnSchema build() {
       return new ColumnSchema(name, type,
                               key, nullable, defaultValue,
-                              blockSize, encoding, compressionAlgorithm);
+                              blockSize, encoding, compressionAlgorithm, typeAttributes);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java b/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java
new file mode 100644
index 0000000..07da4a4
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnTypeAttributes.java
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu;
+
+import java.util.Objects;
+
+/**
+ * Represents a Kudu Table column's type attributes.
+ */
+@org.apache.yetus.audience.InterfaceAudience.Public
+@org.apache.yetus.audience.InterfaceStability.Evolving
+public class ColumnTypeAttributes {
+
+  private final boolean hasPrecision;
+  private final int precision;
+
+  private final boolean hasScale;
+  private final int scale;
+
+  private ColumnTypeAttributes(boolean hasPrecision, int precision,
+                               boolean hasScale, int scale) {
+    this.hasPrecision = hasPrecision;
+    this.precision = precision;
+    this.hasScale = hasScale;
+    this.scale = scale;
+  }
+
+  /**
+   * Returns true if the precision is set;
+   */
+  public boolean hasPrecision() {
+    return hasPrecision;
+  }
+
+  /**
+   * Return the precision;
+   */
+  public int getPrecision() {
+    return precision;
+  }
+
+  /**
+   * Returns true if the scale is set;
+   */
+  public boolean hasScale() {
+    return hasScale;
+  }
+
+  /**
+   * Return the scale;
+   */
+  public int getScale() {
+    return scale;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ColumnTypeAttributes that = (ColumnTypeAttributes) o;
+
+    if (hasPrecision != that.hasPrecision) {
+      return false;
+    }
+    if (precision != that.precision) {
+      return false;
+    }
+    if (hasScale != that.hasScale) {
+      return false;
+    }
+    if (scale != that.scale) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(hasPrecision, precision, hasScale, scale);
+  }
+
+  /**
+   * Return a string representation appropriate for `type`.
+   * This is meant to be postfixed to the name of a primitive type to describe
+   * the full type, e.g. decimal(10, 4).
+   * @param type the type.
+   * @return a postfix string.
+   */
+  public String toStringForType(Type type) {
+    if (type == Type.DECIMAL) {
+      return "(" + precision + ", " + scale + ")";
+    } else {
+      return "";
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "hasPrecision: " + hasPrecision + ", precision: " + precision +
+        ", hasScale: " + hasScale + ", scale: " + scale;
+  }
+
+  /**
+   * Builder for ColumnTypeAttributes.
+   */
+  @org.apache.yetus.audience.InterfaceAudience.Public
+  @org.apache.yetus.audience.InterfaceStability.Evolving
+  public static class ColumnTypeAttributesBuilder {
+
+    private boolean hasPrecision;
+    private int precision;
+    private boolean hasScale;
+    private int scale;
+
+    /**
+     * Set the precision. Only used for Decimal columns.
+     */
+    public ColumnTypeAttributesBuilder precision(int precision) {
+      this.hasPrecision = true;
+      this.precision = precision;
+      return this;
+    }
+
+    /**
+     * Set the scale. Only used for Decimal columns.
+     */
+    public ColumnTypeAttributesBuilder scale(int scale) {
+      this.hasScale = true;
+      this.scale = scale;
+      return this;
+    }
+
+    /**
+     * Builds a {@link ColumnTypeAttributes} using the passed parameters.
+     * @return a new {@link ColumnTypeAttributes}
+     */
+    public ColumnTypeAttributes build() {
+      return new ColumnTypeAttributes(hasPrecision, precision, hasScale, scale);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
index f38bcfb..8e19f38 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
@@ -111,7 +111,7 @@ public class Schema {
 
       hasNulls |= column.isNullable();
       columnOffsets[index] = offset;
-      offset += column.getType().getSize();
+      offset += column.getTypeSize();
       if (this.columnsByName.put(column.getName(), index) != null) {
         throw new IllegalArgumentException(
             String.format("Column names must be unique: %s", columns));
@@ -167,7 +167,7 @@ public class Schema {
     int totalSize = 0;
     boolean hasNullables = false;
     for (ColumnSchema column : columns) {
-      totalSize += column.getType().getSize();
+      totalSize += column.getTypeSize();
       hasNullables |= column.isNullable();
     }
     if (hasNullables) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/Type.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Type.java b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
index 123c577..4173c9b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Type.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
@@ -19,12 +19,19 @@ package org.apache.kudu;
 
 import static org.apache.kudu.Common.DataType;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
 import com.google.common.primitives.Shorts;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.util.DecimalUtil;
+
 /**
  * Describes all the types available to build table schemas.
  */
@@ -41,9 +48,10 @@ public enum Type {
   BOOL(DataType.BOOL, "bool"),
   FLOAT(DataType.FLOAT, "float"),
   DOUBLE(DataType.DOUBLE, "double"),
-  UNIXTIME_MICROS(DataType.UNIXTIME_MICROS, "unixtime_micros");
+  UNIXTIME_MICROS(DataType.UNIXTIME_MICROS, "unixtime_micros"),
+  DECIMAL(Arrays.asList(DataType.DECIMAL32, DataType.DECIMAL64, DataType.DECIMAL128), "decimal");
 
-  private final DataType dataType;
+  private final List<DataType> dataTypes;
   private final String name;
   private final int size;
 
@@ -53,17 +61,40 @@ public enum Type {
    * @param name string representation of the type
    */
   private Type(DataType dataType, String name) {
-    this.dataType = dataType;
+    this.dataTypes = Collections.singletonList(dataType);
+    this.name = name;
+    this.size = getTypeSize(dataType);
+  }
+
+  private Type(List<DataType> dataTypes, String name) {
+    this.dataTypes = dataTypes;
     this.name = name;
-    this.size = getTypeSize(this.dataType);
+    this.size = -1;
   }
 
   /**
    * Get the data type from the common's pb
    * @return A DataType
+   * @deprecated use {@link #getDataType(ColumnTypeAttributes)}
    */
   public DataType getDataType() {
-    return this.dataType;
+    if (this == DECIMAL) {
+      throw new IllegalStateException("Please use the newer getDataType(ColumnTypeAttributes) " +
+          "to support the Decimal data type");
+    }
+    return this.dataTypes.get(0);
+  }
+
+  /**
+   * Get the data type from the common's pb
+   * @param typeAttributes the additional attributes of the type.
+   * @return A DataType
+   */
+  public DataType getDataType(ColumnTypeAttributes typeAttributes) {
+    if (this == DECIMAL) {
+      return DecimalUtil.precisionToDataType(typeAttributes.getPrecision());
+    }
+    return this.dataTypes.get(0);
   }
 
   /**
@@ -77,14 +108,31 @@ public enum Type {
   /**
    * The size of this type on the wire
    * @return A size
+   * @deprecated use {@link #getSize(ColumnTypeAttributes)}
    */
   public int getSize() {
+    if (this == DECIMAL) {
+      throw new IllegalStateException("Please use the newer getSize(ColumnTypeAttributes) " +
+          "to support the Decimal data type");
+    }
+    return this.size;
+  }
+
+  /**
+   * The size of this type on the wire
+   * @param typeAttributes the additional attributes of the type.
+   * @return A size
+   */
+  public int getSize(ColumnTypeAttributes typeAttributes) {
+    if (this == DECIMAL) {
+      return DecimalUtil.precisionToSize(typeAttributes.getPrecision());
+    }
     return this.size;
   }
 
   @Override
   public String toString() {
-    return "Type: " + this.name + ", size: " + this.size;
+    return "Type: " + this.name;
   }
 
   /**
@@ -92,7 +140,7 @@ public enum Type {
    * @param type pb type
    * @return size in bytes
    */
-  static int getTypeSize(DataType type) {
+  private static int getTypeSize(DataType type) {
     switch (type) {
       case STRING:
       case BINARY:
@@ -131,6 +179,10 @@ public enum Type {
       case UNIXTIME_MICROS: return UNIXTIME_MICROS;
       case FLOAT: return FLOAT;
       case DOUBLE: return DOUBLE;
+      case DECIMAL32:
+      case DECIMAL64:
+      case DECIMAL128:
+        return DECIMAL;
       default:
         throw new IllegalArgumentException("The provided data type doesn't map" +
             " to know any known one: " + type.getDescriptorForType().getFullName());

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 5e7736c..9f5c137 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -303,6 +303,7 @@ public final class AsyncKuduScanner {
   private static ColumnSchema getStrippedColumnSchema(ColumnSchema columnToClone) {
     return new ColumnSchema.ColumnSchemaBuilder(columnToClone.getName(), columnToClone.getType())
         .nullable(columnToClone.isNullable())
+        .typeAttributes(columnToClone.getTypeAttributes())
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
index d5a5889..11b8849 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -43,6 +44,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.util.CharsetUtil;
 
+import org.apache.kudu.util.DecimalUtil;
 import org.apache.kudu.util.Slice;
 
 /**
@@ -57,6 +59,11 @@ public final class Bytes {
   // biginteger-java
   private static final BigInteger TWO_COMPL_REF = BigInteger.ONE.shiftLeft(64);
 
+  private static final BigInteger BIGINT32_MAX = BigInteger.valueOf(Integer.MAX_VALUE);
+  private static final BigInteger BIGINT32_MIN = BigInteger.valueOf(Integer.MIN_VALUE);
+  private static final BigInteger BIGINT64_MAX = BigInteger.valueOf(Long.MAX_VALUE);
+  private static final BigInteger BIGINT64_MIN = BigInteger.valueOf(Long.MIN_VALUE);
+
   private Bytes() {  // Can't instantiate.
   }
 
@@ -472,7 +479,7 @@ public final class Bytes {
    */
   public static BigInteger getUnsignedLong(final byte[] b, final int offset) {
     long l = getLong(b, offset);
-    BigInteger bi = new BigInteger(l + "");
+    BigInteger bi = BigInteger.valueOf(l);
     if (bi.compareTo(BigInteger.ZERO) < 0) {
       bi = bi.add(TWO_COMPL_REF);
     }
@@ -579,6 +586,89 @@ public final class Bytes {
   }
 
   /**
+   * Reads a little-endian 16-byte integer from the beginning of the given array.
+   * @param b The array to read from.
+   * @return A BigInteger.
+   * @throws IndexOutOfBoundsException if the byte array is too small.
+   */
+  public static BigInteger getBigInteger(final byte[] b) {
+    return getBigInteger(b, 0);
+  }
+
+  /**
+   * Reads a little-endian 16-byte integer from an offset in the given array.
+   * @param b The array to read from.
+   * @param offset The offset in the array to start reading from.
+   * @return A BigInteger.
+   * @throws IndexOutOfBoundsException if the byte array is too small.
+   */
+  public static BigInteger getBigInteger(final byte[] b, final int offset) {
+    // TODO: Support larger/smaller than 16 bytes (int128)
+    byte[] bytes = Arrays.copyOfRange(b, offset, offset + 16);
+    // BigInteger expects big-endian order.
+    reverseBytes(bytes);
+    return new BigInteger(bytes);
+  }
+
+  /**
+   * Writes a little-endian 16-byte BigInteger at the beginning of the given array.
+   * @param b The array to write to.
+   * @param n A BigInteger.
+   * @throws IndexOutOfBoundsException if the byte array is too small.
+   */
+  public static void setBigInteger(final byte[] b, final BigInteger n) {
+    setBigInteger(b, n, 0);
+  }
+
+  /**
+   * Writes a little-endian 16-byte BigInteger at an offset in the given array.
+   * @param b The zeroed byte array to write to.
+   * @param n A BigInteger.
+   * @param offset The offset in the array to start writing at.
+   * @throws IndexOutOfBoundsException if the byte array is too small.
+   */
+  public static void setBigInteger(final byte[] b, final BigInteger n, final int offset) {
+    byte[] bytes = n.toByteArray();
+    // TODO: Support larger/smaller than 16 bytes (int128)
+    // Guard against values that are too large.
+    if (bytes.length > 16) {
+      throw new IllegalArgumentException("Value is larger than the maximum 16 bytes: " + n);
+    }
+    // BigInteger is big-endian order.
+    reverseBytes(bytes);
+    System.arraycopy(bytes, 0, b, offset, bytes.length);
+    // If the value is negative trail with set bits.
+    if (n.compareTo(BigInteger.ZERO) < 0) {
+      Arrays.fill(b, offset + bytes.length, offset + 16, (byte) 0xff);
+    }
+  }
+
+  /**
+   * Creates a new byte array containing a little-endian 16-byte BigInteger.
+   * @param n A BigInteger.
+   * @return A new byte array containing the given value.
+   */
+  public static byte[] fromBigInteger(final BigInteger n) {
+    // TODO: Support larger/smaller than 16 bytes (int128)
+    final byte[] b = new byte[16];
+    setBigInteger(b, n);
+    return b;
+  }
+
+  /**
+   * Reverses the passed byte array in place.
+   * @param b The array to reverse.
+   */
+  private static void reverseBytes(final byte[] b) {
+    // Swaps the items until the mid-point is reached.
+    for(int i = 0; i < b.length / 2; i++) {
+      byte temp = b[i];
+      b[i] = b[b.length - i - 1];
+      b[b.length - i - 1] = temp;
+    }
+  }
+
+  /**
    * Reads a little-endian 4-byte float from the beginning of the given array.
    * @param b The array to read from.
    * @return a float
@@ -684,6 +774,110 @@ public final class Bytes {
     return b;
   }
 
+  /**
+   * Reads a decimal from the beginning of the given array.
+   * @param b The array to read from.
+   * @param precision The precision of the decimal value.
+   * @return A BigDecimal.
+   * @throws IndexOutOfBoundsException if the byte array is too small.
+   */
+  public static BigDecimal getDecimal(final byte[] b, int precision, int scale) {
+    return getDecimal(b, 0, precision, scale);
+  }
+
+  /**
+   * Reads a decimal from the beginning of the given array.
+   * @param b The array to read from.
+   * @param offset The offset in the array to start reading from.
+   * @param precision The precision of the decimal value.
+   * @return A BigDecimal.
+   * @throws IndexOutOfBoundsException if the byte array is too small.
+   */
+  public static BigDecimal getDecimal(final byte[] b, final int offset, int precision, int scale) {
+    int size = DecimalUtil.precisionToSize(precision);
+    switch (size) {
+      case  DecimalUtil.DECIMAL32_SIZE:
+        int intVal = getInt(b, offset);
+        return BigDecimal.valueOf(intVal, scale);
+      case DecimalUtil.DECIMAL64_SIZE:
+        long longVal = getLong(b, offset);
+        return BigDecimal.valueOf(longVal, scale);
+      case DecimalUtil.DECIMAL128_SIZE:
+         BigInteger int128Val = getBigInteger(b, offset);
+        return new BigDecimal(int128Val, scale);
+      default:
+        throw new IllegalArgumentException("Unsupported decimal type size: " + size);
+    }
+  }
+
+  /**
+   * Writes a BigDecimal at the beginning of the given array.
+   *
+   * @param b The array to write to.
+   * @param n A BigDecimal.
+   * @param precision The target precision of the decimal value.
+   * @throws IndexOutOfBoundsException if the byte array is too small.
+   */
+  public static void setBigDecimal(final byte[] b,  final BigDecimal n, int precision) {
+    setBigDecimal(b, n, precision, 0);
+  }
+
+  /**
+   * Writes a BigDecimal at an offset in the given array.
+   * @param b The array to write to.
+   * @param n A BigDecimal.
+   * @param precision The target precision of the decimal value.
+   * @param offset The offset in the array to start writing at.
+   * @throws IndexOutOfBoundsException if the byte array is too small.
+   */
+  public static void setBigDecimal(final byte[] b, final BigDecimal n, int precision, final int offset) {
+    int size = DecimalUtil.precisionToSize(precision);
+    BigInteger bigInt = n.unscaledValue();
+    switch (size) {
+      case  DecimalUtil.DECIMAL32_SIZE:
+        // TODO: use n.unscaledValue().intValueExact() when we drop Java7 support.
+        if (bigInt.compareTo(BIGINT32_MIN) >= 0 && bigInt.compareTo(BIGINT32_MAX) <= 0) {
+          setInt(b, bigInt.intValue(), offset);
+        } else {
+          throw new ArithmeticException("BigInteger out of int range");
+        }
+        break;
+      case DecimalUtil.DECIMAL64_SIZE:
+        // TODO: use n.unscaledValue().intValueExact() when we drop Java7 support.
+        if (bigInt.compareTo(BIGINT64_MIN) >= 0 && bigInt.compareTo(BIGINT64_MAX) <= 0) {
+          setLong(b, bigInt.longValue(), offset);
+        } else {
+          throw new ArithmeticException("BigInteger out of int range");
+        }
+        break;
+      case DecimalUtil.DECIMAL128_SIZE:
+        setBigInteger(b, bigInt, offset);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported decimal type size: " + size);
+    }
+  }
+
+  /**
+   * Creates a new byte array containing a little-endian BigDecimal.
+   * @param n A BigDecimal.
+   * @param precision The target precision of the decimal value.
+   * @return A new byte array containing the given value.
+   */
+  public static byte[] fromBigDecimal(final BigDecimal n, int precision) {
+    int size = DecimalUtil.precisionToSize(precision);
+    switch (size) {
+      case  DecimalUtil.DECIMAL32_SIZE:
+        return fromInt(n.unscaledValue().intValue());
+      case DecimalUtil.DECIMAL64_SIZE:
+        return fromLong(n.unscaledValue().longValue());
+      case DecimalUtil.DECIMAL128_SIZE:
+        return fromBigInteger(n.unscaledValue());
+      default:
+        throw new IllegalArgumentException("Unsupported decimal type size: " + size);
+    }
+  }
+
   // CHECKSTYLE:OFF
   /** Transforms a string into an UTF-8 encoded byte array.  */
   public static byte[] UTF8(final String s) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
index b2fa774..6a9fbef 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnRangePredicate.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -26,8 +27,10 @@ import com.google.protobuf.UnsafeByteOperations;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Type;
 import org.apache.kudu.tserver.Tserver;
+import org.apache.kudu.util.DecimalUtil;
 
 /**
  * A range predicate on one of the columns in the underlying data.
@@ -76,7 +79,7 @@ public class ColumnRangePredicate {
     if (bound == null) {
       return null;
     }
-    switch (column.getType().getDataType()) {
+    switch (column.getType().getDataType(column.getTypeAttributes())) {
       case BOOL:
         return KuduPredicate.newComparisonPredicate(column, op, Bytes.getBoolean(bound));
       case INT8:
@@ -96,6 +99,12 @@ public class ColumnRangePredicate {
         return KuduPredicate.newComparisonPredicate(column, op, Bytes.getString(bound));
       case BINARY:
         return KuduPredicate.newComparisonPredicate(column, op, bound);
+      case DECIMAL32:
+      case DECIMAL64:
+      case DECIMAL128:
+        ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+        return KuduPredicate.newComparisonPredicate(column, op,
+            Bytes.getDecimal(bound, typeAttributes.getPrecision(), typeAttributes.getScale()));
       default:
         throw new IllegalStateException(String.format("unknown column type %s", column.getType()));
     }
@@ -206,6 +215,18 @@ public class ColumnRangePredicate {
   }
 
   /**
+   * Set a BigDecimal for the lower bound
+   * @param lowerBound value for the lower bound
+   */
+  public void setLowerBound(BigDecimal lowerBound) {
+    checkColumn(Type.DECIMAL);
+    int precision = column.getTypeAttributes().getPrecision();
+    int scale = column.getTypeAttributes().getScale();
+    BigDecimal coercedVal = DecimalUtil.coerce(lowerBound, precision, scale);
+    setLowerBoundInternal(Bytes.fromBigDecimal(coercedVal, precision));
+  }
+
+  /**
    * Set a boolean for the upper bound
    * @param upperBound value for the upper bound
    */
@@ -291,6 +312,18 @@ public class ColumnRangePredicate {
   }
 
   /**
+   * Set a BigDecimal for the upper bound
+   * @param upperBound value for the upper bound
+   */
+  public void setUpperBound(BigDecimal upperBound) {
+    checkColumn(Type.DECIMAL);
+    int precision = column.getTypeAttributes().getPrecision();
+    int scale = column.getTypeAttributes().getScale();
+    BigDecimal coercedVal = DecimalUtil.coerce(upperBound, precision, scale);
+    setUpperBoundInternal(Bytes.fromBigDecimal(coercedVal, precision));
+  }
+
+  /**
    * Get the column used by this predicate
    * @return the column
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
index e58876e..724a270 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KeyEncoder.java
@@ -17,6 +17,8 @@
 
 package org.apache.kudu.client;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
@@ -34,6 +36,7 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
 import org.apache.kudu.util.ByteVec;
+import org.apache.kudu.util.DecimalUtil;
 import org.apache.kudu.util.Pair;
 
 /**
@@ -42,6 +45,8 @@ import org.apache.kudu.util.Pair;
 @InterfaceAudience.Private
 class KeyEncoder {
 
+  private static final BigInteger MIN_VALUE_128 = BigInteger.valueOf(-2).pow(127);
+
   /** Non-constructable utility class. */
   private KeyEncoder() {
   }
@@ -139,13 +144,12 @@ class KeyEncoder {
                                                     column.getName()));
     }
     final Type type = column.getType();
-
     if (type == Type.STRING || type == Type.BINARY) {
       encodeBinary(row.getVarLengthData().get(columnIdx), isLast, buf);
     } else {
       encodeSignedInt(row.getRowAlloc(),
                       schema.getColumnOffset(columnIdx),
-                      type.getSize(),
+                      column.getTypeSize(),
                       buf);
     }
   }
@@ -313,7 +317,8 @@ class KeyEncoder {
    */
   private static void decodeColumn(ByteBuffer buf, PartialRow row, int idx, boolean isLast) {
     Schema schema = row.getSchema();
-    switch (schema.getColumnByIndex(idx).getType()) {
+    ColumnSchema column = schema.getColumnByIndex(idx);
+    switch (column.getType()) {
       case INT8:
         row.addByte(idx, (byte) (buf.get() ^ Byte.MIN_VALUE));
         break;
@@ -337,6 +342,29 @@ class KeyEncoder {
         row.addStringUtf8(idx, binary);
         break;
       }
+      case DECIMAL: {
+        int scale = column.getTypeAttributes().getScale();
+        int size = column.getTypeSize();
+        switch (size) {
+          case  DecimalUtil.DECIMAL32_SIZE:
+            int intVal = buf.getInt() ^ Integer.MIN_VALUE;
+            row.addDecimal(idx, BigDecimal.valueOf(intVal, scale));
+            break;
+          case DecimalUtil.DECIMAL64_SIZE:
+            long longVal = buf.getLong() ^ Long.MIN_VALUE;
+            row.addDecimal(idx, BigDecimal.valueOf(longVal, scale));
+            break;
+          case DecimalUtil.DECIMAL128_SIZE:
+            byte[] bytes = new byte[size];
+            buf.get(bytes);
+            BigInteger bigIntVal = new BigInteger(bytes).xor(MIN_VALUE_128);
+            row.addDecimal(idx, new BigDecimal(bigIntVal, scale));
+            break;
+          default:
+            throw new IllegalArgumentException("Unsupported decimal type size: " + size);
+        }
+        break;
+      }
       default:
         throw new IllegalArgumentException(String.format(
             "The column type %s is not a valid key component type",

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
index b346664..19a73bd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
@@ -17,6 +17,8 @@
 
 package org.apache.kudu.client;
 
+import java.math.BigInteger;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,9 +37,11 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
+import org.apache.kudu.util.DecimalUtil;
 
 /**
  * A predicate which can be used to filter rows based on the value of a column.
@@ -227,6 +231,68 @@ public class KuduPredicate {
   }
 
   /**
+   * Creates a new comparison predicate on a Decimal column.
+   * @param column the column schema
+   * @param op the comparison operation
+   * @param value the value to compare against
+   */
+  public static KuduPredicate newComparisonPredicate(ColumnSchema column,
+                                                     ComparisonOp op,
+                                                     BigDecimal value) {
+    checkColumn(column, Type.DECIMAL);
+    ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+    int precision = typeAttributes.getPrecision();
+    int scale = typeAttributes.getScale();
+
+    BigDecimal minValue = DecimalUtil.minValue(precision, scale);
+    BigDecimal maxValue = DecimalUtil.maxValue(precision, scale);
+    Preconditions.checkArgument(value.compareTo(maxValue) <= 0 && value.compareTo(minValue) >= 0,
+        "Decimal value out of range for %s column: %s",
+        column.getType(), value);
+    BigDecimal smallestValue = DecimalUtil.smallestValue(scale);
+
+    if (op == ComparisonOp.LESS_EQUAL) {
+      if (value.equals(maxValue)) {
+        // If the value can't be incremented because it is at the top end of the
+        // range, then substitute the predicate with an IS NOT NULL predicate.
+        // This has the same effect as an inclusive upper bound on the maximum
+        // value. If the column is not nullable then the IS NOT NULL predicate
+        // is ignored.
+        return newIsNotNullPredicate(column);
+      }
+      value = value.add(smallestValue);
+      op = ComparisonOp.LESS;
+    } else if (op == ComparisonOp.GREATER) {
+      if (value == maxValue) {
+        return none(column);
+      }
+      value = value.add(smallestValue);
+      op = ComparisonOp.GREATER_EQUAL;
+    }
+
+    byte[] bytes = Bytes.fromBigDecimal(value, precision);
+
+    switch (op) {
+      case GREATER_EQUAL:
+        if (value.equals(minValue)) {
+          return newIsNotNullPredicate(column);
+        } else if (value.equals(maxValue)) {
+          return new KuduPredicate(PredicateType.EQUALITY, column, bytes, null);
+        }
+        return new KuduPredicate(PredicateType.RANGE, column, bytes, null);
+      case EQUAL:
+        return new KuduPredicate(PredicateType.EQUALITY, column, bytes, null);
+      case LESS:
+        if (value.equals(minValue)) {
+          return none(column);
+        }
+        return new KuduPredicate(PredicateType.RANGE, column, null, bytes);
+      default:
+        throw new RuntimeException("unknown comparison op");
+    }
+  }
+
+  /**
    * Creates a new comparison predicate on a float column.
    * @param column the column schema
    * @param op the comparison operation
@@ -449,6 +515,12 @@ public class KuduPredicate {
       for (T value : values) {
         vals.add(Bytes.fromDouble((Double) value));
       }
+    } else if (t instanceof BigDecimal) {
+        checkColumn(column, Type.DECIMAL);
+        for (T value : values) {
+          vals.add(Bytes.fromBigDecimal((BigDecimal) value,
+              column.getTypeAttributes().getPrecision()));
+        }
     } else if (t instanceof String) {
       checkColumn(column, Type.STRING);
       for (T value : values) {
@@ -647,7 +719,8 @@ public class KuduPredicate {
    */
   private static KuduPredicate buildInList(ColumnSchema column, Collection<byte[]> values) {
     // IN (true, false) predicates can be simplified to IS NOT NULL.
-    if (column.getType().getDataType() == Common.DataType.BOOL && values.size() > 1) {
+    if (column.getType().getDataType(column.getTypeAttributes()) ==
+        Common.DataType.BOOL && values.size() > 1) {
       return newIsNotNullPredicate(column);
     }
 
@@ -785,7 +858,7 @@ public class KuduPredicate {
    * @return the comparison of the serialized values based on the column type
    */
   private static int compare(ColumnSchema column, byte[] a, byte[] b) {
-    switch (column.getType().getDataType()) {
+    switch (column.getType().getDataType(column.getTypeAttributes())) {
       case BOOL:
         return Boolean.compare(Bytes.getBoolean(a), Bytes.getBoolean(b));
       case INT8:
@@ -793,9 +866,11 @@ public class KuduPredicate {
       case INT16:
         return Short.compare(Bytes.getShort(a), Bytes.getShort(b));
       case INT32:
+      case DECIMAL32:
         return Integer.compare(Bytes.getInt(a), Bytes.getInt(b));
       case INT64:
       case UNIXTIME_MICROS:
+      case DECIMAL64:
         return Long.compare(Bytes.getLong(a), Bytes.getLong(b));
       case FLOAT:
         return Float.compare(Bytes.getFloat(a), Bytes.getFloat(b));
@@ -804,6 +879,8 @@ public class KuduPredicate {
       case STRING:
       case BINARY:
         return UnsignedBytes.lexicographicalComparator().compare(a, b);
+      case DECIMAL128:
+        return Bytes.getBigInteger(a).compareTo(Bytes.getBigInteger(b));
       default:
         throw new IllegalStateException(String.format("unknown column type %s", column.getType()));
     }
@@ -816,7 +893,7 @@ public class KuduPredicate {
    * @return true if increment(a) == b
    */
   private boolean areConsecutive(byte[] a, byte[] b) {
-    switch (column.getType().getDataType()) {
+    switch (column.getType().getDataType(column.getTypeAttributes())) {
       case BOOL: return false;
       case INT8: {
         byte m = Bytes.getByte(a);
@@ -828,13 +905,15 @@ public class KuduPredicate {
         short n = Bytes.getShort(b);
         return m < n && m + 1 == n;
       }
-      case INT32: {
+      case INT32:
+      case DECIMAL32:{
         int m = Bytes.getInt(a);
         int n = Bytes.getInt(b);
         return m < n && m + 1 == n;
       }
       case INT64:
-      case UNIXTIME_MICROS: {
+      case UNIXTIME_MICROS:
+      case DECIMAL64:  {
         long m = Bytes.getLong(a);
         long n = Bytes.getLong(b);
         return m < n && m + 1 == n;
@@ -861,6 +940,11 @@ public class KuduPredicate {
         }
         return true;
       }
+      case DECIMAL128: {
+        BigInteger m = Bytes.getBigInteger(a);
+        BigInteger n = Bytes.getBigInteger(b);
+        return m.compareTo(n) < 0 && m.add(BigInteger.ONE).equals(n);
+      }
       default:
         throw new IllegalStateException(String.format("unknown column type %s", column.getType()));
     }
@@ -953,7 +1037,7 @@ public class KuduPredicate {
    * @return the text representation of the value
    */
   private String valueToString(byte[] value) {
-    switch (column.getType().getDataType()) {
+    switch (column.getType().getDataType(column.getTypeAttributes())) {
       case BOOL: return Boolean.toString(Bytes.getBoolean(value));
       case INT8: return Byte.toString(Bytes.getByte(value));
       case INT16: return Short.toString(Bytes.getShort(value));
@@ -971,6 +1055,12 @@ public class KuduPredicate {
         return sb.toString();
       }
       case BINARY: return Bytes.hex(value);
+      case DECIMAL32:
+      case DECIMAL64:
+      case DECIMAL128:
+       ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+       return Bytes.getDecimal(value, typeAttributes.getPrecision(),
+           typeAttributes.getScale()).toString();
       default:
         throw new IllegalStateException(String.format("unknown column type %s", column.getType()));
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index b85c37f..0f520f9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -168,7 +168,7 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
     for (Common.ColumnSchemaPB column : message.getProjectedColumnsList()) {
       int columnIdx = table.getSchema().getColumnIndex(column.getName());
       ColumnSchema schema = table.getSchema().getColumnByIndex(columnIdx);
-      if (column.getType() != schema.getType().getDataType()) {
+      if (column.getType() != schema.getType().getDataType(schema.getTypeAttributes())) {
         throw new IllegalStateException(String.format(
             "invalid type %s for column '%s' in scan token, expected: %s",
             column.getType().name(), column.getName(), schema.getType().name()));

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 4cd38f3..b6ca888 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -303,10 +303,10 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
             indirectWrittenBytes += bbSize;
           } else {
             // This is for cols other than strings
-            rows.put(rowData, currentRowOffset, col.getType().getSize());
+            rows.put(rowData, currentRowOffset, col.getTypeSize());
           }
         }
-        currentRowOffset += col.getType().getSize();
+        currentRowOffset += col.getTypeSize();
         colIdx++;
       }
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index 4b1f965..10b8642 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -31,8 +32,10 @@ import org.apache.yetus.audience.InterfaceStability;
 import org.jboss.netty.util.CharsetUtil;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
+import org.apache.kudu.util.DecimalUtil;
 import org.apache.kudu.util.StringUtil;
 
 /**
@@ -479,6 +482,69 @@ public class PartialRow {
   }
 
   /**
+   * Add a Decimal for the specified column.
+   * @param columnIndex the column's index in the schema
+   * @param val value to add
+   * @throws IllegalArgumentException if the value doesn't match the column's type
+   * @throws IllegalStateException if the row was already applied
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public void addDecimal(int columnIndex, BigDecimal val) {
+    checkNotFrozen();
+    ColumnSchema column = schema.getColumnByIndex(columnIndex);
+    ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+    checkColumn(column, Type.DECIMAL);
+    BigDecimal coercedVal = DecimalUtil.coerce(val,typeAttributes.getPrecision(),
+        typeAttributes.getScale());
+    Bytes.setBigDecimal(rowAlloc, coercedVal, typeAttributes.getPrecision(),
+        getPositionInRowAllocAndSetBitSet(columnIndex));
+  }
+
+  /**
+   * Add a Decimal for the specified column.
+   *
+   * @param columnName Name of the column
+   * @param val value to add
+   * @throws IllegalArgumentException if the column doesn't exist
+   * or if the value doesn't match the column's type
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addDecimal(String columnName, BigDecimal val) {
+    addDecimal(schema.getColumnIndex(columnName), val);
+  }
+
+  /**
+   * Get the specified column's BigDecimal
+   *
+   * @param columnName name of the column to get data for
+   * @return a BigDecimal
+   * @throws IllegalArgumentException if the column doesn't exist,
+   * is null, is unset, or the type doesn't match the column's type
+   */
+  public BigDecimal getDecimal(String columnName) {
+    return getDecimal(this.schema.getColumnIndex(columnName));
+  }
+
+  /**
+   * Get the specified column's Decimal.
+   *
+   * @param columnIndex Column index in the schema
+   * @return a BigDecimal
+   * @throws IllegalArgumentException if the column is null, is unset,
+   * or if the type doesn't match the column's type
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public BigDecimal getDecimal(int columnIndex) {
+    checkColumn(schema.getColumnByIndex(columnIndex), Type.DECIMAL);
+    checkColumnExists(schema.getColumnByIndex(columnIndex));
+    checkValue(columnIndex);
+    ColumnSchema column = schema.getColumnByIndex(columnIndex);
+    ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+    return Bytes.getDecimal(rowAlloc, schema.getColumnOffset(columnIndex),
+        typeAttributes.getPrecision(), typeAttributes.getScale());
+  }
+
+  /**
    * Add a String for the specified column.
    * @param columnIndex the column's index in the schema
    * @param val value to add
@@ -878,6 +944,9 @@ public class PartialRow {
 
       ColumnSchema col = schema.getColumnByIndex(idx);
       sb.append(col.getType().getName());
+      if (col.getTypeAttributes() != null) {
+        sb.append(col.getTypeAttributes().toStringForType(col.getType()));
+      }
       sb.append(' ');
       sb.append(col.getName());
       sb.append('=');
@@ -992,6 +1061,11 @@ public class PartialRow {
       case DOUBLE:
         sb.append(Bytes.getDouble(rowAlloc, schema.getColumnOffset(idx)));
         return;
+      case DECIMAL:
+        ColumnTypeAttributes typeAttributes = col.getTypeAttributes();
+        sb.append(Bytes.getDecimal(rowAlloc, schema.getColumnOffset(idx),
+            typeAttributes.getPrecision(), typeAttributes.getScale()));
+        return;
       case BINARY:
       case STRING:
         ByteBuffer value = getVarLengthData().get(idx).duplicate();
@@ -1016,7 +1090,8 @@ public class PartialRow {
    * @param index the index of the column to set to the minimum
    */
   void setMin(int index) {
-    Type type = schema.getColumnByIndex(index).getType();
+    ColumnSchema column = schema.getColumnByIndex(index);
+    Type type = column.getType();
     switch (type) {
       case BOOL:
         addBoolean(index, false);
@@ -1040,6 +1115,10 @@ public class PartialRow {
       case DOUBLE:
         addDouble(index, -Double.MAX_VALUE);
         break;
+      case DECIMAL:
+        ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+        addDecimal(index,
+            DecimalUtil.minValue(typeAttributes.getPrecision(), typeAttributes.getScale()));
       case STRING:
         addStringUtf8(index, AsyncKuduClient.EMPTY_ARRAY);
         break;
@@ -1057,7 +1136,8 @@ public class PartialRow {
    * @param value the raw value
    */
   void setRaw(int index, byte[] value) {
-    Type type = schema.getColumnByIndex(index).getType();
+    ColumnSchema column = schema.getColumnByIndex(index);
+    Type type = column.getType();
     switch (type) {
       case BOOL:
       case INT8:
@@ -1066,8 +1146,9 @@ public class PartialRow {
       case INT64:
       case UNIXTIME_MICROS:
       case FLOAT:
-      case DOUBLE: {
-        Preconditions.checkArgument(value.length == type.getSize());
+      case DOUBLE:
+      case DECIMAL: {
+        Preconditions.checkArgument(value.length == column.getTypeSize());
         System.arraycopy(value, 0, rowAlloc,
             getPositionInRowAllocAndSetBitSet(index), value.length);
         break;
@@ -1091,7 +1172,8 @@ public class PartialRow {
    *         it is already the maximum value
    */
   boolean incrementColumn(int index) {
-    Type type = schema.getColumnByIndex(index).getType();
+    ColumnSchema column = schema.getColumnByIndex(index);
+    Type type = column.getType();
     Preconditions.checkState(isSet(index));
     int offset = schema.getColumnOffset(index);
     switch (type) {
@@ -1143,7 +1225,7 @@ public class PartialRow {
         return true;
       }
       case DOUBLE: {
-        double existing = Bytes.getFloat(rowAlloc, offset);
+        double existing = Bytes.getDouble(rowAlloc, offset);
         double incremented = Math.nextAfter(existing, Double.POSITIVE_INFINITY);
         if (existing == incremented) {
           return false;
@@ -1151,6 +1233,18 @@ public class PartialRow {
         Bytes.setDouble(rowAlloc, incremented, offset);
         return true;
       }
+      case DECIMAL: {
+        int precision = column.getTypeAttributes().getPrecision();
+        int scale = column.getTypeAttributes().getScale();
+        BigDecimal existing = Bytes.getDecimal(rowAlloc, offset, precision, scale);
+        BigDecimal max = DecimalUtil.maxValue(precision, scale);
+        if (existing.equals(max)) {
+          return false;
+        }
+        BigDecimal smallest = DecimalUtil.smallestValue(scale);
+        Bytes.setBigDecimal(rowAlloc, existing.add(smallest), precision, offset);
+        return true;
+      }
       case STRING:
       case BINARY: {
         ByteBuffer data = varLengthData.get(index);
@@ -1212,7 +1306,8 @@ public class PartialRow {
     Preconditions.checkArgument(a.isSet(index));
     Preconditions.checkArgument(b.isSet(index));
 
-    Type type = a.getSchema().getColumnByIndex(index).getType();
+    ColumnSchema column = a.getSchema().getColumnByIndex(index);
+    Type type = column.getType();
     int offset = a.getSchema().getColumnOffset(index);
 
     switch (type) {
@@ -1231,6 +1326,12 @@ public class PartialRow {
         return Bytes.getFloat(a.rowAlloc, offset) == Bytes.getFloat(b.rowAlloc, offset);
       case DOUBLE:
         return Bytes.getDouble(a.rowAlloc, offset) == Bytes.getDouble(b.rowAlloc, offset);
+      case DECIMAL:
+        ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+        int precision = typeAttributes.getPrecision();
+        int scale = typeAttributes.getScale();
+        return Bytes.getDecimal(a.rowAlloc, offset, precision, scale)
+            .equals(Bytes.getDecimal(b.rowAlloc, offset, precision, scale));
       case STRING:
       case BINARY: {
         ByteBuffer aData = a.varLengthData.get(index).duplicate();
@@ -1272,7 +1373,8 @@ public class PartialRow {
     Preconditions.checkArgument(lower.isSet(index));
     Preconditions.checkArgument(upper.isSet(index));
 
-    Type type = lower.getSchema().getColumnByIndex(index).getType();
+    ColumnSchema column = lower.getSchema().getColumnByIndex(index);
+    Type type = column.getType();
     int offset = lower.getSchema().getColumnOffset(index);
 
     switch (type) {
@@ -1306,6 +1408,15 @@ public class PartialRow {
                Math.nextAfter(val, Double.POSITIVE_INFINITY) ==
                    Bytes.getDouble(upper.rowAlloc, offset);
       }
+      case DECIMAL: {
+        ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+        int precision = typeAttributes.getPrecision();
+        int scale = typeAttributes.getScale();
+        BigDecimal val = Bytes.getDecimal(lower.rowAlloc, offset, precision, scale);
+        BigDecimal smallestVal = DecimalUtil.smallestValue(scale);
+        return val.add(smallestVal).equals(
+                Bytes.getDecimal(upper.rowAlloc, offset, precision, scale));
+      }
       case STRING:
       case BINARY: {
         // Check that b is 1 byte bigger than a, the extra byte is 0, and the other bytes are equal.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index c816e3d..b76194f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
@@ -31,9 +32,11 @@ import com.google.protobuf.UnsafeByteOperations;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Common;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
+import org.apache.kudu.util.DecimalUtil;
 
 @InterfaceAudience.Private
 public class ProtobufHelper {
@@ -68,7 +71,7 @@ public class ProtobufHelper {
                                                  ColumnSchema column) {
     schemaBuilder
         .setName(column.getName())
-        .setType(column.getType().getDataType())
+        .setType(column.getType().getDataType(column.getTypeAttributes()))
         .setIsKey(column.isKey())
         .setIsNullable(column.isNullable())
         .setCfileBlockSize(column.getDesiredBlockSize());
@@ -82,13 +85,31 @@ public class ProtobufHelper {
       schemaBuilder.setReadDefaultValue(UnsafeByteOperations.unsafeWrap(
           objectToWireFormat(column, column.getDefaultValue())));
     }
+    if(column.getTypeAttributes() != null) {
+      schemaBuilder.setTypeAttributes(
+          columnTypeAttributesToPb(Common.ColumnTypeAttributesPB.newBuilder(), column));
+    }
     return schemaBuilder.build();
   }
 
+  public static Common.ColumnTypeAttributesPB columnTypeAttributesToPb(
+      Common.ColumnTypeAttributesPB.Builder builder, ColumnSchema column) {
+    ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+    if (typeAttributes.hasPrecision()) {
+      builder.setPrecision(typeAttributes.getPrecision());
+    }
+    if (typeAttributes.hasScale()) {
+      builder.setScale(typeAttributes.getScale());
+    }
+    return builder.build();
+  }
+
   public static ColumnSchema pbToColumnSchema(Common.ColumnSchemaPB pb) {
     Type type = Type.getTypeForDataType(pb.getType());
+    ColumnTypeAttributes typeAttributes = pb.hasTypeAttributes() ?
+        pbToColumnTypeAttributes(pb.getTypeAttributes()) : null;
     Object defaultValue = pb.hasWriteDefaultValue() ?
-        byteStringToObject(type, pb.getWriteDefaultValue()) : null;
+        byteStringToObject(type, typeAttributes, pb.getWriteDefaultValue()) : null;
     ColumnSchema.Encoding encoding = ColumnSchema.Encoding.valueOf(pb.getEncoding().name());
     ColumnSchema.CompressionAlgorithm compressionAlgorithm =
         ColumnSchema.CompressionAlgorithm.valueOf(pb.getCompression().name());
@@ -100,9 +121,22 @@ public class ProtobufHelper {
                            .encoding(encoding)
                            .compressionAlgorithm(compressionAlgorithm)
                            .desiredBlockSize(desiredBlockSize)
+                           .typeAttributes(typeAttributes)
                            .build();
   }
 
+  public static ColumnTypeAttributes pbToColumnTypeAttributes(Common.ColumnTypeAttributesPB pb) {
+    ColumnTypeAttributes.ColumnTypeAttributesBuilder builder =
+        new ColumnTypeAttributes.ColumnTypeAttributesBuilder();
+    if(pb.hasPrecision()) {
+      builder.precision(pb.getPrecision());
+    }
+    if(pb.hasScale()) {
+      builder.scale(pb.getScale());
+    }
+    return builder.build();
+  }
+
   public static Schema pbToSchema(Common.SchemaPB schema) {
     List<ColumnSchema> columns = new ArrayList<>(schema.getColumnsCount());
     List<Integer> columnIds = new ArrayList<>(schema.getColumnsCount());
@@ -205,13 +239,16 @@ public class ProtobufHelper {
         return Bytes.fromFloat((Float) value);
       case DOUBLE:
         return Bytes.fromDouble((Double) value);
+      case DECIMAL:
+        return Bytes.fromBigDecimal((BigDecimal) value, col.getTypeAttributes().getPrecision());
       default:
         throw new IllegalArgumentException("The column " + col.getName() + " is of type " + col
             .getType() + " which is unknown");
     }
   }
 
-  private static Object byteStringToObject(Type type, ByteString value) {
+  private static Object byteStringToObject(Type type, ColumnTypeAttributes typeAttributes,
+                                           ByteString value) {
     ByteBuffer buf = value.asReadOnlyByteBuffer();
     buf.order(ByteOrder.LITTLE_ENDIAN);
     switch (type) {
@@ -234,6 +271,9 @@ public class ProtobufHelper {
         return value.toStringUtf8();
       case BINARY:
         return value.toByteArray();
+      case DECIMAL:
+        return Bytes.getDecimal(value.toByteArray(),
+            typeAttributes.getPrecision(), typeAttributes.getScale());
       default:
         throw new IllegalArgumentException("This type is unknown: " + type);
     }
@@ -270,6 +310,8 @@ public class ProtobufHelper {
       bytes = Bytes.fromFloat((Float) value);
     } else if (value instanceof Double) {
       bytes = Bytes.fromDouble((Double) value);
+    } else if (value instanceof BigDecimal) {
+      bytes = Bytes.fromBigDecimal((BigDecimal) value, DecimalUtil.MAX_DECIMAL_PRECISION);
     } else {
       throw new IllegalArgumentException("The default value provided for " +
           "column " + colName + " is of class " + value.getClass().getName() +

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index a0ca2b5..12edbf4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.text.FieldPosition;
@@ -29,6 +30,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.util.Slice;
@@ -89,7 +91,8 @@ public class RowResult {
     // Pre-compute the columns offsets in rowData for easier lookups later.
     // If the schema has nullables, we also add the offset for the null bitmap at the end.
     for (int i = 1; i < columnOffsetsSize; i++) {
-      int previousSize = schema.getColumnByIndex(i - 1).getType().getSize();
+      org.apache.kudu.ColumnSchema column = schema.getColumnByIndex(i - 1);
+      int previousSize = column.getTypeSize();
       columnOffsets[i] = previousSize + currentOffset;
       currentOffset += previousSize;
     }
@@ -325,6 +328,36 @@ public class RowResult {
   }
 
   /**
+   * Get the specified column's Decimal.
+   *
+   * @param columnName name of the column to get data for
+   * @return a BigDecimal
+   * @throws IllegalArgumentException if the column doesn't exist or is null
+   */
+  public BigDecimal getDecimal(String columnName) {
+    return getDecimal(this.schema.getColumnIndex(columnName));
+  }
+
+  /**
+   * Get the specified column's Decimal.
+   *
+   * @param columnIndex Column index in the schema
+   * @return a BigDecimal.
+   * @throws IllegalArgumentException if the column is null
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public BigDecimal getDecimal(int columnIndex) {
+    checkValidColumn(columnIndex);
+    checkNull(columnIndex);
+    checkType(columnIndex, Type.DECIMAL);
+    ColumnSchema column = schema.getColumnByIndex(columnIndex);
+    ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+    return Bytes.getDecimal(this.rowData.getRawArray(),
+        this.rowData.getRawOffset() + getCurrentRowDataOffsetForColumn(columnIndex),
+            typeAttributes.getPrecision(), typeAttributes.getScale());
+  }
+
+  /**
    * Get the schema used for this scanner's column projection.
    * @return a column projection as a schema.
    */
@@ -561,8 +594,13 @@ public class RowResult {
       if (i != 0) {
         buf.append(", ");
       }
-      buf.append(col.getType().name());
-      buf.append(" ").append(col.getName()).append("=");
+      Type type = col.getType();
+      buf.append(type.name());
+      buf.append(" ").append(col.getName());
+      if (col.getTypeAttributes() != null) {
+        buf.append(col.getTypeAttributes().toStringForType(type));
+      }
+      buf.append("=");
       if (isNull(i)) {
         buf.append("NULL");
       } else {
@@ -594,6 +632,9 @@ public class RowResult {
           case DOUBLE:
             buf.append(getDouble(i));
             break;
+          case DECIMAL:
+            buf.append(getDecimal(i));
+            break;
           case BOOL:
             buf.append(getBoolean(i));
             break;

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/main/java/org/apache/kudu/util/DecimalUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/DecimalUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/DecimalUtil.java
new file mode 100644
index 0000000..ae76481
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/DecimalUtil.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.MathContext;
+import java.math.RoundingMode;
+
+import com.google.common.base.Strings;
+
+import org.apache.kudu.ColumnTypeAttributes;
+
+import static org.apache.kudu.Common.DataType;
+
+public class DecimalUtil {
+  public static final int MAX_DECIMAL32_PRECISION = 9;
+  public static final int MAX_UNSCALED_DECIMAL32 = 999999999;
+  public static final int MIN_UNSCALED_DECIMAL32 = -MAX_UNSCALED_DECIMAL32;
+  public static final int DECIMAL32_SIZE = 32 / Byte.SIZE;
+
+  public static final int MAX_DECIMAL64_PRECISION = 18;
+  public static final long MAX_UNSCALED_DECIMAL64 = 999999999999999999L;
+  public static final long MIN_UNSCALED_DECIMAL64 = -MAX_UNSCALED_DECIMAL64;
+  public static final int DECIMAL64_SIZE = 64 / Byte.SIZE;
+
+  public static final int MAX_DECIMAL128_PRECISION = 38;
+  public static final BigInteger MAX_UNSCALED_DECIMAL128 =
+      new BigInteger(Strings.repeat("9", MAX_DECIMAL128_PRECISION));
+  public static final BigInteger MIN_UNSCALED_DECIMAL128 = MAX_UNSCALED_DECIMAL128.negate();
+  public static final int DECIMAL128_SIZE = 128 / Byte.SIZE;
+
+  public static final int MAX_DECIMAL_PRECISION = MAX_DECIMAL128_PRECISION;
+
+  /**
+   * Given a precision, returns the size of the Decimal in Bytes.
+   * @return the size in Bytes.
+   */
+  public static int precisionToSize(int precision) {
+    if (precision <= MAX_DECIMAL32_PRECISION) {
+      return DECIMAL32_SIZE;
+    } else if (precision <= MAX_DECIMAL64_PRECISION) {
+      return DECIMAL64_SIZE;
+    } else if (precision <= MAX_DECIMAL128_PRECISION) {
+      return DECIMAL128_SIZE;
+    } else {
+      throw new IllegalArgumentException("Unsupported decimal type precision: " + precision);
+    }
+  }
+
+  /**
+   * Given a precision, returns the smallest unscaled data type.
+   * @return the smallest valid DataType.
+   */
+  public static DataType precisionToDataType(int precision) {
+    if (precision <= MAX_DECIMAL32_PRECISION) {
+      return DataType.DECIMAL32;
+    } else if (precision <= MAX_DECIMAL64_PRECISION) {
+      return DataType.DECIMAL64;
+    } else if (precision <= MAX_DECIMAL128_PRECISION) {
+      return DataType.DECIMAL128;
+    } else {
+      throw new IllegalArgumentException("Unsupported decimal type precision: " + precision);
+    }
+  }
+
+  /**
+   * Returns the maximum value of a Decimal give a precision and scale.
+   * @param precision the precision of the decimal.
+   * @param scale the scale of the decimal.
+   * @return the maximum decimal value.
+   */
+  public static BigDecimal maxValue(int precision, int scale) {
+    String maxPrecision = Strings.repeat("9", precision);
+    return new BigDecimal(new BigInteger(maxPrecision), scale);
+  }
+
+  /**
+   * Returns the minimum value of a Decimal give a precision and scale.
+   * @param precision the precision of the decimal.
+   * @param scale the scale of the decimal.
+   * @return the minimum decimal value.
+   */
+  public static BigDecimal minValue(int precision, int scale) {
+    return maxValue(precision, scale).negate();
+  }
+
+  /**
+   * Returns the smallest value of a Decimal give a precision and scale.
+   * This value can be useful for incrementing a Decimal.
+   * @param scale the scale of the decimal.
+   * @return the smallest decimal value.
+   */
+  public static BigDecimal smallestValue(int scale) {
+    return new BigDecimal(BigInteger.ONE, scale);
+  }
+
+  /**
+   * Attempts to coerce a big decimal to a target precision and scale and
+   * returns the result. Throws an {@link IllegalArgumentException} if the value
+   * can't be coerced without rounding or exceeding the targetPrecision.
+   *
+   * @param val the BigDecimal value to coerce.
+   * @param targetPrecision the target precision of the coerced value.
+   * @param targetScale the target scale of the coerced value.
+   * @return the coerced BigDecimal value.
+   */
+  public static BigDecimal coerce(BigDecimal val, int targetPrecision, int targetScale) {
+    if (val.scale() != targetScale) {
+      try {
+        val = val.setScale(targetScale, BigDecimal.ROUND_UNNECESSARY);
+      } catch (ArithmeticException ex) {
+        throw new IllegalArgumentException("Value scale " + val.scale() +
+            " can't be coerced to target scale " +  targetScale + ". ");
+      }
+    }
+    if (val.precision() > targetPrecision) {
+      throw new IllegalArgumentException("Value precision " + val.precision() +
+          " (after scale coercion) can't be coerced to target precision " +  targetPrecision + ". ");
+    }
+    return val;
+  }
+
+  /**
+   * Convenience method to create column type attributes for decimal columns.
+   * @param precision the precision.
+   * @param scale the scale.
+   * @return the column type attributes.
+   */
+  public static ColumnTypeAttributes typeAttributes(int precision, int scale) {
+    return new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+        .precision(precision)
+        .scale(scale)
+        .build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index c1f8564..56b6127 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -38,11 +38,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Common.HostPortPB;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.LocatedTablet.Replica;
 import org.apache.kudu.master.Master;
+import org.apache.kudu.util.DecimalUtil;
 
 public class BaseKuduTest {
 
@@ -237,7 +239,9 @@ public class BaseKuduTest {
             new ColumnSchema.ColumnSchemaBuilder("binary-array", Type.BINARY).build(),
             new ColumnSchema.ColumnSchemaBuilder("binary-bytebuffer", Type.BINARY).build(),
             new ColumnSchema.ColumnSchemaBuilder("null", Type.STRING).nullable(true).build(),
-            new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS).build());
+            new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS).build(),
+            new ColumnSchema.ColumnSchemaBuilder("decimal", Type.DECIMAL)
+                .typeAttributes(DecimalUtil.typeAttributes(5, 3)).build());
 
     return new Schema(columns);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/test/java/org/apache/kudu/client/TestBytes.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestBytes.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestBytes.java
index ced6144..6291cf0 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestBytes.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestBytes.java
@@ -18,7 +18,12 @@ package org.apache.kudu.client;
 
 import static org.junit.Assert.assertEquals;
 
+import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.math.MathContext;
+import java.math.RoundingMode;
+
+import org.apache.kudu.util.DecimalUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,7 +32,7 @@ public class TestBytes {
 
   @Test
   public void test() {
-    byte[] bytes = new byte[8];
+    byte[] bytes = new byte[16];
 
     // Boolean
     Bytes.setUnsignedByte(bytes, (short) 1);
@@ -94,6 +99,44 @@ public class TestBytes {
     double aDouble = 123.456;
     Bytes.setDouble(bytes, aDouble);
     assertEquals(aDouble, Bytes.getDouble(bytes), 0.001);
+
+    // DECIMAL (32 bits)
+    BigDecimal smallDecimal = new BigDecimal(BigInteger.valueOf(123456789), 0,
+        new MathContext(DecimalUtil.MAX_DECIMAL32_PRECISION, RoundingMode.UNNECESSARY));
+    Bytes.setBigDecimal(bytes, smallDecimal, DecimalUtil.MAX_DECIMAL32_PRECISION);
+    assertEquals(smallDecimal,
+        Bytes.getDecimal(bytes, 0, DecimalUtil.MAX_DECIMAL32_PRECISION, 0));
+    BigDecimal nSmallDecimal = new BigDecimal(BigInteger.valueOf(-123456789), 0,
+        new MathContext(DecimalUtil.MAX_DECIMAL32_PRECISION, RoundingMode.UNNECESSARY));
+    Bytes.setBigDecimal(bytes, nSmallDecimal, DecimalUtil.MAX_DECIMAL32_PRECISION);
+    assertEquals(nSmallDecimal,
+        Bytes.getDecimal(bytes, 0, DecimalUtil.MAX_DECIMAL32_PRECISION, 0));
+
+    // DECIMAL (64 bits)
+    BigDecimal mediumDecimal = new BigDecimal(BigInteger.valueOf(123456789L), 0,
+        new MathContext(DecimalUtil.MAX_DECIMAL64_PRECISION, RoundingMode.UNNECESSARY));
+    Bytes.setBigDecimal(bytes, mediumDecimal, DecimalUtil.MAX_DECIMAL64_PRECISION);
+    assertEquals(mediumDecimal,
+        Bytes.getDecimal(bytes, DecimalUtil.MAX_DECIMAL64_PRECISION, 0));
+    BigDecimal nMediumDecimal = new BigDecimal(BigInteger.valueOf(-123456789L), 0,
+        new MathContext(DecimalUtil.MAX_DECIMAL64_PRECISION, RoundingMode.UNNECESSARY));
+    Bytes.setBigDecimal(bytes, nMediumDecimal, DecimalUtil.MAX_DECIMAL64_PRECISION);
+    assertEquals(nMediumDecimal,
+        Bytes.getDecimal(bytes, DecimalUtil.MAX_DECIMAL64_PRECISION, 0));
+
+    // DECIMAL (128 bits)
+    BigDecimal largeDecimal =
+        new BigDecimal(new java.math.BigInteger("1234567891011121314151617181920212223"), 0,
+        new MathContext(DecimalUtil.MAX_DECIMAL128_PRECISION, RoundingMode.UNNECESSARY));
+    Bytes.setBigDecimal(bytes, largeDecimal, DecimalUtil.MAX_DECIMAL128_PRECISION);
+    assertEquals(largeDecimal,
+        Bytes.getDecimal(bytes, DecimalUtil.MAX_DECIMAL128_PRECISION, 0));
+    BigDecimal nLargeDecimal =
+        new BigDecimal(new java.math.BigInteger("-1234567891011121314151617181920212223"), 0,
+            new MathContext(DecimalUtil.MAX_DECIMAL128_PRECISION, RoundingMode.UNNECESSARY));
+    Bytes.setBigDecimal(bytes, nLargeDecimal, DecimalUtil.MAX_DECIMAL128_PRECISION);
+    assertEquals(nLargeDecimal,
+        Bytes.getDecimal(bytes, DecimalUtil.MAX_DECIMAL128_PRECISION, 0));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/test/java/org/apache/kudu/client/TestColumnRangePredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestColumnRangePredicate.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestColumnRangePredicate.java
index e6022c1..979ce5d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestColumnRangePredicate.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestColumnRangePredicate.java
@@ -20,12 +20,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
+import java.math.BigDecimal;
 import java.util.List;
 
 import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Type;
 import org.apache.kudu.tserver.Tserver;
 
@@ -36,6 +38,11 @@ public class TestColumnRangePredicate {
     ColumnSchema col1 = new ColumnSchema.ColumnSchemaBuilder("col1", Type.INT32).build();
     ColumnSchema col2 = new ColumnSchema.ColumnSchemaBuilder("col2", Type.STRING).build();
 
+    ColumnSchema col3 = new ColumnSchema.ColumnSchemaBuilder("col3", Type.DECIMAL)
+        .typeAttributes(new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+            .precision(6).scale(2).build()
+        ).build();
+
     ColumnRangePredicate pred1 = new ColumnRangePredicate(col1);
     pred1.setLowerBound(1);
 
@@ -46,7 +53,10 @@ public class TestColumnRangePredicate {
     pred3.setLowerBound("aaa");
     pred3.setUpperBound("bbb");
 
-    List<ColumnRangePredicate> preds = Lists.newArrayList(pred1, pred2, pred3);
+    ColumnRangePredicate pred4 = new ColumnRangePredicate(col3);
+    pred4.setLowerBound(BigDecimal.valueOf(12345, 2));
+
+    List<ColumnRangePredicate> preds = Lists.newArrayList(pred1, pred2, pred3, pred4);
 
     byte[] rawPreds = ColumnRangePredicate.toByteArray(preds);
 
@@ -57,7 +67,7 @@ public class TestColumnRangePredicate {
       fail("Couldn't decode: " + e.getMessage());
     }
 
-    assertEquals(3, decodedPreds.size());
+    assertEquals(4, decodedPreds.size());
 
     assertEquals(col1.getName(), decodedPreds.get(0).getColumn().getName());
     assertEquals(1, Bytes.getInt(decodedPreds.get(0).getLowerBound().toByteArray()));
@@ -70,5 +80,9 @@ public class TestColumnRangePredicate {
     assertEquals(col2.getName(), decodedPreds.get(2).getColumn().getName());
     assertEquals("aaa", Bytes.getString(decodedPreds.get(2).getLowerBound().toByteArray()));
     assertEquals("bbb", Bytes.getString(decodedPreds.get(2).getInclusiveUpperBound().toByteArray()));
+
+    assertEquals(col3.getName(), decodedPreds.get(3).getColumn().getName());
+    assertEquals(12345, Bytes.getInt(decodedPreds.get(3).getLowerBound().toByteArray()));
+    assertFalse(decodedPreds.get(0).hasInclusiveUpperBound());
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f34b69d/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
index 0ae3479..6122190 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -33,6 +34,7 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
 import org.apache.kudu.client.PartitionSchema.RangeSchema;
+import org.apache.kudu.util.DecimalUtil;
 
 public class TestKeyEncoding extends BaseKuduTest {
 
@@ -164,6 +166,12 @@ public class TestKeyEncoding extends BaseKuduTest {
         new ColumnSchemaBuilder("int16", Type.INT16).key(true),
         new ColumnSchemaBuilder("int32", Type.INT32).key(true),
         new ColumnSchemaBuilder("int64", Type.INT64).key(true),
+        new ColumnSchemaBuilder("decimal32", Type.DECIMAL).key(true)
+            .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL32_PRECISION, 0)),
+        new ColumnSchemaBuilder("decimal64", Type.DECIMAL).key(true)
+            .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 0)),
+        new ColumnSchemaBuilder("decimal128", Type.DECIMAL).key(true)
+            .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL128_PRECISION, 0)),
         new ColumnSchemaBuilder("string", Type.STRING).key(true),
         new ColumnSchemaBuilder("binary", Type.BINARY).key(true));
 
@@ -172,17 +180,26 @@ public class TestKeyEncoding extends BaseKuduTest {
     rowA.addShort("int16", Short.MIN_VALUE);
     rowA.addInt("int32", Integer.MIN_VALUE);
     rowA.addLong("int64", Long.MIN_VALUE);
+    // Note: The decimal value is not the minimum of the underlying int32, int64, int128 type so
+    // we don't use "minimum" values in the test.
+    rowA.addDecimal("decimal32", BigDecimal.valueOf(5));
+    rowA.addDecimal("decimal64", BigDecimal.valueOf(6));
+    rowA.addDecimal("decimal128", BigDecimal.valueOf(7));
     rowA.addString("string", "");
     rowA.addBinary("binary", "".getBytes(Charsets.UTF_8));
 
     byte[] rowAEncoded = rowA.encodePrimaryKey();
     assertBytesEquals(rowAEncoded,
-                      "\0"
-                    + "\0\0"
-                    + "\0\0\0\0"
-                    + "\0\0\0\0\0\0\0\0"
-                    + "\0\0"
-                    + "");
+                      new byte[] {
+                          0,
+                          0, 0,
+                          0, 0, 0, 0,
+                          0, 0, 0, 0, 0, 0, 0, 0,
+                          (byte) 0x80, 0, 0, 5,
+                          (byte) 0x80, 0, 0, 0, 0, 0, 0, 6,
+                          (byte) 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
+                          0, 0
+                      });
     assertEquals(rowA.stringifyRowKey(),
                  KeyEncoder.decodePrimaryKey(schema, rowAEncoded).stringifyRowKey());
 
@@ -191,6 +208,11 @@ public class TestKeyEncoding extends BaseKuduTest {
     rowB.addShort("int16", Short.MAX_VALUE);
     rowB.addInt("int32", Integer.MAX_VALUE);
     rowB.addLong("int64", Long.MAX_VALUE);
+    // Note: The decimal value is not the maximum of the underlying int32, int64, int128 type so
+    // we don't use "minimum" values in the test.
+    rowB.addDecimal("decimal32", BigDecimal.valueOf(5));
+    rowB.addDecimal("decimal64", BigDecimal.valueOf(6));
+    rowB.addDecimal("decimal128", BigDecimal.valueOf(7));
     rowB.addString("string", "abc\1\0def");
     rowB.addBinary("binary", "\0\1binary".getBytes(Charsets.UTF_8));
 
@@ -201,6 +223,9 @@ public class TestKeyEncoding extends BaseKuduTest {
                           -1, -1,
                           -1, -1, -1, -1,
                           -1, -1, -1, -1, -1, -1, -1, -1,
+                          (byte) 0x80, 0, 0, 5,
+                          (byte) 0x80, 0, 0, 0, 0, 0, 0, 6,
+                          (byte) 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
                           'a', 'b', 'c', 1, 0, 1, 'd', 'e', 'f', 0, 0,
                           0, 1, 'b', 'i', 'n', 'a', 'r', 'y',
                       });
@@ -212,6 +237,9 @@ public class TestKeyEncoding extends BaseKuduTest {
     rowC.addShort("int16", (short) 2);
     rowC.addInt("int32", 3);
     rowC.addLong("int64", 4);
+    rowC.addDecimal("decimal32", BigDecimal.valueOf(5));
+    rowC.addDecimal("decimal64", BigDecimal.valueOf(6));
+    rowC.addDecimal("decimal128", BigDecimal.valueOf(7));
     rowC.addString("string", "abc\n123");
     rowC.addBinary("binary", "\0\1\2\3\4\5".getBytes(Charsets.UTF_8));
 
@@ -222,6 +250,9 @@ public class TestKeyEncoding extends BaseKuduTest {
                           (byte) 0x80, 2,
                           (byte) 0x80, 0, 0, 3,
                           (byte) 0x80, 0, 0, 0, 0, 0, 0, 4,
+                          (byte) 0x80, 0, 0, 5,
+                          (byte) 0x80, 0, 0, 0, 0, 0, 0, 6,
+                          (byte) 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7,
                           'a', 'b', 'c', '\n', '1', '2', '3', 0, 0,
                           0, 1, 2, 3, 4, 5,
                       });
@@ -233,6 +264,9 @@ public class TestKeyEncoding extends BaseKuduTest {
     rowD.addShort("int16", (short) -2);
     rowD.addInt("int32", -3);
     rowD.addLong("int64", -4);
+    rowD.addDecimal("decimal32", BigDecimal.valueOf(-5));
+    rowD.addDecimal("decimal64", BigDecimal.valueOf(-6));
+    rowD.addDecimal("decimal128", BigDecimal.valueOf(-7));
     rowD.addString("string", "\0abc\n\1\1\0 123\1\0");
     rowD.addBinary("binary", "\0\1\2\3\4\5\0".getBytes(Charsets.UTF_8));
 
@@ -243,6 +277,9 @@ public class TestKeyEncoding extends BaseKuduTest {
                           (byte) 127, -2,
                           (byte) 127, -1, -1, -3,
                           (byte) 127, -1, -1, -1, -1, -1, -1, -4,
+                          (byte) 127, -1, -1, -5,
+                          (byte) 127, -1, -1, -1, -1, -1, -1, -6,
+                          (byte) 127, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -7,
                           0, 1, 'a', 'b', 'c', '\n', 1, 1, 0, 1, ' ', '1', '2', '3', 1, 0, 1, 0, 0,
                           0, 1, 2, 3, 4, 5, 0,
                       });
@@ -325,6 +362,12 @@ public class TestKeyEncoding extends BaseKuduTest {
         new ColumnSchemaBuilder("string", Type.STRING).key(true),
         new ColumnSchemaBuilder("binary", Type.BINARY).key(true),
         new ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS).key(true),
+        new ColumnSchemaBuilder("decimal32", Type.DECIMAL).key(true)
+            .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL32_PRECISION, 0)),
+        new ColumnSchemaBuilder("decimal64", Type.DECIMAL).key(true)
+          .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 0)),
+        new ColumnSchemaBuilder("decimal128", Type.DECIMAL).key(true)
+          .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL128_PRECISION, 0)),
         new ColumnSchemaBuilder("bool", Type.BOOL),       // not primary key type
         new ColumnSchemaBuilder("float", Type.FLOAT),     // not primary key type
         new ColumnSchemaBuilder("double", Type.DOUBLE));  // not primary key type
@@ -342,9 +385,12 @@ public class TestKeyEncoding extends BaseKuduTest {
     row.addString(4, "foo");
     row.addBinary(5, "bar".getBytes(Charsets.UTF_8));
     row.addLong(6, 6l);
-    row.addBoolean(7, true);
-    row.addFloat(8, 8.8f);
-    row.addDouble(9, 9.9);
+    row.addDecimal(7, BigDecimal.valueOf(DecimalUtil.MAX_UNSCALED_DECIMAL32));
+    row.addDecimal(8, BigDecimal.valueOf(DecimalUtil.MAX_UNSCALED_DECIMAL64));
+    row.addDecimal(9, new BigDecimal(DecimalUtil.MAX_UNSCALED_DECIMAL128));
+    row.addBoolean(10, true);
+    row.addFloat(11, 8.8f);
+    row.addDouble(12, 9.9);
     session.apply(insert);
     session.close();
 
@@ -360,9 +406,15 @@ public class TestKeyEncoding extends BaseKuduTest {
       assertBytesEquals(rr.getBinaryCopy(4), "foo");
       assertBytesEquals(rr.getBinaryCopy(5), "bar");
       assertEquals(6l, rr.getLong(6));
-      assertTrue(rr.getBoolean(7));
-      assertEquals(8.8f, rr.getFloat(8), .001f);
-      assertEquals(9.9, rr.getDouble(9), .001);
+      assertTrue(BigDecimal.valueOf(DecimalUtil.MAX_UNSCALED_DECIMAL32)
+          .compareTo(rr.getDecimal(7)) == 0);
+      assertTrue(BigDecimal.valueOf(DecimalUtil.MAX_UNSCALED_DECIMAL64)
+          .compareTo(rr.getDecimal(8)) == 0);
+      assertTrue(new BigDecimal(DecimalUtil.MAX_UNSCALED_DECIMAL128)
+          .compareTo(rr.getDecimal(9)) == 0);
+      assertTrue(rr.getBoolean(10));
+      assertEquals(8.8f, rr.getFloat(11), .001f);
+      assertEquals(9.9, rr.getDouble(12), .001);
     }
   }
 }


[2/6] kudu git commit: [docs] Add scaling guide

Posted by gr...@apache.org.
[docs] Add scaling guide

This adds some more detailed information on how Kudu scales w.r.t
several resources and provides some background on the scale limits
and how to plan capacity for a Kudu deployment.

Change-Id: I38d8999addc41fe0b726342a27dbba199ddf7dd2
Reviewed-on: http://gerrit.cloudera.org:8080/8842
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c9e3bd84
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c9e3bd84
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c9e3bd84

Branch: refs/heads/master
Commit: c9e3bd846d6d0403c688a8e7d178f31966a3bfd3
Parents: 5121871
Author: Will Berkeley <wd...@apache.org>
Authored: Tue Dec 12 14:01:29 2017 -0800
Committer: Will Berkeley <wd...@gmail.com>
Committed: Tue Feb 13 21:08:07 2018 +0000

----------------------------------------------------------------------
 docs/scaling_guide.adoc | 182 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 182 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c9e3bd84/docs/scaling_guide.adoc
----------------------------------------------------------------------
diff --git a/docs/scaling_guide.adoc b/docs/scaling_guide.adoc
new file mode 100644
index 0000000..93c4334
--- /dev/null
+++ b/docs/scaling_guide.adoc
@@ -0,0 +1,182 @@
+[[scaling]]
+= Apache Kudu Scaling Guide
+
+:author: Kudu Team
+:imagesdir: ./images
+:icons: font
+:toc: left
+:toclevels: 2
+:doctype: book
+:backend: html5
+:sectlinks:
+:experimental:
+
+This document describes in detail how Kudu scales with respect to various system resources,
+including memory, file descriptors, and threads. See the
+link:known_issues.html#_scale[scaling limits] for the maximum recommended parameters of a Kudu
+cluster. They can be used to estimate roughly the number of servers required for a given quantity
+of data.
+
+WARNING: The recommendations and conclusions here are only approximations. Appropriate numbers
+depend on use case. There is no substitute for measurement and monitoring of resources used during a
+representative workload.
+
+== Terms
+
+We will use the following terms:
+
+* *hot replica*: A tablet replica that is continuously receiving writes. For example, in a time
+series use case, tablet replicas for the most recent range partition on a time column would be
+continuously receiving the latest data, and would be hot replicas.
+* *cold replica*: A tablet replica that is not hot, i.e. a replica that is not frequently receiving
+writes, for example, once every few minutes. A cold replica may be read from. For example, in a time
+series use case, tablet replicas for previous range partitions on a time column would not receive
+writes at all, or only occasionally receive late updates or additions, but may be constantly read.
+* *data on disk*: The total amount of data stored on a tablet server across all disks,
+post-replication, post-compression, and post-encoding.
+
+== Example Workload
+
+The sections below perform sample calculations using the following parameters:
+
+* 200 hot replicas per tablet server
+* 1600 cold replicas per tablet server
+* 8TB of data on disk per tablet server (about 4.5GB/replica)
+* 512MB block cache
+* 40 cores per server
+* limit of 32000 file descriptors per server
+* a read workload with 1 frequently-scanned table with 40 columns
+
+This workload resembles a time series use case, where the hot replicas correspond to the most recent
+range partition on time.
+
+[[memory]]
+== Memory
+
+The flag `--memory_limit_hard_bytes` determines the maximum amount of memory that a Kudu tablet
+server may use. The amount of memory used by a tablet server scales with data size, write workload,
+and read concurrency. The following table provides numbers that can be used to compute a rough
+estimate of memory usage.
+
+.Tablet Server Memory Usage
+|===
+| Type | Multiplier | Description
+
+| Memory required per TB of data on disk | 1.5GB per 1TB data on disk | Amount of memory per unit of data on disk required for
+basic operation of the tablet server.
+| Hot Replicas' MemRowSets and DeltaMemStores | minimum 128MB per hot replica | Minimum amount of data
+to flush per MemRowSet flush. For most use cases, updates should be rare compared to inserts, so the
+DeltaMemStores should be very small.
+| Scans | 256KB per column per core for read-heavy tables | Amount of memory used by scanners, and which
+will be constantly needed for tables which are constantly read.
+| Block Cache | Fixed by `--block_cache_capacity_mb` (default 512MB) | Amount of memory reserved for use by the
+block cache.
+|===
+
+Using this information for the example load gives the following breakdown of memory usage:
+
+.Example Tablet Server Memory Usage
+|===
+| Type | Amount
+
+| 8TB data on disk | 8TB * 1.5GB / 1TB = 12GB
+| 200 hot replicas | 200 * 128MB = 25.6GB
+| 1 40-column, frequently-scanned table | 40 * 40 * 256KB = 409.6MB
+| Block Cache | `--block_cache_capacity_mb=512` = 512MB
+| Expected memory usage | 38.5GB
+| Recommended hard limit | 52GB
+|===
+
+Using this as a rough estimate of Kudu's memory usage, select a memory limit so that the expected
+memory usage of Kudu is around 50-75% of the hard limit.
+
+=== Verifying if a Memory Limit is sufficient
+
+After configuring an appropriate memory limit with `--memory_limit_hard_bytes`, run a workload and
+monitor the Kudu tablet server process's RAM usage. The memory usage should stay around 50-75% of
+the hard limit, with occasional spikes above 75% but below 100%. If the tablet server runs above 75%
+consistently, the memory limit should be increased.
+
+Additionally, it's also useful to monitor the logs for memory rejections, which look like:
+
+----
+Service unavailable: Soft memory limit exceeded (at 96.35% of capacity)
+----
+
+and watch the memory rejections metrics:
+
+* `leader_memory_pressure_rejections`
+* `follower_memory_pressure_rejections`
+* `transaction_memory_pressure_rejections`
+
+Occasional rejections due to memory pressure are fine and act as backpressure to clients. Clients
+will transparently retry operations. However, no operations should time out.
+
+[[file_descriptors]]
+== File Descriptors
+
+Processes are allotted a maximum number of open file descriptors (also referred to as fds). If a
+tablet server attempts to open too many fds, it will typically crash with a message saying something
+like "too many open files". The following table summarizes the sources of file descriptor usage in a
+Kudu tablet server process:
+
+.Tablet Server File Descriptor Usage
+|===
+| Type | Multiplier | Description
+
+| File cache | Fixed by `--block_manager_max_open_files` (default 40% of process maximum) | Maximum allowed open fds reserved for use by
+the file cache.
+| Hot replicas | 2 per WAL segment, 1 per WAL index | Number of fds used by hot replicas. See below
+for more explanation.
+| Cold replicas | 3 per cold replica | Number of fds used per cold replica: 2 for the single WAL
+segment and 1 for the single WAL index.
+|===
+
+Every replica has at least one WAL segment and at least one WAL index, and should have the same
+number of segments and indices; however, the number of segments and indices can be greater for a
+replica if one of its peer replicas is falling behind. WAL segment and index fds are closed as WALs
+are garbage collected.
+
+Using this information for the example load gives the following breakdown of file descriptor usage,
+under the assumption that some replicas are lagging and using 10 WAL segments:
+
+.Example Tablet Server File Descriptor Usage
+|===
+| Type | Amount
+
+| file cache | 40% * 32000 fds = 12800 fds
+| 1600 cold replicas | 1600 cold replicas * 3 fds / cold replica = 4800 fds
+| 200 hot replicas | (2 / segment * 10 segments/hot replica * 200 hot replicas) + (1 / index * 10 indices / hot replica * 200 hot replicas) = 6000 fds
+| Total | 23600 fds
+|===
+
+So for this example, the tablet server process has about 32000 - 23600 = 8400 fds to spare.
+
+There is typically no downside to configuring a higher file descriptor limit if approaching the
+currently configured limit.
+
+[[threads]]
+== Threads
+
+Processes are allotted a maximum number of threads by the operating system, and this limit is
+typically difficult or impossible to change. Therefore, this section is more informational than
+advisory.
+
+If a Kudu tablet server's thread count exceeds the OS limit, it will crash, usually with a message
+in the logs like "pthread_create failed: Resource temporarily unavailable". If the system thread
+count limit is exceeded, other processes on the same node may also crash.
+
+The table below summarizes thread usage by a Kudu tablet server:
+
+.Tablet Server Thread Usage
+|===
+| Consumer | Multiplier
+
+| Hot replica | 5 threads per hot replica
+| Cold replica | 2 threads per cold replica
+| Replica at startup | 5 threads per replica
+|===
+
+As indicated in the table, all replicas may be considered "hot" when the tablet server starts, so,
+for our example load, thread usage should peak around 5 threads / replica * (200 hot replicas + 1600
+cold replicas) = 18000 threads at startup time.