You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/05/31 18:18:22 UTC

[3/3] kudu git commit: [java] Add Timestamp APIs to kudu-client

[java] Add Timestamp APIs to kudu-client

Adds support for java.sql.Timestamp to PartialRow,
RowResult, and KuduPredicate. This simplifies and
unfies the handling of Timestamps when reading
and writing from Kudu.

This functionality existied in the kudu-spark
integration, but has been moved into the kudu-client
and can be leveraged in other integrations going
forward.

Change-Id: I1fdd397691f12f279663838af9fa12ed27508fd1
Reviewed-on: http://gerrit.cloudera.org:8080/10502
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5f9a2f52
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5f9a2f52
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5f9a2f52

Branch: refs/heads/master
Commit: 5f9a2f523a58990af63b1b6eaef16dfc35eabddd
Parents: 4883eee
Author: Grant Henke <gr...@apache.org>
Authored: Thu May 24 10:51:55 2018 -0500
Committer: Grant Henke <gr...@apache.org>
Committed: Thu May 31 15:54:01 2018 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/KuduPredicate.java   | 18 +++-
 .../java/org/apache/kudu/client/PartialRow.java | 67 +++++++++++++-
 .../java/org/apache/kudu/client/RowResult.java  | 61 +++++++------
 .../org/apache/kudu/util/TimestampUtil.java     | 92 ++++++++++++++++++++
 .../org/apache/kudu/client/TestKuduClient.java  |  6 +-
 .../org/apache/kudu/client/TestPartialRow.java  | 13 +--
 .../org/apache/kudu/client/TestRowResult.java   |  7 +-
 .../org/apache/kudu/util/TestTimestampUtil.java | 75 ++++++++++++++++
 .../apache/kudu/spark/kudu/DefaultSource.scala  | 39 +--------
 .../apache/kudu/spark/kudu/KuduContext.scala    |  2 +-
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    |  2 +-
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 23 -----
 .../kudu/spark/kudu/KuduContextTest.scala       |  3 +-
 13 files changed, 304 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
index b0ff0b4..f332edb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,6 +36,7 @@ import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedBytes;
 import com.google.protobuf.ByteString;
