You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/07/15 18:35:58 UTC

[orc] branch master updated: ORC-189. Add timestamp with local timezone.

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new a7255f3  ORC-189. Add timestamp with local timezone.
a7255f3 is described below

commit a7255f3669146e7697215e75720c74ca831b374c
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Thu Dec 13 12:51:05 2018 -0800

    ORC-189. Add timestamp with local timezone.
    
    Fixes #349
    
    Signed-off-by: Owen O'Malley <om...@apache.org>
---
 java/core/src/java/org/apache/orc/OrcUtils.java    |   6 +
 .../src/java/org/apache/orc/TypeDescription.java   |  32 +-
 .../org/apache/orc/TypeDescriptionPrettyPrint.java |   1 +
 .../org/apache/orc/impl/ColumnStatisticsImpl.java  |  39 +-
 .../apache/orc/impl/ConvertTreeReaderFactory.java  | 301 ++++++----
 .../src/java/org/apache/orc/impl/ReaderImpl.java   |   3 +-
 .../java/org/apache/orc/impl/SchemaEvolution.java  |   1 +
 .../org/apache/orc/impl/SerializationUtils.java    |  70 ++-
 .../org/apache/orc/impl/TreeReaderFactory.java     |  75 +--
 .../java/org/apache/orc/impl/mask/MaskFactory.java |   1 +
 .../apache/orc/impl/reader/ReaderEncryption.java   |   8 +-
 .../orc/impl/writer/TimestampTreeWriter.java       |  57 +-
 .../org/apache/orc/impl/writer/TreeWriter.java     |   4 +-
 .../test/org/apache/orc/TestTypeDescription.java   |  10 +-
 .../src/test/org/apache/orc/TestVectorOrcFile.java | 104 +++-
 .../org/apache/orc/impl/TestSchemaEvolution.java   | 649 ++++++++++++++++++++-
 .../apache/orc/mapred/OrcMapredRecordReader.java   |   1 +
 .../apache/orc/mapred/OrcMapredRecordWriter.java   |   1 +
 .../src/java/org/apache/orc/mapred/OrcStruct.java  |   1 +
 .../src/java/org/apache/orc/tools/PrintData.java   |   1 +
 .../org/apache/orc/tools/convert/CsvReader.java    |   1 +
 .../org/apache/orc/tools/convert/JsonReader.java   |   1 +
 .../java/org/apache/orc/tools/json/HiveType.java   |   2 +-
 .../apache/orc/tools/json/JsonSchemaFinder.java    |   2 +
 .../java/org/apache/orc/tools/json/StringType.java |   4 +
 proto/orc_proto.proto                              |   1 +
 site/_docs/types.md                                |  18 +
 27 files changed, 1156 insertions(+), 238 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/OrcUtils.java b/java/core/src/java/org/apache/orc/OrcUtils.java
