You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/30 19:27:57 UTC

svn commit: r1508537 - in /hive/trunk: hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ ql/src/java/org/apache/hadoop/hive/ql/udf/ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/ serde/src/java/org/apache/hadoop...

Author: hashutosh
Date: Tue Jul 30 17:27:57 2013
New Revision: 1508537

URL: http://svn.apache.org/r1508537
Log:
HIVE-4525 : Support timestamps earlier than 1970 and later than 2038 (Mikhail Bautin via Ashutosh Chauhan)

Added:
    hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/
    hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java
Modified:
    hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java

Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java Tue Jul 30 17:27:57 2013
@@ -75,7 +75,7 @@ public class RevisionManagerFactory {
      * Internally used by endpoint implementation to instantiate from different configuration setting.
      * @param className
      * @param conf
-     * @return
+     * @return the opened revision manager
      * @throws IOException
      */
     static RevisionManager getOpenedRevisionManager(String className, Configuration conf) throws IOException {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java Tue Jul 30 17:27:57 2013
@@ -184,7 +184,7 @@ public class UDFToInteger extends UDF {
     if (i == null) {
       return null;
     } else {
-      intWritable.set(i.getSeconds());
+      intWritable.set((int) i.getSeconds());
       return intWritable;
     }
   }

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java Tue Jul 30 17:27:57 2013
@@ -376,7 +376,7 @@ public class BinarySortableSerDe extends
       case TIMESTAMP:
         TimestampWritable t = (reuse == null ? new TimestampWritable() :
             (TimestampWritable) reuse);
-        byte[] bytes = new byte[8];
+        byte[] bytes = new byte[TimestampWritable.BINARY_SORTABLE_LENGTH];
 
         for (int i = 0; i < bytes.length; i++) {
           bytes[i] = buffer.read(invert);

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java Tue Jul 30 17:27:57 2013
@@ -25,7 +25,6 @@ import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -59,8 +58,17 @@ public class TimestampWritable implement
 
   static final public byte[] nullBytes = {0x0, 0x0, 0x0, 0x0};
 
-  private static final int NO_DECIMAL_MASK = 0x7FFFFFFF;
-  private static final int HAS_DECIMAL_MASK = 0x80000000;
+  private static final int DECIMAL_OR_SECOND_VINT_FLAG = 0x80000000;
+  private static final int LOWEST_31_BITS_OF_SEC_MASK = 0x7fffffff;
+
+  private static final long SEVEN_BYTE_LONG_SIGN_FLIP = 0xff80L << 48;
+
+  private static final BigDecimal BILLION_BIG_DECIMAL = BigDecimal.valueOf(1000000000);
+
+  /** The maximum number of bytes required for a TimestampWritable */
+  public static final int MAX_BYTES = 13;
+
+  public static final int BINARY_SORTABLE_LENGTH = 11;
 
   private static final ThreadLocal<DateFormat> threadLocalDateFormat =
       new ThreadLocal<DateFormat>() {
@@ -82,16 +90,12 @@ public class TimestampWritable implement
 
   /* Allow use of external byte[] for efficiency */
   private byte[] currentBytes;
-  private final byte[] internalBytes = new byte[9];
+  private final byte[] internalBytes = new byte[MAX_BYTES];
   private byte[] externalBytes;
   private int offset;
 
-  /* Reused to read VInts */
-  static private final VInt vInt = new VInt();
-
   /* Constructors */
   public TimestampWritable() {
-    Arrays.fill(internalBytes, (byte) 0x0);
     bytesEmpty = false;
     currentBytes = internalBytes;
     offset = 0;
@@ -156,11 +160,14 @@ public class TimestampWritable implement
    *
    * @return seconds corresponding to this TimestampWritable
    */
-  public int getSeconds() {
-    if (bytesEmpty) {
-      return (int) (timestamp.getTime() / 1000);
+  public long getSeconds() {
+    if (!timestampEmpty) {
+      return millisToSeconds(timestamp.getTime());
+    } else if (!bytesEmpty) {
+      return TimestampWritable.getSeconds(currentBytes, offset);
+    } else {
+      throw new IllegalStateException("Both timestamp and bytes are empty");
     }
-    return TimestampWritable.getSeconds(currentBytes, offset);
   }
 
   /**
@@ -170,26 +177,33 @@ public class TimestampWritable implement
   public int getNanos() {
     if (!timestampEmpty) {
       return timestamp.getNanos();
+    } else if (!bytesEmpty) {
+      return hasDecimalOrSecondVInt() ?
+          TimestampWritable.getNanos(currentBytes, offset + 4) : 0;
+    } else {
+      throw new IllegalStateException("Both timestamp and bytes are empty");
     }
-
-    return hasDecimal() ? TimestampWritable.getNanos(currentBytes, offset+4) : 0;
   }
 
   /**
-   *
-   * @return length of serialized TimestampWritable data
+   * @return length of serialized TimestampWritable data. As a side effect, populates the internal
+   *         byte array if empty.
    */
-  private int getTotalLength() {
-    return 4 + getDecimalLength();
+  int getTotalLength() {
+    checkBytes();
+    return getTotalLength(currentBytes, offset);
   }
 
-  /**
-   *
-   * @return number of bytes the variable length decimal takes up
-   */
-  private int getDecimalLength() {
-    checkBytes();
-    return hasDecimal() ? WritableUtils.decodeVIntSize(currentBytes[offset+4]) : 0;
+  public static int getTotalLength(byte[] bytes, int offset) {
+    int len = 4;
+    if (hasDecimalOrSecondVInt(bytes[offset])) {
+      int firstVIntLen = WritableUtils.decodeVIntSize(bytes[offset + 4]);
+      len += firstVIntLen;
+      if (hasSecondVInt(bytes[offset + 4])) {
+        len += WritableUtils.decodeVIntSize(bytes[offset + 4 + firstVIntLen]);
+      }
+    }
+    return len;
   }
 
   public Timestamp getTimestamp() {
@@ -215,33 +229,45 @@ public class TimestampWritable implement
 
   /**
    * @return byte[] representation of TimestampWritable that is binary
-   * sortable (4 byte seconds, 4 bytes for nanoseconds)
+   * sortable (7 bytes for seconds, 4 bytes for nanoseconds)
    */
   public byte[] getBinarySortable() {
-    byte[] b = new byte[8];
+    byte[] b = new byte[BINARY_SORTABLE_LENGTH];
     int nanos = getNanos();
-    int seconds = HAS_DECIMAL_MASK | getSeconds();
-    intToBytes(seconds, b, 0);
-    intToBytes(nanos, b, 4);
+    // We flip the highest-order bit of the seven-byte representation of seconds to make negative
+    // values come before positive ones.
+    long seconds = getSeconds() ^ SEVEN_BYTE_LONG_SIGN_FLIP;
+    sevenByteLongToBytes(seconds, b, 0);
+    intToBytes(nanos, b, 7);
     return b;
   }
 
   /**
    * Given a byte[] that has binary sortable data, initialize the internal
    * structures to hold that data
-   * @param bytes
-   * @param offset
+   * @param bytes the byte array that holds the binary sortable representation
+   * @param binSortOffset offset of the binary-sortable representation within the buffer.
    */
-  public void setBinarySortable(byte[] bytes, int offset) {
-    int seconds = bytesToInt(bytes, offset);
-    int nanos = bytesToInt(bytes, offset+4);
-    if (nanos == 0) {
-      seconds &= NO_DECIMAL_MASK;
+  public void setBinarySortable(byte[] bytes, int binSortOffset) {
+    // Flip the sign bit (and unused bits of the high-order byte) of the seven-byte long back.
+    long seconds = readSevenByteLong(bytes, binSortOffset) ^ SEVEN_BYTE_LONG_SIGN_FLIP;
+    int nanos = bytesToInt(bytes, binSortOffset + 7);
+    int firstInt = (int) seconds;
+    boolean hasSecondVInt = seconds < 0 || seconds > Integer.MAX_VALUE;
+    if (nanos != 0 || hasSecondVInt) {
+      firstInt |= DECIMAL_OR_SECOND_VINT_FLAG;
     } else {
-      seconds |= HAS_DECIMAL_MASK;
+      firstInt &= LOWEST_31_BITS_OF_SEC_MASK;
     }
-    intToBytes(seconds, internalBytes, 0);
-    setNanosBytes(nanos, internalBytes, 4);
+
+    intToBytes(firstInt, internalBytes, 0);
+    setNanosBytes(nanos, internalBytes, 4, hasSecondVInt);
+    if (hasSecondVInt) {
+      LazyBinaryUtils.writeVLongToByteArray(internalBytes,
+          4 + WritableUtils.decodeVIntSize(internalBytes[4]),
+          seconds >> 31);
+    }
+
     currentBytes = internalBytes;
     this.offset = 0;
   }
@@ -268,7 +294,7 @@ public class TimestampWritable implement
   public double getDouble() {
     double seconds, nanos;
     if (bytesEmpty) {
-      seconds = timestamp.getTime() / 1000;
+      seconds = millisToSeconds(timestamp.getTime());
       nanos = timestamp.getNanos();
     } else {
       seconds = getSeconds();
@@ -281,10 +307,31 @@ public class TimestampWritable implement
 
   public void readFields(DataInput in) throws IOException {
     in.readFully(internalBytes, 0, 4);
-    if (TimestampWritable.hasDecimal(internalBytes[0])) {
+    if (TimestampWritable.hasDecimalOrSecondVInt(internalBytes[0])) {
       in.readFully(internalBytes, 4, 1);
       int len = (byte) WritableUtils.decodeVIntSize(internalBytes[4]);
-      in.readFully(internalBytes, 5, len-1);
+      if (len > 1) {
+        in.readFully(internalBytes, 5, len-1);
+      }
+
+      long vlong = LazyBinaryUtils.readVLongFromByteArray(internalBytes, 4);
+      if (vlong < -1000000000 || vlong > 999999999) {
+        throw new IOException(
+            "Invalid first vint value (encoded nanoseconds) of a TimestampWritable: " + vlong +
+            ", expected to be between -1000000000 and 999999999.");
+        // Note that -1000000000 is a valid value corresponding to a nanosecond timestamp
+        // of 999999999, because if the second VInt is present, we use the value
+        // (-reversedNanoseconds - 1) as the second VInt.
+      }
+      if (vlong < 0) {
+        // This indicates there is a second VInt containing the additional bits of the seconds
+        // field.
+        in.readFully(internalBytes, 4 + len, 1);
+        int secondVIntLen = (byte) WritableUtils.decodeVIntSize(internalBytes[4 + len]);
+        if (secondVIntLen > 1) {
+          in.readFully(internalBytes, 5 + len, secondVIntLen - 1);
+        }
+      }
     }
     currentBytes = internalBytes;
     this.offset = 0;
@@ -301,8 +348,8 @@ public class TimestampWritable implement
 
   public int compareTo(TimestampWritable t) {
     checkBytes();
-    int s1 = this.getSeconds();
-    int s2 = t.getSeconds();
+    long s1 = this.getSeconds();
+    long s2 = t.getSeconds();
     if (s1 == s2) {
       int n1 = this.getNanos();
       int n2 = t.getNanos();
@@ -311,7 +358,7 @@ public class TimestampWritable implement
       }
       return n1 - n2;
     } else {
-      return s1 - s2;
+      return s1 < s2 ? -1 : 1;
     }
   }
 
@@ -342,7 +389,7 @@ public class TimestampWritable implement
   @Override
   public int hashCode() {
     long seconds = getSeconds();
-    seconds <<= 32;
+    seconds <<= 30;  // the nanosecond part fits in 30 bits
     seconds |= getNanos();
     return (int) ((seconds >>> 32) ^ seconds);
   }
@@ -362,13 +409,30 @@ public class TimestampWritable implement
    * @param offset
    * @return the number of seconds
    */
-  public static int getSeconds(byte[] bytes, int offset) {
-    return NO_DECIMAL_MASK & bytesToInt(bytes, offset);
+  public static long getSeconds(byte[] bytes, int offset) {
+    int lowest31BitsOfSecondsAndFlag = bytesToInt(bytes, offset);
+    if (lowest31BitsOfSecondsAndFlag >= 0 ||  // the "has decimal or second VInt" flag is not set
+        !hasSecondVInt(bytes[offset + 4])) {
+      // The entire seconds field is stored in the first 4 bytes.
+      return lowest31BitsOfSecondsAndFlag & LOWEST_31_BITS_OF_SEC_MASK;
+    }
+
+    // We compose the seconds field from two parts. The lowest 31 bits come from the first four
+    // bytes. The higher-order bits come from the second VInt that follows the nanos field.
+    return ((long) (lowest31BitsOfSecondsAndFlag & LOWEST_31_BITS_OF_SEC_MASK)) |
+           (LazyBinaryUtils.readVLongFromByteArray(bytes,
+               offset + 4 + WritableUtils.decodeVIntSize(bytes[offset + 4])) << 31);
   }
 
   public static int getNanos(byte[] bytes, int offset) {
+    VInt vInt = LazyBinaryUtils.threadLocalVInt.get();
     LazyBinaryUtils.readVInt(bytes, offset, vInt);
     int val = vInt.value;
+    if (val < 0) {
+      // This means there is a second VInt present that specifies additional bits of the timestamp.
+      // The reversed nanoseconds value is still encoded in this VInt.
+      val = -val - 1;
+    }
     int len = (int) Math.floor(Math.log10(val)) + 1;
 
     // Reverse the value
@@ -387,40 +451,33 @@ public class TimestampWritable implement
   }
 
   /**
-   * Writes a Timestamp's serialized value to byte array b at
-   * @param t
-   * @param b
+   * Writes a Timestamp's serialized value to byte array b at the given offset
+   * @param timestamp to convert to bytes
+   * @param b destination byte array
+   * @param offset destination offset in the byte array
    */
   public static void convertTimestampToBytes(Timestamp t, byte[] b,
       int offset) {
-    if (b.length < 9) {
-      LOG.error("byte array too short");
-    }
     long millis = t.getTime();
     int nanos = t.getNanos();
 
-    boolean hasDecimal = nanos != 0 && setNanosBytes(nanos, b, offset+4);
-    setSecondsBytes(millis, b, offset, hasDecimal);
-  }
-
-  /**
-   * Given an integer representing seconds, write its serialized
-   * value to the byte array b at offset
-   * @param millis
-   * @param b
-   * @param offset
-   * @param hasDecimal
-   */
-  private static void setSecondsBytes(long millis, byte[] b, int offset, boolean hasDecimal) {
-    int seconds = (int) (millis / 1000);
-
-    if (!hasDecimal) {
-      seconds &= NO_DECIMAL_MASK;
+    long seconds = millisToSeconds(millis);
+    boolean hasSecondVInt = seconds < 0 || seconds > Integer.MAX_VALUE;
+    boolean hasDecimal = setNanosBytes(nanos, b, offset+4, hasSecondVInt);
+
+    int firstInt = (int) seconds;
+    if (hasDecimal || hasSecondVInt) {
+      firstInt |= DECIMAL_OR_SECOND_VINT_FLAG;
     } else {
-      seconds |= HAS_DECIMAL_MASK;
+      firstInt &= LOWEST_31_BITS_OF_SEC_MASK;
     }
+    intToBytes(firstInt, b, offset);
 
-    intToBytes(seconds, b, offset);
+    if (hasSecondVInt) {
+      LazyBinaryUtils.writeVLongToByteArray(b,
+          offset + 4 + WritableUtils.decodeVIntSize(b[offset + 4]),
+          seconds >> 31);
+    }
   }
 
   /**
@@ -432,7 +489,7 @@ public class TimestampWritable implement
    * @param offset
    * @return
    */
-  private static boolean setNanosBytes(int nanos, byte[] b, int offset) {
+  private static boolean setNanosBytes(int nanos, byte[] b, int offset, boolean hasSecondVInt) {
     int decimal = 0;
     if (nanos != 0) {
       int counter = 0;
@@ -444,7 +501,11 @@ public class TimestampWritable implement
       }
     }
 
-    LazyBinaryUtils.writeVLongToByteArray(b, offset, decimal);
+    if (hasSecondVInt || decimal != 0) {
+      // We use the sign of the reversed-nanoseconds field to indicate that there is a second VInt
+      // present.
+      LazyBinaryUtils.writeVLongToByteArray(b, offset, hasSecondVInt ? (-decimal - 1) : decimal);
+    }
     return decimal != 0;
   }
 
@@ -458,11 +519,14 @@ public class TimestampWritable implement
   }
 
   public static Timestamp decimalToTimestamp(HiveDecimal d) {
-    BigDecimal seconds = new BigDecimal(d.longValue());
-    long millis = d.bigDecimalValue().multiply(new BigDecimal(1000)).longValue();
-    int nanos = d.bigDecimalValue().subtract(seconds).multiply(new BigDecimal(1000000000)).intValue();
-
-    Timestamp t = new Timestamp(millis);
+    BigDecimal nanoInstant = d.bigDecimalValue().multiply(BILLION_BIG_DECIMAL);
+    int nanos = nanoInstant.remainder(BILLION_BIG_DECIMAL).intValue();
+    if (nanos < 0) {
+      nanos += 1000000000;
+    }
+    long seconds =
+      nanoInstant.subtract(new BigDecimal(nanos)).divide(BILLION_BIG_DECIMAL).longValue();
+    Timestamp t = new Timestamp(seconds * 1000);
     t.setNanos(nanos);
 
     return t;
@@ -480,6 +544,10 @@ public class TimestampWritable implement
 
     // Convert to millis
     long millis = seconds * 1000;
+    if (nanos < 0) {
+      millis -= 1000;
+      nanos += 1000000000;
+    }
     Timestamp t = new Timestamp(millis);
 
     // Set remaining fractional portion to nanos
@@ -488,10 +556,19 @@ public class TimestampWritable implement
   }
 
   public static void setTimestamp(Timestamp t, byte[] bytes, int offset) {
-    boolean hasDecimal = hasDecimal(bytes[offset]);
-    t.setTime(((long) TimestampWritable.getSeconds(bytes, offset)) * 1000);
-    if (hasDecimal) {
-      t.setNanos(TimestampWritable.getNanos(bytes, offset+4));
+    boolean hasDecimalOrSecondVInt = hasDecimalOrSecondVInt(bytes[offset]);
+    long seconds = (long) TimestampWritable.getSeconds(bytes, offset);
+    int nanos = 0;
+    if (hasDecimalOrSecondVInt) {
+      nanos = TimestampWritable.getNanos(bytes, offset + 4);
+      if (hasSecondVInt(bytes[offset + 4])) {
+        seconds += LazyBinaryUtils.readVLongFromByteArray(bytes,
+            offset + 4 + WritableUtils.decodeVIntSize(bytes[offset + 4]));
+      }
+    }
+    t.setTime(seconds * 1000);
+    if (nanos != 0) {
+      t.setNanos(nanos);
     }
   }
 
@@ -501,17 +578,22 @@ public class TimestampWritable implement
     return t;
   }
 
-  public boolean hasDecimal() {
-    return hasDecimal(currentBytes[offset]);
+  private static boolean hasDecimalOrSecondVInt(byte b) {
+    return (b >> 7) != 0;
   }
 
-  /**
-   *
-   * @param b first byte in an encoded TimestampWritable
-   * @return true if it has a decimal portion, false otherwise
-   */
-  public static boolean hasDecimal(byte b) {
-    return (b >> 7) != 0;
+  private static boolean hasSecondVInt(byte b) {
+    return WritableUtils.isNegativeVInt(b);
+  }
+
+  private final boolean hasDecimalOrSecondVInt() {
+    return hasDecimalOrSecondVInt(currentBytes[offset]);
+  }
+
+  public final boolean hasDecimal() {
+    return hasDecimalOrSecondVInt() || currentBytes[offset + 4] != -1;
+    // If the first byte of the VInt is -1, the VInt itself is -1, indicating that there is a
+    // second VInt but the nanoseconds field is actually 0.
   }
 
   /**
@@ -528,6 +610,20 @@ public class TimestampWritable implement
   }
 
   /**
+   * Writes <code>value</code> into <code>dest</code> at <code>offset</code> as a seven-byte
+   * serialized long number.
+   */
+  static void sevenByteLongToBytes(long value, byte[] dest, int offset) {
+    dest[offset] = (byte) ((value >> 48) & 0xFF);
+    dest[offset+1] = (byte) ((value >> 40) & 0xFF);
+    dest[offset+2] = (byte) ((value >> 32) & 0xFF);
+    dest[offset+3] = (byte) ((value >> 24) & 0xFF);
+    dest[offset+4] = (byte) ((value >> 16) & 0xFF);
+    dest[offset+5] = (byte) ((value >> 8) & 0xFF);
+    dest[offset+6] = (byte) (value & 0xFF);
+  }
+
+  /**
    *
    * @param bytes
    * @param offset
@@ -540,4 +636,27 @@ public class TimestampWritable implement
         | ((0xFF & bytes[offset+2]) << 8)
         | (0xFF & bytes[offset+3]);
   }
+
+  static long readSevenByteLong(byte[] bytes, int offset) {
+    // We need to shift everything 8 bits left and then shift back to populate the sign field.
+    return (((0xFFL & bytes[offset]) << 56)
+        | ((0xFFL & bytes[offset+1]) << 48)
+        | ((0xFFL & bytes[offset+2]) << 40)
+        | ((0xFFL & bytes[offset+3]) << 32)
+        | ((0xFFL & bytes[offset+4]) << 24)
+        | ((0xFFL & bytes[offset+5]) << 16)
+        | ((0xFFL & bytes[offset+6]) << 8)) >> 8;
+  }
+
+  /**
+   * Rounds the number of milliseconds relative to the epoch down to the nearest whole number of
+   * seconds. 500 would round to 0, -500 would round to -1.
+   */
+  static long millisToSeconds(long millis) {
+    if (millis >= 0) {
+      return millis / 1000;
+    } else {
+      return (millis - 999) / 1000;
+    }
+  }
 }

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java?rev=1508537&r1=1508536&r2=1508537&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java Tue Jul 30 17:27:57 2013
@@ -202,10 +202,7 @@ public final class LazyBinaryUtils {
         break;
       case TIMESTAMP:
         recordInfo.elementOffset = 0;
-        recordInfo.elementSize = 4;
-        if(TimestampWritable.hasDecimal(bytes[offset])) {
-          recordInfo.elementSize += (byte) WritableUtils.decodeVIntSize(bytes[offset+4]);
-        }
+        recordInfo.elementSize = TimestampWritable.getTotalLength(bytes, offset);
         break;
       case DECIMAL:
         // using vint instead of 4 bytes
@@ -285,6 +282,13 @@ public final class LazyBinaryUtils {
     public byte length;
   };
 
+  public static final ThreadLocal<VInt> threadLocalVInt = new ThreadLocal<VInt>() {
+    @Override
+    protected VInt initialValue() {
+      return new VInt();
+    }
+  };
+
   /**
    * Reads a zero-compressed encoded int from a byte array and returns it.
    *
@@ -324,6 +328,28 @@ public final class LazyBinaryUtils {
   }
 
   /**
+   * Read a zero-compressed encoded long from a byte array.
+   *
+   * @param bytes the byte array
+   * @param offset the offset in the byte array where the VLong is stored
+   * @return the long
+   */
+  public static long readVLongFromByteArray(final byte[] bytes, int offset) {
+    byte firstByte = bytes[offset++];
+    int len = WritableUtils.decodeVIntSize(firstByte);
+    if (len == 1) {
+      return firstByte;
+    }
+    long i = 0;
+    for (int idx = 0; idx < len-1; idx++) {
+      byte b = bytes[offset++];
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
+  }
+
+  /**
    * Write a zero-compressed encoded long to a byte array.
    *
    * @param bytes

Added: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java?rev=1508537&view=auto
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java (added)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/io/TestTimestampWritable.java Tue Jul 30 17:27:57 2013
@@ -0,0 +1,454 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TestTimestampWritable extends TestCase {
+
+  private static DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  private static final int HAS_DECIMAL_MASK = 0x80000000;
+
+  private static final long MAX_ADDITIONAL_SECONDS_BITS = 0x418937;
+
+  private static long MIN_FOUR_DIGIT_YEAR_MILLIS = parseToMillis("0001-01-01 00:00:00");
+  private static long MAX_FOUR_DIGIT_YEAR_MILLIS = parseToMillis("9999-01-01 00:00:00");
+
+  private static int BILLION = 1000 * 1000 * 1000;
+
+  private static long getSeconds(Timestamp ts) {
+    // To compute seconds, we first subtract the milliseconds stored in the nanos field of the
+    // Timestamp from the result of getTime().
+    long seconds = (ts.getTime() - ts.getNanos() / 1000000) / 1000;
+
+    // It should also be possible to calculate this based on ts.getTime() only.
+    assertEquals(seconds, TimestampWritable.millisToSeconds(ts.getTime()));
+
+    return seconds;
+  }
+
+  private static long parseToMillis(String s) {
+    try {
+      return DATE_FORMAT.parse(s).getTime();
+    } catch (ParseException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  protected void setUp() {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+  }
+
+  private static String normalizeTimestampStr(String timestampStr) {
+    if (timestampStr.endsWith(".0")) {
+      return timestampStr.substring(0, timestampStr.length() - 2);
+    }
+    return timestampStr;
+  }
+
+  private static void assertTSWEquals(TimestampWritable expected, TimestampWritable actual) {
+    assertEquals(normalizeTimestampStr(expected.toString()),
+                 normalizeTimestampStr(actual.toString()));
+    assertEquals(expected, actual);
+    assertEquals(expected.getTimestamp(), actual.getTimestamp());
+  }
+
+  private static TimestampWritable deserializeFromBytes(byte[] tsBytes) throws IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(tsBytes);
+    DataInputStream dis = new DataInputStream(bais);
+    TimestampWritable deserTSW = new TimestampWritable();
+    deserTSW.readFields(dis);
+    return deserTSW;
+  }
+
+  private static int reverseNanos(int nanos) {
+    if (nanos == 0) {
+      return 0;
+    }
+    if (nanos < 0 || nanos >= 1000 * 1000 * 1000) {
+      throw new IllegalArgumentException("Invalid nanosecond value: " + nanos);
+    }
+
+    int x = nanos;
+    StringBuilder reversed = new StringBuilder();
+    while (x != 0) {
+      reversed.append((char)('0' + x % 10));
+      x /= 10;
+    }
+
+    int result = Integer.parseInt(reversed.toString());
+    while (nanos < 100 * 1000 * 1000) {
+      result *= 10;
+      nanos *= 10;
+    }
+    return result;
+  }
+
+  private static byte[] serializeToBytes(Writable w) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    w.write(dos);
+    return baos.toByteArray();
+  }
+
+  private static List<Byte> toList(byte[] a) {
+    List<Byte> list = new ArrayList<Byte>(a.length);
+    for (byte b : a) {
+      list.add(b);
+    }
+    return list;
+  }
+
+  /**
+   * Pad the given byte array with the given number of bytes in the beginning. The padding bytes
+   * deterministically depend on the passed data.
+   */
+  private static byte[] padBytes(byte[] bytes, int count) {
+    byte[] result = new byte[bytes.length + count];
+    for (int i = 0; i < count; ++i) {
+      // Fill the prefix bytes with deterministic data based on the actual meaningful data.
+      result[i] = (byte) (bytes[i % bytes.length] * 37 + 19);
+    }
+    System.arraycopy(bytes, 0, result, count, bytes.length);
+    return result;
+  }
+
+  private static TimestampWritable serializeDeserializeAndCheckTimestamp(Timestamp ts)
+      throws IOException {
+    TimestampWritable tsw = new TimestampWritable(ts);
+    assertEquals(ts, tsw.getTimestamp());
+
+    byte[] tsBytes = serializeToBytes(tsw);
+    TimestampWritable deserTSW = deserializeFromBytes(tsBytes);
+    assertTSWEquals(tsw, deserTSW);
+    assertEquals(ts, deserTSW.getTimestamp());
+    assertEquals(tsBytes.length, tsw.getTotalLength());
+
+    // Also convert to/from binary-sortable representation.
+    int binarySortableOffset = Math.abs(tsw.hashCode()) % 10;
+    byte[] binarySortableBytes = padBytes(tsw.getBinarySortable(), binarySortableOffset);
+    TimestampWritable fromBinSort = new TimestampWritable();
+    fromBinSort.setBinarySortable(binarySortableBytes, binarySortableOffset);
+    assertTSWEquals(tsw, fromBinSort);
+
+    long timeSeconds = ts.getTime() / 1000;
+    if (0 <= timeSeconds && timeSeconds <= Integer.MAX_VALUE) {
+      assertEquals(new Timestamp(timeSeconds * 1000),
+        fromIntAndVInts((int) timeSeconds, 0).getTimestamp());
+
+      int nanos = reverseNanos(ts.getNanos());
+      assertEquals(ts,
+        fromIntAndVInts((int) timeSeconds | (nanos != 0 ? HAS_DECIMAL_MASK : 0),
+          nanos).getTimestamp());
+    }
+
+    assertEquals(ts.getNanos(), tsw.getNanos());
+    assertEquals(getSeconds(ts), tsw.getSeconds());
+
+    // Test various set methods and copy constructors.
+    {
+      TimestampWritable tsSet1 = new TimestampWritable();
+      // make the offset non-zero to keep things interesting.
+      int offset = Math.abs(ts.hashCode() % 32);
+      byte[] shiftedBytes = padBytes(tsBytes, offset);
+      tsSet1.set(shiftedBytes, offset);
+      assertTSWEquals(tsw, tsSet1);
+
+      TimestampWritable tswShiftedBytes = new TimestampWritable(shiftedBytes, offset);
+      assertTSWEquals(tsw, tswShiftedBytes);
+      assertTSWEquals(tsw, deserializeFromBytes(serializeToBytes(tswShiftedBytes)));
+    }
+
+    {
+      TimestampWritable tsSet2 = new TimestampWritable();
+      tsSet2.set(ts);
+      assertTSWEquals(tsw, tsSet2);
+    }
+
+    {
+      TimestampWritable tsSet3 = new TimestampWritable();
+      tsSet3.set(tsw);
+      assertTSWEquals(tsw, tsSet3);
+    }
+
+    {
+      TimestampWritable tsSet4 = new TimestampWritable();
+      tsSet4.set(deserTSW);
+      assertTSWEquals(tsw, tsSet4);
+    }
+
+    double expectedDbl = getSeconds(ts) + 1e-9d * ts.getNanos();
+    assertTrue(Math.abs(tsw.getDouble() - expectedDbl) < 1e-10d);
+
+    return deserTSW;
+  }
+
+  private static int randomNanos(Random rand, int decimalDigits) {
+    // Only keep the most significant decimalDigits digits.
+    int nanos = rand.nextInt(BILLION);
+    return nanos - nanos % (int) Math.pow(10, 9 - decimalDigits);
+  }
+
+  private static int randomNanos(Random rand) {
+    return randomNanos(rand, rand.nextInt(10));
+  }
+
+  private static void checkTimestampWithAndWithoutNanos(Timestamp ts, int nanos)
+      throws IOException {
+    serializeDeserializeAndCheckTimestamp(ts);
+
+    ts.setNanos(nanos);
+    assertEquals(serializeDeserializeAndCheckTimestamp(ts).getNanos(), nanos);
+  }
+
+  private static TimestampWritable fromIntAndVInts(int i, long... vints) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    dos.writeInt(i);
+    if ((i & HAS_DECIMAL_MASK) != 0) {
+      for (long vi : vints) {
+        WritableUtils.writeVLong(dos, vi);
+      }
+    }
+    byte[] bytes = baos.toByteArray();
+    TimestampWritable tsw = deserializeFromBytes(bytes);
+    assertEquals(toList(bytes), toList(serializeToBytes(tsw)));
+    return tsw;
+  }
+
+  public void testReverseNanos() {
+    assertEquals(0, reverseNanos(0));
+    assertEquals(120000000, reverseNanos(21));
+    assertEquals(32100000, reverseNanos(1230));
+    assertEquals(5, reverseNanos(500000000));
+    assertEquals(987654321, reverseNanos(123456789));
+    assertEquals(12345678, reverseNanos(876543210));
+  }
+
+  /**
+   * Test serializing and deserializing timestamps that can be represented by a number of seconds
+   * from 0 to 2147483647 since the UNIX epoch.
+   */
+  public void testTimestampsWithinPositiveIntRange() throws IOException {
+    Random rand = new Random(294722773L);
+    for (int i = 0; i < 10000; ++i) {
+      long millis = ((long) rand.nextInt(Integer.MAX_VALUE)) * 1000;
+      checkTimestampWithAndWithoutNanos(new Timestamp(millis), randomNanos(rand));
+    }
+  }
+
+  private static long randomMillis(long minMillis, long maxMillis, Random rand) {
+    return minMillis + (long) ((maxMillis - minMillis) * rand.nextDouble());
+  }
+
+  /**
+   * Test timestamps that don't necessarily fit between 1970 and 2038. This depends on HIVE-4525
+   * being fixed.
+   */
+  public void testTimestampsOutsidePositiveIntRange() throws IOException {
+    Random rand = new Random(789149717L);
+    for (int i = 0; i < 10000; ++i) {
+      long millis = randomMillis(MIN_FOUR_DIGIT_YEAR_MILLIS, MAX_FOUR_DIGIT_YEAR_MILLIS, rand);
+      checkTimestampWithAndWithoutNanos(new Timestamp(millis), randomNanos(rand));
+    }
+  }
+
+  public void testTimestampsInFullRange() throws IOException {
+    Random rand = new Random(2904974913L);
+    for (int i = 0; i < 10000; ++i) {
+      checkTimestampWithAndWithoutNanos(new Timestamp(rand.nextLong()), randomNanos(rand));
+    }
+  }
+
+  public void testToFromDouble() {
+    Random rand = new Random(294729777L);
+    for (int nanosPrecision = 0; nanosPrecision <= 4; ++nanosPrecision) {
+      for (int i = 0; i < 10000; ++i) {
+        long millis = randomMillis(MIN_FOUR_DIGIT_YEAR_MILLIS, MAX_FOUR_DIGIT_YEAR_MILLIS, rand);
+        Timestamp ts = new Timestamp(millis);
+        int nanos = randomNanos(rand, nanosPrecision);
+        ts.setNanos(nanos);
+        TimestampWritable tsw = new TimestampWritable(ts);
+        double asDouble = tsw.getDouble();
+        int recoveredNanos =
+          (int) (Math.round((asDouble - Math.floor(asDouble)) * Math.pow(10, nanosPrecision)) *
+            Math.pow(10, 9 - nanosPrecision));
+        assertEquals(String.format("Invalid nanosecond part recovered from %f", asDouble),
+          nanos, recoveredNanos);
+        assertEquals(ts, TimestampWritable.doubleToTimestamp(asDouble));
+        // decimalToTimestamp should be consistent with doubleToTimestamp for this level of
+        // precision.
+        assertEquals(ts, TimestampWritable.decimalToTimestamp(
+            new HiveDecimal(BigDecimal.valueOf(asDouble))));
+      }
+    }
+  }
+
+  private static HiveDecimal timestampToDecimal(Timestamp ts) {
+    BigDecimal d = new BigDecimal(getSeconds(ts));
+    d = d.add(new BigDecimal(ts.getNanos()).divide(new BigDecimal(BILLION)));
+    return new HiveDecimal(d);
+  }
+
+  public void testDecimalToTimestampRandomly() {
+    Random rand = new Random(294729777L);
+    for (int i = 0; i < 10000; ++i) {
+      Timestamp ts = new Timestamp(
+          randomMillis(MIN_FOUR_DIGIT_YEAR_MILLIS, MAX_FOUR_DIGIT_YEAR_MILLIS, rand));
+      ts.setNanos(randomNanos(rand, 9));  // full precision
+      assertEquals(ts, TimestampWritable.decimalToTimestamp(timestampToDecimal(ts)));
+    }
+  }
+
+  public void testDecimalToTimestampCornerCases() {
+    Timestamp ts = new Timestamp(parseToMillis("1969-03-04 05:44:33"));
+    assertEquals(0, ts.getTime() % 1000);
+    for (int nanos : new int[] { 100000, 900000, 999100000, 999900000 }) {
+      ts.setNanos(nanos);
+      HiveDecimal d = timestampToDecimal(ts);
+      assertEquals(ts, TimestampWritable.decimalToTimestamp(d));
+      assertEquals(ts, TimestampWritable.doubleToTimestamp(d.bigDecimalValue().doubleValue()));
+    }
+  }
+
+  public void testSerializationFormatDirectly() throws IOException {
+    assertEquals("1970-01-01 00:00:00", fromIntAndVInts(0).toString());
+    assertEquals("1970-01-01 00:00:01", fromIntAndVInts(1).toString());
+    assertEquals("1970-01-01 00:05:00", fromIntAndVInts(300).toString());
+    assertEquals("1970-01-01 02:00:00", fromIntAndVInts(7200).toString());
+    assertEquals("2000-01-02 03:04:05", fromIntAndVInts(946782245).toString());
+
+    // This won't have a decimal part because the HAS_DECIMAL_MASK bit is not set.
+    assertEquals("2000-01-02 03:04:05", fromIntAndVInts(946782245, 3210).toString());
+
+    assertEquals("2000-01-02 03:04:05.0123",
+      fromIntAndVInts(946782245 | HAS_DECIMAL_MASK, 3210).toString());
+
+    assertEquals("2038-01-19 03:14:07", fromIntAndVInts(Integer.MAX_VALUE).toString());
+    assertEquals("2038-01-19 03:14:07.012345678",
+      fromIntAndVInts(Integer.MAX_VALUE | HAS_DECIMAL_MASK,  // this is really just -1
+        876543210).toString());
+
+    // Timestamps with a second VInt storing additional bits of the seconds field.
+    long seconds = 253392390415L;
+    assertEquals("9999-09-08 07:06:55",
+      fromIntAndVInts((int) (seconds & 0x7fffffff) | (1 << 31), -1L, seconds >> 31).toString());
+    assertEquals("9999-09-08 07:06:55.0123",
+      fromIntAndVInts((int) (seconds & 0x7fffffff) | (1 << 31),
+                      -3210 - 1, seconds >> 31).toString());
+  }
+
+  public void testMaxSize() {
+    // This many bytes are necessary to store the reversed nanoseconds.
+    assertEquals(5, WritableUtils.getVIntSize(999999999));
+    assertEquals(5, WritableUtils.getVIntSize(-2 - 999999999));
+
+    // Bytes necessary to store extra bits of the second timestamp if storing a timestamp
+    // before 1970 or after 2038.
+    assertEquals(3, WritableUtils.getVIntSize(Short.MAX_VALUE));
+    assertEquals(3, WritableUtils.getVIntSize(Short.MIN_VALUE));
+
+    // Test that MAX_ADDITIONAL_SECONDS_BITS is really the maximum value of the
+    // additional bits (beyond 31 bits) of the seconds-since-epoch part of timestamp.
+    assertTrue((((long) MAX_ADDITIONAL_SECONDS_BITS) << 31) * 1000 < Long.MAX_VALUE);
+    assertTrue((((double) MAX_ADDITIONAL_SECONDS_BITS + 1) * (1L << 31)) * 1000 >
+      Long.MAX_VALUE);
+
+    // This is how many bytes we need to store those additonal bits as a VInt.
+    assertEquals(4, WritableUtils.getVIntSize(MAX_ADDITIONAL_SECONDS_BITS));
+
+    // Therefore, the maximum total size of a serialized timestamp is 4 + 5 + 4 = 13.
+  }
+
+  public void testMillisToSeconds() {
+    assertEquals(0, TimestampWritable.millisToSeconds(0));
+    assertEquals(-1, TimestampWritable.millisToSeconds(-1));
+    assertEquals(-1, TimestampWritable.millisToSeconds(-999));
+    assertEquals(-1, TimestampWritable.millisToSeconds(-1000));
+    assertEquals(-2, TimestampWritable.millisToSeconds(-1001));
+    assertEquals(-2, TimestampWritable.millisToSeconds(-1999));
+    assertEquals(-2, TimestampWritable.millisToSeconds(-2000));
+    assertEquals(-3, TimestampWritable.millisToSeconds(-2001));
+    assertEquals(-99, TimestampWritable.millisToSeconds(-99000));
+    assertEquals(-100, TimestampWritable.millisToSeconds(-99001));
+    assertEquals(-100, TimestampWritable.millisToSeconds(-100000));
+    assertEquals(1, TimestampWritable.millisToSeconds(1500));
+    assertEquals(19, TimestampWritable.millisToSeconds(19999));
+    assertEquals(20, TimestampWritable.millisToSeconds(20000));
+  }
+
+  private static int compareEqualLengthByteArrays(byte[] a, byte[] b) {
+    assertEquals(a.length, b.length);
+    for (int i = 0; i < a.length; ++i) {
+      if (a[i] != b[i]) {
+        return (a[i] & 0xff) - (b[i] & 0xff);
+      }
+    }
+    return 0;
+  }
+
+  private static int normalizeComparisonResult(int result) {
+    return result < 0 ? -1 : (result > 0 ? 1 : 0);
+  }
+
+  public void testBinarySortable() {
+    Random rand = new Random(5972977L);
+    List<TimestampWritable> tswList = new ArrayList<TimestampWritable>();
+    for (int i = 0; i < 50; ++i) {
+      Timestamp ts = new Timestamp(rand.nextLong());
+      ts.setNanos(randomNanos(rand));
+      tswList.add(new TimestampWritable(ts));
+    }
+    for (TimestampWritable tsw1 : tswList) {
+      byte[] bs1 = tsw1.getBinarySortable();
+      for (TimestampWritable tsw2 : tswList) {
+        byte[] bs2 = tsw2.getBinarySortable();
+        int binaryComparisonResult =
+          normalizeComparisonResult(compareEqualLengthByteArrays(bs1, bs2));
+        int comparisonResult = normalizeComparisonResult(tsw1.compareTo(tsw2));
+        if (binaryComparisonResult != comparisonResult) {
+          throw new AssertionError("TimestampWritables " + tsw1 + " and " + tsw2 + " compare as " +
+            comparisonResult + " using compareTo but as " + binaryComparisonResult + " using " +
+            "getBinarySortable");
+        }
+      }
+    }
+  }
+
+}