+import org.apache.kudu.util.TimestampUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -298,6 +300,20 @@ public class KuduPredicate {
   }
 
   /**
+   * Creates a new comparison predicate on a timestamp column.
+   * @param column the column schema
+   * @param op the comparison operation
+   * @param value the value to compare against
+   */
+  public static KuduPredicate newComparisonPredicate(ColumnSchema column,
+                                                     ComparisonOp op,
+                                                     Timestamp value) {
+    checkColumn(column, Type.UNIXTIME_MICROS);
+    long micros = TimestampUtil.timestampToMicros(value);
+    return newComparisonPredicate(column, op, micros);
+  }
+
+  /**
    * Creates a new comparison predicate on a float column.
    * @param column the column schema
    * @param op the comparison operation
@@ -1079,7 +1095,7 @@ public class KuduPredicate {
       case INT16: return Short.toString(Bytes.getShort(value));
       case INT32: return Integer.toString(Bytes.getInt(value));
       case INT64: return Long.toString(Bytes.getLong(value));
-      case UNIXTIME_MICROS: return RowResult.timestampToString(Bytes.getLong(value));
+      case UNIXTIME_MICROS: return TimestampUtil.timestampToString(Bytes.getLong(value));
       case FLOAT: return Float.toString(Bytes.getFloat(value));
       case DOUBLE: return Double.toString(Bytes.getDouble(value));
       case STRING: {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index 43cc27a..6984545 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -27,6 +28,7 @@ import java.util.ListIterator;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.kudu.util.TimestampUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.jboss.netty.util.CharsetUtil;
@@ -545,6 +547,69 @@ public class PartialRow {
   }
 
   /**
+   * Add a Timestamp for the specified column.
+   *
+   * Note: Timestamp instances with nanosecond precision are truncated to microseconds.
+   *
+   * @param columnIndex the column's index in the schema
+   * @param val value to add
+   * @throws IllegalArgumentException if the value doesn't match the column's type
+   * @throws IllegalStateException if the row was already applied
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public void addTimestamp(int columnIndex, Timestamp val) {
+    checkNotFrozen();
+    ColumnSchema column = schema.getColumnByIndex(columnIndex);
+    checkColumn(column, Type.UNIXTIME_MICROS);
+    long micros = TimestampUtil.timestampToMicros(val);
+    Bytes.setLong(rowAlloc, micros, getPositionInRowAllocAndSetBitSet(columnIndex));
+  }
+
+  /**
+   * Add a Timestamp for the specified column.
+   *
+   * Note: Timestamp instances with nanosecond precision are truncated to microseconds.
+   *
+   * @param columnName Name of the column
+   * @param val value to add
+   * @throws IllegalArgumentException if the column doesn't exist
+   * or if the value doesn't match the column's type
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addTimestamp(String columnName, Timestamp val) {
+    addTimestamp(schema.getColumnIndex(columnName), val);
+  }
+
+  /**
+   * Get the specified column's Timestamp.
+   *
+   * @param columnName name of the column to get data for
+   * @return a Timestamp
+   * @throws IllegalArgumentException if the column doesn't exist,
+   * is null, is unset, or the type doesn't match the column's type
+   */
+  public Timestamp getTimestamp(String columnName) {
+    return getTimestamp(this.schema.getColumnIndex(columnName));
+  }
+
+  /**
+   * Get the specified column's Timestamp.
+   *
+   * @param columnIndex Column index in the schema
+   * @return a Timestamp
+   * @throws IllegalArgumentException if the column is null, is unset,
+   * or if the type doesn't match the column's type
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public Timestamp getTimestamp(int columnIndex) {
+    checkColumn(schema.getColumnByIndex(columnIndex), Type.UNIXTIME_MICROS);
+    checkColumnExists(schema.getColumnByIndex(columnIndex));
+    checkValue(columnIndex);
+    long micros = Bytes.getLong(rowAlloc, schema.getColumnOffset(columnIndex));
+    return TimestampUtil.microsToTimestamp(micros);
+  }
+
+  /**
    * Add a String for the specified column.
    * @param columnIndex the column's index in the schema
    * @param val value to add
@@ -1053,7 +1118,7 @@ public class PartialRow {
         sb.append(Bytes.getLong(rowAlloc, schema.getColumnOffset(idx)));
         return;
       case UNIXTIME_MICROS:
-        sb.append(RowResult.timestampToString(
+        sb.append(TimestampUtil.timestampToString(
             Bytes.getLong(rowAlloc, schema.getColumnOffset(idx))));
         return;
       case FLOAT:

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index 8a08b59..ee8b20b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.sql.Timestamp;
 import java.text.DateFormat;
 import java.text.FieldPosition;
 import java.text.SimpleDateFormat;
@@ -26,6 +27,7 @@ import java.util.BitSet;
 import java.util.Date;
 import java.util.TimeZone;
 
+import org.apache.kudu.util.TimestampUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -44,18 +46,6 @@ public class RowResult {
 
   private static final int INDEX_RESET_LOCATION = -1;
 
-  // Thread local DateFormat since they're not thread-safe.
-  private static final ThreadLocal<DateFormat> DATE_FORMAT = new ThreadLocal<DateFormat>() {
-    @Override
-    protected DateFormat initialValue() {
-      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
-      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
-      return sdf;
-    }
-  };
-
-  private static final long MS_IN_S = 1000L;
-  private static final long US_IN_S = 1000L * 1000L;
   private int index = INDEX_RESET_LOCATION;
   private int offset;
   private BitSet nullsBitSet;
@@ -358,6 +348,38 @@ public class RowResult {
   }
 
   /**
+   * Get the specified column's Timestamp.
+   *
+   * @param columnName name of the column to get data for
+   * @return a Timestamp
+   * @throws IllegalArgumentException if the column doesn't exist,
+   * is null, is unset, or the type doesn't match the column's type
+   */
+  public Timestamp getTimestamp(String columnName) {
+    return getTimestamp(this.schema.getColumnIndex(columnName));
+  }
+
+  /**
+   * Get the specified column's Timestamp.
+   *
+   * @param columnIndex Column index in the schema
+   * @return a Timestamp
+   * @throws IllegalArgumentException if the column is null, is unset,
+   * or if the type doesn't match the column's type
+   * @throws IndexOutOfBoundsException if the column doesn't exist
+   */
+  public Timestamp getTimestamp(int columnIndex) {
+    checkValidColumn(columnIndex);
+    checkNull(columnIndex);
+    checkType(columnIndex, Type.UNIXTIME_MICROS);
+    ColumnSchema column = schema.getColumnByIndex(columnIndex);
+    long micros = Bytes.getLong(this.rowData.getRawArray(),
+        this.rowData.getRawOffset() +
+            getCurrentRowDataOffsetForColumn(columnIndex));
+    return TimestampUtil.microsToTimestamp(micros);
+  }
+
+  /**
    * Get the schema used for this scanner's column projection.
    * @return a column projection as a schema.
    */