index 5811c59..85a4b83 100644
--- a/java/core/src/java/org/apache/orc/OrcUtils.java
+++ b/java/core/src/java/org/apache/orc/OrcUtils.java
@@ -157,6 +157,9 @@ public class OrcUtils {
     case TIMESTAMP:
       type.setKind(OrcProto.Type.Kind.TIMESTAMP);
       break;
+    case TIMESTAMP_INSTANT:
+        type.setKind(OrcProto.Type.Kind.TIMESTAMP_INSTANT);
+        break;
     case DATE:
       type.setKind(OrcProto.Type.Kind.DATE);
       break;
@@ -305,6 +308,9 @@ public class OrcUtils {
       case TIMESTAMP:
         result = TypeDescription.createTimestamp();
         break;
+      case TIMESTAMP_INSTANT:
+        result = TypeDescription.createTimestampInstant();
+        break;
       case DATE:
         result = TypeDescription.createDate();
         break;
diff --git a/java/core/src/java/org/apache/orc/TypeDescription.java b/java/core/src/java/org/apache/orc/TypeDescription.java
index 9b49ff0..cace19c 100644
--- a/java/core/src/java/org/apache/orc/TypeDescription.java
+++ b/java/core/src/java/org/apache/orc/TypeDescription.java
@@ -120,7 +120,8 @@ public class TypeDescription
     LIST("array", false),
     MAP("map", false),
     STRUCT("struct", false),
-    UNION("uniontype", false);
+    UNION("uniontype", false),
+    TIMESTAMP_INSTANT("timestamp with local time zone", false);
 
     Category(String name, boolean isPrimitive) {
       this.name = name;
@@ -179,6 +180,10 @@ public class TypeDescription
     return new TypeDescription(Category.TIMESTAMP);
   }
 
+  public static TypeDescription createTimestampInstant() {
+    return new TypeDescription(Category.TIMESTAMP_INSTANT);
+  }
+
   public static TypeDescription createBinary() {
     return new TypeDescription(Category.BINARY);
   }
@@ -211,18 +216,31 @@ public class TypeDescription
   }
 
   static Category parseCategory(StringPosition source) {
-    int start = source.position;
+    StringBuilder word = new StringBuilder();
+    boolean hadSpace = true;
     while (source.position < source.length) {
       char ch = source.value.charAt(source.position);
-      if (!Character.isLetter(ch)) {
+      if (Character.isLetter(ch)) {
+        word.append(Character.toLowerCase(ch));
+        hadSpace = false;
+      } else if (ch == ' ') {
+        if (!hadSpace) {
+          hadSpace = true;
+          word.append(ch);
+        }
+      } else {
         break;
       }
       source.position += 1;
     }
-    if (source.position != start) {
-      String word = source.value.substring(start, source.position).toLowerCase();
+    String catString = word.toString();
+    // if there were trailing spaces, remove them.
+    if (hadSpace) {
+      catString = catString.trim();
+    }
+    if (!catString.isEmpty()) {
       for (Category cat : Category.values()) {
-        if (cat.getName().equals(word)) {
+        if (cat.getName().equals(catString)) {
           return cat;
         }
       }
@@ -349,6 +367,7 @@ public class TypeDescription
       case SHORT:
       case STRING:
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         break;
       case CHAR:
       case VARCHAR:
@@ -650,6 +669,7 @@ public class TypeDescription
       case DATE:
         return new LongColumnVector(maxSize);
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         return new TimestampColumnVector(maxSize);
       case FLOAT:
       case DOUBLE:
diff --git a/java/core/src/java/org/apache/orc/TypeDescriptionPrettyPrint.java b/java/core/src/java/org/apache/orc/TypeDescriptionPrettyPrint.java
index 0714224..8653287 100644
--- a/java/core/src/java/org/apache/orc/TypeDescriptionPrettyPrint.java
+++ b/java/core/src/java/org/apache/orc/TypeDescriptionPrettyPrint.java
@@ -92,6 +92,7 @@ public class TypeDescriptionPrettyPrint {
       case SHORT:
       case STRING:
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         break;
 
       case DECIMAL:
diff --git a/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java b/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java
index 5791737..b73cafe 100644
--- a/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ColumnStatisticsImpl.java
@@ -1427,12 +1427,12 @@ public class ColumnStatisticsImpl implements ColumnStatistics {
       StringBuilder buf = new StringBuilder(super.toString());
       if (getNumberOfValues() != 0) {
         buf.append(" min: ");
-        buf.append(minimum);
+        buf.append(getMinimum());
         buf.append(" max: ");
-        buf.append(maximum);
+        buf.append(getMaximum());
         if (hasSum) {
           buf.append(" sum: ");
-          buf.append(sum);
+          buf.append(getSum());
         }
       }
       return buf.toString();
@@ -1632,7 +1632,7 @@ public class ColumnStatisticsImpl implements ColumnStatistics {
     }
   }
 
-  private static final class TimestampStatisticsImpl extends ColumnStatisticsImpl
+  private static class TimestampStatisticsImpl extends ColumnStatisticsImpl
       implements TimestampColumnStatistics {
     private Long minimum = null;
     private Long maximum = null;
@@ -1791,6 +1791,30 @@ public class ColumnStatisticsImpl implements ColumnStatistics {
     }
   }
 
+  private static final class TimestampInstantStatisticsImpl extends TimestampStatisticsImpl {
+    TimestampInstantStatisticsImpl() {
+    }
+
+    TimestampInstantStatisticsImpl(OrcProto.ColumnStatistics stats) {
+      super(stats);
+    }
+
+    @Override
+    public void updateTimestamp(Timestamp value) {
+      updateTimestamp(value.getTime());
+    }
+
+    @Override
+    public Timestamp getMinimum() {
+      return getMinimumUTC();
+    }
+
+    @Override
+    public Timestamp getMaximum() {
+      return getMaximumUTC();
+    }
+  }
+
   protected long count = 0;
   private boolean hasNull = false;
   private long bytesOnDisk = 0;
@@ -1973,6 +1997,8 @@ public class ColumnStatisticsImpl implements ColumnStatistics {
         return new DateStatisticsImpl();
       case TIMESTAMP:
         return new TimestampStatisticsImpl();
+      case TIMESTAMP_INSTANT:
+        return new TimestampInstantStatisticsImpl();
       case BINARY:
         return new BinaryStatisticsImpl();
       default:
@@ -2002,7 +2028,10 @@ public class ColumnStatisticsImpl implements ColumnStatistics {
     } else if (stats.hasDateStatistics()) {
       return new DateStatisticsImpl(stats);
     } else if (stats.hasTimestampStatistics()) {
-      return new TimestampStatisticsImpl(stats);
+      return schema == null ||
+                 schema.getCategory() == TypeDescription.Category.TIMESTAMP ?
+                 new TimestampStatisticsImpl(stats) :
+                 new TimestampInstantStatisticsImpl(stats);
     } else if(stats.hasBinaryStatistics()) {
       return new BinaryStatisticsImpl(stats);
     } else {
diff --git a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
index 7644203..a6c158b 100644
--- a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
@@ -20,8 +20,14 @@ package org.apache.orc.impl;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
-import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.DateTimeParseException;
 import java.util.EnumMap;
+import java.util.TimeZone;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -221,44 +227,6 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       }
     }
 
-    /**
-     * @param string
-     * @return the Timestamp parsed, or null if there was a parse error.
-     */
-    protected Timestamp parseTimestampFromString(String string) {
-      try {
-        Timestamp value = Timestamp.valueOf(string);
-        return value;
-      } catch (IllegalArgumentException e) {
-        return null;
-      }
-    }
-
-    /**
-     * @param string
-     * @return the Date parsed, or null if there was a parse error.
-     */
-    protected Date parseDateFromString(String string) {
-      try {
-        Date value = Date.valueOf(string);
-        return value;
-      } catch (IllegalArgumentException e) {
-        return null;
-      }
-    }
-
-    protected String stringFromBytesColumnVectorEntry(
-        BytesColumnVector bytesColVector, int elementNum) {
-      String string;
-
-      string = new String(
-          bytesColVector.vector[elementNum],
-          bytesColVector.start[elementNum], bytesColVector.length[elementNum],
-          StandardCharsets.UTF_8);
-
-      return string;
-    }
-
     private static final double MIN_LONG_AS_DOUBLE = -0x1p63;
     /*
      * We cannot store Long.MAX_VALUE as a double without losing precision. Instead, we store
@@ -588,7 +556,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
 
     @Override
     public void setConvertVectorElement(int elementNum) throws IOException {
-      String string = stringFromBytesColumnVectorEntry(bytesColVector, elementNum);
+      String string = SerializationUtils.bytesVectorToString(bytesColVector, elementNum);
       long longValue = parseLongFromString(string);
       if (!getIsParseError()) {
         downCastAnyInteger(longColVector, elementNum, longValue, readerType);
@@ -620,8 +588,9 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     private LongColumnVector longColVector;
 
     AnyIntegerFromTimestampTreeReader(int columnId, TypeDescription readerType,
-        Context context) throws IOException {
-      super(columnId, new TimestampTreeReader(columnId, context));
+                                      Context context,
+                                      boolean instantType) throws IOException {
+        super(columnId, new TimestampTreeReader(columnId, context, instantType));
       this.readerType = readerType;
     }
 
@@ -733,7 +702,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
 
     @Override
     public void setConvertVectorElement(int elementNum) throws IOException {
-      String string = stringFromBytesColumnVectorEntry(bytesColVector, elementNum);
+      String string = SerializationUtils.bytesVectorToString(bytesColVector, elementNum);
       double doubleValue = parseDoubleFromString(string);
       if (!getIsParseError()) {
         doubleColVector.vector[elementNum] = doubleValue;
@@ -763,8 +732,9 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     private TimestampColumnVector timestampColVector;
     private DoubleColumnVector doubleColVector;
 
-    DoubleFromTimestampTreeReader(int columnId, Context context) throws IOException {
-      super(columnId, new TimestampTreeReader(columnId, context));
+    DoubleFromTimestampTreeReader(int columnId, Context context,
+                                  boolean instantType) throws IOException {
+      super(columnId, new TimestampTreeReader(columnId, context, instantType));
     }
 
     @Override
@@ -903,7 +873,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
 
     @Override
     public void setConvertVectorElement(int elementNum) throws IOException {
-      String string = stringFromBytesColumnVectorEntry(bytesColVector, elementNum);
+      String string = SerializationUtils.bytesVectorToString(bytesColVector, elementNum);
       HiveDecimal value = parseDecimalFromString(string);
       if (value != null) {
         // The DecimalColumnVector will enforce precision and scale and set the entry to null when out of bounds.
@@ -938,15 +908,16 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     private TimestampColumnVector timestampColVector;
     private ColumnVector decimalColVector;
 
-    DecimalFromTimestampTreeReader(int columnId, Context context) throws IOException {
-      super(columnId, new TimestampTreeReader(columnId, context));
+    DecimalFromTimestampTreeReader(int columnId, Context context,
+                                   boolean instantType) throws IOException {
+        super(columnId, new TimestampTreeReader(columnId, context, instantType));
     }
 
     @Override
     public void setConvertVectorElement(int elementNum) throws IOException {
-      double doubleValue = TimestampUtils.getDouble(
-          timestampColVector.asScratchTimestamp(elementNum));
-      HiveDecimal value = HiveDecimal.create(Double.toString(doubleValue));
+      long seconds = timestampColVector.time[elementNum] / 1000;
+      long nanos = timestampColVector.nanos[elementNum];
+      HiveDecimal value = HiveDecimal.create(String.format("%d.%09d", seconds, nanos));
       if (value != null) {
         // The DecimalColumnVector will enforce precision and scale and set the entry to null when out of bounds.
         if (decimalColVector instanceof Decimal64ColumnVector) {
@@ -954,9 +925,6 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
         } else {
           ((DecimalColumnVector) decimalColVector).set(elementNum, value);
         }
-      } else {
-        decimalColVector.noNulls = false;
-        decimalColVector.isNull[elementNum] = true;
       }
     }
 
@@ -1158,21 +1126,84 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     }
   }
 
+  /**
+   * The format for converting from/to string/date.
+   * Eg. "2019-07-09"
+   */
+  static final DateTimeFormatter DATE_FORMAT =
+      new DateTimeFormatterBuilder().appendPattern("uuuu-MM-dd")
+          .toFormatter();
+
+  /**
+   * The format for converting from/to string/timestamp.
+   * Eg. "2019-07-09 13:11:00"
+   */
+  static final DateTimeFormatter TIMESTAMP_FORMAT =
+      new DateTimeFormatterBuilder()
+          .append(DATE_FORMAT)
+          .appendPattern(" HH:mm:ss[.S]")
+          .toFormatter();
+
+  /**
+   * The format for converting from/to string/timestamp with local time zone.
+   * Eg. "2019-07-09 13:11:00 America/Los_Angeles"
+   */
+  static final DateTimeFormatter INSTANT_TIMESTAMP_FORMAT =
+      new DateTimeFormatterBuilder()
+          .append(TIMESTAMP_FORMAT)
+          .appendPattern(" VV")
+          .toFormatter();
+
+  /**
+   * Create an Instant from an entry in a TimestampColumnVector.
+   * It assumes that vector.isRepeating and null values have been handled
+   * before we get called.
+   * @param vector the timestamp column vector
+   * @param element the element number
+   * @return a timestamp Instant
+   */
+  static Instant timestampToInstant(TimestampColumnVector vector, int element) {
+    return Instant.ofEpochSecond(vector.time[element] / 1000,
+          vector.nanos[element]);
+  }
+
+  /**
+   * Convert a decimal to an Instant using seconds & nanos.
+   * @param vector the decimal64 column vector
+   * @param element the element number to use
+   * @return the timestamp instant
+   */
+  static Instant decimalToInstant(DecimalColumnVector vector, int element) {
+    // copy the value so that we can mutate it
+    HiveDecimalWritable value = new HiveDecimalWritable(vector.vector[element]);
+    long seconds = value.longValue();
+    value.mutateFractionPortion();
+    value.mutateScaleByPowerOfTen(9);
+    int nanos = (int) value.longValue();
+    return Instant.ofEpochSecond(seconds, nanos);
+  }
+
   public static class StringGroupFromTimestampTreeReader extends ConvertTreeReader {
     private final TypeDescription readerType;
+    private final ZoneId local;
+    private final DateTimeFormatter formatter;
     private TimestampColumnVector timestampColVector;
     private BytesColumnVector bytesColVector;
 
     StringGroupFromTimestampTreeReader(int columnId, TypeDescription readerType,
-        Context context) throws IOException {
-      super(columnId, new TimestampTreeReader(columnId, context));
+                                       Context context,
+                                       boolean instantType) throws IOException {
+      super(columnId, new TimestampTreeReader(columnId, context, instantType));
       this.readerType = readerType;
+      local = context.getUseUTCTimestamp() ? ZoneId.of("UTC")
+                  : ZoneId.systemDefault();
+      formatter = instantType ? INSTANT_TIMESTAMP_FORMAT : TIMESTAMP_FORMAT;
     }
 
     @Override
     public void setConvertVectorElement(int elementNum) throws IOException {
-      String string =
-          timestampColVector.asScratchTimestamp(elementNum).toString();
+      String string = timestampToInstant(timestampColVector, elementNum).atZone(local)
+                          .format(formatter);
       byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
       assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes);
     }
@@ -1319,17 +1350,24 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
   public static class TimestampFromAnyIntegerTreeReader extends ConvertTreeReader {
     private LongColumnVector longColVector;
     private TimestampColumnVector timestampColVector;
+    private final boolean useUtc;
+    private final TimeZone local;
 
     TimestampFromAnyIntegerTreeReader(int columnId, TypeDescription fileType,
-        Context context) throws IOException {
+                                      Context context,
+                                      boolean isInstant) throws IOException {
       super(columnId, createFromInteger(columnId, fileType, context));
+      this.useUtc = isInstant || context.getUseUTCTimestamp();
+      local = TimeZone.getDefault();
     }
 
     @Override
     public void setConvertVectorElement(int elementNum) {
-      long longValue = longColVector.vector[elementNum];
-      // UNDONE: What does the boolean setting need to be?
-      timestampColVector.set(elementNum, new Timestamp(longValue));
+      long millis = longColVector.vector[elementNum] * 1000;
+      timestampColVector.time[elementNum] = useUtc
+          ? millis
+          : SerializationUtils.convertFromUtc(local, millis);
+      timestampColVector.nanos[elementNum] = 0;
     }
 
     @Override
@@ -1351,20 +1389,29 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
   public static class TimestampFromDoubleTreeReader extends ConvertTreeReader {
     private DoubleColumnVector doubleColVector;
     private TimestampColumnVector timestampColVector;
+    private final boolean useUtc;
+    private final TimeZone local;
 
     TimestampFromDoubleTreeReader(int columnId, TypeDescription fileType,
         TypeDescription readerType, Context context) throws IOException {
       super(columnId, fileType.getCategory() == Category.DOUBLE ?
                           new DoubleTreeReader(columnId) :
                           new FloatTreeReader(columnId));
+      useUtc = readerType.getCategory() == Category.TIMESTAMP_INSTANT ||
+                   context.getUseUTCTimestamp();
+      local = TimeZone.getDefault();
     }
 
     @Override
     public void setConvertVectorElement(int elementNum) {
-      double doubleValue = doubleColVector.vector[elementNum];
-      Timestamp timestampValue = TimestampUtils.doubleToTimestamp(doubleValue);
-      // The TimestampColumnVector will set the entry to null when a null timestamp is passed in.
-      timestampColVector.set(elementNum, timestampValue);
+      double seconds = doubleColVector.vector[elementNum];
+      if (!useUtc) {
+        seconds = SerializationUtils.convertFromUtc(local, seconds);
+      }
+      long wholeSec = (long) Math.floor(seconds);
+      timestampColVector.time[elementNum] = wholeSec * 1000;
+      timestampColVector.nanos[elementNum] =
+          1_000_000 * (int) Math.round((seconds - wholeSec) * 1000);
     }
 
     @Override
@@ -1388,22 +1435,31 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     private final int scale;
     private DecimalColumnVector decimalColVector;
     private TimestampColumnVector timestampColVector;
+    private final boolean useUtc;
+    private final TimeZone local;
 
     TimestampFromDecimalTreeReader(int columnId, TypeDescription fileType,
-        Context context) throws IOException {
+                                   Context context,
+                                   boolean isInstant) throws IOException {
       super(columnId, new DecimalTreeReader(columnId, fileType.getPrecision(),
           fileType.getScale(), context));
       this.precision = fileType.getPrecision();
       this.scale = fileType.getScale();
+      useUtc = isInstant || context.getUseUTCTimestamp();
+      local = TimeZone.getDefault();
     }
 
     @Override
     public void setConvertVectorElement(int elementNum) {
-      Timestamp timestampValue =
-            TimestampUtils.decimalToTimestamp(
-                decimalColVector.vector[elementNum].getHiveDecimal());
-      // The TimestampColumnVector will set the entry to null when a null timestamp is passed in.
-      timestampColVector.set(elementNum, timestampValue);
+      Instant t = decimalToInstant(decimalColVector, elementNum);
+      if (!useUtc) {
+        timestampColVector.time[elementNum] =
+            SerializationUtils.convertFromUtc(local, t.getEpochSecond() * 1000);
+        timestampColVector.nanos[elementNum] = t.getNano();
+      } else {
+        timestampColVector.time[elementNum] = t.getEpochSecond() * 1000;
+        timestampColVector.nanos[elementNum] = t.getNano();
+      }
     }
 
     @Override
@@ -1425,20 +1481,30 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
   public static class TimestampFromStringGroupTreeReader extends ConvertTreeReader {
     private BytesColumnVector bytesColVector;
     private TimestampColumnVector timestampColVector;
+    private final DateTimeFormatter formatter;
 
-    TimestampFromStringGroupTreeReader(int columnId, TypeDescription fileType, Context context)
+    TimestampFromStringGroupTreeReader(int columnId, TypeDescription fileType,
+                                       Context context, boolean isInstant)
         throws IOException {
       super(columnId, getStringGroupTreeReader(columnId, fileType, context));
+      if (isInstant) {
+        formatter = INSTANT_TIMESTAMP_FORMAT;
+      } else {
+        formatter = TIMESTAMP_FORMAT.withZone(context.getUseUTCTimestamp() ?
+                                                  ZoneId.of("UTC") :
+                                                  ZoneId.systemDefault());
+      }
     }
 
     @Override
     public void setConvertVectorElement(int elementNum) throws IOException {
-      String stringValue =
-          stringFromBytesColumnVectorEntry(bytesColVector, elementNum);
-      Timestamp timestampValue = parseTimestampFromString(stringValue);
-      if (timestampValue != null) {
-        timestampColVector.set(elementNum, timestampValue);
-      } else {
+      String str = SerializationUtils.bytesVectorToString(bytesColVector,
+          elementNum);
+      try {
+        Instant instant = Instant.from(formatter.parse(str));
+        timestampColVector.time[elementNum] = instant.getEpochSecond() * 1000;
+        timestampColVector.nanos[elementNum] = instant.getNano();
+      } catch (DateTimeParseException exception) {
         timestampColVector.noNulls = false;
         timestampColVector.isNull[elementNum] = true;
       }
@@ -1463,17 +1529,24 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
   public static class TimestampFromDateTreeReader extends ConvertTreeReader {
     private LongColumnVector longColVector;
     private TimestampColumnVector timestampColVector;
+    private final boolean useUtc;
+    private final TimeZone local = TimeZone.getDefault();
 
-    TimestampFromDateTreeReader(int columnId, TypeDescription fileType,
+    TimestampFromDateTreeReader(int columnId, TypeDescription readerType,
         Context context) throws IOException {
       super(columnId, new DateTreeReader(columnId, context));
+      useUtc = readerType.getCategory() == Category.TIMESTAMP_INSTANT ||
+                   context.getUseUTCTimestamp();
     }
 
     @Override
     public void setConvertVectorElement(int elementNum) {
-      long millis =
-          DateWritable.daysToMillis((int) longColVector.vector[elementNum]);
-      timestampColVector.set(elementNum, new Timestamp(millis));
+      long days = longColVector.vector[elementNum];
+      long millis = days * 24 * 60 * 60 * 1000;
+      timestampColVector.time[elementNum] = useUtc ?
+                                                millis :
+                                                SerializationUtils.convertFromUtc(local, millis);
+      timestampColVector.nanos[elementNum] = 0;
     }
 
     @Override
@@ -1504,8 +1577,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void setConvertVectorElement(int elementNum) {
       String stringValue =
-          stringFromBytesColumnVectorEntry(bytesColVector, elementNum);
-      Date dateValue = parseDateFromString(stringValue);
+          SerializationUtils.bytesVectorToString(bytesColVector, elementNum);
+      Date dateValue = SerializationUtils.parseDateFromString(stringValue);
       if (dateValue != null) {
         longColVector.vector[elementNum] = DateWritable.dateToDays(dateValue);
       } else {
@@ -1533,17 +1606,22 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
   public static class DateFromTimestampTreeReader extends ConvertTreeReader {
     private TimestampColumnVector timestampColVector;
     private LongColumnVector longColVector;
+    private final ZoneId local;
 
-    DateFromTimestampTreeReader(int columnId, Context context) throws IOException {
-      super(columnId, new TimestampTreeReader(columnId, context));
+    DateFromTimestampTreeReader(int columnId, Context context,
+                                boolean instantType) throws IOException {
+      super(columnId, new TimestampTreeReader(columnId, context, instantType));
+      boolean useUtc = instantType || context.getUseUTCTimestamp();
+      local = useUtc ? ZoneId.of("UTC") : ZoneId.systemDefault();
     }
 
     @Override
-    public void setConvertVectorElement(int elementNum) {
-      Date dateValue =
-          DateWritable.timeToDate(TimestampUtils.millisToSeconds(
-              timestampColVector.asScratchTimestamp(elementNum).getTime()));
-      longColVector.vector[elementNum] = DateWritable.dateToDays(dateValue);
+    public void setConvertVectorElement(int elementNum) throws IOException {
+      LocalDate day = LocalDate.from(
+          Instant.ofEpochSecond(timestampColVector.time[elementNum] / 1000,
+                                timestampColVector.nanos[elementNum])
+              .atZone(local));
+      longColVector.vector[elementNum] = day.toEpochDay();
     }
 
     @Override
@@ -1598,7 +1676,9 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
         context);
 
     case TIMESTAMP:
-      return new TimestampFromAnyIntegerTreeReader(columnId, fileType, context);
+    case TIMESTAMP_INSTANT:
+      return new TimestampFromAnyIntegerTreeReader(columnId, fileType, context,
+          readerType.getCategory() == Category.TIMESTAMP_INSTANT);
 
     // Not currently supported conversion(s):
     case BINARY:
@@ -1644,6 +1724,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       return new StringGroupFromDoubleTreeReader(columnId, fileType, readerType, context);
 
     case TIMESTAMP:
+    case TIMESTAMP_INSTANT:
       return new TimestampFromDoubleTreeReader(columnId, fileType, readerType, context);
 
     // Not currently supported conversion(s):
@@ -1685,7 +1766,9 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       return new StringGroupFromDecimalTreeReader(columnId, fileType, readerType, context);
 
     case TIMESTAMP:
-      return new TimestampFromDecimalTreeReader(columnId, fileType, context);
+    case TIMESTAMP_INSTANT:
+      return new TimestampFromDecimalTreeReader(columnId, fileType, context,
+          readerType.getCategory() == Category.TIMESTAMP_INSTANT);
 
     case DECIMAL:
       return new DecimalFromDecimalTreeReader(columnId, fileType, readerType, context);
@@ -1740,7 +1823,9 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       return new BinaryTreeReader(columnId, context);
 
     case TIMESTAMP:
-      return new TimestampFromStringGroupTreeReader(columnId, fileType, context);
+    case TIMESTAMP_INSTANT:
+      return new TimestampFromStringGroupTreeReader(columnId, fileType, context,
+          readerType.getCategory() == Category.TIMESTAMP_INSTANT);
 
     case DATE:
       return new DateFromStringGroupTreeReader(columnId, fileType, context);
@@ -1758,9 +1843,10 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
   }
 
   private static TreeReader createTimestampConvertTreeReader(int columnId,
+                                                             TypeDescription fileType,
                                                              TypeDescription readerType,
                                                              Context context) throws IOException {
-
+    boolean isInstant = fileType.getCategory() == Category.TIMESTAMP_INSTANT;
     // CONVERT from TIMESTAMP to schema type.
     switch (readerType.getCategory()) {
 
@@ -1769,26 +1855,28 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     case SHORT:
     case INT:
     case LONG:
-      return new AnyIntegerFromTimestampTreeReader(columnId, readerType, context);
+      return new AnyIntegerFromTimestampTreeReader(columnId, readerType,
+          context, isInstant);
 
     case FLOAT:
     case DOUBLE:
-      return new DoubleFromTimestampTreeReader(columnId, context);
+      return new DoubleFromTimestampTreeReader(columnId, context, isInstant);
 
     case DECIMAL:
-      return new DecimalFromTimestampTreeReader(columnId, context);
+      return new DecimalFromTimestampTreeReader(columnId, context, isInstant);
 
     case STRING:
     case CHAR:
     case VARCHAR:
-      return new StringGroupFromTimestampTreeReader(columnId, readerType, context);
+      return new StringGroupFromTimestampTreeReader(columnId, readerType,
+          context, isInstant);
 
     case TIMESTAMP:
-      throw new IllegalArgumentException("No conversion of type " +
-          readerType.getCategory() + " to self needed");
+    case TIMESTAMP_INSTANT:
+      return new TimestampTreeReader(columnId, context, isInstant);
 
     case DATE:
-      return new DateFromTimestampTreeReader(columnId, context);
+      return new DateFromTimestampTreeReader(columnId, context, isInstant);
 
     // Not currently supported conversion(s):
     case BINARY:
@@ -1816,6 +1904,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       return new StringGroupFromDateTreeReader(columnId, readerType, context);
 
     case TIMESTAMP:
+    case TIMESTAMP_INSTANT:
       return new TimestampFromDateTreeReader(columnId, readerType, context);
 
     case DATE:
@@ -1868,6 +1957,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
     case LONG:
     case DOUBLE:
     case TIMESTAMP:
+    case TIMESTAMP_INSTANT:
     case DECIMAL:
     case STRUCT:
     case LIST:
@@ -2034,7 +2124,8 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       return createStringConvertTreeReader(columnId, fileType, readerType, context);
 
     case TIMESTAMP:
-      return createTimestampConvertTreeReader(columnId, readerType, context);
+    case TIMESTAMP_INSTANT:
+      return createTimestampConvertTreeReader(columnId, fileType, readerType, context);
 
     case DATE:
       return createDateConvertTreeReader(columnId, readerType, context);
@@ -2101,6 +2192,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       }
 
     case TIMESTAMP:
+    case TIMESTAMP_INSTANT:
       switch (readerType.getCategory()) {
       // Not currently supported conversion(s):
       case BINARY:
@@ -2137,6 +2229,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
       case LONG:
       case DOUBLE:
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
       case DECIMAL:
         return false;
       default:
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index d1311b9..fb15aa2 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -352,7 +352,8 @@ public class ReaderImpl implements Reader {
       List<OrcProto.ColumnStatistics> fileStats) {
     ColumnStatistics[] result = new ColumnStatistics[fileStats.size()];
     for(int i=0; i < result.length; ++i) {
-      result[i] = ColumnStatisticsImpl.deserialize(schema, fileStats.get(i));
+      TypeDescription subschema = schema == null ? null : schema.findSubtype(i);
+      result[i] = ColumnStatisticsImpl.deserialize(subschema, fileStats.get(i));
     }
     return result;
   }
diff --git a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
index 1d4cc67..d93c9bd 100644
--- a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
+++ b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
@@ -452,6 +452,7 @@ public class SchemaEvolution {
         case FLOAT:
         case STRING:
         case TIMESTAMP:
+        case TIMESTAMP_INSTANT:
         case BINARY:
         case DATE:
           // these are always a match
diff --git a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
index 1852e5e..06ba711 100644
--- a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,7 @@
 
 package org.apache.orc.impl;
 
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
@@ -30,6 +31,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
 import java.util.TimeZone;
 
 public final class SerializationUtils {
@@ -1320,6 +1323,17 @@ public final class SerializationUtils {
     return (left ^ right) >= 0 || (left ^ (left - right)) >= 0;
   }
 
+  /**
+   * Convert a UTC time to a local timezone
+   * @param local the local timezone
+   * @param time the number of seconds since 1970
+   * @return the converted timestamp
+   */
+  public static double convertFromUtc(TimeZone local, double time) {
+    int offset = local.getOffset((long) (time*1000) - local.getRawOffset());
+    return time - offset / 1000.0;
+  }
+
   public static long convertFromUtc(TimeZone local, long time) {
     int offset = local.getOffset(time - local.getRawOffset());
     return time - offset;
@@ -1377,4 +1391,58 @@ public final class SerializationUtils {
     }
     return base;
   }
+
+  /**
+   * Find the relative offset when moving between timezones at a particular
+   * point in time.
+   *
+   * This is a function of ORC v0 and v1 writing timestamps relative to the
+   * local timezone. Therefore, when we read, we need to convert from the
+   * writer's timezone to the reader's timezone.
+   *
+   * @param writer the timezone we are moving from
+   * @param reader the timezone we are moving to
+   * @param millis the point in time
+   * @return the change in milliseconds
+   */
+  public static long convertBetweenTimezones(TimeZone writer, TimeZone reader,
+                                             long millis) {
+    final long writerOffset = writer.getOffset(millis);
+    final long readerOffset = reader.getOffset(millis);
+    long adjustedMillis = millis + writerOffset - readerOffset;
+    // If the timezone adjustment moves the millis across a DST boundary, we
+    // need to reevaluate the offsets.
+    long adjustedReader = reader.getOffset(adjustedMillis);
+    return writerOffset - adjustedReader;
+  }
+
+  /**
+   * Convert a bytes vector element into a String.
+   * @param vector the vector to use
+   * @param elementNum the element number to stringify
+   * @return a string or null if the value was null
+   */
+  public static String bytesVectorToString(BytesColumnVector vector,
+                                           int elementNum) {
+    if (vector.isRepeating) {
+      elementNum = 0;
+    }
+    return vector.noNulls || !vector.isNull[elementNum] ?
+               new String(vector.vector[elementNum], vector.start[elementNum],
+                   vector.length[elementNum], StandardCharsets.UTF_8) : null;
+  }
+
+  /**
+   * Parse a date from a string.
+   * @param string the date to parse (YYYY-MM-DD)
+   * @return the Date parsed, or null if there was a parse error.
+   */
+  public static Date parseDateFromString(String string) {
+    try {
+      Date value = Date.valueOf(string);
+      return value;
+    } catch (IllegalArgumentException e) {
+      return null;
+    }
+  }
 }
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index dae736a..712702a 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -893,33 +893,36 @@ public class TreeReaderFactory {
     private Map<String, Long> baseTimestampMap;
     protected long base_timestamp;
     private final TimeZone readerTimeZone;
+    private final boolean instantType;
     private TimeZone writerTimeZone;
     private boolean hasSameTZRules;
     private ThreadLocal<DateFormat> threadLocalDateFormat;
 
-    TimestampTreeReader(int columnId, Context context) throws IOException {
-      this(columnId, null, null, null, null, context);
+    TimestampTreeReader(int columnId, Context context,
+                        boolean instantType) throws IOException {
+      this(columnId, null, null, null, null, context, instantType);
     }
 
     protected TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
-        InStream nanosStream, OrcProto.ColumnEncoding encoding, Context context)
-        throws IOException {
+                                  InStream nanosStream,
+                                  OrcProto.ColumnEncoding encoding,
+                                  Context context,
+                                  boolean instantType) throws IOException {
       super(columnId, presentStream, context);
+      this.instantType = instantType;
       this.threadLocalDateFormat = new ThreadLocal<>();
       this.threadLocalDateFormat.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
       this.baseTimestampMap = new HashMap<>();
-      if (context.getUseUTCTimestamp()) {
+      if (instantType || context.getUseUTCTimestamp()) {
         this.readerTimeZone = TimeZone.getTimeZone("UTC");
       } else {
         this.readerTimeZone = TimeZone.getDefault();
       }
       if (context.getWriterTimezone() == null || context.getWriterTimezone().isEmpty()) {
-        this.writerTimeZone = readerTimeZone;
+        this.base_timestamp = getBaseTimestamp(readerTimeZone.getID());
       } else {
-        this.writerTimeZone = TimeZone.getTimeZone(context.getWriterTimezone());
+        this.base_timestamp = getBaseTimestamp(context.getWriterTimezone());
       }
-      this.hasSameTZRules = writerTimeZone.hasSameRules(readerTimeZone);
-      this.base_timestamp = getBaseTimestamp(readerTimeZone.getID());
       if (encoding != null) {
         checkEncoding(encoding);
 
@@ -930,7 +933,6 @@ public class TreeReaderFactory {
         if (nanosStream != null) {
           this.nanos = createIntegerReader(encoding.getKind(), nanosStream, false, context);
         }
-        base_timestamp = getBaseTimestamp(context.getWriterTimezone());
       }
     }
 
@@ -953,7 +955,9 @@ public class TreeReaderFactory {
       nanos = createIntegerReader(kind,
           planner.getStream(new StreamName(columnId,
               OrcProto.Stream.Kind.SECONDARY)), false, context);
-      base_timestamp = getBaseTimestamp(planner.getWriterTimezone());
+      if (!instantType) {
+        base_timestamp = getBaseTimestamp(planner.getWriterTimezone());
+      }
     }
 
     protected long getBaseTimestamp(String timeZoneId) throws IOException {
@@ -962,24 +966,28 @@ public class TreeReaderFactory {
         timeZoneId = readerTimeZone.getID();
       }
 
-      if (!baseTimestampMap.containsKey(timeZoneId)) {
+      if (writerTimeZone == null || !timeZoneId.equals(writerTimeZone.getID())) {
         writerTimeZone = TimeZone.getTimeZone(timeZoneId);
         hasSameTZRules = writerTimeZone.hasSameRules(readerTimeZone);
-        threadLocalDateFormat.get().setTimeZone(writerTimeZone);
-        try {
-          long epoch = threadLocalDateFormat.get()
-            .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
-              TimestampTreeWriter.MILLIS_PER_SECOND;
-          baseTimestampMap.put(timeZoneId, epoch);
-          return epoch;
-        } catch (ParseException e) {
-          throw new IOException("Unable to create base timestamp", e);
-        } finally {
-          threadLocalDateFormat.get().setTimeZone(readerTimeZone);
+        if (!baseTimestampMap.containsKey(timeZoneId)) {
+          threadLocalDateFormat.get().setTimeZone(writerTimeZone);
+          try {
+            long epoch = threadLocalDateFormat.get()
+                             .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
+                             TimestampTreeWriter.MILLIS_PER_SECOND;
+            baseTimestampMap.put(timeZoneId, epoch);
+            return epoch;
+          } catch (ParseException e) {
+            throw new IOException("Unable to create base timestamp", e);
+          } finally {
+            threadLocalDateFormat.get().setTimeZone(readerTimeZone);
+          }
+        } else {
+          return baseTimestampMap.get(timeZoneId);
         }
       }
 
-      return baseTimestampMap.get(timeZoneId);
+      return base_timestamp;
     }
 
     @Override
@@ -1015,19 +1023,10 @@ public class TreeReaderFactory {
           // If reader and writer time zones have different rules, adjust the timezone difference
           // between reader and writer taking day light savings into account.
           if (!hasSameTZRules) {
-            offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis);
-          }
-          long adjustedMillis = millis + offset;
-          // Sometimes the reader timezone might have changed after adding the adjustedMillis.
-          // To account for that change, check for any difference in reader timezone after
-          // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time).
-          if (!hasSameTZRules &&
-              (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) {
-            long newOffset =
-                writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis);
-            adjustedMillis = millis + newOffset;
+            offset = SerializationUtils.convertBetweenTimezones(writerTimeZone,
+                readerTimeZone, millis);
           }
-          result.time[i] = adjustedMillis;
+          result.time[i] = millis + offset;
           result.nanos[i] = newNanos;
           if (result.isRepeating && i != 0 &&
               (result.time[0] != result.time[i] ||
@@ -2351,7 +2350,9 @@ public class TreeReaderFactory {
       case BINARY:
         return new BinaryTreeReader(fileType.getId(), context);
       case TIMESTAMP:
-        return new TimestampTreeReader(fileType.getId(), context);
+        return new TimestampTreeReader(fileType.getId(), context, false);
+      case TIMESTAMP_INSTANT:
+        return new TimestampTreeReader(fileType.getId(), context, true);
       case DATE:
         return new DateTreeReader(fileType.getId(), context);
       case DECIMAL:
diff --git a/java/core/src/java/org/apache/orc/impl/mask/MaskFactory.java b/java/core/src/java/org/apache/orc/impl/mask/MaskFactory.java
index e1be9bd..77e4411 100644
--- a/java/core/src/java/org/apache/orc/impl/mask/MaskFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/mask/MaskFactory.java
@@ -59,6 +59,7 @@ public abstract class MaskFactory {
       case VARCHAR:
         return buildStringMask(schema);
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         return buildTimestampMask(schema);
       case DATE:
         return buildDateMask(schema);
diff --git a/java/core/src/java/org/apache/orc/impl/reader/ReaderEncryption.java b/java/core/src/java/org/apache/orc/impl/reader/ReaderEncryption.java
index fe54c49..c647c10 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/ReaderEncryption.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/ReaderEncryption.java
@@ -40,8 +40,12 @@ public class ReaderEncryption {
   // A value of variants.length means no encryption
   private final ReaderEncryptionVariant[] columnVariants;
 
-  public ReaderEncryption() throws IOException {
-    this(null, null, null, null, null);
+  public ReaderEncryption() {
+    keyProvider = null;
+    keys = new ReaderEncryptionKey[0];
+    masks = new MaskDescriptionImpl[0];
+    variants = new ReaderEncryptionVariant[0];
+    columnVariants = null;
   }
 
   public ReaderEncryption(OrcProto.Footer footer,
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
index 3ba2dbe..00313b5 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
@@ -39,18 +39,19 @@ import java.util.function.Consumer;
 public class TimestampTreeWriter extends TreeWriterBase {
   public static final int MILLIS_PER_SECOND = 1000;
   public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
+  private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
   private final IntegerWriter seconds;
   private final IntegerWriter nanos;
   private final boolean isDirectV2;
-  private boolean useUTCTimestamp;
+  private final boolean alwaysUTC;
   private final TimeZone localTimezone;
-  private final long baseEpochSecsLocalTz;
-  private final long baseEpochSecsUTC;
+  private final long epoch;
 
   public TimestampTreeWriter(TypeDescription schema,
                              WriterEncryptionVariant encryption,
-                             WriterContext writer) throws IOException {
+                             WriterContext writer,
+                             boolean instantType) throws IOException {
     super(schema, encryption, writer);
     this.isDirectV2 = isNewWriteFormat(writer);
     this.seconds = createIntegerWriter(writer.createStream(
@@ -62,22 +63,21 @@ public class TimestampTreeWriter extends TreeWriterBase {
     if (rowIndexPosition != null) {
       recordPosition(rowIndexPosition);
     }
-    this.useUTCTimestamp = writer.getUseUTCTimestamp();
+    this.alwaysUTC = instantType || writer.getUseUTCTimestamp();
     DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-    this.localTimezone = TimeZone.getDefault();
-    dateFormat.setTimeZone(this.localTimezone);
     try {
-      this.baseEpochSecsLocalTz = dateFormat
-          .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
-          TimestampTreeWriter.MILLIS_PER_SECOND;
-    } catch (ParseException e) {
-      throw new IOException("Unable to create base timestamp tree writer", e);
-    }
-    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-    try {
-      this.baseEpochSecsUTC = dateFormat
-          .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
-          TimestampTreeWriter.MILLIS_PER_SECOND;
+      if (this.alwaysUTC) {
+        dateFormat.setTimeZone(UTC);
+        localTimezone = null;
+        epoch = dateFormat.parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
+                       TimestampTreeWriter.MILLIS_PER_SECOND;
+
+      } else {
+        localTimezone = TimeZone.getDefault();
+        dateFormat.setTimeZone(localTimezone);
+        epoch = dateFormat.parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
+                         TimestampTreeWriter.MILLIS_PER_SECOND;
+      }
     } catch (ParseException e) {
       throw new IOException("Unable to create base timestamp tree writer", e);
     }
@@ -86,11 +86,8 @@ public class TimestampTreeWriter extends TreeWriterBase {
   @Override
   OrcProto.ColumnEncoding.Builder getEncoding() {
     OrcProto.ColumnEncoding.Builder result = super.getEncoding();
-    if (isDirectV2) {
-      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
-    } else {
-      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-    }
+    result.setKind(isDirectV2 ? OrcProto.ColumnEncoding.Kind.DIRECT_V2
+                       : OrcProto.ColumnEncoding.Kind.DIRECT);
     return result;
   }
 
@@ -109,7 +106,7 @@ public class TimestampTreeWriter extends TreeWriterBase {
         if (millis < 0 && newNanos > 999_999) {
           millis -= MILLIS_PER_SECOND;
         }
-        long utc = vec.isUTC() ?
+        long utc = vec.isUTC() || alwaysUTC ?
             millis : SerializationUtils.convertToUtc(localTimezone, millis);
         indexStatistics.updateTimestamp(utc);
         if (createBloomFilter) {
@@ -120,7 +117,7 @@ public class TimestampTreeWriter extends TreeWriterBase {
         }
         final long nano = formatNanos(vec.nanos[0]);
         for (int i = 0; i < length; ++i) {
-          seconds.write(secs - (useUTCTimestamp ? baseEpochSecsUTC : baseEpochSecsLocalTz));
+          seconds.write(secs - epoch);
           nanos.write(nano);
         }
       }
@@ -135,13 +132,9 @@ public class TimestampTreeWriter extends TreeWriterBase {
           if (millis < 0 && newNanos > 999_999) {
             millis -= MILLIS_PER_SECOND;
           }
-          long utc = vec.isUTC() ?
+          long utc = vec.isUTC() || alwaysUTC ?
               millis : SerializationUtils.convertToUtc(localTimezone, millis);
-          if (useUTCTimestamp) {
-            seconds.write(secs - baseEpochSecsUTC);
-          } else {
-            seconds.write(secs - baseEpochSecsLocalTz);
-          }
+          seconds.write(secs - epoch);
           nanos.write(formatNanos(newNanos));
           indexStatistics.updateTimestamp(utc);
           if (createBloomFilter) {
@@ -163,7 +156,7 @@ public class TimestampTreeWriter extends TreeWriterBase {
     }
   }
 
-  private static long formatNanos(int nanos) {
+  static long formatNanos(int nanos) {
     if (nanos == 0) {
       return 0;
     } else if (nanos % 100 != 0) {
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
index 680cf8c..db81ae4 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
@@ -164,7 +164,9 @@ public interface TreeWriter {
       case BINARY:
         return new BinaryTreeWriter(schema, encryption, streamFactory);
       case TIMESTAMP:
-        return new TimestampTreeWriter(schema, encryption, streamFactory);
+        return new TimestampTreeWriter(schema, encryption, streamFactory, false);
+      case TIMESTAMP_INSTANT:
+        return new TimestampTreeWriter(schema, encryption, streamFactory, true);
       case DATE:
         return new DateTreeWriter(schema, encryption, streamFactory);
       case DECIMAL:
diff --git a/java/core/src/test/org/apache/orc/TestTypeDescription.java b/java/core/src/test/org/apache/orc/TestTypeDescription.java
index 6a48746..6428993 100644
--- a/java/core/src/test/org/apache/orc/TestTypeDescription.java
+++ b/java/core/src/test/org/apache/orc/TestTypeDescription.java
@@ -110,12 +110,14 @@ public class TestTypeDescription {
            .addField("u", TypeDescription.createUnion()
                .addUnionChild(TypeDescription.createTimestamp())
                .addUnionChild(TypeDescription.createVarchar()
-                   .withMaxLength(100))));
+                   .withMaxLength(100))))
+        .addField("tz", TypeDescription.createTimestampInstant())
+        .addField("ts", TypeDescription.createTimestamp());
     String expectedStr =
         "struct<b1:binary,b2:boolean,b3:tinyint,c:char(10),d1:date," +
             "d2:decimal(20,5),d3:double,fff:float,int:int,l:array<bigint>," +
             "map:map<smallint,string>,str:struct<u:uniontype<timestamp," +
-            "varchar(100)>>>";
+            "varchar(100)>>,tz:timestamp with local time zone,ts:timestamp>";
     assertEquals(expectedStr, expected.toString());
     TypeDescription actual = TypeDescription.fromString(expectedStr);
     assertEquals(expected, actual);
@@ -131,9 +133,9 @@ public class TestTypeDescription {
     assertEquals("MY_FIELD", type.getFieldNames().get(0));
     assertEquals(TypeDescription.Category.INT,
         type.getChildren().get(0).getCategory());
-    type = TypeDescription.fromString("UNIONTYPE<STRING>");
+    type = TypeDescription.fromString("UNIONTYPE<   TIMESTAMP WITH LOCAL   TIME ZONE    >");
     assertEquals(TypeDescription.Category.UNION, type.getCategory());
-    assertEquals(TypeDescription.Category.STRING,
+    assertEquals(TypeDescription.Category.TIMESTAMP_INSTANT,
         type.getChildren().get(0).getCategory());
   }
 
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 2eb2d23..69e1a40 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -82,6 +82,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.TimeZone;
 import java.util.function.IntFunction;
 
 import static junit.framework.TestCase.assertNotNull;
@@ -1472,7 +1473,7 @@ public class TestVectorOrcFile {
 
   private static void setUnion(VectorizedRowBatch batch, int rowId,
                                Timestamp ts, Integer tag, Integer i, String s,
-                               HiveDecimalWritable dec) {
+                               HiveDecimalWritable dec, Timestamp instant) {
     UnionColumnVector union = (UnionColumnVector) batch.cols[1];
     if (ts != null) {
       TimestampColumnVector timestampColVector = (TimestampColumnVector) batch.cols[0];
@@ -1510,6 +1511,12 @@ public class TestVectorOrcFile {
       batch.cols[2].isNull[rowId] = true;
       batch.cols[2].noNulls = false;
     }
+    if (instant == null) {
+      batch.cols[3].isNull[rowId] = true;
+      batch.cols[3].noNulls = false;
+    } else {
+      ((TimestampColumnVector) batch.cols[3]).set(rowId, instant);
+    }
   }
 
   /**
@@ -1676,9 +1683,14 @@ public class TestVectorOrcFile {
      */
   @Test
   public void testUnionAndTimestamp() throws Exception {
+    final TimeZone original = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"));
     TypeDescription schema = TypeDescription.fromString(
-        "struct<time:timestamp,union:uniontype<int,string>," +
-           "decimal:decimal(38,18)>");
+        "struct<time:timestamp," +
+               "union:uniontype<int,string>," +
+               "decimal:decimal(38,18)," +
+               "instant:timestamp with local time zone>"
+    );
     HiveDecimal maxValue = HiveDecimal.create("10000000000000000000");
     Writer writer = OrcFile.createWriter(testFilePath,
                                          OrcFile.writerOptions(conf)
@@ -1691,16 +1703,19 @@ public class TestVectorOrcFile {
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 6;
     setUnion(batch, 0, Timestamp.valueOf("2000-03-12 15:00:00"), 0, 42, null,
-             new HiveDecimalWritable("12345678.6547456"));
+             new HiveDecimalWritable("12345678.6547456"),
+             Timestamp.valueOf("2014-12-12 6:00:00"));
     setUnion(batch, 1, Timestamp.valueOf("2000-03-20 12:00:00.123456789"),
-        1, null, "hello", new HiveDecimalWritable("-5643.234"));
+        1, null, "hello", new HiveDecimalWritable("-5643.234"),
+        Timestamp.valueOf("1996-12-11 11:00:00"));
 
-    setUnion(batch, 2, null, null, null, null, null);
-    setUnion(batch, 3, null, 0, null, null, null);
-    setUnion(batch, 4, null, 1, null, null, null);
+    setUnion(batch, 2, null, null, null, null, null, null);
+    setUnion(batch, 3, null, 0, null, null, null, null);
+    setUnion(batch, 4, null, 1, null, null, null, null);
 
     setUnion(batch, 5, Timestamp.valueOf("1970-01-01 00:00:00"), 0, 200000,
-        null, new HiveDecimalWritable("10000000000000000000"));
+        null, new HiveDecimalWritable("10000000000000000000"),
+        Timestamp.valueOf("2011-07-01 09:00:00"));
     writer.addRowBatch(batch);
 
     batch.reset();
@@ -1711,10 +1726,10 @@ public class TestVectorOrcFile {
           HiveDecimal.create(new BigInteger(64, rand), rand.nextInt(18));
       if ((i & 1) == 0) {
         setUnion(batch, batch.size++, ts, 0, i*i, null,
-            new HiveDecimalWritable(dec));
+            new HiveDecimalWritable(dec), null);
       } else {
         setUnion(batch, batch.size++, ts, 1, null, Integer.toString(i*i),
-            new HiveDecimalWritable(dec));
+            new HiveDecimalWritable(dec), null);
       }
       if (maxValue.compareTo(dec) < 0) {
         maxValue = dec;
@@ -1729,42 +1744,77 @@ public class TestVectorOrcFile {
       batch.cols[c].setRepeating(true);
     }
     ((UnionColumnVector) batch.cols[1]).fields[0].isRepeating = true;
-    setUnion(batch, 0, null, 0, 1732050807, null, null);
+    setUnion(batch, 0, null, 0, 1732050807, null, null, null);
     for(int i=0; i < 5; ++i) {
       writer.addRowBatch(batch);
     }
 
     batch.reset();
     batch.size = 3;
-    setUnion(batch, 0, null, 0, 0, null, null);
-    setUnion(batch, 1, null, 0, 10, null, null);
-    setUnion(batch, 2, null, 0, 138, null, null);
+    setUnion(batch, 0, null, 0, 0, null, null, null);
+    setUnion(batch, 1, null, 0, 10, null, null, null);
+    setUnion(batch, 2, null, 0, 138, null, null, null);
     writer.addRowBatch(batch);
+    // check the stats on the writer side
+    ColumnStatistics[] stats = writer.getStatistics();
+    assertEquals("1996-12-11 11:00:00.0",
+        ((TimestampColumnStatistics) stats[6]).getMinimum().toString());
+    assertEquals("1996-12-11 11:00:00.0",
+        ((TimestampColumnStatistics) stats[6]).getMinimumUTC().toString());
+    assertEquals("2014-12-12 06:00:00.0",
+        ((TimestampColumnStatistics) stats[6]).getMaximum().toString());
+    assertEquals("2014-12-12 06:00:00.0",
+        ((TimestampColumnStatistics) stats[6]).getMaximumUTC().toString());
+
     writer.close();
+
+    TimeZone.setDefault(TimeZone.getTimeZone("America/New_York"));
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf).filesystem(fs));
+    stats = reader.getStatistics();
+
+    // check the timestamp statistics
+    assertEquals("1970-01-01 00:00:00.0",
+        ((TimestampColumnStatistics) stats[1]).getMinimum().toString());
+    assertEquals("1969-12-31 19:00:00.0",
+        ((TimestampColumnStatistics) stats[1]).getMinimumUTC().toString());
+    assertEquals("2037-05-05 12:34:56.203",
+        ((TimestampColumnStatistics) stats[1]).getMaximum().toString());
+    assertEquals("2037-05-05 08:34:56.203",
+        ((TimestampColumnStatistics) stats[1]).getMaximumUTC().toString());
+
+    // check the instant statistics
+    assertEquals("1996-12-11 14:00:00.0",
+        ((TimestampColumnStatistics) stats[6]).getMinimum().toString());
+    assertEquals("1996-12-11 14:00:00.0",
+        ((TimestampColumnStatistics) stats[6]).getMinimumUTC().toString());
+    assertEquals("2014-12-12 09:00:00.0",
+        ((TimestampColumnStatistics) stats[6]).getMaximum().toString());
+    assertEquals("2014-12-12 09:00:00.0",
+        ((TimestampColumnStatistics) stats[6]).getMaximumUTC().toString());
+
 
     schema = writer.getSchema();
-    assertEquals(5, schema.getMaximumId());
-    boolean[] expected = new boolean[] {false, false, false, false, false, false};
+    assertEquals(6, schema.getMaximumId());
+    boolean[] expected = new boolean[] {false, false, false, false, false, false, false};
     boolean[] included = OrcUtils.includeColumns("", schema);
     assertEquals(true, Arrays.equals(expected, included));
 
-    expected = new boolean[] {false, true, false, false, false, true};
+    expected = new boolean[] {false, true, false, false, false, true, false};
     included = OrcUtils.includeColumns("time,decimal", schema);
     assertEquals(true, Arrays.equals(expected, included));
 
-    expected = new boolean[] {false, false, true, true, true, false};
+    expected = new boolean[] {false, false, true, true, true, false, false};
     included = OrcUtils.includeColumns("union", schema);
     assertEquals(true, Arrays.equals(expected, included));
 
     Assert.assertEquals(false, reader.getMetadataKeys().iterator().hasNext());
     Assert.assertEquals(5077, reader.getNumberOfRows());
-    DecimalColumnStatistics stats =
+    DecimalColumnStatistics decStats =
         (DecimalColumnStatistics) reader.getStatistics()[5];
-    assertEquals(71, stats.getNumberOfValues());
-    assertEquals(HiveDecimal.create("-5643.234"), stats.getMinimum());
-    assertEquals(maxValue, stats.getMaximum());
+    assertEquals(71, decStats.getNumberOfValues());
+    assertEquals(HiveDecimal.create("-5643.234"), decStats.getMinimum());
+    assertEquals(maxValue, decStats.getMaximum());
     // TODO: fix this
 //    assertEquals(null,stats.getSum());
     int stripeCount = 0;
@@ -1798,18 +1848,22 @@ public class TestVectorOrcFile {
     LongColumnVector longs = (LongColumnVector) union.fields[0];
     BytesColumnVector strs = (BytesColumnVector) union.fields[1];
     DecimalColumnVector decs = (DecimalColumnVector) batch.cols[2];
+    TimestampColumnVector instant = (TimestampColumnVector) batch.cols[3];
 
-    assertEquals("struct<time:timestamp,union:uniontype<int,string>,decimal:decimal(38,18)>",
+    assertEquals("struct<time:timestamp,union:uniontype<int,string>,decimal:decimal(38,18)," +
+                     "instant:timestamp with local time zone>",
         schema.toString());
     assertEquals("2000-03-12 15:00:00.0", ts.asScratchTimestamp(0).toString());
     assertEquals(0, union.tags[0]);
     assertEquals(42, longs.vector[0]);
     assertEquals("12345678.6547456", decs.vector[0].toString());
+    assertEquals("2014-12-12 09:00:00.0", instant.asScratchTimestamp(0).toString());
 
     assertEquals("2000-03-20 12:00:00.123456789", ts.asScratchTimestamp(1).toString());
     assertEquals(1, union.tags[1]);
     assertEquals("hello", strs.toString(1));
     assertEquals("-5643.234", decs.vector[1].toString());
+    assertEquals("1996-12-11 14:00:00.0", instant.asScratchTimestamp(1).toString());
 
     assertEquals(false, ts.noNulls);
     assertEquals(false, union.noNulls);
@@ -1838,6 +1892,7 @@ public class TestVectorOrcFile {
     assertEquals(200000, longs.vector[5]);
     assertEquals(false, decs.isNull[5]);
     assertEquals("10000000000000000000", decs.vector[5].toString());
+    assertEquals("2011-07-01 12:00:00.0", instant.asScratchTimestamp(5).toString());
 
     rand = new Random(42);
     for(int i=1970; i < 2038; ++i) {
@@ -1896,6 +1951,7 @@ public class TestVectorOrcFile {
     assertEquals("hello", strs.toString(0));
     assertEquals(new HiveDecimalWritable(HiveDecimal.create("-5643.234")), decs.vector[0]);
     rows.close();
+    TimeZone.setDefault(original);
   }
 
   /**
diff --git a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
index a638e89..e07425d 100644
--- a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
+++ b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
@@ -17,15 +17,23 @@
  */
 package org.apache.orc.impl;
 
-import static junit.framework.TestCase.assertSame;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoField;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.TimeZone;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -68,7 +76,7 @@ public class TestSchemaEvolution {
     conf = new Configuration();
     options = new Reader.Options(conf);
     fs = FileSystem.getLocal(conf);
-    testFilePath = new Path(workDir, "TestOrcFile." +
+    testFilePath = new Path(workDir, "TestSchemaEvolution." +
         testCaseName.getMethodName() + ".orc");
     fs.delete(testFilePath, false);
   }
@@ -1569,21 +1577,6 @@ public class TestSchemaEvolution {
     assertFalse(fileInclude[3]);
   }
 
-  static void createStream(Map<StreamName, InStream> streams,
-                           int id,
-                           OrcProto.Stream.Kind kind,
-                           int... values) throws IOException {
-    StreamName name = new StreamName(id, kind);
-    BufferChunkList ranges = new BufferChunkList();
-    byte[] buffer = new byte[values.length];
-    for(int i=0; i < values.length; ++i) {
-      buffer[i] = (byte) values[i];
-    }
-    ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), 0));
-    streams.put(name, InStream.create(name.toString(), ranges.get(), 0,
-        values.length));
-  }
-
   static ByteBuffer createBuffer(int... values) {
     ByteBuffer result = ByteBuffer.allocate(values.length);
     for(int v: values) {
@@ -1657,4 +1650,620 @@ public class TestSchemaEvolution {
     assertEquals(3, evo.getFileType(3).getId());
     assertEquals(null, evo.getFileType(4));
   }
+
+  // These are helper methods that pull some of the common code into one
+  // place.
+
+  static String decimalTimestampToString(long centiseconds, ZoneId zone) {
+    long sec = centiseconds / 100;
+    int nano = (int) ((centiseconds % 100) * 10_000_000);
+    return timestampToString(sec, nano, zone);
+  }
+
+  static String doubleTimestampToString(double seconds, ZoneId zone) {
+    long sec = (long) seconds;
+    int nano = 1_000_000 * (int) Math.round((seconds - sec) * 1000);
+    return timestampToString(sec, nano, zone);
+  }
+
+  static String timestampToString(long seconds, int nanos, ZoneId zone) {
+    return timestampToString(Instant.ofEpochSecond(seconds, nanos), zone);
+  }
+
+  static String timestampToString(Instant time, ZoneId zone) {
+    return time.atZone(zone)
+               .format(ConvertTreeReaderFactory.INSTANT_TIMESTAMP_FORMAT);
+  }
+
+  static void writeTimestampDataFile(Path path,
+                                     Configuration conf,
+                                     ZoneId writerZone,
+                                     DateTimeFormatter formatter,
+                                     String[] values) throws IOException {
+    TimeZone oldDefault = TimeZone.getDefault();
+    try {
+      TimeZone.setDefault(TimeZone.getTimeZone(writerZone));
+      TypeDescription fileSchema =
+          TypeDescription.fromString("struct<t1:timestamp," +
+                                         "t2:timestamp with local time zone>");
+      Writer writer = OrcFile.createWriter(path,
+          OrcFile.writerOptions(conf).setSchema(fileSchema).stripeSize(10000));
+      VectorizedRowBatch batch = fileSchema.createRowBatch(1024);
+      TimestampColumnVector t1 = (TimestampColumnVector) batch.cols[0];
+      TimestampColumnVector t2 = (TimestampColumnVector) batch.cols[1];
+      for (int r = 0; r < values.length; ++r) {
+        int row = batch.size++;
+        Instant t = Instant.from(formatter.parse(values[r]));
+        t1.time[row] = t.getEpochSecond() * 1000;
+        t1.nanos[row] = t.getNano();
+        t2.time[row] = t1.time[row];
+        t2.nanos[row] = t1.nanos[row];
+        if (batch.size == 1024) {
+          writer.addRowBatch(batch);
+          batch.reset();
+        }
+      }
+      if (batch.size != 0) {
+        writer.addRowBatch(batch);
+      }
+      writer.close();
+    } finally {
+      TimeZone.setDefault(oldDefault);
+    }
+  }
+
+  /**
+   * Tests the various conversions from timestamp and timestamp with local
+   * timezone.
+   *
+   * It writes an ORC file with timestamp and timestamp with local time zone
+   * and then reads it back in with each of the relevant types.
+   *
+   * This test test both with and without the useUtc flag.
+   *
+   * It uses Australia/Sydney and America/New_York because they both have
+   * DST and they move in opposite directions on different days. Thus, we
+   * end up with four sets of offsets.
+   *
+   * Picking the 27th of the month puts it around when DST changes.
+   */
+  @Test
+  public void testEvolutionFromTimestamp() throws Exception {
+    // The number of rows in the file that we test with.
+    final int VALUES = 1024;
+    // The different timezones that we'll use for this test.
+    final ZoneId UTC = ZoneId.of("UTC");
+    final ZoneId WRITER_ZONE = ZoneId.of("America/New_York");
+    final ZoneId READER_ZONE = ZoneId.of("Australia/Sydney");
+
+    final TimeZone oldDefault = TimeZone.getDefault();
+
+    // generate the timestamps to use
+    String[] timeStrings = new String[VALUES];
+    for(int r=0; r < timeStrings.length; ++r) {
+      timeStrings[r] = String.format("%04d-%02d-27 23:45:56.7",
+          2000 + (r / 12), (r % 12) + 1);
+    }
+
+    final DateTimeFormatter WRITER_FORMAT =
+        ConvertTreeReaderFactory.TIMESTAMP_FORMAT.withZone(WRITER_ZONE);
+
+    writeTimestampDataFile(testFilePath, conf, WRITER_ZONE, WRITER_FORMAT, timeStrings);
+
+    try {
+      TimeZone.setDefault(TimeZone.getTimeZone(READER_ZONE));
+      OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
+      Reader.Options rowOptions = new Reader.Options();
+
+      try (Reader reader = OrcFile.createReader(testFilePath, options)) {
+
+        // test conversion to long
+        TypeDescription readerSchema = TypeDescription.fromString("struct<t1:bigint,t2:bigint>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES);
+          LongColumnVector t1 = (LongColumnVector) batch.cols[0];
+          LongColumnVector t2 = (LongColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, (timeStrings[r] + " " +
+                                          READER_ZONE.getId()).replace(".7 ", ".0 "),
+                timestampToString(t1.vector[current], 0, READER_ZONE));
+            assertEquals("row " + r, (timeStrings[r] + " " +
+                                          WRITER_ZONE.getId()).replace(".7 ", ".0 "),
+                timestampToString(t2.vector[current], 0, WRITER_ZONE));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion to decimal
+        readerSchema = TypeDescription.fromString("struct<t1:decimal(14,2),t2:decimal(14,2)>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatchV2();
+          Decimal64ColumnVector t1 = (Decimal64ColumnVector) batch.cols[0];
+          Decimal64ColumnVector t2 = (Decimal64ColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, timeStrings[r] + " " + READER_ZONE.getId(),
+                decimalTimestampToString(t1.vector[current], READER_ZONE));
+            assertEquals("row " + r, timeStrings[r] + " " + WRITER_ZONE.getId(),
+                decimalTimestampToString(t2.vector[current], WRITER_ZONE));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion to double
+        readerSchema = TypeDescription.fromString("struct<t1:double,t2:double>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatchV2();
+          DoubleColumnVector t1 = (DoubleColumnVector) batch.cols[0];
+          DoubleColumnVector t2 = (DoubleColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, timeStrings[r] + " " + READER_ZONE.getId(),
+                doubleTimestampToString(t1.vector[current], READER_ZONE));
+            assertEquals("row " + r, timeStrings[r] + " " + WRITER_ZONE.getId(),
+                doubleTimestampToString(t2.vector[current], WRITER_ZONE));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion to date
+        readerSchema = TypeDescription.fromString("struct<t1:date,t2:date>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatchV2();
+          LongColumnVector t1 = (LongColumnVector) batch.cols[0];
+          LongColumnVector t2 = (LongColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            String date = timeStrings[r].substring(0, 10);
+            assertEquals("row " + r, date,
+                ConvertTreeReaderFactory.DATE_FORMAT.format(
+                    LocalDate.ofEpochDay(t1.vector[current])));
+            // NYC -> Sydney moves forward a day for instant
+            assertEquals("row " + r, date.replace("-27", "-28"),
+                ConvertTreeReaderFactory.DATE_FORMAT.format(
+                    LocalDate.ofEpochDay(t2.vector[current])));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion to string
+        readerSchema = TypeDescription.fromString("struct<t1:string,t2:string>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES);
+          BytesColumnVector bytesT1 = (BytesColumnVector) batch.cols[0];
+          BytesColumnVector bytesT2 = (BytesColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, timeStrings[r], bytesT1.toString(current));
+            Instant t = Instant.from(WRITER_FORMAT.parse(timeStrings[r]));
+            assertEquals("row " + r,
+                timestampToString(Instant.from(WRITER_FORMAT.parse(timeStrings[r])),
+                    READER_ZONE),
+                bytesT2.toString(current));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion between timestamps
+        readerSchema = TypeDescription.fromString("struct<t1:timestamp with local time zone,t2:timestamp>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES);
+          TimestampColumnVector timeT1 = (TimestampColumnVector) batch.cols[0];
+          TimestampColumnVector timeT2 = (TimestampColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, timeStrings[r] + " " + READER_ZONE.getId(),
+                timestampToString(timeT1.time[current] / 1000, timeT1.nanos[current], READER_ZONE));
+            assertEquals("row " + r,
+                timestampToString(Instant.from(WRITER_FORMAT.parse(timeStrings[r])), READER_ZONE),
+                timestampToString(timeT2.time[current] / 1000, timeT2.nanos[current], READER_ZONE));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+      }
+
+      // Now test using UTC as local
+      options.useUTCTimestamp(true);
+      try (Reader reader = OrcFile.createReader(testFilePath, options)) {
+        DateTimeFormatter UTC_FORMAT =
+            ConvertTreeReaderFactory.TIMESTAMP_FORMAT.withZone(UTC);
+
+        // test conversion to int in UTC
+        TypeDescription readerSchema =
+            TypeDescription.fromString("struct<t1:bigint,t2:bigint>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES);
+          LongColumnVector t1 = (LongColumnVector) batch.cols[0];
+          LongColumnVector t2 = (LongColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, (timeStrings[r] + " " +
+                                          UTC.getId()).replace(".7 ", ".0 "),
+                timestampToString(t1.vector[current], 0, UTC));
+            assertEquals("row " + r, (timeStrings[r] + " " +
+                                          WRITER_ZONE.getId()).replace(".7 ", ".0 "),
+                timestampToString(t2.vector[current], 0, WRITER_ZONE));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion to decimal
+        readerSchema = TypeDescription.fromString("struct<t1:decimal(14,2),t2:decimal(14,2)>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatchV2();
+          Decimal64ColumnVector t1 = (Decimal64ColumnVector) batch.cols[0];
+          Decimal64ColumnVector t2 = (Decimal64ColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, timeStrings[r] + " " + UTC.getId(),
+                decimalTimestampToString(t1.vector[current], UTC));
+            assertEquals("row " + r, timeStrings[r] + " " + WRITER_ZONE.getId(),
+                decimalTimestampToString(t2.vector[current], WRITER_ZONE));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion to double
+        readerSchema = TypeDescription.fromString("struct<t1:double,t2:double>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatchV2();
+          DoubleColumnVector t1 = (DoubleColumnVector) batch.cols[0];
+          DoubleColumnVector t2 = (DoubleColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, timeStrings[r] + " " + UTC.getId(),
+                doubleTimestampToString(t1.vector[current], UTC));
+            assertEquals("row " + r, timeStrings[r] + " " + WRITER_ZONE.getId(),
+                doubleTimestampToString(t2.vector[current], WRITER_ZONE));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion to date
+        readerSchema = TypeDescription.fromString("struct<t1:date,t2:date>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatchV2();
+          LongColumnVector t1 = (LongColumnVector) batch.cols[0];
+          LongColumnVector t2 = (LongColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            String date = timeStrings[r].substring(0, 10);
+            assertEquals("row " + r, date,
+                ConvertTreeReaderFactory.DATE_FORMAT.format(
+                    LocalDate.ofEpochDay(t1.vector[current])));
+            // NYC -> UTC still moves forward a day
+            assertEquals("row " + r, date.replace("-27", "-28"),
+                ConvertTreeReaderFactory.DATE_FORMAT.format(
+                    LocalDate.ofEpochDay(t2.vector[current])));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion to string in UTC
+        readerSchema = TypeDescription.fromString("struct<t1:string,t2:string>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES);
+          BytesColumnVector bytesT1 = (BytesColumnVector) batch.cols[0];
+          BytesColumnVector bytesT2 = (BytesColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, timeStrings[r], bytesT1.toString(current));
+            assertEquals("row " + r,
+                timestampToString(Instant.from(WRITER_FORMAT.parse(timeStrings[r])),
+                    UTC),
+                bytesT2.toString(current));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+
+        // test conversion between timestamps in UTC
+        readerSchema = TypeDescription.fromString("struct<t1:timestamp with local time zone,t2:timestamp>");
+        try (RecordReader rows = reader.rows(rowOptions.schema(readerSchema))) {
+          VectorizedRowBatch batch = readerSchema.createRowBatch(VALUES);
+          TimestampColumnVector timeT1 = (TimestampColumnVector) batch.cols[0];
+          TimestampColumnVector timeT2 = (TimestampColumnVector) batch.cols[1];
+          int current = 0;
+          for (int r = 0; r < VALUES; ++r) {
+            if (current == batch.size) {
+              assertEquals("row " + r, true, rows.nextBatch(batch));
+              current = 0;
+            }
+            assertEquals("row " + r, timeStrings[r] + " UTC",
+                timestampToString(timeT1.time[current] / 1000, timeT1.nanos[current], UTC));
+            assertEquals("row " + r,
+                timestampToString(Instant.from(WRITER_FORMAT.parse(timeStrings[r])), UTC),
+                timestampToString(timeT2.time[current] / 1000, timeT2.nanos[current], UTC));
+            current += 1;
+          }
+          assertEquals(false, rows.nextBatch(batch));
+        }
+      }
+    } finally {
+      TimeZone.setDefault(oldDefault);
+    }
+  }
+
+  static void writeEvolutionToTimestamp(Path path,
+                                        Configuration conf,
+                                        ZoneId writerZone,
+                                        String[] values) throws IOException {
+    TypeDescription fileSchema =
+        TypeDescription.fromString("struct<l1:bigint,l2:bigint," +
+                                       "d1:decimal(14,2),d2:decimal(14,2)," +
+                                       "dbl1:double,dbl2:double," +
+                                       "dt1:date,dt2:date," +
+                                       "s1:string,s2:string>");
+    ZoneId UTC = ZoneId.of("UTC");
+    DateTimeFormatter WRITER_FORMAT = ConvertTreeReaderFactory.TIMESTAMP_FORMAT
+                                          .withZone(writerZone);
+    DateTimeFormatter UTC_FORMAT = ConvertTreeReaderFactory.TIMESTAMP_FORMAT
+                                          .withZone(UTC);
+    DateTimeFormatter UTC_DATE = ConvertTreeReaderFactory.DATE_FORMAT
+                                     .withZone(UTC);
+    Writer writer = OrcFile.createWriter(path,
+        OrcFile.writerOptions(conf).setSchema(fileSchema).stripeSize(10000));
+    VectorizedRowBatch batch = fileSchema.createRowBatchV2();
+    int batchSize = batch.getMaxSize();
+    LongColumnVector l1 = (LongColumnVector) batch.cols[0];
+    LongColumnVector l2 = (LongColumnVector) batch.cols[1];
+    Decimal64ColumnVector d1 = (Decimal64ColumnVector) batch.cols[2];
+    Decimal64ColumnVector d2 = (Decimal64ColumnVector) batch.cols[3];
+    DoubleColumnVector dbl1 = (DoubleColumnVector) batch.cols[4];
+    DoubleColumnVector dbl2 = (DoubleColumnVector) batch.cols[5];
+    LongColumnVector dt1 = (LongColumnVector) batch.cols[6];
+    LongColumnVector dt2 = (LongColumnVector) batch.cols[7];
+    BytesColumnVector s1 = (BytesColumnVector) batch.cols[8];
+    BytesColumnVector s2 = (BytesColumnVector) batch.cols[9];
+    for (int r = 0; r < values.length; ++r) {
+      int row = batch.size++;
+      Instant utcTime = Instant.from(UTC_FORMAT.parse(values[r]));
+      Instant writerTime = Instant.from(WRITER_FORMAT.parse(values[r]));
+      l1.vector[row] =  utcTime.getEpochSecond();
+      l2.vector[row] =  writerTime.getEpochSecond();
+      // balance out the 2 digits of scale
+      d1.vector[row] = utcTime.toEpochMilli() / 10;
+      d2.vector[row] = writerTime.toEpochMilli() / 10;
+      // convert to double
+      dbl1.vector[row] = utcTime.toEpochMilli() / 1000.0;
+      dbl2.vector[row] = writerTime.toEpochMilli() / 1000.0;
+      // convert to date
+      dt1.vector[row] = UTC_DATE.parse(values[r].substring(0, 10))
+                            .getLong(ChronoField.EPOCH_DAY);
+      dt2.vector[row] = dt1.vector[row];
+      // set the strings
+      s1.setVal(row, values[r].getBytes(StandardCharsets.UTF_8));
+      String withZone = values[r] + " " + writerZone.getId();
+      s2.setVal(row, withZone.getBytes(StandardCharsets.UTF_8));
+
+      if (batch.size == batchSize) {
+        writer.addRowBatch(batch);
+        batch.reset();
+      }
+    }
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+  }
+
+  /**
+   * Tests the various conversions to timestamp.
+   *
+   * It writes an ORC file with two longs, two decimals, and two strings and
+   * then reads it back with the types converted to timestamp and timestamp
+   * with local time zone.
+   *
+   * This test is run both with and without setting the useUtc flag.
+   *
+   * It uses Australia/Sydney and America/New_York because they both have
+   * DST and they move in opposite directions on different days. Thus, we
+   * end up with four sets of offsets.
+   */
+  @Test
+  public void testEvolutionToTimestamp() throws Exception {
+    // The number of rows in the file that we test with.
+    final int VALUES = 1024;
+    // The different timezones that we'll use for this test.
+    final ZoneId WRITER_ZONE = ZoneId.of("America/New_York");
+    final ZoneId READER_ZONE = ZoneId.of("Australia/Sydney");
+
+    final TimeZone oldDefault = TimeZone.getDefault();
+    final ZoneId UTC = ZoneId.of("UTC");
+
+    // generate the timestamps to use
+    String[] timeStrings = new String[VALUES];
+    for(int r=0; r < timeStrings.length; ++r) {
+      timeStrings[r] = String.format("%04d-%02d-27 12:34:56.1",
+          1960 + (r / 12), (r % 12) + 1);
+    }
+
+    writeEvolutionToTimestamp(testFilePath, conf, WRITER_ZONE, timeStrings);
+
+    try {
+      TimeZone.setDefault(TimeZone.getTimeZone(READER_ZONE));
+
+      // test timestamp, timestamp with local time zone to long
+      TypeDescription readerSchema = TypeDescription.fromString(
+          "struct<l1:timestamp," +
+              "l2:timestamp with local time zone," +
+              "d1:timestamp," +
+              "d2:timestamp with local time zone," +
+              "dbl1:timestamp," +
+              "dbl2:timestamp with local time zone," +
+              "dt1:timestamp," +
+              "dt2:timestamp with local time zone," +
+              "s1:timestamp," +
+              "s2:timestamp with local time zone>");
+      VectorizedRowBatch batch = readerSchema.createRowBatchV2();
+      TimestampColumnVector l1 = (TimestampColumnVector) batch.cols[0];
+      TimestampColumnVector l2 = (TimestampColumnVector) batch.cols[1];
+      TimestampColumnVector d1 = (TimestampColumnVector) batch.cols[2];
+      TimestampColumnVector d2 = (TimestampColumnVector) batch.cols[3];
+      TimestampColumnVector dbl1 = (TimestampColumnVector) batch.cols[4];
+      TimestampColumnVector dbl2 = (TimestampColumnVector) batch.cols[5];
+      TimestampColumnVector dt1 = (TimestampColumnVector) batch.cols[6];
+      TimestampColumnVector dt2 = (TimestampColumnVector) batch.cols[7];
+      TimestampColumnVector s1 = (TimestampColumnVector) batch.cols[8];
+      TimestampColumnVector s2 = (TimestampColumnVector) batch.cols[9];
+      OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
+      Reader.Options rowOptions = new Reader.Options().schema(readerSchema);
+
+      try (Reader reader = OrcFile.createReader(testFilePath, options);
+           RecordReader rows = reader.rows(rowOptions)) {
+        int current = 0;
+        for (int r = 0; r < VALUES; ++r) {
+          if (current == batch.size) {
+            assertEquals("row " + r, true, rows.nextBatch(batch));
+            current = 0;
+          }
+
+          String expected1 = timeStrings[r] + " " + READER_ZONE.getId();
+          String expected2 = timeStrings[r] + " " + WRITER_ZONE.getId();
+          String midnight = timeStrings[r].substring(0, 10) + " 00:00:00.0";
+          String expectedDate1 = midnight + " " + READER_ZONE.getId();
+          String expectedDate2 = midnight + " " + UTC.getId();
+
+          assertEquals("row " + r, expected1.replace(".1 ", ".0 "),
+              timestampToString(l1.time[current] / 1000, l1.nanos[current], READER_ZONE));
+
+          assertEquals("row " + r, expected2.replace(".1 ", ".0 "),
+              timestampToString(l2.time[current] / 1000, l2.nanos[current], WRITER_ZONE));
+
+          assertEquals("row " + r, expected1,
+              timestampToString(d1.time[current] / 1000, d1.nanos[current], READER_ZONE));
+
+          assertEquals("row " + r, expected2,
+              timestampToString(d2.time[current] / 1000, d2.nanos[current], WRITER_ZONE));
+
+          assertEquals("row " + r, expected1,
+              timestampToString(dbl1.time[current] / 1000, dbl1.nanos[current], READER_ZONE));
+
+          assertEquals("row " + r, expected2,
+              timestampToString(dbl2.time[current] / 1000, dbl2.nanos[current], WRITER_ZONE));
+
+          assertEquals("row " + r, expectedDate1,
+              timestampToString(dt1.time[current] / 1000, dt1.nanos[current], READER_ZONE));
+
+          assertEquals("row " + r, expectedDate2,
+              timestampToString(dt2.time[current] / 1000, dt2.nanos[current], UTC));
+
+          assertEquals("row " + r, expected1,
+              timestampToString(s1.time[current] / 1000, s1.nanos[current], READER_ZONE));
+
+          assertEquals("row " + r, expected2,
+              timestampToString(s2.time[current] / 1000, s2.nanos[current], WRITER_ZONE));
+          current += 1;
+        }
+        assertEquals(false, rows.nextBatch(batch));
+      }
+
+      // try the tests with useUtc set on
+      options.useUTCTimestamp(true);
+      try (Reader reader = OrcFile.createReader(testFilePath, options);
+           RecordReader rows = reader.rows(rowOptions)) {
+        int current = 0;
+        for (int r = 0; r < VALUES; ++r) {
+          if (current == batch.size) {
+            assertEquals("row " + r, true, rows.nextBatch(batch));
+            current = 0;
+          }
+
+          String expected1 = timeStrings[r] + " " + UTC.getId();
+          String expected2 = timeStrings[r] + " " + WRITER_ZONE.getId();
+          String midnight = timeStrings[r].substring(0, 10) + " 00:00:00.0";
+          String expectedDate = midnight + " " + UTC.getId();
+
+          assertEquals("row " + r, expected1.replace(".1 ", ".0 "),
+              timestampToString(l1.time[current] / 1000, l1.nanos[current], UTC));
+
+          assertEquals("row " + r, expected2.replace(".1 ", ".0 "),
+              timestampToString(l2.time[current] / 1000, l2.nanos[current], WRITER_ZONE));
+
+          assertEquals("row " + r, expected1,
+              timestampToString(d1.time[current] / 1000, d1.nanos[current], UTC));
+
+          assertEquals("row " + r, expected2,
+              timestampToString(d2.time[current] / 1000, d2.nanos[current], WRITER_ZONE));
+
+          assertEquals("row " + r, expected1,
+              timestampToString(dbl1.time[current] / 1000, dbl1.nanos[current], UTC));
+
+          assertEquals("row " + r, expected2,
+              timestampToString(dbl2.time[current] / 1000, dbl2.nanos[current], WRITER_ZONE));
+
+          assertEquals("row " + r, expectedDate,
+              timestampToString(dt1.time[current] / 1000, dt1.nanos[current], UTC));
+
+          assertEquals("row " + r, expectedDate,
+              timestampToString(dt2.time[current] / 1000, dt2.nanos[current], UTC));
+
+          assertEquals("row " + r, expected1,
+              timestampToString(s1.time[current] / 1000, s1.nanos[current], UTC));
+
+          assertEquals("row " + r, expected2,
+              timestampToString(s2.time[current] / 1000, s2.nanos[current], WRITER_ZONE));
+          current += 1;
+        }
+        assertEquals(false, rows.nextBatch(batch));
+      }
+    } finally {
+      TimeZone.setDefault(oldDefault);
+    }
+  }
 }
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
index 4c3c0d3..ea49788 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
@@ -543,6 +543,7 @@ public class OrcMapredRecordReader<V extends WritableComparable>
       case DATE:
         return nextDate(vector, row, previous);
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         return nextTimestamp(vector, row, previous);
       case STRUCT:
         return nextStruct(vector, row, schema, previous);
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java
index 59f89f7..eaf1ce2 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java
@@ -221,6 +221,7 @@ public class OrcMapredRecordWriter<V extends Writable>
           setLongValue(vector, row, ((DateWritable) value).getDays());
           break;
         case TIMESTAMP:
+        case TIMESTAMP_INSTANT:
           ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value);
           break;
         case DECIMAL:
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
index 51e0d60..d24d3a7 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
@@ -189,6 +189,7 @@ public final class OrcStruct implements WritableComparable<OrcStruct> {
       case DATE:
         return new DateWritable();
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         return new OrcTimestamp();
       case DECIMAL:
         return new HiveDecimalWritable();
diff --git a/java/tools/src/java/org/apache/orc/tools/PrintData.java b/java/tools/src/java/org/apache/orc/tools/PrintData.java
index 5d74a21..419d13c 100644
--- a/java/tools/src/java/org/apache/orc/tools/PrintData.java
+++ b/java/tools/src/java/org/apache/orc/tools/PrintData.java
@@ -151,6 +151,7 @@ public class PrintData {
               (int) ((LongColumnVector) vector).vector[row]).toString());
           break;
         case TIMESTAMP:
+        case TIMESTAMP_INSTANT:
           writer.value(((TimestampColumnVector) vector)
               .asScratchTimestamp(row).toString());
           break;
diff --git a/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java b/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java
index ce32336..62c458d 100644
--- a/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java
+++ b/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java
@@ -315,6 +315,7 @@ public class CsvReader implements RecordReader {
       case VARCHAR:
         return new BytesConverter(startOffset);
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         return new TimestampConverter(startOffset);
       case STRUCT:
         return new StructConverter(startOffset, schema);
diff --git a/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java b/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
index c020d11..1c058fa 100644
--- a/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
+++ b/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
@@ -276,6 +276,7 @@ public class JsonReader implements RecordReader {
       case DECIMAL:
         return new DecimalColumnConverter();
       case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
         return new TimestampColumnConverter();
       case BINARY:
         return new BinaryColumnConverter();
diff --git a/java/tools/src/java/org/apache/orc/tools/json/HiveType.java b/java/tools/src/java/org/apache/orc/tools/json/HiveType.java
index 6222aca..58c4a3e 100644
--- a/java/tools/src/java/org/apache/orc/tools/json/HiveType.java
+++ b/java/tools/src/java/org/apache/orc/tools/json/HiveType.java
@@ -31,7 +31,7 @@ abstract class HiveType {
     NULL(0),
     BOOLEAN(1),
     BYTE(1), SHORT(2), INT(3), LONG(4), DECIMAL(5), FLOAT(6), DOUBLE(7),
-    BINARY(1), DATE(1), TIMESTAMP(1), STRING(2),
+    BINARY(1), DATE(1), TIMESTAMP(1), TIMESTAMP_INSTANT(1), STRING(2),
     STRUCT(1, false),
     LIST(1, false),
     UNION(8, false);
diff --git a/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java b/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
index 8b53ee1..dac75d7 100644
--- a/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
+++ b/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
@@ -286,6 +286,8 @@ public class JsonSchemaFinder {
         return new StringType(HiveType.Kind.STRING);
       case TIMESTAMP:
         return new StringType(HiveType.Kind.TIMESTAMP);
+      case TIMESTAMP_INSTANT:
+        return new StringType(HiveType.Kind.TIMESTAMP_INSTANT);
       case DATE:
         return new StringType(HiveType.Kind.DATE);
       case BINARY:
diff --git a/java/tools/src/java/org/apache/orc/tools/json/StringType.java b/java/tools/src/java/org/apache/orc/tools/json/StringType.java
index 32cb73d..3d9a97c 100644
--- a/java/tools/src/java/org/apache/orc/tools/json/StringType.java
+++ b/java/tools/src/java/org/apache/orc/tools/json/StringType.java
@@ -38,6 +38,8 @@ class StringType extends HiveType {
         return "string";
       case TIMESTAMP:
         return "timestamp";
+      case TIMESTAMP_INSTANT:
+        return "timestamp with local time zone";
       case DATE:
         return "date";
       default:
@@ -67,6 +69,8 @@ class StringType extends HiveType {
         return TypeDescription.createString();
       case TIMESTAMP:
         return TypeDescription.createTimestamp();
+      case TIMESTAMP_INSTANT:
+        return TypeDescription.createTimestampInstant();
       case DATE:
         return TypeDescription.createDate();
       default:
diff --git a/proto/orc_proto.proto b/proto/orc_proto.proto
index a2f4c24..337854b 100644
--- a/proto/orc_proto.proto
+++ b/proto/orc_proto.proto
@@ -212,6 +212,7 @@ message Type {
     DATE = 15;
     VARCHAR = 16;
     CHAR = 17;
+    TIMESTAMP_INSTANT = 18;
   }
   optional Kind kind = 1;
   repeated uint32 subtypes = 2 [packed=true];
diff --git a/site/_docs/types.md b/site/_docs/types.md
index a14adf4..dbe843b 100644
--- a/site/_docs/types.md
+++ b/site/_docs/types.md
@@ -29,6 +29,7 @@ ORC provides a rich set of scalar and compound types:
   * binary
 * Date/time
   * timestamp
+  * timestamp with local time zone
   * date
 * Compound types
   * struct
@@ -62,3 +63,20 @@ create table Foobar (
 
 ![ORC column structure](/img/TreeWriters.png)
 
+# Timestamps
+
+ORC includes two different forms of timestamps from the SQL world:
+
+* **Timestamp** is a date and time without a time zone, which does not change based on the time zone of the reader.
+* **Timestamp with local time zone** is a fixed instant in time, which does change based on the time zone of the reader.
+
+Unless your application uses UTC consistently, **timestamp with
+local time zone** is strongly preferred over **timestamp** for most
+use cases. When users say an event is at 10:00, it is always in
+reference to a certain timezone and means a point in time, rather than
+10:00 in an arbitrary time zone.
+
+| Type        | Value in America/Los_Angeles | Value in America/New_York |
+| ----------- | ---------------------------- | ------------------------- |
+| **timestamp** | 2014-12-12 6:00:00           | 2014-12-12 6:00:00        |
+| **timestamp with local time zone** | 2014-12-12 9:00:00 | 2014-12-12 6:00:00 |
\ No newline at end of file