You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/30 19:27:57 UTC
svn commit: r1508537 - in /hive/trunk:
hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/
ql/src/java/org/apache/hadoop/hive/ql/udf/
serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/
serde/src/java/org/apache/hadoop...
Author: hashutosh
Date: Tue Jul 30 17:27:57 2013
New Revision: 1508537
URL: http://svn.apache.org/r1508537
Log:
HIVE-4525 : Support timestamps earlier than 1970 and later than 2038 (Mikhail Bautin via Ashutosh Chauhan)
Added:
hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/
hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java
Modified:
hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java Tue Jul 30 17:27:57 2013
@@ -75,7 +75,7 @@ public class RevisionManagerFactory {
* Internally used by endpoint implementation to instantiate from different configuration setting.
* @param className
* @param conf
- * @return
+ * @return the opened revision manager
* @throws IOException
*/
static RevisionManager getOpenedRevisionManager(String className, Configuration conf) throws IOException {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java Tue Jul 30 17:27:57 2013
@@ -184,7 +184,7 @@ public class UDFToInteger extends UDF {
if (i == null) {
return null;
} else {
- intWritable.set(i.getSeconds());
+ intWritable.set((int) i.getSeconds());
return intWritable;
}
}
Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java Tue Jul 30 17:27:57 2013
@@ -376,7 +376,7 @@ public class BinarySortableSerDe extends
case TIMESTAMP:
TimestampWritable t = (reuse == null ? new TimestampWritable() :
(TimestampWritable) reuse);
- byte[] bytes = new byte[8];
+ byte[] bytes = new byte[TimestampWritable.BINARY_SORTABLE_LENGTH];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = buffer.read(invert);
Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java Tue Jul 30 17:27:57 2013
@@ -25,7 +25,6 @@ import java.math.BigDecimal;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,8 +58,17 @@ public class TimestampWritable implement
static final public byte[] nullBytes = {0x0, 0x0, 0x0, 0x0};
- private static final int NO_DECIMAL_MASK = 0x7FFFFFFF;
- private static final int HAS_DECIMAL_MASK = 0x80000000;
+ private static final int DECIMAL_OR_SECOND_VINT_FLAG = 0x80000000;
+ private static final int LOWEST_31_BITS_OF_SEC_MASK = 0x7fffffff;
+
+ private static final long SEVEN_BYTE_LONG_SIGN_FLIP = 0xff80L << 48;
+
+ private static final BigDecimal BILLION_BIG_DECIMAL = BigDecimal.valueOf(1000000000);
+
+ /** The maximum number of bytes required for a TimestampWritable */
+ public static final int MAX_BYTES = 13;
+
+ public static final int BINARY_SORTABLE_LENGTH = 11;
private static final ThreadLocal<DateFormat> threadLocalDateFormat =
new ThreadLocal<DateFormat>() {
@@ -82,16 +90,12 @@ public class TimestampWritable implement
/* Allow use of external byte[] for efficiency */
private byte[] currentBytes;
- private final byte[] internalBytes = new byte[9];
+ private final byte[] internalBytes = new byte[MAX_BYTES];
private byte[] externalBytes;
private int offset;
- /* Reused to read VInts */
- static private final VInt vInt = new VInt();
-
/* Constructors */
public TimestampWritable() {
- Arrays.fill(internalBytes, (byte) 0x0);
bytesEmpty = false;
currentBytes = internalBytes;
offset = 0;
@@ -156,11 +160,14 @@ public class TimestampWritable implement
*
* @return seconds corresponding to this TimestampWritable
*/
- public int getSeconds() {
- if (bytesEmpty) {
- return (int) (timestamp.getTime() / 1000);
+ public long getSeconds() {
+ if (!timestampEmpty) {
+ return millisToSeconds(timestamp.getTime());
+ } else if (!bytesEmpty) {
+ return TimestampWritable.getSeconds(currentBytes, offset);
+ } else {
+ throw new IllegalStateException("Both timestamp and bytes are empty");
}
- return TimestampWritable.getSeconds(currentBytes, offset);
}
/**
@@ -170,26 +177,33 @@ public class TimestampWritable implement
public int getNanos() {
if (!timestampEmpty) {
return timestamp.getNanos();
+ } else if (!bytesEmpty) {
+ return hasDecimalOrSecondVInt() ?
+ TimestampWritable.getNanos(currentBytes, offset + 4) : 0;
+ } else {
+ throw new IllegalStateException("Both timestamp and bytes are empty");
}
-
- return hasDecimal() ? TimestampWritable.getNanos(currentBytes, offset+4) : 0;
}
/**
- *
- * @return length of serialized TimestampWritable data
+ * @return length of serialized TimestampWritable data. As a side effect, populates the internal
+ * byte array if empty.
*/
- private int getTotalLength() {
- return 4 + getDecimalLength();
+ int getTotalLength() {
+ checkBytes();
+ return getTotalLength(currentBytes, offset);
}
- /**
- *
- * @return number of bytes the variable length decimal takes up
- */
- private int getDecimalLength() {
- checkBytes();
- return hasDecimal() ? WritableUtils.decodeVIntSize(currentBytes[offset+4]) : 0;
+ public static int getTotalLength(byte[] bytes, int offset) {
+ int len = 4;
+ if (hasDecimalOrSecondVInt(bytes[offset])) {
+ int firstVIntLen = WritableUtils.decodeVIntSize(bytes[offset + 4]);
+ len += firstVIntLen;
+ if (hasSecondVInt(bytes[offset + 4])) {
+ len += WritableUtils.decodeVIntSize(bytes[offset + 4 + firstVIntLen]);
+ }
+ }
+ return len;
}
public Timestamp getTimestamp() {
@@ -215,33 +229,45 @@ public class TimestampWritable implement
/**
* @return byte[] representation of TimestampWritable that is binary
- * sortable (4 byte seconds, 4 bytes for nanoseconds)
+ * sortable (7 bytes for seconds, 4 bytes for nanoseconds)
*/
public byte[] getBinarySortable() {
- byte[] b = new byte[8];
+ byte[] b = new byte[BINARY_SORTABLE_LENGTH];
int nanos = getNanos();
- int seconds = HAS_DECIMAL_MASK | getSeconds();
- intToBytes(seconds, b, 0);
- intToBytes(nanos, b, 4);
+ // We flip the highest-order bit of the seven-byte representation of seconds to make negative
+ // values come before positive ones.
+ long seconds = getSeconds() ^ SEVEN_BYTE_LONG_SIGN_FLIP;
+ sevenByteLongToBytes(seconds, b, 0);
+ intToBytes(nanos, b, 7);
return b;
}
/**
* Given a byte[] that has binary sortable data, initialize the internal
* structures to hold that data
- * @param bytes
- * @param offset
+ * @param bytes the byte array that holds the binary sortable representation
+ * @param binSortOffset offset of the binary-sortable representation within the buffer.
*/
- public void setBinarySortable(byte[] bytes, int offset) {
- int seconds = bytesToInt(bytes, offset);
- int nanos = bytesToInt(bytes, offset+4);
- if (nanos == 0) {
- seconds &= NO_DECIMAL_MASK;
+ public void setBinarySortable(byte[] bytes, int binSortOffset) {
+ // Flip the sign bit (and unused bits of the high-order byte) of the seven-byte long back.
+ long seconds = readSevenByteLong(bytes, binSortOffset) ^ SEVEN_BYTE_LONG_SIGN_FLIP;
+ int nanos = bytesToInt(bytes, binSortOffset + 7);
+ int firstInt = (int) seconds;
+ boolean hasSecondVInt = seconds < 0 || seconds > Integer.MAX_VALUE;
+ if (nanos != 0 || hasSecondVInt) {
+ firstInt |= DECIMAL_OR_SECOND_VINT_FLAG;
} else {
- seconds |= HAS_DECIMAL_MASK;
+ firstInt &= LOWEST_31_BITS_OF_SEC_MASK;
}
- intToBytes(seconds, internalBytes, 0);
- setNanosBytes(nanos, internalBytes, 4);
+
+ intToBytes(firstInt, internalBytes, 0);
+ setNanosBytes(nanos, internalBytes, 4, hasSecondVInt);
+ if (hasSecondVInt) {
+ LazyBinaryUtils.writeVLongToByteArray(internalBytes,
+ 4 + WritableUtils.decodeVIntSize(internalBytes[4]),
+ seconds >> 31);
+ }
+
currentBytes = internalBytes;
this.offset = 0;
}
@@ -268,7 +294,7 @@ public class TimestampWritable implement
public double getDouble() {
double seconds, nanos;
if (bytesEmpty) {
- seconds = timestamp.getTime() / 1000;
+ seconds = millisToSeconds(timestamp.getTime());
nanos = timestamp.getNanos();
} else {
seconds = getSeconds();
@@ -281,10 +307,31 @@ public class TimestampWritable implement
public void readFields(DataInput in) throws IOException {
in.readFully(internalBytes, 0, 4);
- if (TimestampWritable.hasDecimal(internalBytes[0])) {
+ if (TimestampWritable.hasDecimalOrSecondVInt(internalBytes[0])) {
in.readFully(internalBytes, 4, 1);
int len = (byte) WritableUtils.decodeVIntSize(internalBytes[4]);
- in.readFully(internalBytes, 5, len-1);
+ if (len > 1) {
+ in.readFully(internalBytes, 5, len-1);
+ }
+
+ long vlong = LazyBinaryUtils.readVLongFromByteArray(internalBytes, 4);
+ if (vlong < -1000000000 || vlong > 999999999) {
+ throw new IOException(
+ "Invalid first vint value (encoded nanoseconds) of a TimestampWritable: " + vlong +
+ ", expected to be between -1000000000 and 999999999.");
+ // Note that -1000000000 is a valid value corresponding to a nanosecond timestamp
+ // of 999999999, because if the second VInt is present, we use the value
+ // (-reversedNanoseconds - 1) as the second VInt.
+ }
+ if (vlong < 0) {
+ // This indicates there is a second VInt containing the additional bits of the seconds
+ // field.
+ in.readFully(internalBytes, 4 + len, 1);
+ int secondVIntLen = (byte) WritableUtils.decodeVIntSize(internalBytes[4 + len]);
+ if (secondVIntLen > 1) {
+ in.readFully(internalBytes, 5 + len, secondVIntLen - 1);
+ }
+ }
}
currentBytes = internalBytes;
this.offset = 0;
@@ -301,8 +348,8 @@ public class TimestampWritable implement
public int compareTo(TimestampWritable t) {
checkBytes();
- int s1 = this.getSeconds();
- int s2 = t.getSeconds();
+ long s1 = this.getSeconds();
+ long s2 = t.getSeconds();
if (s1 == s2) {
int n1 = this.getNanos();
int n2 = t.getNanos();
@@ -311,7 +358,7 @@ public class TimestampWritable implement
}
return n1 - n2;
} else {
- return s1 - s2;
+ return s1 < s2 ? -1 : 1;
}
}
@@ -342,7 +389,7 @@ public class TimestampWritable implement
@Override
public int hashCode() {
long seconds = getSeconds();
- seconds <<= 32;
+ seconds <<= 30; // the nanosecond part fits in 30 bits
seconds |= getNanos();
return (int) ((seconds >>> 32) ^ seconds);
}
@@ -362,13 +409,30 @@ public class TimestampWritable implement
* @param offset
* @return the number of seconds
*/
- public static int getSeconds(byte[] bytes, int offset) {
- return NO_DECIMAL_MASK & bytesToInt(bytes, offset);
+ public static long getSeconds(byte[] bytes, int offset) {
+ int lowest31BitsOfSecondsAndFlag = bytesToInt(bytes, offset);
+ if (lowest31BitsOfSecondsAndFlag >= 0 || // the "has decimal or second VInt" flag is not set
+ !hasSecondVInt(bytes[offset + 4])) {
+ // The entire seconds field is stored in the first 4 bytes.
+ return lowest31BitsOfSecondsAndFlag & LOWEST_31_BITS_OF_SEC_MASK;
+ }
+
+ // We compose the seconds field from two parts. The lowest 31 bits come from the first four
+ // bytes. The higher-order bits come from the second VInt that follows the nanos field.
+ return ((long) (lowest31BitsOfSecondsAndFlag & LOWEST_31_BITS_OF_SEC_MASK)) |
+ (LazyBinaryUtils.readVLongFromByteArray(bytes,
+ offset + 4 + WritableUtils.decodeVIntSize(bytes[offset + 4])) << 31);
}
public static int getNanos(byte[] bytes, int offset) {
+ VInt vInt = LazyBinaryUtils.threadLocalVInt.get();
LazyBinaryUtils.readVInt(bytes, offset, vInt);
int val = vInt.value;
+ if (val < 0) {
+ // This means there is a second VInt present that specifies additional bits of the timestamp.
+ // The reversed nanoseconds value is still encoded in this VInt.
+ val = -val - 1;
+ }
int len = (int) Math.floor(Math.log10(val)) + 1;
// Reverse the value
@@ -387,40 +451,33 @@ public class TimestampWritable implement
}
/**
- * Writes a Timestamp's serialized value to byte array b at
- * @param t
- * @param b
+ * Writes a Timestamp's serialized value to byte array b at the given offset
+ * @param timestamp to convert to bytes
+ * @param b destination byte array
+ * @param offset destination offset in the byte array
*/
public static void convertTimestampToBytes(Timestamp t, byte[] b,
int offset) {
- if (b.length < 9) {
- LOG.error("byte array too short");
- }
long millis = t.getTime();
int nanos = t.getNanos();
- boolean hasDecimal = nanos != 0 && setNanosBytes(nanos, b, offset+4);
- setSecondsBytes(millis, b, offset, hasDecimal);
- }
-
- /**
- * Given an integer representing seconds, write its serialized
- * value to the byte array b at offset
- * @param millis
- * @param b
- * @param offset
- * @param hasDecimal
- */
- private static void setSecondsBytes(long millis, byte[] b, int offset, boolean hasDecimal) {
- int seconds = (int) (millis / 1000);
-
- if (!hasDecimal) {
- seconds &= NO_DECIMAL_MASK;
+ long seconds = millisToSeconds(millis);
+ boolean hasSecondVInt = seconds < 0 || seconds > Integer.MAX_VALUE;
+ boolean hasDecimal = setNanosBytes(nanos, b, offset+4, hasSecondVInt);
+
+ int firstInt = (int) seconds;
+ if (hasDecimal || hasSecondVInt) {
+ firstInt |= DECIMAL_OR_SECOND_VINT_FLAG;
} else {
- seconds |= HAS_DECIMAL_MASK;
+ firstInt &= LOWEST_31_BITS_OF_SEC_MASK;
}
+ intToBytes(firstInt, b, offset);
- intToBytes(seconds, b, offset);
+ if (hasSecondVInt) {
+ LazyBinaryUtils.writeVLongToByteArray(b,
+ offset + 4 + WritableUtils.decodeVIntSize(b[offset + 4]),
+ seconds >> 31);
+ }
}
/**
@@ -432,7 +489,7 @@ public class TimestampWritable implement
* @param offset
* @return
*/
- private static boolean setNanosBytes(int nanos, byte[] b, int offset) {
+ private static boolean setNanosBytes(int nanos, byte[] b, int offset, boolean hasSecondVInt) {
int decimal = 0;
if (nanos != 0) {
int counter = 0;
@@ -444,7 +501,11 @@ public class TimestampWritable implement
}
}
- LazyBinaryUtils.writeVLongToByteArray(b, offset, decimal);
+ if (hasSecondVInt || decimal != 0) {
+ // We use the sign of the reversed-nanoseconds field to indicate that there is a second VInt
+ // present.
+ LazyBinaryUtils.writeVLongToByteArray(b, offset, hasSecondVInt ? (-decimal - 1) : decimal);
+ }
return decimal != 0;
}
@@ -458,11 +519,14 @@ public class TimestampWritable implement
}
public static Timestamp decimalToTimestamp(HiveDecimal d) {
- BigDecimal seconds = new BigDecimal(d.longValue());
- long millis = d.bigDecimalValue().multiply(new BigDecimal(1000)).longValue();
- int nanos = d.bigDecimalValue().subtract(seconds).multiply(new BigDecimal(1000000000)).intValue();
-
- Timestamp t = new Timestamp(millis);
+ BigDecimal nanoInstant = d.bigDecimalValue().multiply(BILLION_BIG_DECIMAL);
+ int nanos = nanoInstant.remainder(BILLION_BIG_DECIMAL).intValue();
+ if (nanos < 0) {
+ nanos += 1000000000;
+ }
+ long seconds =
+ nanoInstant.subtract(new BigDecimal(nanos)).divide(BILLION_BIG_DECIMAL).longValue();
+ Timestamp t = new Timestamp(seconds * 1000);
t.setNanos(nanos);
return t;
@@ -480,6 +544,10 @@ public class TimestampWritable implement
// Convert to millis
long millis = seconds * 1000;
+ if (nanos < 0) {
+ millis -= 1000;
+ nanos += 1000000000;
+ }
Timestamp t = new Timestamp(millis);
// Set remaining fractional portion to nanos
@@ -488,10 +556,19 @@ public class TimestampWritable implement
}
public static void setTimestamp(Timestamp t, byte[] bytes, int offset) {
- boolean hasDecimal = hasDecimal(bytes[offset]);
- t.setTime(((long) TimestampWritable.getSeconds(bytes, offset)) * 1000);
- if (hasDecimal) {
- t.setNanos(TimestampWritable.getNanos(bytes, offset+4));
+ boolean hasDecimalOrSecondVInt = hasDecimalOrSecondVInt(bytes[offset]);
+ long seconds = (long) TimestampWritable.getSeconds(bytes, offset);
+ int nanos = 0;
+ if (hasDecimalOrSecondVInt) {
+ nanos = TimestampWritable.getNanos(bytes, offset + 4);
+ if (hasSecondVInt(bytes[offset + 4])) {
+ seconds += LazyBinaryUtils.readVLongFromByteArray(bytes,
+ offset + 4 + WritableUtils.decodeVIntSize(bytes[offset + 4]));
+ }
+ }
+ t.setTime(seconds * 1000);
+ if (nanos != 0) {
+ t.setNanos(nanos);
}
}
@@ -501,17 +578,22 @@ public class TimestampWritable implement
return t;
}
- public boolean hasDecimal() {
- return hasDecimal(currentBytes[offset]);
+ private static boolean hasDecimalOrSecondVInt(byte b) {
+ return (b >> 7) != 0;
}
- /**
- *
- * @param b first byte in an encoded TimestampWritable
- * @return true if it has a decimal portion, false otherwise
- */
- public static boolean hasDecimal(byte b) {
- return (b >> 7) != 0;
+ private static boolean hasSecondVInt(byte b) {
+ return WritableUtils.isNegativeVInt(b);
+ }
+
+ private final boolean hasDecimalOrSecondVInt() {
+ return hasDecimalOrSecondVInt(currentBytes[offset]);
+ }
+
+ public final boolean hasDecimal() {
+ return hasDecimalOrSecondVInt() || currentBytes[offset + 4] != -1;
+ // If the first byte of the VInt is -1, the VInt itself is -1, indicating that there is a
+ // second VInt but the nanoseconds field is actually 0.
}
/**
@@ -528,6 +610,20 @@ public class TimestampWritable implement
}
/**
+ * Writes <code>value</code> into <code>dest</code> at <code>offset</code> as a seven-byte
+ * serialized long number.
+ */
+ static void sevenByteLongToBytes(long value, byte[] dest, int offset) {
+ dest[offset] = (byte) ((value >> 48) & 0xFF);
+ dest[offset+1] = (byte) ((value >> 40) & 0xFF);
+ dest[offset+2] = (byte) ((value >> 32) & 0xFF);
+ dest[offset+3] = (byte) ((value >> 24) & 0xFF);
+ dest[offset+4] = (byte) ((value >> 16) & 0xFF);
+ dest[offset+5] = (byte) ((value >> 8) & 0xFF);
+ dest[offset+6] = (byte) (value & 0xFF);
+ }
+
+ /**
*
* @param bytes
* @param offset
@@ -540,4 +636,27 @@ public class TimestampWritable implement
| ((0xFF & bytes[offset+2]) << 8)
| (0xFF & bytes[offset+3]);
}
+
+ static long readSevenByteLong(byte[] bytes, int offset) {
+ // We need to shift everything 8 bits left and then shift back to populate the sign field.
+ return (((0xFFL & bytes[offset]) << 56)
+ | ((0xFFL & bytes[offset+1]) << 48)
+ | ((0xFFL & bytes[offset+2]) << 40)
+ | ((0xFFL & bytes[offset+3]) << 32)
+ | ((0xFFL & bytes[offset+4]) << 24)
+ | ((0xFFL & bytes[offset+5]) << 16)
+ | ((0xFFL & bytes[offset+6]) << 8)) >> 8;
+ }
+
+ /**
+ * Rounds the number of milliseconds relative to the epoch down to the nearest whole number of
+ * seconds. 500 would round to 0, -500 would round to -1.
+ */
+ static long millisToSeconds(long millis) {
+ if (millis >= 0) {
+ return millis / 1000;
+ } else {
+ return (millis - 999) / 1000;
+ }
+ }
}
Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java Tue Jul 30 17:27:57 2013
@@ -202,10 +202,7 @@ public final class LazyBinaryUtils {
break;
case TIMESTAMP:
recordInfo.elementOffset = 0;
- recordInfo.elementSize = 4;
- if(TimestampWritable.hasDecimal(bytes[offset])) {
- recordInfo.elementSize += (byte) WritableUtils.decodeVIntSize(bytes[offset+4]);
- }
+ recordInfo.elementSize = TimestampWritable.getTotalLength(bytes, offset);
break;
case DECIMAL:
// using vint instead of 4 bytes
@@ -285,6 +282,13 @@ public final class LazyBinaryUtils {
public byte length;
};
+ public static final ThreadLocal<VInt> threadLocalVInt = new ThreadLocal<VInt>() {
+ @Override
+ protected VInt initialValue() {
+ return new VInt();
+ }
+ };
+
/**
* Reads a zero-compressed encoded int from a byte array and returns it.
*
@@ -324,6 +328,28 @@ public final class LazyBinaryUtils {
}
/**
+ * Read a zero-compressed encoded long from a byte array.
+ *
+ * @param bytes the byte array
+ * @param offset the offset in the byte array where the VLong is stored
+ * @return the long
+ */
+ public static long readVLongFromByteArray(final byte[] bytes, int offset) {
+ byte firstByte = bytes[offset++];
+ int len = WritableUtils.decodeVIntSize(firstByte);
+ if (len == 1) {
+ return firstByte;
+ }
+ long i = 0;
+ for (int idx = 0; idx < len-1; idx++) {
+ byte b = bytes[offset++];
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
+ }
+
+ /**
* Write a zero-compressed encoded long to a byte array.
*
* @param bytes
Added: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java?rev=1508537&view=auto
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java (added)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java Tue Jul 30 17:27:57 2013
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TestTimestampWritable extends TestCase {
+
+ private static DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ private static final int HAS_DECIMAL_MASK = 0x80000000;
+
+ private static final long MAX_ADDITIONAL_SECONDS_BITS = 0x418937;
+
+ private static long MIN_FOUR_DIGIT_YEAR_MILLIS = parseToMillis("0001-01-01 00:00:00");
+ private static long MAX_FOUR_DIGIT_YEAR_MILLIS = parseToMillis("9999-01-01 00:00:00");
+
+ private static int BILLION = 1000 * 1000 * 1000;
+
+ private static long getSeconds(Timestamp ts) {
+ // To compute seconds, we first subtract the milliseconds stored in the nanos field of the
+ // Timestamp from the result of getTime().
+ long seconds = (ts.getTime() - ts.getNanos() / 1000000) / 1000;
+
+ // It should also be possible to calculate this based on ts.getTime() only.
+ assertEquals(seconds, TimestampWritable.millisToSeconds(ts.getTime()));
+
+ return seconds;
+ }
+
+ private static long parseToMillis(String s) {
+ try {
+ return DATE_FORMAT.parse(s).getTime();
+ } catch (ParseException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ protected void setUp() {
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+ }
+
+ private static String normalizeTimestampStr(String timestampStr) {
+ if (timestampStr.endsWith(".0")) {
+ return timestampStr.substring(0, timestampStr.length() - 2);
+ }
+ return timestampStr;
+ }
+
+ private static void assertTSWEquals(TimestampWritable expected, TimestampWritable actual) {
+ assertEquals(normalizeTimestampStr(expected.toString()),
+ normalizeTimestampStr(actual.toString()));
+ assertEquals(expected, actual);
+ assertEquals(expected.getTimestamp(), actual.getTimestamp());
+ }
+
+ private static TimestampWritable deserializeFromBytes(byte[] tsBytes) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(tsBytes);
+ DataInputStream dis = new DataInputStream(bais);
+ TimestampWritable deserTSW = new TimestampWritable();
+ deserTSW.readFields(dis);
+ return deserTSW;
+ }
+
+ private static int reverseNanos(int nanos) {
+ if (nanos == 0) {
+ return 0;
+ }
+ if (nanos < 0 || nanos >= 1000 * 1000 * 1000) {
+ throw new IllegalArgumentException("Invalid nanosecond value: " + nanos);
+ }
+
+ int x = nanos;
+ StringBuilder reversed = new StringBuilder();
+ while (x != 0) {
+ reversed.append((char)('0' + x % 10));
+ x /= 10;
+ }
+
+ int result = Integer.parseInt(reversed.toString());
+ while (nanos < 100 * 1000 * 1000) {
+ result *= 10;
+ nanos *= 10;
+ }
+ return result;
+ }
+
+ private static byte[] serializeToBytes(Writable w) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ w.write(dos);
+ return baos.toByteArray();
+ }
+
+ private static List<Byte> toList(byte[] a) {
+ List<Byte> list = new ArrayList<Byte>(a.length);
+ for (byte b : a) {
+ list.add(b);
+ }
+ return list;
+ }
+
+ /**
+ * Pad the given byte array with the given number of bytes in the beginning. The padding bytes
+ * deterministically depend on the passed data.
+ */
+ private static byte[] padBytes(byte[] bytes, int count) {
+ byte[] result = new byte[bytes.length + count];
+ for (int i = 0; i < count; ++i) {
+ // Fill the prefix bytes with deterministic data based on the actual meaningful data.
+ result[i] = (byte) (bytes[i % bytes.length] * 37 + 19);
+ }
+ System.arraycopy(bytes, 0, result, count, bytes.length);
+ return result;
+ }
+
+ private static TimestampWritable serializeDeserializeAndCheckTimestamp(Timestamp ts)
+ throws IOException {
+ TimestampWritable tsw = new TimestampWritable(ts);
+ assertEquals(ts, tsw.getTimestamp());
+
+ byte[] tsBytes = serializeToBytes(tsw);
+ TimestampWritable deserTSW = deserializeFromBytes(tsBytes);
+ assertTSWEquals(tsw, deserTSW);
+ assertEquals(ts, deserTSW.getTimestamp());
+ assertEquals(tsBytes.length, tsw.getTotalLength());
+
+ // Also convert to/from binary-sortable representation.
+ int binarySortableOffset = Math.abs(tsw.hashCode()) % 10;
+ byte[] binarySortableBytes = padBytes(tsw.getBinarySortable(), binarySortableOffset);
+ TimestampWritable fromBinSort = new TimestampWritable();
+ fromBinSort.setBinarySortable(binarySortableBytes, binarySortableOffset);
+ assertTSWEquals(tsw, fromBinSort);
+
+ long timeSeconds = ts.getTime() / 1000;
+ if (0 <= timeSeconds && timeSeconds <= Integer.MAX_VALUE) {
+ assertEquals(new Timestamp(timeSeconds * 1000),
+ fromIntAndVInts((int) timeSeconds, 0).getTimestamp());
+
+ int nanos = reverseNanos(ts.getNanos());
+ assertEquals(ts,
+ fromIntAndVInts((int) timeSeconds | (nanos != 0 ? HAS_DECIMAL_MASK : 0),
+ nanos).getTimestamp());
+ }
+
+ assertEquals(ts.getNanos(), tsw.getNanos());
+ assertEquals(getSeconds(ts), tsw.getSeconds());
+
+ // Test various set methods and copy constructors.
+ {
+ TimestampWritable tsSet1 = new TimestampWritable();
+ // make the offset non-zero to keep things interesting.
+ int offset = Math.abs(ts.hashCode() % 32);
+ byte[] shiftedBytes = padBytes(tsBytes, offset);
+ tsSet1.set(shiftedBytes, offset);
+ assertTSWEquals(tsw, tsSet1);
+
+ TimestampWritable tswShiftedBytes = new TimestampWritable(shiftedBytes, offset);
+ assertTSWEquals(tsw, tswShiftedBytes);
+ assertTSWEquals(tsw, deserializeFromBytes(serializeToBytes(tswShiftedBytes)));
+ }
+
+ {
+ TimestampWritable tsSet2 = new TimestampWritable();
+ tsSet2.set(ts);
+ assertTSWEquals(tsw, tsSet2);
+ }
+
+ {
+ TimestampWritable tsSet3 = new TimestampWritable();
+ tsSet3.set(tsw);
+ assertTSWEquals(tsw, tsSet3);
+ }
+
+ {
+ TimestampWritable tsSet4 = new TimestampWritable();
+ tsSet4.set(deserTSW);
+ assertTSWEquals(tsw, tsSet4);
+ }
+
+ double expectedDbl = getSeconds(ts) + 1e-9d * ts.getNanos();
+ assertTrue(Math.abs(tsw.getDouble() - expectedDbl) < 1e-10d);
+
+ return deserTSW;
+ }
+
+ private static int randomNanos(Random rand, int decimalDigits) {
+ // Only keep the most significant decimalDigits digits.
+ int nanos = rand.nextInt(BILLION);
+ return nanos - nanos % (int) Math.pow(10, 9 - decimalDigits);
+ }
+
+ private static int randomNanos(Random rand) {
+ return randomNanos(rand, rand.nextInt(10));
+ }
+
+ private static void checkTimestampWithAndWithoutNanos(Timestamp ts, int nanos)
+ throws IOException {
+ serializeDeserializeAndCheckTimestamp(ts);
+
+ ts.setNanos(nanos);
+ assertEquals(serializeDeserializeAndCheckTimestamp(ts).getNanos(), nanos);
+ }
+
+ private static TimestampWritable fromIntAndVInts(int i, long... vints) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeInt(i);
+ if ((i & HAS_DECIMAL_MASK) != 0) {
+ for (long vi : vints) {
+ WritableUtils.writeVLong(dos, vi);
+ }
+ }
+ byte[] bytes = baos.toByteArray();
+ TimestampWritable tsw = deserializeFromBytes(bytes);
+ assertEquals(toList(bytes), toList(serializeToBytes(tsw)));
+ return tsw;
+ }
+
+ public void testReverseNanos() {
+ assertEquals(0, reverseNanos(0));
+ assertEquals(120000000, reverseNanos(21));
+ assertEquals(32100000, reverseNanos(1230));
+ assertEquals(5, reverseNanos(500000000));
+ assertEquals(987654321, reverseNanos(123456789));
+ assertEquals(12345678, reverseNanos(876543210));
+ }
+
+ /**
+ * Test serializing and deserializing timestamps that can be represented by a number of seconds
+ * from 0 to 2147483647 since the UNIX epoch.
+ */
+ public void testTimestampsWithinPositiveIntRange() throws IOException {
+ Random rand = new Random(294722773L);
+ for (int i = 0; i < 10000; ++i) {
+ long millis = ((long) rand.nextInt(Integer.MAX_VALUE)) * 1000;
+ checkTimestampWithAndWithoutNanos(new Timestamp(millis), randomNanos(rand));
+ }
+ }
+
+ private static long randomMillis(long minMillis, long maxMillis, Random rand) {
+ return minMillis + (long) ((maxMillis - minMillis) * rand.nextDouble());
+ }
+
+ /**
+ * Test timestamps that don't necessarily fit between 1970 and 2038. This depends on HIVE-4525
+ * being fixed.
+ */
+ public void testTimestampsOutsidePositiveIntRange() throws IOException {
+ Random rand = new Random(789149717L);
+ for (int i = 0; i < 10000; ++i) {
+ long millis = randomMillis(MIN_FOUR_DIGIT_YEAR_MILLIS, MAX_FOUR_DIGIT_YEAR_MILLIS, rand);
+ checkTimestampWithAndWithoutNanos(new Timestamp(millis), randomNanos(rand));
+ }
+ }
+
+ public void testTimestampsInFullRange() throws IOException {
+ Random rand = new Random(2904974913L);
+ for (int i = 0; i < 10000; ++i) {
+ checkTimestampWithAndWithoutNanos(new Timestamp(rand.nextLong()), randomNanos(rand));
+ }
+ }
+
+ public void testToFromDouble() {
+ Random rand = new Random(294729777L);
+ for (int nanosPrecision = 0; nanosPrecision <= 4; ++nanosPrecision) {
+ for (int i = 0; i < 10000; ++i) {
+ long millis = randomMillis(MIN_FOUR_DIGIT_YEAR_MILLIS, MAX_FOUR_DIGIT_YEAR_MILLIS, rand);
+ Timestamp ts = new Timestamp(millis);
+ int nanos = randomNanos(rand, nanosPrecision);
+ ts.setNanos(nanos);
+ TimestampWritable tsw = new TimestampWritable(ts);
+ double asDouble = tsw.getDouble();
+ int recoveredNanos =
+ (int) (Math.round((asDouble - Math.floor(asDouble)) * Math.pow(10, nanosPrecision)) *
+ Math.pow(10, 9 - nanosPrecision));
+ assertEquals(String.format("Invalid nanosecond part recovered from %f", asDouble),
+ nanos, recoveredNanos);
+ assertEquals(ts, TimestampWritable.doubleToTimestamp(asDouble));
+ // decimalToTimestamp should be consistent with doubleToTimestamp for this level of
+ // precision.
+ assertEquals(ts, TimestampWritable.decimalToTimestamp(
+ new HiveDecimal(BigDecimal.valueOf(asDouble))));
+ }
+ }
+ }
+
+ private static HiveDecimal timestampToDecimal(Timestamp ts) {
+ BigDecimal d = new BigDecimal(getSeconds(ts));
+ d = d.add(new BigDecimal(ts.getNanos()).divide(new BigDecimal(BILLION)));
+ return new HiveDecimal(d);
+ }
+
+ public void testDecimalToTimestampRandomly() {
+ Random rand = new Random(294729777L);
+ for (int i = 0; i < 10000; ++i) {
+ Timestamp ts = new Timestamp(
+ randomMillis(MIN_FOUR_DIGIT_YEAR_MILLIS, MAX_FOUR_DIGIT_YEAR_MILLIS, rand));
+ ts.setNanos(randomNanos(rand, 9)); // full precision
+ assertEquals(ts, TimestampWritable.decimalToTimestamp(timestampToDecimal(ts)));
+ }
+ }
+
+ public void testDecimalToTimestampCornerCases() {
+ Timestamp ts = new Timestamp(parseToMillis("1969-03-04 05:44:33"));
+ assertEquals(0, ts.getTime() % 1000);
+ for (int nanos : new int[] { 100000, 900000, 999100000, 999900000 }) {
+ ts.setNanos(nanos);
+ HiveDecimal d = timestampToDecimal(ts);
+ assertEquals(ts, TimestampWritable.decimalToTimestamp(d));
+ assertEquals(ts, TimestampWritable.doubleToTimestamp(d.bigDecimalValue().doubleValue()));
+ }
+ }
+
+ public void testSerializationFormatDirectly() throws IOException {
+ assertEquals("1970-01-01 00:00:00", fromIntAndVInts(0).toString());
+ assertEquals("1970-01-01 00:00:01", fromIntAndVInts(1).toString());
+ assertEquals("1970-01-01 00:05:00", fromIntAndVInts(300).toString());
+ assertEquals("1970-01-01 02:00:00", fromIntAndVInts(7200).toString());
+ assertEquals("2000-01-02 03:04:05", fromIntAndVInts(946782245).toString());
+
+ // This won't have a decimal part because the HAS_DECIMAL_MASK bit is not set.
+ assertEquals("2000-01-02 03:04:05", fromIntAndVInts(946782245, 3210).toString());
+
+ assertEquals("2000-01-02 03:04:05.0123",
+ fromIntAndVInts(946782245 | HAS_DECIMAL_MASK, 3210).toString());
+
+ assertEquals("2038-01-19 03:14:07", fromIntAndVInts(Integer.MAX_VALUE).toString());
+ assertEquals("2038-01-19 03:14:07.012345678",
+ fromIntAndVInts(Integer.MAX_VALUE | HAS_DECIMAL_MASK, // this is really just -1
+ 876543210).toString());
+
+ // Timestamps with a second VInt storing additional bits of the seconds field.
+ long seconds = 253392390415L;
+ assertEquals("9999-09-08 07:06:55",
+ fromIntAndVInts((int) (seconds & 0x7fffffff) | (1 << 31), -1L, seconds >> 31).toString());
+ assertEquals("9999-09-08 07:06:55.0123",
+ fromIntAndVInts((int) (seconds & 0x7fffffff) | (1 << 31),
+ -3210 - 1, seconds >> 31).toString());
+ }
+
+ public void testMaxSize() {
+ // This many bytes are necessary to store the reversed nanoseconds.
+ assertEquals(5, WritableUtils.getVIntSize(999999999));
+ assertEquals(5, WritableUtils.getVIntSize(-2 - 999999999));
+
+ // Bytes necessary to store extra bits of the second timestamp if storing a timestamp
+ // before 1970 or after 2038.
+ assertEquals(3, WritableUtils.getVIntSize(Short.MAX_VALUE));
+ assertEquals(3, WritableUtils.getVIntSize(Short.MIN_VALUE));
+
+ // Test that MAX_ADDITIONAL_SECONDS_BITS is really the maximum value of the
+ // additional bits (beyond 31 bits) of the seconds-since-epoch part of timestamp.
+ assertTrue((((long) MAX_ADDITIONAL_SECONDS_BITS) << 31) * 1000 < Long.MAX_VALUE);
+ assertTrue((((double) MAX_ADDITIONAL_SECONDS_BITS + 1) * (1L << 31)) * 1000 >
+ Long.MAX_VALUE);
+
+ // This is how many bytes we need to store those additonal bits as a VInt.
+ assertEquals(4, WritableUtils.getVIntSize(MAX_ADDITIONAL_SECONDS_BITS));
+
+ // Therefore, the maximum total size of a serialized timestamp is 4 + 5 + 4 = 13.
+ }
+
+ public void testMillisToSeconds() {
+ assertEquals(0, TimestampWritable.millisToSeconds(0));
+ assertEquals(-1, TimestampWritable.millisToSeconds(-1));
+ assertEquals(-1, TimestampWritable.millisToSeconds(-999));
+ assertEquals(-1, TimestampWritable.millisToSeconds(-1000));
+ assertEquals(-2, TimestampWritable.millisToSeconds(-1001));
+ assertEquals(-2, TimestampWritable.millisToSeconds(-1999));
+ assertEquals(-2, TimestampWritable.millisToSeconds(-2000));
+ assertEquals(-3, TimestampWritable.millisToSeconds(-2001));
+ assertEquals(-99, TimestampWritable.millisToSeconds(-99000));
+ assertEquals(-100, TimestampWritable.millisToSeconds(-99001));
+ assertEquals(-100, TimestampWritable.millisToSeconds(-100000));
+ assertEquals(1, TimestampWritable.millisToSeconds(1500));
+ assertEquals(19, TimestampWritable.millisToSeconds(19999));
+ assertEquals(20, TimestampWritable.millisToSeconds(20000));
+ }
+
+ private static int compareEqualLengthByteArrays(byte[] a, byte[] b) {
+ assertEquals(a.length, b.length);
+ for (int i = 0; i < a.length; ++i) {
+ if (a[i] != b[i]) {
+ return (a[i] & 0xff) - (b[i] & 0xff);
+ }
+ }
+ return 0;
+ }
+
+ private static int normalizeComparisonResult(int result) {
+ return result < 0 ? -1 : (result > 0 ? 1 : 0);
+ }
+
+ public void testBinarySortable() {
+ Random rand = new Random(5972977L);
+ List<TimestampWritable> tswList = new ArrayList<TimestampWritable>();
+ for (int i = 0; i < 50; ++i) {
+ Timestamp ts = new Timestamp(rand.nextLong());
+ ts.setNanos(randomNanos(rand));
+ tswList.add(new TimestampWritable(ts));
+ }
+ for (TimestampWritable tsw1 : tswList) {
+ byte[] bs1 = tsw1.getBinarySortable();
+ for (TimestampWritable tsw2 : tswList) {
+ byte[] bs2 = tsw2.getBinarySortable();
+ int binaryComparisonResult =
+ normalizeComparisonResult(compareEqualLengthByteArrays(bs1, bs2));
+ int comparisonResult = normalizeComparisonResult(tsw1.compareTo(tsw2));
+ if (binaryComparisonResult != comparisonResult) {
+ throw new AssertionError("TimestampWritables " + tsw1 + " and " + tsw2 + " compare as " +
+ comparisonResult + " using compareTo but as " + binaryComparisonResult + " using " +
+ "getBinarySortable");
+ }
+ }
+ }
+ }
+
+}