@@ -569,19 +591,6 @@ public class RowResult {
   }
 
   /**
-   * Transforms a timestamp into a string, whose formatting and timezone is consistent
-   * across Kudu.
-   * @param timestamp the timestamp, in microseconds
-   * @return a string, in the format: YYYY-MM-DDTHH:MM:SS.ssssssZ
-   */
-  static String timestampToString(long timestamp) {
-    long tsMillis = timestamp / MS_IN_S;
-    long tsMicros = timestamp % US_IN_S;
-    String tsStr = DATE_FORMAT.get().format(new Date(tsMillis));
-    return String.format("%s.%06dZ", tsStr, tsMicros);
-  }
-
-  /**
    * Return the actual data from this row in a stringified key=value
    * form.
    */
@@ -616,7 +625,7 @@ public class RowResult {
             buf.append(getLong(i));
             break;
           case UNIXTIME_MICROS: {
-            buf.append(timestampToString(getLong(i)));
+            buf.append(TimestampUtil.timestampToString(getLong(i)));
           } break;
           case STRING:
             buf.append(getString(i));

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
new file mode 100644
index 0000000..f6ebd65
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+@InterfaceAudience.Private
+public class TimestampUtil {
+
+  // Thread local DateFormat since they're not thread-safe.
+  private static final ThreadLocal<DateFormat> DATE_FORMAT = new ThreadLocal<DateFormat>() {
+    @Override
+    protected DateFormat initialValue() {
+      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+      return sdf;
+    }
+  };
+
+  /**
+   * Converts a {@link Timestamp} to microseconds since the Unix epoch (1970-01-01T00:00:00Z).
+   *
+   * Note: Timestamp instances with nanosecond precision are truncated to microseconds.
+   *
+   * @param timestamp the timestamp to convert to microseconds
+   * @return the microseconds since the Unix epoch
+   */
+  public static long timestampToMicros(Timestamp timestamp) {
+    // Number of whole milliseconds since the Unix epoch, in microseconds.
+    long millis = timestamp.getTime() * 1000L;
+    // Sub millisecond time since the Unix epoch, in microseconds.
+    long micros = (timestamp.getNanos() % 1000000L) / 1000L;
+    if (micros >= 0) {
+      return millis + micros;
+    } else {
+      return millis + 1000000L + micros;
+    }
+  }
+
+  /**
+   * Converts a microsecond offset from the Unix epoch (1970-01-01T00:00:00Z)
+   * to a {@link Timestamp}.
+   *
+   * @param micros the offset in microseconds since the Unix epoch
+   * @return the corresponding timestamp
+   */
+  public static Timestamp microsToTimestamp(long micros) {
+    long millis = micros / 1000L;
+    long nanos = (micros % 1000000L) * 1000L;
+    if (nanos < 0) {
+      millis -= 1L;
+      nanos += 1000000000L;
+    }
+    Timestamp timestamp = new Timestamp(millis);
+    timestamp.setNanos((int) nanos);
+    return timestamp;
+  }
+
+  /**
+   * Transforms a timestamp into a string, whose formatting and timezone is consistent
+   * across Kudu.
+   * @param micros the timestamp, in microseconds
+   * @return a string, in the format: YYYY-MM-DDTHH:MM:SS.ssssssZ
+   */
+  public static String timestampToString(long micros) {
+    long tsMillis = micros / 1000L;
+    long tsMicros = micros % 1000000L;
+    String tsStr = DATE_FORMAT.get().format(new Date(tsMillis));
+    return String.format("%s.%06dZ", tsStr, tsMicros);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 5b3e210..845b4a7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -21,7 +21,6 @@ import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER;
 import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER_EQUAL;
 import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS;
 import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS_EQUAL;
-import static org.apache.kudu.client.RowResult.timestampToString;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -47,6 +46,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.stumbleupon.async.Deferred;
 
+import org.apache.kudu.util.TimestampUtil;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -439,9 +439,9 @@ public class TestKuduClient extends BaseKuduTest {
     for (int i = 0; i < rowStrings.size(); i++) {
       StringBuilder expectedRow = new StringBuilder();
       expectedRow.append(String.format("UNIXTIME_MICROS key=%s, UNIXTIME_MICROS c1=",
-          timestampToString(timestamps.get(i))));
+          TimestampUtil.timestampToString(timestamps.get(i))));
       if (i % 2 == 1) {
-        expectedRow.append(timestampToString(timestamps.get(i)));
+        expectedRow.append(TimestampUtil.timestampToString(timestamps.get(i)));
       } else {
         expectedRow.append("NULL");
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
index 865a7a2..4ae7eaf 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.sql.Timestamp;
 
 import org.junit.Test;
 
@@ -42,7 +43,7 @@ public class TestPartialRow {
     assertEquals(43, partialRow.getShort("int16"));
     assertEquals(44, partialRow.getInt("int32"));
     assertEquals(45, partialRow.getLong("int64"));
-    assertEquals(1234567890, partialRow.getLong("timestamp"));
+    assertEquals(new Timestamp(1234567890), partialRow.getTimestamp("timestamp"));
     assertEquals(52.35F, partialRow.getFloat("float"), 0.0f);
     assertEquals(53.35, partialRow.getDouble("double"), 0.0);
     assertEquals("fun with ütf\0", partialRow.getString("string"));
@@ -372,7 +373,7 @@ public class TestPartialRow {
     row.addShort("int16", (short) 43);
     row.addInt("int32", 44);
     row.addLong("int64", 45);
-    row.addLong("timestamp", 1234567890); // Fri, 13 Feb 2009 23:31:30 UTC
+    row.addTimestamp("timestamp", new Timestamp(1234567890));
     row.addBoolean("bool", true);
     row.addFloat("float", 52.35F);
     row.addDouble("double", 53.35);
@@ -397,7 +398,7 @@ public class TestPartialRow {
       case INT16: return partialRow.getShort(columnName);
       case INT32: return partialRow.getInt(columnName);
       case INT64: return partialRow.getLong(columnName);
-      case UNIXTIME_MICROS: return partialRow.getLong(columnName);
+      case UNIXTIME_MICROS: return partialRow.getTimestamp(columnName);
       case STRING: return partialRow.getString(columnName);
       case BINARY: return partialRow.getBinary(columnName);
       case FLOAT: return partialRow.getFloat(columnName);
@@ -415,7 +416,7 @@ public class TestPartialRow {
       case INT16: return partialRow.getShort(columnIndex);
       case INT32: return partialRow.getInt(columnIndex);
       case INT64: return partialRow.getLong(columnIndex);
-      case UNIXTIME_MICROS: return partialRow.getLong(columnIndex);
+      case UNIXTIME_MICROS: return partialRow.getTimestamp(columnIndex);
       case STRING: return partialRow.getString(columnIndex);
       case BINARY: return partialRow.getBinary(columnIndex);
       case FLOAT: return partialRow.getFloat(columnIndex);
@@ -433,7 +434,7 @@ public class TestPartialRow {
       case INT16: partialRow.addShort(columnName, (short) 43); break;
       case INT32: partialRow.addInt(columnName, 44); break;
       case INT64: partialRow.addLong(columnName, 45); break;
-      case UNIXTIME_MICROS: partialRow.addLong(columnName, 1234567890); break;
+      case UNIXTIME_MICROS: partialRow.addTimestamp(columnName, new Timestamp(1234567890)); break;
       case STRING: partialRow.addString(columnName, "fun with ütf\0"); break;
       case BINARY: partialRow.addBinary(columnName, new byte[] { 0, 1, 2, 3, 4 }); break;
       case FLOAT: partialRow.addFloat(columnName, 52.35F); break;
@@ -451,7 +452,7 @@ public class TestPartialRow {
       case INT16: partialRow.addShort(columnIndex, (short) 43); break;
       case INT32: partialRow.addInt(columnIndex, 44); break;
       case INT64: partialRow.addLong(columnIndex, 45); break;
-      case UNIXTIME_MICROS: partialRow.addLong(columnIndex, 1234567890); break;
+      case UNIXTIME_MICROS: partialRow.addTimestamp(columnIndex, new Timestamp(1234567890)); break;
       case STRING: partialRow.addString(columnIndex, "fun with ütf\0"); break;
       case BINARY: partialRow.addBinary(columnIndex, new byte[] { 0, 1, 2, 3, 4 }); break;
       case FLOAT: partialRow.addFloat(columnIndex, 52.35F); break;

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
index 4207bf6..35e248b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.sql.Timestamp;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -62,7 +63,7 @@ public class TestRowResult extends BaseKuduTest {
     bb.position(7); // We're only inserting the bytebuffer part of the original array.
     row.addBinary(9, bb);
     row.setNull(10);
-    row.addLong(11, 11l);
+    row.addTimestamp(11, new Timestamp(11));
     row.addDecimal(12, BigDecimal.valueOf(12345, 3));
 
     KuduSession session = syncClient.newSession();
@@ -113,8 +114,8 @@ public class TestRowResult extends BaseKuduTest {
       assertEquals(true, rr.isNull(10));
       assertEquals(true, rr.isNull(allTypesSchema.getColumnByIndex(10).getName()));
 
-      assertEquals(11, rr.getLong(11));
-      assertEquals(11, rr.getLong(allTypesSchema.getColumnByIndex(11).getName()));
+      assertEquals(new Timestamp(11), rr.getTimestamp(11));
+      assertEquals(new Timestamp(11), rr.getTimestamp(allTypesSchema.getColumnByIndex(11).getName()));
 
       assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(12));
       assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(allTypesSchema.getColumnByIndex(12).getName()));

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/test/java/org/apache/kudu/util/TestTimestampUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/TestTimestampUtil.java b/java/kudu-client/src/test/java/org/apache/kudu/util/TestTimestampUtil.java
new file mode 100644
index 0000000..c06ae06
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/util/TestTimestampUtil.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.util;
+
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTimestampUtil {
+
+  @Test
+  public void testTimestampConversion() throws Exception {
+    Timestamp epoch = new Timestamp(0);
+    assertEquals(0, TimestampUtil.timestampToMicros(epoch));
+    assertEquals(epoch, TimestampUtil.microsToTimestamp(0));
+
+    Timestamp t1 = new Timestamp(0);
+    t1.setNanos(123456000);
+    assertEquals(123456, TimestampUtil.timestampToMicros(t1));
+    assertEquals(t1, TimestampUtil.microsToTimestamp(123456));
+
+    SimpleDateFormat iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+    iso8601.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+    Timestamp t3 = new Timestamp(iso8601.parse("1923-12-01T00:44:36.876").getTime());
+    t3.setNanos(876544000);
+    assertEquals(-1454368523123456L, TimestampUtil.timestampToMicros(t3));
+    assertEquals(t3, TimestampUtil.microsToTimestamp(-1454368523123456L));
+  }
+
+  @Test
+  public void testNonZuluTimestampConversion() throws Exception {
+    SimpleDateFormat cst = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+    cst.setTimeZone(TimeZone.getTimeZone("CST"));
+
+    String timeString = "2016-08-19T12:12:12.121";
+    Timestamp timestamp = new Timestamp(cst.parse(timeString).getTime());
+
+    long toMicros = TimestampUtil.timestampToMicros(timestamp);
+    Timestamp fromMicros = TimestampUtil.microsToTimestamp(toMicros);
+    String formattedCST = cst.format(fromMicros);
+
+    assertEquals(1471626732121000L, toMicros);
+    assertEquals(timestamp, fromMicros);
+    assertEquals(timeString, formattedCST);
+
+    SimpleDateFormat pst = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+    pst.setTimeZone(TimeZone.getTimeZone("PST"));
+    String formattedPST = pst.format(fromMicros);
+    assertEquals("2016-08-19T10:12:12.121", formattedPST);
+
+    SimpleDateFormat utc = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+    utc.setTimeZone(TimeZone.getTimeZone("UTC"));
+    String formattedUTC = utc.format(fromMicros);
+    assertEquals("2016-08-19T17:12:12.121", formattedUTC);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 761fd79..acfebcc 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -275,7 +275,7 @@ class KuduRelation(private val tableName: String,
       case value: Short => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
       case value: Int => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
       case value: Long => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-      case value: Timestamp => KuduPredicate.newComparisonPredicate(columnSchema, operator, timestampToMicros(value))
+      case value: Timestamp => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
       case value: Float => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
       case value: Double => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
       case value: String => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
@@ -372,41 +372,4 @@ private[spark] object KuduRelation {
     case And(left, right) => supportsFilter(left) && supportsFilter(right)
     case _ => false
   }
-
-  /**
-    * Converts a [[Timestamp]] to microseconds since the Unix epoch (1970-01-01T00:00:00Z).
-    *
-    * @param timestamp the timestamp to convert to microseconds
-    * @return the microseconds since the Unix epoch
-    */
-  def timestampToMicros(timestamp: Timestamp): Long = {
-    // Number of whole milliseconds since the Unix epoch, in microseconds.
-    val millis = timestamp.getTime * 1000
-    // Sub millisecond time since the Unix epoch, in microseconds.
-    val micros = (timestamp.getNanos % 1000000) / 1000
-    if (micros >= 0) {
-      millis + micros
-    } else {
-      millis + 1000000 + micros
-    }
-  }
-
-  /**
-    * Converts a microsecond offset from the Unix epoch (1970-01-01T00:00:00Z) to a [[Timestamp]].
-    *
-    * @param micros the offset in microseconds since the Unix epoch
-    * @return the corresponding timestamp
-    */
-  def microsToTimestamp(micros: Long): Timestamp = {
-    var millis = micros / 1000
-    var nanos = (micros % 1000000) * 1000
-    if (nanos < 0) {
-      millis -= 1
-      nanos += 1000000000
-    }
-
-    val timestamp = new Timestamp(millis)
-    timestamp.setNanos(nanos.asInstanceOf[Int])
-    timestamp
-  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 4d9bb8e..867cf19 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -359,7 +359,7 @@ class KuduContext(val kuduMaster: String,
               case DataTypes.LongType => operation.getRow.addLong(kuduIdx, row.getLong(sparkIdx))
               case DataTypes.FloatType => operation.getRow.addFloat(kuduIdx, row.getFloat(sparkIdx))
               case DataTypes.DoubleType => operation.getRow.addDouble(kuduIdx, row.getDouble(sparkIdx))
-              case DataTypes.TimestampType => operation.getRow.addLong(kuduIdx, KuduRelation.timestampToMicros(row.getTimestamp(sparkIdx)))
+              case DataTypes.TimestampType => operation.getRow.addTimestamp(kuduIdx, row.getTimestamp(sparkIdx))
               case DecimalType() => operation.getRow.addDecimal(kuduIdx, row.getDecimal(sparkIdx))
               case t => throw new IllegalArgumentException(s"No support for Spark SQL type $t")
             }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index ddfdf46..a92ec3e 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -126,7 +126,7 @@ private class RowIterator(private val scanner: KuduScanner,
       case Type.INT16 => rowResult.getShort(i)
       case Type.INT32 => rowResult.getInt(i)
       case Type.INT64 => rowResult.getLong(i)
-      case Type.UNIXTIME_MICROS => KuduRelation.microsToTimestamp(rowResult.getLong(i))
+      case Type.UNIXTIME_MICROS => rowResult.getTimestamp(i)
       case Type.FLOAT => rowResult.getFloat(i)
       case Type.DOUBLE => rowResult.getDouble(i)
       case Type.STRING => rowResult.getString(i)

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 91ebdce..19a1948 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -16,10 +16,6 @@
  */
 package org.apache.kudu.spark.kudu
 
-import java.sql.Timestamp
-import java.text.SimpleDateFormat
-import java.util.TimeZone
-
 import scala.collection.JavaConverters._
 import scala.collection.immutable.IndexedSeq
 import scala.util.control.NonFatal
@@ -39,25 +35,6 @@ import org.apache.kudu.{Schema, Type}
 @RunWith(classOf[JUnitRunner])
 class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter with Matchers {
 
-  test("timestamp conversion") {
-    val epoch = new Timestamp(0)
-    assertEquals(0, KuduRelation.timestampToMicros(epoch))
-    assertEquals(epoch, KuduRelation.microsToTimestamp(0))
-
-    val t1 = new Timestamp(0)
-    t1.setNanos(123456000)
-    assertEquals(123456, KuduRelation.timestampToMicros(t1))
-    assertEquals(t1, KuduRelation.microsToTimestamp(123456))
-
-    val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
-    iso8601.setTimeZone(TimeZone.getTimeZone("UTC"))
-
-    val t3 = new Timestamp(iso8601.parse("1923-12-01T00:44:36.876").getTime)
-    t3.setNanos(876544000)
-    assertEquals(-1454368523123456L, KuduRelation.timestampToMicros(t3))
-    assertEquals(t3, KuduRelation.microsToTimestamp(-1454368523123456L))
-  }
-
   val rowCount = 10
   var sqlContext : SQLContext = _
   var rows : IndexedSeq[(Int, Int, String, Long)] = _

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
index 8e12dcd..ce1059b 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
@@ -20,6 +20,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
 import java.math.BigDecimal
 import java.sql.Timestamp
 
+import org.apache.kudu.util.TimestampUtil
 import org.apache.spark.sql.functions.decode
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -77,7 +78,7 @@ class KuduContextTest extends FunSuite with TestContext with Matchers {
       val binaryBytes = s"bytes ${rows.apply(index)._2}".getBytes().toSeq
       assert(r.apply(8).asInstanceOf[Array[Byte]].toSeq == binaryBytes)
       assert(r.apply(9).asInstanceOf[Timestamp] ==
-        KuduRelation.microsToTimestamp(rows.apply(index)._4))
+        TimestampUtil.microsToTimestamp(rows.apply(index)._4))
       assert(r.apply(10).asInstanceOf[Byte] == rows.apply(index)._2.toByte)
       assert(r.apply(11).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
       assert(r.apply(12).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))