You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/05/31 18:18:22 UTC
[3/3] kudu git commit: [java] Add Timestamp APIs to kudu-client
[java] Add Timestamp APIs to kudu-client
Adds support for java.sql.Timestamp to PartialRow,
RowResult, and KuduPredicate. This simplifies and
unfies the handling of Timestamps when reading
and writing from Kudu.
This functionality existied in the kudu-spark
integration, but has been moved into the kudu-client
and can be leveraged in other integrations going
forward.
Change-Id: I1fdd397691f12f279663838af9fa12ed27508fd1
Reviewed-on: http://gerrit.cloudera.org:8080/10502
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5f9a2f52
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5f9a2f52
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5f9a2f52
Branch: refs/heads/master
Commit: 5f9a2f523a58990af63b1b6eaef16dfc35eabddd
Parents: 4883eee
Author: Grant Henke <gr...@apache.org>
Authored: Thu May 24 10:51:55 2018 -0500
Committer: Grant Henke <gr...@apache.org>
Committed: Thu May 31 15:54:01 2018 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/KuduPredicate.java | 18 +++-
.../java/org/apache/kudu/client/PartialRow.java | 67 +++++++++++++-
.../java/org/apache/kudu/client/RowResult.java | 61 +++++++------
.../org/apache/kudu/util/TimestampUtil.java | 92 ++++++++++++++++++++
.../org/apache/kudu/client/TestKuduClient.java | 6 +-
.../org/apache/kudu/client/TestPartialRow.java | 13 +--
.../org/apache/kudu/client/TestRowResult.java | 7 +-
.../org/apache/kudu/util/TestTimestampUtil.java | 75 ++++++++++++++++
.../apache/kudu/spark/kudu/DefaultSource.scala | 39 +--------
.../apache/kudu/spark/kudu/KuduContext.scala | 2 +-
.../org/apache/kudu/spark/kudu/KuduRDD.scala | 2 +-
.../kudu/spark/kudu/DefaultSourceTest.scala | 23 -----
.../kudu/spark/kudu/KuduContextTest.scala | 3 +-
13 files changed, 304 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
index b0ff0b4..f332edb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.math.BigDecimal;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -35,6 +36,7 @@ import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedBytes;
import com.google.protobuf.ByteString;
+import org.apache.kudu.util.TimestampUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -298,6 +300,20 @@ public class KuduPredicate {
}
/**
+ * Creates a new comparison predicate on a timestamp column.
+ * @param column the column schema
+ * @param op the comparison operation
+ * @param value the value to compare against
+ */
+ public static KuduPredicate newComparisonPredicate(ColumnSchema column,
+ ComparisonOp op,
+ Timestamp value) {
+ checkColumn(column, Type.UNIXTIME_MICROS);
+ long micros = TimestampUtil.timestampToMicros(value);
+ return newComparisonPredicate(column, op, micros);
+ }
+
+ /**
* Creates a new comparison predicate on a float column.
* @param column the column schema
* @param op the comparison operation
@@ -1079,7 +1095,7 @@ public class KuduPredicate {
case INT16: return Short.toString(Bytes.getShort(value));
case INT32: return Integer.toString(Bytes.getInt(value));
case INT64: return Long.toString(Bytes.getLong(value));
- case UNIXTIME_MICROS: return RowResult.timestampToString(Bytes.getLong(value));
+ case UNIXTIME_MICROS: return TimestampUtil.timestampToString(Bytes.getLong(value));
case FLOAT: return Float.toString(Bytes.getFloat(value));
case DOUBLE: return Double.toString(Bytes.getDouble(value));
case STRING: {
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index 43cc27a..6984545 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -27,6 +28,7 @@ import java.util.ListIterator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.kudu.util.TimestampUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.jboss.netty.util.CharsetUtil;
@@ -545,6 +547,69 @@ public class PartialRow {
}
/**
+ * Add a Timestamp for the specified column.
+ *
+ * Note: Timestamp instances with nanosecond precision are truncated to microseconds.
+ *
+ * @param columnIndex the column's index in the schema
+ * @param val value to add
+ * @throws IllegalArgumentException if the value doesn't match the column's type
+ * @throws IllegalStateException if the row was already applied
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ public void addTimestamp(int columnIndex, Timestamp val) {
+ checkNotFrozen();
+ ColumnSchema column = schema.getColumnByIndex(columnIndex);
+ checkColumn(column, Type.UNIXTIME_MICROS);
+ long micros = TimestampUtil.timestampToMicros(val);
+ Bytes.setLong(rowAlloc, micros, getPositionInRowAllocAndSetBitSet(columnIndex));
+ }
+
+ /**
+ * Add a Timestamp for the specified column.
+ *
+ * Note: Timestamp instances with nanosecond precision are truncated to microseconds.
+ *
+ * @param columnName Name of the column
+ * @param val value to add
+ * @throws IllegalArgumentException if the column doesn't exist
+ * or if the value doesn't match the column's type
+ * @throws IllegalStateException if the row was already applied
+ */
+ public void addTimestamp(String columnName, Timestamp val) {
+ addTimestamp(schema.getColumnIndex(columnName), val);
+ }
+
+ /**
+ * Get the specified column's Timestamp.
+ *
+ * @param columnName name of the column to get data for
+ * @return a Timestamp
+ * @throws IllegalArgumentException if the column doesn't exist,
+ * is null, is unset, or the type doesn't match the column's type
+ */
+ public Timestamp getTimestamp(String columnName) {
+ return getTimestamp(this.schema.getColumnIndex(columnName));
+ }
+
+ /**
+ * Get the specified column's Timestamp.
+ *
+ * @param columnIndex Column index in the schema
+ * @return a Timestamp
+ * @throws IllegalArgumentException if the column is null, is unset,
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ public Timestamp getTimestamp(int columnIndex) {
+ checkColumn(schema.getColumnByIndex(columnIndex), Type.UNIXTIME_MICROS);
+ checkColumnExists(schema.getColumnByIndex(columnIndex));
+ checkValue(columnIndex);
+ long micros = Bytes.getLong(rowAlloc, schema.getColumnOffset(columnIndex));
+ return TimestampUtil.microsToTimestamp(micros);
+ }
+
+ /**
* Add a String for the specified column.
* @param columnIndex the column's index in the schema
* @param val value to add
@@ -1053,7 +1118,7 @@ public class PartialRow {
sb.append(Bytes.getLong(rowAlloc, schema.getColumnOffset(idx)));
return;
case UNIXTIME_MICROS:
- sb.append(RowResult.timestampToString(
+ sb.append(TimestampUtil.timestampToString(
Bytes.getLong(rowAlloc, schema.getColumnOffset(idx))));
return;
case FLOAT:
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index 8a08b59..ee8b20b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.FieldPosition;
import java.text.SimpleDateFormat;
@@ -26,6 +27,7 @@ import java.util.BitSet;
import java.util.Date;
import java.util.TimeZone;
+import org.apache.kudu.util.TimestampUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -44,18 +46,6 @@ public class RowResult {
private static final int INDEX_RESET_LOCATION = -1;
- // Thread local DateFormat since they're not thread-safe.
- private static final ThreadLocal<DateFormat> DATE_FORMAT = new ThreadLocal<DateFormat>() {
- @Override
- protected DateFormat initialValue() {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
- sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
- return sdf;
- }
- };
-
- private static final long MS_IN_S = 1000L;
- private static final long US_IN_S = 1000L * 1000L;
private int index = INDEX_RESET_LOCATION;
private int offset;
private BitSet nullsBitSet;
@@ -358,6 +348,38 @@ public class RowResult {
}
/**
+ * Get the specified column's Timestamp.
+ *
+ * @param columnName name of the column to get data for
+ * @return a Timestamp
+ * @throws IllegalArgumentException if the column doesn't exist,
+ * is null, is unset, or the type doesn't match the column's type
+ */
+ public Timestamp getTimestamp(String columnName) {
+ return getTimestamp(this.schema.getColumnIndex(columnName));
+ }
+
+ /**
+ * Get the specified column's Timestamp.
+ *
+ * @param columnIndex Column index in the schema
+ * @return a Timestamp
+ * @throws IllegalArgumentException if the column is null, is unset,
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ public Timestamp getTimestamp(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.UNIXTIME_MICROS);
+ ColumnSchema column = schema.getColumnByIndex(columnIndex);
+ long micros = Bytes.getLong(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
+ getCurrentRowDataOffsetForColumn(columnIndex));
+ return TimestampUtil.microsToTimestamp(micros);
+ }
+
+ /**
* Get the schema used for this scanner's column projection.
* @return a column projection as a schema.
*/
@@ -569,19 +591,6 @@ public class RowResult {
}
/**
- * Transforms a timestamp into a string, whose formatting and timezone is consistent
- * across Kudu.
- * @param timestamp the timestamp, in microseconds
- * @return a string, in the format: YYYY-MM-DDTHH:MM:SS.ssssssZ
- */
- static String timestampToString(long timestamp) {
- long tsMillis = timestamp / MS_IN_S;
- long tsMicros = timestamp % US_IN_S;
- String tsStr = DATE_FORMAT.get().format(new Date(tsMillis));
- return String.format("%s.%06dZ", tsStr, tsMicros);
- }
-
- /**
* Return the actual data from this row in a stringified key=value
* form.
*/
@@ -616,7 +625,7 @@ public class RowResult {
buf.append(getLong(i));
break;
case UNIXTIME_MICROS: {
- buf.append(timestampToString(getLong(i)));
+ buf.append(TimestampUtil.timestampToString(getLong(i)));
} break;
case STRING:
buf.append(getString(i));
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
new file mode 100644
index 0000000..f6ebd65
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+@InterfaceAudience.Private
+public class TimestampUtil {
+
+ // Thread local DateFormat since they're not thread-safe.
+ private static final ThreadLocal<DateFormat> DATE_FORMAT = new ThreadLocal<DateFormat>() {
+ @Override
+ protected DateFormat initialValue() {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return sdf;
+ }
+ };
+
+ /**
+ * Converts a {@link Timestamp} to microseconds since the Unix epoch (1970-01-01T00:00:00Z).
+ *
+ * Note: Timestamp instances with nanosecond precision are truncated to microseconds.
+ *
+ * @param timestamp the timestamp to convert to microseconds
+ * @return the microseconds since the Unix epoch
+ */
+ public static long timestampToMicros(Timestamp timestamp) {
+ // Number of whole milliseconds since the Unix epoch, in microseconds.
+ long millis = timestamp.getTime() * 1000L;
+ // Sub millisecond time since the Unix epoch, in microseconds.
+ long micros = (timestamp.getNanos() % 1000000L) / 1000L;
+ if (micros >= 0) {
+ return millis + micros;
+ } else {
+ return millis + 1000000L + micros;
+ }
+ }
+
+ /**
+ * Converts a microsecond offset from the Unix epoch (1970-01-01T00:00:00Z)
+ * to a {@link Timestamp}.
+ *
+ * @param micros the offset in microseconds since the Unix epoch
+ * @return the corresponding timestamp
+ */
+ public static Timestamp microsToTimestamp(long micros) {
+ long millis = micros / 1000L;
+ long nanos = (micros % 1000000L) * 1000L;
+ if (nanos < 0) {
+ millis -= 1L;
+ nanos += 1000000000L;
+ }
+ Timestamp timestamp = new Timestamp(millis);
+ timestamp.setNanos((int) nanos);
+ return timestamp;
+ }
+
+ /**
+ * Transforms a timestamp into a string, whose formatting and timezone is consistent
+ * across Kudu.
+ * @param micros the timestamp, in microseconds
+ * @return a string, in the format: YYYY-MM-DDTHH:MM:SS.ssssssZ
+ */
+ public static String timestampToString(long micros) {
+ long tsMillis = micros / 1000L;
+ long tsMicros = micros % 1000000L;
+ String tsStr = DATE_FORMAT.get().format(new Date(tsMillis));
+ return String.format("%s.%06dZ", tsStr, tsMicros);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 5b3e210..845b4a7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -21,7 +21,6 @@ import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER;
import static org.apache.kudu.client.KuduPredicate.ComparisonOp.GREATER_EQUAL;
import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS;
import static org.apache.kudu.client.KuduPredicate.ComparisonOp.LESS_EQUAL;
-import static org.apache.kudu.client.RowResult.timestampToString;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -47,6 +46,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.stumbleupon.async.Deferred;
+import org.apache.kudu.util.TimestampUtil;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -439,9 +439,9 @@ public class TestKuduClient extends BaseKuduTest {
for (int i = 0; i < rowStrings.size(); i++) {
StringBuilder expectedRow = new StringBuilder();
expectedRow.append(String.format("UNIXTIME_MICROS key=%s, UNIXTIME_MICROS c1=",
- timestampToString(timestamps.get(i))));
+ TimestampUtil.timestampToString(timestamps.get(i))));
if (i % 2 == 1) {
- expectedRow.append(timestampToString(timestamps.get(i)));
+ expectedRow.append(TimestampUtil.timestampToString(timestamps.get(i)));
} else {
expectedRow.append("NULL");
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
index 865a7a2..4ae7eaf 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartialRow.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.sql.Timestamp;
import org.junit.Test;
@@ -42,7 +43,7 @@ public class TestPartialRow {
assertEquals(43, partialRow.getShort("int16"));
assertEquals(44, partialRow.getInt("int32"));
assertEquals(45, partialRow.getLong("int64"));
- assertEquals(1234567890, partialRow.getLong("timestamp"));
+ assertEquals(new Timestamp(1234567890), partialRow.getTimestamp("timestamp"));
assertEquals(52.35F, partialRow.getFloat("float"), 0.0f);
assertEquals(53.35, partialRow.getDouble("double"), 0.0);
assertEquals("fun with ütf\0", partialRow.getString("string"));
@@ -372,7 +373,7 @@ public class TestPartialRow {
row.addShort("int16", (short) 43);
row.addInt("int32", 44);
row.addLong("int64", 45);
- row.addLong("timestamp", 1234567890); // Fri, 13 Feb 2009 23:31:30 UTC
+ row.addTimestamp("timestamp", new Timestamp(1234567890));
row.addBoolean("bool", true);
row.addFloat("float", 52.35F);
row.addDouble("double", 53.35);
@@ -397,7 +398,7 @@ public class TestPartialRow {
case INT16: return partialRow.getShort(columnName);
case INT32: return partialRow.getInt(columnName);
case INT64: return partialRow.getLong(columnName);
- case UNIXTIME_MICROS: return partialRow.getLong(columnName);
+ case UNIXTIME_MICROS: return partialRow.getTimestamp(columnName);
case STRING: return partialRow.getString(columnName);
case BINARY: return partialRow.getBinary(columnName);
case FLOAT: return partialRow.getFloat(columnName);
@@ -415,7 +416,7 @@ public class TestPartialRow {
case INT16: return partialRow.getShort(columnIndex);
case INT32: return partialRow.getInt(columnIndex);
case INT64: return partialRow.getLong(columnIndex);
- case UNIXTIME_MICROS: return partialRow.getLong(columnIndex);
+ case UNIXTIME_MICROS: return partialRow.getTimestamp(columnIndex);
case STRING: return partialRow.getString(columnIndex);
case BINARY: return partialRow.getBinary(columnIndex);
case FLOAT: return partialRow.getFloat(columnIndex);
@@ -433,7 +434,7 @@ public class TestPartialRow {
case INT16: partialRow.addShort(columnName, (short) 43); break;
case INT32: partialRow.addInt(columnName, 44); break;
case INT64: partialRow.addLong(columnName, 45); break;
- case UNIXTIME_MICROS: partialRow.addLong(columnName, 1234567890); break;
+ case UNIXTIME_MICROS: partialRow.addTimestamp(columnName, new Timestamp(1234567890)); break;
case STRING: partialRow.addString(columnName, "fun with ütf\0"); break;
case BINARY: partialRow.addBinary(columnName, new byte[] { 0, 1, 2, 3, 4 }); break;
case FLOAT: partialRow.addFloat(columnName, 52.35F); break;
@@ -451,7 +452,7 @@ public class TestPartialRow {
case INT16: partialRow.addShort(columnIndex, (short) 43); break;
case INT32: partialRow.addInt(columnIndex, 44); break;
case INT64: partialRow.addLong(columnIndex, 45); break;
- case UNIXTIME_MICROS: partialRow.addLong(columnIndex, 1234567890); break;
+ case UNIXTIME_MICROS: partialRow.addTimestamp(columnIndex, new Timestamp(1234567890)); break;
case STRING: partialRow.addString(columnIndex, "fun with ütf\0"); break;
case BINARY: partialRow.addBinary(columnIndex, new byte[] { 0, 1, 2, 3, 4 }); break;
case FLOAT: partialRow.addFloat(columnIndex, 52.35F); break;
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
index 4207bf6..35e248b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.sql.Timestamp;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -62,7 +63,7 @@ public class TestRowResult extends BaseKuduTest {
bb.position(7); // We're only inserting the bytebuffer part of the original array.
row.addBinary(9, bb);
row.setNull(10);
- row.addLong(11, 11l);
+ row.addTimestamp(11, new Timestamp(11));
row.addDecimal(12, BigDecimal.valueOf(12345, 3));
KuduSession session = syncClient.newSession();
@@ -113,8 +114,8 @@ public class TestRowResult extends BaseKuduTest {
assertEquals(true, rr.isNull(10));
assertEquals(true, rr.isNull(allTypesSchema.getColumnByIndex(10).getName()));
- assertEquals(11, rr.getLong(11));
- assertEquals(11, rr.getLong(allTypesSchema.getColumnByIndex(11).getName()));
+ assertEquals(new Timestamp(11), rr.getTimestamp(11));
+ assertEquals(new Timestamp(11), rr.getTimestamp(allTypesSchema.getColumnByIndex(11).getName()));
assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(12));
assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(allTypesSchema.getColumnByIndex(12).getName()));
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-client/src/test/java/org/apache/kudu/util/TestTimestampUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/TestTimestampUtil.java b/java/kudu-client/src/test/java/org/apache/kudu/util/TestTimestampUtil.java
new file mode 100644
index 0000000..c06ae06
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/util/TestTimestampUtil.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.util;
+
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTimestampUtil {
+
+ @Test
+ public void testTimestampConversion() throws Exception {
+ Timestamp epoch = new Timestamp(0);
+ assertEquals(0, TimestampUtil.timestampToMicros(epoch));
+ assertEquals(epoch, TimestampUtil.microsToTimestamp(0));
+
+ Timestamp t1 = new Timestamp(0);
+ t1.setNanos(123456000);
+ assertEquals(123456, TimestampUtil.timestampToMicros(t1));
+ assertEquals(t1, TimestampUtil.microsToTimestamp(123456));
+
+ SimpleDateFormat iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ iso8601.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ Timestamp t3 = new Timestamp(iso8601.parse("1923-12-01T00:44:36.876").getTime());
+ t3.setNanos(876544000);
+ assertEquals(-1454368523123456L, TimestampUtil.timestampToMicros(t3));
+ assertEquals(t3, TimestampUtil.microsToTimestamp(-1454368523123456L));
+ }
+
+ @Test
+ public void testNonZuluTimestampConversion() throws Exception {
+ SimpleDateFormat cst = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ cst.setTimeZone(TimeZone.getTimeZone("CST"));
+
+ String timeString = "2016-08-19T12:12:12.121";
+ Timestamp timestamp = new Timestamp(cst.parse(timeString).getTime());
+
+ long toMicros = TimestampUtil.timestampToMicros(timestamp);
+ Timestamp fromMicros = TimestampUtil.microsToTimestamp(toMicros);
+ String formattedCST = cst.format(fromMicros);
+
+ assertEquals(1471626732121000L, toMicros);
+ assertEquals(timestamp, fromMicros);
+ assertEquals(timeString, formattedCST);
+
+ SimpleDateFormat pst = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ pst.setTimeZone(TimeZone.getTimeZone("PST"));
+ String formattedPST = pst.format(fromMicros);
+ assertEquals("2016-08-19T10:12:12.121", formattedPST);
+
+ SimpleDateFormat utc = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ utc.setTimeZone(TimeZone.getTimeZone("UTC"));
+ String formattedUTC = utc.format(fromMicros);
+ assertEquals("2016-08-19T17:12:12.121", formattedUTC);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 761fd79..acfebcc 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -275,7 +275,7 @@ class KuduRelation(private val tableName: String,
case value: Short => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
case value: Int => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
case value: Long => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
- case value: Timestamp => KuduPredicate.newComparisonPredicate(columnSchema, operator, timestampToMicros(value))
+ case value: Timestamp => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
case value: Float => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
case value: Double => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
case value: String => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
@@ -372,41 +372,4 @@ private[spark] object KuduRelation {
case And(left, right) => supportsFilter(left) && supportsFilter(right)
case _ => false
}
-
- /**
- * Converts a [[Timestamp]] to microseconds since the Unix epoch (1970-01-01T00:00:00Z).
- *
- * @param timestamp the timestamp to convert to microseconds
- * @return the microseconds since the Unix epoch
- */
- def timestampToMicros(timestamp: Timestamp): Long = {
- // Number of whole milliseconds since the Unix epoch, in microseconds.
- val millis = timestamp.getTime * 1000
- // Sub millisecond time since the Unix epoch, in microseconds.
- val micros = (timestamp.getNanos % 1000000) / 1000
- if (micros >= 0) {
- millis + micros
- } else {
- millis + 1000000 + micros
- }
- }
-
- /**
- * Converts a microsecond offset from the Unix epoch (1970-01-01T00:00:00Z) to a [[Timestamp]].
- *
- * @param micros the offset in microseconds since the Unix epoch
- * @return the corresponding timestamp
- */
- def microsToTimestamp(micros: Long): Timestamp = {
- var millis = micros / 1000
- var nanos = (micros % 1000000) * 1000
- if (nanos < 0) {
- millis -= 1
- nanos += 1000000000
- }
-
- val timestamp = new Timestamp(millis)
- timestamp.setNanos(nanos.asInstanceOf[Int])
- timestamp
- }
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 4d9bb8e..867cf19 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -359,7 +359,7 @@ class KuduContext(val kuduMaster: String,
case DataTypes.LongType => operation.getRow.addLong(kuduIdx, row.getLong(sparkIdx))
case DataTypes.FloatType => operation.getRow.addFloat(kuduIdx, row.getFloat(sparkIdx))
case DataTypes.DoubleType => operation.getRow.addDouble(kuduIdx, row.getDouble(sparkIdx))
- case DataTypes.TimestampType => operation.getRow.addLong(kuduIdx, KuduRelation.timestampToMicros(row.getTimestamp(sparkIdx)))
+ case DataTypes.TimestampType => operation.getRow.addTimestamp(kuduIdx, row.getTimestamp(sparkIdx))
case DecimalType() => operation.getRow.addDecimal(kuduIdx, row.getDecimal(sparkIdx))
case t => throw new IllegalArgumentException(s"No support for Spark SQL type $t")
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index ddfdf46..a92ec3e 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -126,7 +126,7 @@ private class RowIterator(private val scanner: KuduScanner,
case Type.INT16 => rowResult.getShort(i)
case Type.INT32 => rowResult.getInt(i)
case Type.INT64 => rowResult.getLong(i)
- case Type.UNIXTIME_MICROS => KuduRelation.microsToTimestamp(rowResult.getLong(i))
+ case Type.UNIXTIME_MICROS => rowResult.getTimestamp(i)
case Type.FLOAT => rowResult.getFloat(i)
case Type.DOUBLE => rowResult.getDouble(i)
case Type.STRING => rowResult.getString(i)
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 91ebdce..19a1948 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -16,10 +16,6 @@
*/
package org.apache.kudu.spark.kudu
-import java.sql.Timestamp
-import java.text.SimpleDateFormat
-import java.util.TimeZone
-
import scala.collection.JavaConverters._
import scala.collection.immutable.IndexedSeq
import scala.util.control.NonFatal
@@ -39,25 +35,6 @@ import org.apache.kudu.{Schema, Type}
@RunWith(classOf[JUnitRunner])
class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter with Matchers {
- test("timestamp conversion") {
- val epoch = new Timestamp(0)
- assertEquals(0, KuduRelation.timestampToMicros(epoch))
- assertEquals(epoch, KuduRelation.microsToTimestamp(0))
-
- val t1 = new Timestamp(0)
- t1.setNanos(123456000)
- assertEquals(123456, KuduRelation.timestampToMicros(t1))
- assertEquals(t1, KuduRelation.microsToTimestamp(123456))
-
- val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
- iso8601.setTimeZone(TimeZone.getTimeZone("UTC"))
-
- val t3 = new Timestamp(iso8601.parse("1923-12-01T00:44:36.876").getTime)
- t3.setNanos(876544000)
- assertEquals(-1454368523123456L, KuduRelation.timestampToMicros(t3))
- assertEquals(t3, KuduRelation.microsToTimestamp(-1454368523123456L))
- }
-
val rowCount = 10
var sqlContext : SQLContext = _
var rows : IndexedSeq[(Int, Int, String, Long)] = _
http://git-wip-us.apache.org/repos/asf/kudu/blob/5f9a2f52/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
index 8e12dcd..ce1059b 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
@@ -20,6 +20,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
import java.math.BigDecimal
import java.sql.Timestamp
+import org.apache.kudu.util.TimestampUtil
import org.apache.spark.sql.functions.decode
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -77,7 +78,7 @@ class KuduContextTest extends FunSuite with TestContext with Matchers {
val binaryBytes = s"bytes ${rows.apply(index)._2}".getBytes().toSeq
assert(r.apply(8).asInstanceOf[Array[Byte]].toSeq == binaryBytes)
assert(r.apply(9).asInstanceOf[Timestamp] ==
- KuduRelation.microsToTimestamp(rows.apply(index)._4))
+ TimestampUtil.microsToTimestamp(rows.apply(index)._4))
assert(r.apply(10).asInstanceOf[Byte] == rows.apply(index)._2.toByte)
assert(r.apply(11).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))
assert(r.apply(12).asInstanceOf[BigDecimal] == BigDecimal.valueOf(rows.apply(index)._2))