You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/05/01 21:44:42 UTC

orc git commit: ORC-49. Create RLE-based encoding for short decimals in ORCv2.

Repository: orc
Updated Branches:
  refs/heads/master 9eda0e4d5 -> 45b9c5082


ORC-49. Create RLE-based encoding for short decimals in ORCv2.

Fixes #257
Closes #147

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/45b9c508
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/45b9c508
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/45b9c508

Branch: refs/heads/master
Commit: 45b9c508206864fe3147979653dcdf2baafbad29
Parents: 9eda0e4
Author: Owen O'Malley <om...@apache.org>
Authored: Thu Apr 26 22:13:26 2018 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Tue May 1 14:42:25 2018 -0700

----------------------------------------------------------------------
 java/core/src/java/org/apache/orc/OrcFile.java  |  16 +-
 .../java/org/apache/orc/impl/OrcCodecPool.java  |   7 +
 .../src/java/org/apache/orc/impl/OutStream.java |   2 +-
 .../java/org/apache/orc/impl/ReaderImpl.java    |   3 +-
 .../org/apache/orc/impl/RecordReaderImpl.java   |  11 +-
 .../orc/impl/RunLengthIntegerReaderV2.java      |   6 +-
 .../org/apache/orc/impl/TreeReaderFactory.java  | 125 ++++
 .../java/org/apache/orc/impl/WriterImpl.java    |   9 +-
 .../org/apache/orc/impl/WriterInternal.java     |  36 ++
 .../orc/impl/writer/Decimal64TreeWriter.java    | 158 +++++
 .../org/apache/orc/impl/writer/TreeWriter.java  |  11 +-
 .../apache/orc/impl/writer/WriterImplV2.java    | 617 +++++++++++++++++++
 .../test/org/apache/orc/TestVectorOrcFile.java  | 184 ++++--
 site/specification/ORCv2.md                     |  17 +-
 14 files changed, 1124 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index c9262e9..ac0beff 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -36,6 +36,8 @@ import org.apache.orc.impl.MemoryManagerImpl;
 import org.apache.orc.impl.OrcTail;
 import org.apache.orc.impl.ReaderImpl;
 import org.apache.orc.impl.WriterImpl;
+import org.apache.orc.impl.WriterInternal;
+import org.apache.orc.impl.writer.WriterImplV2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -810,8 +812,16 @@ public class OrcFile {
                                     ) throws IOException {
     FileSystem fs = opts.getFileSystem() == null ?
         path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
-
-    return new WriterImpl(fs, path, opts);
+    switch (opts.getVersion()) {
+      case V_0_11:
+      case V_0_12:
+        return new WriterImpl(fs, path, opts);
+      case UNSTABLE_PRE_2_0:
+        return new WriterImplV2(fs, path, opts);
+      default:
+        throw new IllegalArgumentException("Unknown version " +
+            opts.getVersion());
+    }
   }
 
   /**
@@ -964,7 +974,7 @@ public class OrcFile {
           mergeMetadata(userMetadata, reader);
           if (bufferSize < reader.getCompressionSize()) {
             bufferSize = reader.getCompressionSize();
-            ((WriterImpl) output).increaseCompressionSize(bufferSize);
+            ((WriterInternal) output).increaseCompressionSize(bufferSize);
           }
         }
         List<OrcProto.StripeStatistics> statList =

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
index 24ee618..1c4b66f 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
@@ -100,6 +100,13 @@ public final class OrcCodecPool {
     }
   }
 
+  /**
+   * Clear the codec pool. Mostly used for testing.
+   */
+  public static void clear() {
+    POOL.clear();
+  }
+
   private OrcCodecPool() {
     // prevent instantiation
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
index a6009ab..d6302d5 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -108,7 +108,7 @@ public class OutStream extends PositionedOutputStream {
    * @param bufferSize The ORC compression buffer size being checked.
    * @throws IllegalArgumentException If bufferSize value exceeds threshold.
    */
-  static void assertBufferSizeValid(int bufferSize) throws IllegalArgumentException {
+  public static void assertBufferSizeValid(int bufferSize) throws IllegalArgumentException {
     if (bufferSize >= (1 << 23)) {
       throw new IllegalArgumentException("Illegal value of ORC compression buffer size: " + bufferSize);
     }

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 0daa0ed..e8d6be9 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -224,7 +224,8 @@ public class ReaderImpl implements Reader {
     return deserializeStats(fileStats);
   }
 
-  static ColumnStatistics[] deserializeStats(List<OrcProto.ColumnStatistics> fileStats){
+  public static ColumnStatistics[] deserializeStats(
+      List<OrcProto.ColumnStatistics> fileStats) {
     ColumnStatistics[] result = new ColumnStatistics[fileStats.size()];
     for(int i=0; i < result.length; ++i) {
       result[i] = ColumnStatisticsImpl.deserialize(fileStats.get(i));

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 83eb039..53cc761 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -255,10 +255,13 @@ public class RecordReaderImpl implements RecordReader {
       skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
     }
 
-    TreeReaderFactory.ReaderContext readerContext = new TreeReaderFactory.ReaderContext()
-      .setSchemaEvolution(evolution)
-      .skipCorrupt(skipCorrupt);
-    reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(), readerContext);
+    TreeReaderFactory.ReaderContext readerContext =
+        new TreeReaderFactory.ReaderContext()
+          .setSchemaEvolution(evolution)
+          .skipCorrupt(skipCorrupt)
+          .fileFormat(fileReader.getFileVersion());
+    reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
+        readerContext);
 
     this.fileIncluded = evolution.getFileIncluded();
     indexes = new OrcProto.RowIndex[types.size()];

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
index 610d9b5..1a94aad 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
@@ -363,9 +363,13 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
   public void nextVector(ColumnVector previous,
                          long[] data,
                          int previousLen) throws IOException {
+    // if all nulls, just return
+    if (previous.isRepeating && !previous.noNulls && previous.isNull[0]) {
+      return;
+    }
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
-      if (!previous.isNull[i]) {
+      if (previous.noNulls || !previous.isNull[i]) {
         data[i] = next();
       } else {
         // The default value of null for int type in vectorized

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 652067d..ccae522 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.OrcProto;
 import org.apache.orc.impl.writer.TimestampTreeWriter;
@@ -57,12 +58,15 @@ public class TreeReaderFactory {
     boolean isSkipCorrupt();
 
     String getWriterTimezone();
+
+    OrcFile.Version getFileFormat();
   }
 
   public static class ReaderContext implements Context {
     private SchemaEvolution evolution;
     private boolean skipCorrupt = false;
     private String writerTimezone;
+    private OrcFile.Version fileFormat;
 
     public ReaderContext setSchemaEvolution(SchemaEvolution evolution) {
       this.evolution = evolution;
@@ -79,6 +83,11 @@ public class TreeReaderFactory {
       return this;
     }
 
+    public ReaderContext fileFormat(OrcFile.Version version) {
+      this.fileFormat = version;
+      return this;
+    }
+
     @Override
     public SchemaEvolution getSchemaEvolution() {
       return evolution;
@@ -93,6 +102,11 @@ public class TreeReaderFactory {
     public String getWriterTimezone() {
       return writerTimezone;
     }
+
+    @Override
+    public OrcFile.Version getFileFormat() {
+      return fileFormat;
+    }
   }
 
   public abstract static class TreeReader {
@@ -1259,6 +1273,107 @@ public class TreeReaderFactory {
     }
   }
 
+  public static class Decimal64TreeReader extends TreeReader {
+    protected final int precision;
+    protected final int scale;
+    protected final boolean skipCorrupt;
+    protected RunLengthIntegerReaderV2 valueReader;
+
+    Decimal64TreeReader(int columnId,
+                      int precision,
+                      int scale,
+                      Context context) throws IOException {
+      this(columnId, null, null, null, precision, scale, context);
+    }
+
+    protected Decimal64TreeReader(int columnId,
+                                InStream present,
+                                InStream valueStream,
+                                OrcProto.ColumnEncoding encoding,
+                                int precision,
+                                int scale,
+                                Context context) throws IOException {
+      super(columnId, present, context);
+      this.precision = precision;
+      this.scale = scale;
+      valueReader = new RunLengthIntegerReaderV2(valueStream, true,
+          context.isSkipCorrupt());
+      skipCorrupt = context.isSkipCorrupt();
+    }
+
+    @Override
+    void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT)) {
+        throw new IOException("Unknown encoding " + encoding + " in column " +
+            columnId);
+      }
+    }
+
+    @Override
+    void startStripe(Map<StreamName, InStream> streams,
+                     OrcProto.StripeFooter stripeFooter
+    ) throws IOException {
+      super.startStripe(streams, stripeFooter);
+      InStream stream = streams.get(new StreamName(columnId,
+          OrcProto.Stream.Kind.DATA));
+      valueReader = new RunLengthIntegerReaderV2(stream, true, skipCorrupt);
+    }
+
+    @Override
+    public void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      super.seek(index);
+      valueReader.seek(index);
+    }
+
+    private void nextVector(DecimalColumnVector result,
+                            final int batchSize) throws IOException {
+      if (result.noNulls) {
+        for (int r=0; r < batchSize; ++r) {
+          result.vector[r].setFromLongAndScale(valueReader.next(), scale);
+        }
+      } else if (!result.isRepeating || !result.isNull[0]) {
+        for (int r=0; r < batchSize; ++r) {
+          if (result.noNulls || !result.isNull[r]) {
+            result.vector[r].setFromLongAndScale(valueReader.next(), scale);
+          }
+        }
+      }
+      result.precision = (short) precision;
+      result.scale = (short) scale;
+    }
+
+    private void nextVector(Decimal64ColumnVector result,
+                            final int batchSize) throws IOException {
+      valueReader.nextVector(result, result.vector, batchSize);
+      result.precision = (short) precision;
+      result.scale = (short) scale;
+    }
+
+    @Override
+    public void nextVector(ColumnVector result,
+                           boolean[] isNull,
+                           final int batchSize) throws IOException {
+      // Read present/isNull stream
+      super.nextVector(result, isNull, batchSize);
+      if (result instanceof Decimal64ColumnVector) {
+        nextVector((Decimal64ColumnVector) result, batchSize);
+      } else {
+        nextVector((DecimalColumnVector) result, batchSize);
+      }
+    }
+
+    @Override
+    void skipRows(long items) throws IOException {
+      items = countNonNulls(items);
+      valueReader.skip(items);
+    }
+  }
+
   /**
    * A tree reader that will read string columns. At the start of the
    * stripe, it creates an internal reader based on whether a direct or
@@ -1352,6 +1467,8 @@ public class TreeReaderFactory {
         LongColumnVector scratchlcv,
         BytesColumnVector result, final int batchSize) throws IOException {
       // Read lengths
+      scratchlcv.isRepeating = result.isRepeating;
+      scratchlcv.noNulls = result.noNulls;
       scratchlcv.isNull = result.isNull;  // Notice we are replacing the isNull vector here...
       lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize);
       int totalLength = 0;
@@ -1645,6 +1762,8 @@ public class TreeReaderFactory {
         }
 
         // Read string offsets
+        scratchlcv.isRepeating = result.isRepeating;
+        scratchlcv.noNulls = result.noNulls;
         scratchlcv.isNull = result.isNull;
         scratchlcv.ensureSize((int) batchSize, false);
         reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
@@ -2199,6 +2318,7 @@ public class TreeReaderFactory {
   public static TreeReader createTreeReader(TypeDescription readerType,
                                             Context context
                                             ) throws IOException {
+    OrcFile.Version version = context.getFileFormat();
     final SchemaEvolution evolution = context.getSchemaEvolution();
     TypeDescription fileType = evolution.getFileType(readerType);
     if (fileType == null || !evolution.includeReaderColumn(readerType.getId())){
@@ -2241,6 +2361,11 @@ public class TreeReaderFactory {
       case DATE:
         return new DateTreeReader(fileType.getId(), context);
       case DECIMAL:
+        if (version == OrcFile.Version.UNSTABLE_PRE_2_0 &&
+            fileType.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION){
+          return new Decimal64TreeReader(fileType.getId(), fileType.getPrecision(),
+              fileType.getScale(), context);
+        }
         return new DecimalTreeReader(fileType.getId(), fileType.getPrecision(),
             fileType.getScale(), context);
       case STRUCT:

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 0ddd00a..c3656d4 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -72,7 +72,7 @@ import com.google.protobuf.ByteString;
  * to be confined to a single thread as well.
  *
  */
-public class WriterImpl implements Writer, MemoryManager.Callback {
+public class WriterImpl implements WriterInternal, MemoryManager.Callback {
 
   private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
 
@@ -193,12 +193,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     return estBufferSize > bs ? bs : estBufferSize;
   }
 
-  /**
-   * Increase the buffer size for this writer.
-   * This function is internal only and should only be called by the
-   * ORC file merger.
-   * @param newSize the new buffer size.
-   */
+  @Override
   public void increaseCompressionSize(int newSize) {
     if (newSize > bufferSize) {
       bufferSize = newSize;

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/WriterInternal.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterInternal.java b/java/core/src/java/org/apache/orc/impl/WriterInternal.java
new file mode 100644
index 0000000..29f5cf5
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/WriterInternal.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.Writer;
+
+/**
+ * The ORC internal API to the writer.
+ */
+public interface WriterInternal extends Writer {
+
+  /**
+   * Increase the buffer size for this writer.
+   * This function is internal only and should only be called by the
+   * ORC file merger.
+   * @param newSize the new buffer size.
+   */
+  void increaseCompressionSize(int newSize);
+
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
new file mode 100644
index 0000000..b79e3b2
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.RunLengthIntegerWriterV2;
+
+import java.io.IOException;
+
+/**
+ * Writer for short decimals in ORCv2.
+ */
+public class Decimal64TreeWriter extends TreeWriterBase {
+  private final RunLengthIntegerWriterV2 valueWriter;
+  private final int scale;
+
+  public Decimal64TreeWriter(int columnId,
+                             TypeDescription schema,
+                             WriterContext writer,
+                             boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    OutStream stream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+    // Use RLEv2 until we have the new RLEv3.
+    valueWriter = new RunLengthIntegerWriterV2(stream, true, true);
+    scale = schema.getScale();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  private void writeBatch(DecimalColumnVector vector, int offset,
+                         int length) throws IOException {
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        HiveDecimalWritable value = vector.vector[0];
+        long lg = value.serialize64(scale);
+        indexStatistics.updateDecimal(value);
+        if (createBloomFilter) {
+          bloomFilterUtf8.addLong(lg);
+        }
+        for (int i = 0; i < length; ++i) {
+          valueWriter.write(lg);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vector.noNulls || !vector.isNull[i + offset]) {
+          HiveDecimalWritable value = vector.vector[i + offset];
+          long lg = value.serialize64(scale);
+          valueWriter.write(lg);
+          indexStatistics.updateDecimal(value);
+          if (createBloomFilter) {
+            bloomFilterUtf8.addLong(lg);
+          }
+        }
+      }
+    }
+  }
+
+  private void writeBatch(Decimal64ColumnVector vector, int offset,
+                          int length) throws IOException {
+    assert(scale == vector.scale);
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        HiveDecimalWritable value = vector.getScratchWritable();
+        long lg = vector.vector[0];
+        value.setFromLongAndScale(lg, vector.scale);
+        indexStatistics.updateDecimal(value);
+        if (createBloomFilter) {
+          bloomFilterUtf8.addLong(lg);
+        }
+        for (int i = 0; i < length; ++i) {
+          valueWriter.write(lg);
+        }
+      }
+    } else {
+      HiveDecimalWritable value = vector.getScratchWritable();
+      for (int i = 0; i < length; ++i) {
+        if (vector.noNulls || !vector.isNull[i + offset]) {
+          long lg = vector.vector[i + offset];
+          valueWriter.write(lg);
+          value.setFromLongAndScale(lg, vector.scale);
+          indexStatistics.updateDecimal(value);
+          if (createBloomFilter) {
+            bloomFilterUtf8.addLong(lg);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    if (vector instanceof Decimal64ColumnVector) {
+      writeBatch((Decimal64ColumnVector) vector, offset, length);
+    } else {
+      writeBatch((DecimalColumnVector) vector, offset, length);
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    valueWriter.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + valueWriter.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return fileStatistics.getNumberOfValues() * JavaDataModel.get().primitive2();
+  }
+
+  @Override
+  public void flushStreams() throws IOException {
+    super.flushStreams();
+    valueWriter.flush();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
index b1a6bec..bfa403e 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
@@ -20,6 +20,7 @@ package org.apache.orc.impl.writer;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 import org.apache.orc.TypeDescription;
 
@@ -45,7 +46,7 @@ public interface TreeWriter {
   long getRawDataSize();
 
   /**
-   * Write a VectorizedRowBath to the file. This is called by the WriterImpl
+   * Write a VectorizedRowBath to the file. This is called by the WriterImplV2
    * at the top level.
    * @param batch the list of all of the columns
    * @param offset the first row from the batch to write
@@ -101,10 +102,11 @@ public interface TreeWriter {
    */
   void writeFileStatistics(OrcProto.Footer.Builder footer);
 
-  public class Factory {
+  class Factory {
     public static TreeWriter create(TypeDescription schema,
                                     WriterContext streamFactory,
                                     boolean nullable) throws IOException {
+      OrcFile.Version version = streamFactory.getVersion();
       switch (schema.getCategory()) {
         case BOOLEAN:
           return new BooleanTreeWriter(schema.getId(),
@@ -142,6 +144,11 @@ public interface TreeWriter {
           return new DateTreeWriter(schema.getId(),
               schema, streamFactory, nullable);
         case DECIMAL:
+          if (version == OrcFile.Version.UNSTABLE_PRE_2_0 &&
+              schema.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION) {
+            return new Decimal64TreeWriter(schema.getId(),
+                schema, streamFactory, nullable);
+          }
           return new DecimalTreeWriter(schema.getId(),
               schema, streamFactory, nullable);
         case STRUCT:

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
new file mode 100644
index 0000000..ab4fc58
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.MemoryManager;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.PhysicalWriter;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PhysicalFsWriter;
+import org.apache.orc.impl.ReaderImpl;
+import org.apache.orc.impl.StreamName;
+import org.apache.orc.impl.WriterImpl;
+import org.apache.orc.impl.WriterInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+/**
+ * An ORCv2 file writer. The file is divided into stripes, which is the natural
+ * unit of work when reading. Each stripe is buffered in memory until the
+ * memory reaches the stripe size and then it is written out broken down by
+ * columns. Each column is written by a TreeWriter that is specific to that
+ * type of column. TreeWriters may have children TreeWriters that handle the
+ * sub-types. Each of the TreeWriters writes the column's data as a set of
+ * streams.
+ *
+ * This class is unsynchronized like most Stream objects, so from the creation
+ * of an OrcFile and all access to a single instance has to be from a single
+ * thread.
+ *
+ * There are no known cases where these happen between different threads today.
+ *
+ * Caveat: the MemoryManager is created during WriterOptions create, that has
+ * to be confined to a single thread as well.
+ *
+ */
+public class WriterImplV2 implements WriterInternal, MemoryManager.Callback {
+
+  private static final Logger LOG = LoggerFactory.getLogger(WriterImplV2.class);
+
+  private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+  private final Path path;
+  private long adjustedStripeSize;
+  private final int rowIndexStride;
+  private final CompressionKind compress;
+  private int bufferSize;
+  private final TypeDescription schema;
+  private final PhysicalWriter physicalWriter;
+  private final OrcFile.WriterVersion writerVersion;
+
+  private long rowCount = 0;
+  private long rowsInStripe = 0;
+  private long rawDataSize = 0;
+  private int rowsInIndex = 0;
+  private long lastFlushOffset = 0;
+  private int stripesAtLastFlush = -1;
+  private final List<OrcProto.StripeInformation> stripes =
+    new ArrayList<>();
+  private final OrcProto.Metadata.Builder fileMetadata =
+      OrcProto.Metadata.newBuilder();
+  private final Map<String, ByteString> userMetadata =
+    new TreeMap<>();
+  private final TreeWriter treeWriter;
+  private final boolean buildIndex;
+  private final MemoryManager memoryManager;
+  private final OrcFile.Version version;
+  private final Configuration conf;
+  private final OrcFile.WriterCallback callback;
+  private final OrcFile.WriterContext callbackContext;
+  private final OrcFile.EncodingStrategy encodingStrategy;
+  private final OrcFile.CompressionStrategy compressionStrategy;
+  private final boolean[] bloomFilterColumns;
+  private final double bloomFilterFpp;
+  private final OrcFile.BloomFilterVersion bloomFilterVersion;
+  private final boolean writeTimeZone;
+
+  public WriterImplV2(FileSystem fs,
+                      Path path,
+                      OrcFile.WriterOptions opts) throws IOException {
+    this.path = path;
+    this.conf = opts.getConfiguration();
+    this.callback = opts.getCallback();
+    this.schema = opts.getSchema();
+    this.writerVersion = opts.getWriterVersion();
+    bloomFilterVersion = opts.getBloomFilterVersion();
+    if (callback != null) {
+      callbackContext = new OrcFile.WriterContext(){
+
+        @Override
+        public Writer getWriter() {
+          return WriterImplV2.this;
+        }
+      };
+    } else {
+      callbackContext = null;
+    }
+    writeTimeZone = hasTimestamp(schema);
+    this.adjustedStripeSize = opts.getStripeSize();
+    this.version = opts.getVersion();
+    this.encodingStrategy = opts.getEncodingStrategy();
+    this.compressionStrategy = opts.getCompressionStrategy();
+    this.compress = opts.getCompress();
+    this.rowIndexStride = opts.getRowIndexStride();
+    this.memoryManager = opts.getMemoryManager();
+    buildIndex = rowIndexStride > 0;
+    int numColumns = schema.getMaximumId() + 1;
+    if (opts.isEnforceBufferSize()) {
+      OutStream.assertBufferSizeValid(opts.getBufferSize());
+      this.bufferSize = opts.getBufferSize();
+    } else {
+      this.bufferSize = WriterImpl.getEstimatedBufferSize(adjustedStripeSize,
+          numColumns, opts.getBufferSize());
+    }
+    if (version == OrcFile.Version.FUTURE) {
+      throw new IllegalArgumentException("Can not write in a unknown version.");
+    } else if (version == OrcFile.Version.UNSTABLE_PRE_2_0) {
+      LOG.warn("ORC files written in " + version.getName() + " will not be" +
+          " readable by other versions of the software. It is only for" +
+          " developer testing.");
+    }
+    if (version == OrcFile.Version.V_0_11) {
+      /* do not write bloom filters for ORC v11 */
+      this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
+    } else {
+      this.bloomFilterColumns =
+          OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
+    }
+    this.bloomFilterFpp = opts.getBloomFilterFpp();
+    this.physicalWriter = opts.getPhysicalWriter() == null ?
+        new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter();
+    physicalWriter.writeHeader();
+    treeWriter = TreeWriter.Factory.create(schema, new StreamFactory(), false);
+    if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+      throw new IllegalArgumentException("Row stride must be at least " +
+          MIN_ROW_INDEX_STRIDE);
+    }
+
+    // ensure that we are able to handle callbacks before we register ourselves
+    memoryManager.addWriter(path, opts.getStripeSize(), this);
+    LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
+        " compression: {} bufferSize: {}", path, adjustedStripeSize, opts.getBlockSize(),
+        compress, bufferSize);
+  }
+
+  @Override
+  public boolean checkMemory(double newScale) throws IOException {
+    long limit = Math.round(adjustedStripeSize * newScale);
+    long size = treeWriter.estimateMemory();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ORC writer " + physicalWriter + " size = " + size +
+          " limit = " + limit);
+    }
+    if (size > limit) {
+      flushStripe();
+      return true;
+    }
+    return false;
+  }
+
+
+  CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
+    // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
+    //       but at this point there's no close() for the stream.
+    CompressionCodec result = physicalWriter.getCompressionCodec();
+    if (result != null) {
+      switch (kind) {
+        case BLOOM_FILTER:
+        case DATA:
+        case DICTIONARY_DATA:
+        case BLOOM_FILTER_UTF8:
+          if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
+            result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
+                CompressionCodec.Modifier.TEXT));
+          } else {
+            result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+                CompressionCodec.Modifier.TEXT));
+          }
+          break;
+        case LENGTH:
+        case DICTIONARY_COUNT:
+        case PRESENT:
+        case ROW_INDEX:
+        case SECONDARY:
+          // easily compressed using the fastest modes
+          result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
+              CompressionCodec.Modifier.BINARY));
+          break;
+        default:
+          LOG.info("Missing ORC compression modifiers for " + kind);
+          break;
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void increaseCompressionSize(int newSize) {
+    if (newSize > bufferSize) {
+      bufferSize = newSize;
+    }
+  }
+
+  /**
+   * Interface from the Writer to the TreeWriters. This limits the visibility
+   * that the TreeWriters have into the Writer.
+   */
+  private class StreamFactory implements WriterContext {
+    /**
+     * Create a stream to store part of a column.
+     * @param column the column id for the stream
+     * @param kind the kind of stream
+     * @return The output outStream that the section needs to be written to.
+     */
+    public OutStream createStream(int column,
+                                  OrcProto.Stream.Kind kind
+                                  ) throws IOException {
+      final StreamName name = new StreamName(column, kind);
+      CompressionCodec codec = getCustomizedCodec(kind);
+
+      return new OutStream(physicalWriter.toString(), bufferSize, codec,
+          physicalWriter.createDataStream(name));
+    }
+
+    /**
+     * Get the stride rate of the row index.
+     */
+    public int getRowIndexStride() {
+      return rowIndexStride;
+    }
+
+    /**
+     * Should be building the row index.
+     * @return true if we are building the index
+     */
+    public boolean buildIndex() {
+      return buildIndex;
+    }
+
+    /**
+     * Is the ORC file compressed?
+     * @return are the streams compressed
+     */
+    public boolean isCompressed() {
+      return physicalWriter.getCompressionCodec() != null;
+    }
+
+    /**
+     * Get the encoding strategy to use.
+     * @return encoding strategy
+     */
+    public OrcFile.EncodingStrategy getEncodingStrategy() {
+      return encodingStrategy;
+    }
+
+    /**
+     * Get the bloom filter columns
+     * @return bloom filter columns
+     */
+    public boolean[] getBloomFilterColumns() {
+      return bloomFilterColumns;
+    }
+
+    /**
+     * Get bloom filter false positive percentage.
+     * @return fpp
+     */
+    public double getBloomFilterFPP() {
+      return bloomFilterFpp;
+    }
+
+    /**
+     * Get the writer's configuration.
+     * @return configuration
+     */
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    /**
+     * Get the version of the file to write.
+     */
+    public OrcFile.Version getVersion() {
+      return version;
+    }
+
+    /**
+     * Get the PhysicalWriter.
+     *
+     * @return the file's physical writer.
+     */
+    @Override
+    public PhysicalWriter getPhysicalWriter() {
+      return physicalWriter;
+    }
+
+    public OrcFile.BloomFilterVersion getBloomFilterVersion() {
+      return bloomFilterVersion;
+    }
+
+    public void writeIndex(StreamName name,
+                           OrcProto.RowIndex.Builder index) throws IOException {
+      physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind()));
+    }
+
+    public void writeBloomFilter(StreamName name,
+                                 OrcProto.BloomFilterIndex.Builder bloom
+                                 ) throws IOException {
+      physicalWriter.writeBloomFilter(name, bloom,
+          getCustomizedCodec(name.getKind()));
+    }
+  }
+
+
+  private static void writeTypes(OrcProto.Footer.Builder builder,
+                                 TypeDescription schema) {
+    builder.addAllTypes(OrcUtils.getOrcTypes(schema));
+  }
+
+  private void createRowIndexEntry() throws IOException {
+    treeWriter.createRowIndexEntry();
+    rowsInIndex = 0;
+  }
+
+  private void flushStripe() throws IOException {
+    if (buildIndex && rowsInIndex != 0) {
+      createRowIndexEntry();
+    }
+    if (rowsInStripe != 0) {
+      if (callback != null) {
+        callback.preStripeWrite(callbackContext);
+      }
+      // finalize the data for the stripe
+      int requiredIndexEntries = rowIndexStride == 0 ? 0 :
+          (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
+      OrcProto.StripeFooter.Builder builder =
+          OrcProto.StripeFooter.newBuilder();
+      if (writeTimeZone) {
+        builder.setWriterTimezone(TimeZone.getDefault().getID());
+      }
+      OrcProto.StripeStatistics.Builder stats =
+          OrcProto.StripeStatistics.newBuilder();
+
+      treeWriter.flushStreams();
+      treeWriter.writeStripe(builder, stats, requiredIndexEntries);
+
+      OrcProto.StripeInformation.Builder dirEntry =
+          OrcProto.StripeInformation.newBuilder()
+              .setNumberOfRows(rowsInStripe);
+      physicalWriter.finalizeStripe(builder, dirEntry);
+
+      fileMetadata.addStripeStats(stats.build());
+      stripes.add(dirEntry.build());
+      rowCount += rowsInStripe;
+      rowsInStripe = 0;
+    }
+  }
+
+  private long computeRawDataSize() {
+    return treeWriter.getRawDataSize();
+  }
+
+  private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
+    switch (kind) {
+      case NONE: return OrcProto.CompressionKind.NONE;
+      case ZLIB: return OrcProto.CompressionKind.ZLIB;
+      case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
+      case LZO: return OrcProto.CompressionKind.LZO;
+      case LZ4: return OrcProto.CompressionKind.LZ4;
+      default:
+        throw new IllegalArgumentException("Unknown compression " + kind);
+    }
+  }
+
+  private void writeFileStatistics(OrcProto.Footer.Builder builder,
+                                   TreeWriter writer) throws IOException {
+    writer.writeFileStatistics(builder);
+  }
+
+  private void writeMetadata() throws IOException {
+    physicalWriter.writeFileMetadata(fileMetadata);
+  }
+
+  private long writePostScript() throws IOException {
+    OrcProto.PostScript.Builder builder =
+        OrcProto.PostScript.newBuilder()
+            .setCompression(writeCompressionKind(compress))
+            .setMagic(OrcFile.MAGIC)
+            .addVersion(version.getMajor())
+            .addVersion(version.getMinor())
+            .setWriterVersion(writerVersion.getId());
+    if (compress != CompressionKind.NONE) {
+      builder.setCompressionBlockSize(bufferSize);
+    }
+    return physicalWriter.writePostScript(builder);
+  }
+
+  private long writeFooter() throws IOException {
+    writeMetadata();
+    OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
+    builder.setNumberOfRows(rowCount);
+    builder.setRowIndexStride(rowIndexStride);
+    rawDataSize = computeRawDataSize();
+    // serialize the types
+    writeTypes(builder, schema);
+    // add the stripe information
+    for(OrcProto.StripeInformation stripe: stripes) {
+      builder.addStripes(stripe);
+    }
+    // add the column statistics
+    writeFileStatistics(builder, treeWriter);
+    // add all of the user metadata
+    for(Map.Entry<String, ByteString> entry: userMetadata.entrySet()) {
+      builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
+        .setName(entry.getKey()).setValue(entry.getValue()));
+    }
+    builder.setWriter(OrcFile.WriterImplementation.ORC_JAVA.getId());
+    physicalWriter.writeFileFooter(builder);
+    return writePostScript();
+  }
+
+  @Override
+  public TypeDescription getSchema() {
+    return schema;
+  }
+
+  @Override
+  public void addUserMetadata(String name, ByteBuffer value) {
+    userMetadata.put(name, ByteString.copyFrom(value));
+  }
+
+  @Override
+  public void addRowBatch(VectorizedRowBatch batch) throws IOException {
+    if (buildIndex) {
+      // Batch the writes up to the rowIndexStride so that we can get the
+      // right size indexes.
+      int posn = 0;
+      while (posn < batch.size) {
+        int chunkSize = Math.min(batch.size - posn,
+            rowIndexStride - rowsInIndex);
+        treeWriter.writeRootBatch(batch, posn, chunkSize);
+        posn += chunkSize;
+        rowsInIndex += chunkSize;
+        rowsInStripe += chunkSize;
+        if (rowsInIndex >= rowIndexStride) {
+          createRowIndexEntry();
+        }
+      }
+    } else {
+      rowsInStripe += batch.size;
+      treeWriter.writeRootBatch(batch, 0, batch.size);
+    }
+    memoryManager.addedRow(batch.size);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (callback != null) {
+      callback.preFooterWrite(callbackContext);
+    }
+    // remove us from the memory manager so that we don't get any callbacks
+    memoryManager.removeWriter(path);
+    // actually close the file
+    flushStripe();
+    lastFlushOffset = writeFooter();
+    physicalWriter.close();
+  }
+
+  /**
+   * Raw data size will be compute when writing the file footer. Hence raw data
+   * size value will be available only after closing the writer.
+   */
+  @Override
+  public long getRawDataSize() {
+    return rawDataSize;
+  }
+
+  /**
+   * Row count gets updated when flushing the stripes. To get accurate row
+   * count call this method after writer is closed.
+   */
+  @Override
+  public long getNumberOfRows() {
+    return rowCount;
+  }
+
+  @Override
+  public long writeIntermediateFooter() throws IOException {
+    // flush any buffered rows
+    flushStripe();
+    // write a footer
+    if (stripesAtLastFlush != stripes.size()) {
+      if (callback != null) {
+        callback.preFooterWrite(callbackContext);
+      }
+      lastFlushOffset = writeFooter();
+      stripesAtLastFlush = stripes.size();
+      physicalWriter.flush();
+    }
+    return lastFlushOffset;
+  }
+
+  static void checkArgument(boolean expression, String message) {
+    if (!expression) {
+      throw new IllegalArgumentException(message);
+    }
+  }
+
+  @Override
+  public void appendStripe(byte[] stripe, int offset, int length,
+      StripeInformation stripeInfo,
+      OrcProto.StripeStatistics stripeStatistics) throws IOException {
+    checkArgument(stripe != null, "Stripe must not be null");
+    checkArgument(length <= stripe.length,
+        "Specified length must not be greater specified array length");
+    checkArgument(stripeInfo != null, "Stripe information must not be null");
+    checkArgument(stripeStatistics != null,
+        "Stripe statistics must not be null");
+
+    rowsInStripe = stripeInfo.getNumberOfRows();
+    // update stripe information
+    OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation
+        .newBuilder()
+        .setNumberOfRows(rowsInStripe)
+        .setIndexLength(stripeInfo.getIndexLength())
+        .setDataLength(stripeInfo.getDataLength())
+        .setFooterLength(stripeInfo.getFooterLength());
+    physicalWriter.appendRawStripe(ByteBuffer.wrap(stripe, offset, length),
+        dirEntry);
+
+    // since we have already written the stripe, just update stripe statistics
+    treeWriter.updateFileStatistics(stripeStatistics);
+    fileMetadata.addStripeStats(stripeStatistics);
+
+    stripes.add(dirEntry.build());
+
+    // reset it after writing the stripe
+    rowCount += rowsInStripe;
+    rowsInStripe = 0;
+  }
+
+  @Override
+  public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) {
+    if (userMetadata != null) {
+      for (OrcProto.UserMetadataItem item : userMetadata) {
+        this.userMetadata.put(item.getName(), item.getValue());
+      }
+    }
+  }
+
+  @Override
+  public ColumnStatistics[] getStatistics()
+      throws IOException {
+    // Generate the stats
+    OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
+
+    // add the column statistics
+    writeFileStatistics(builder, treeWriter);
+    return ReaderImpl.deserializeStats(builder.getStatisticsList());
+  }
+
+  public CompressionCodec getCompressionCodec() {
+    return physicalWriter.getCompressionCodec();
+  }
+
+  private static boolean hasTimestamp(TypeDescription schema) {
+    if (schema.getCategory() == TypeDescription.Category.TIMESTAMP) {
+      return true;
+    }
+    List<TypeDescription> children = schema.getChildren();
+    if (children != null) {
+      for (TypeDescription child : children) {
+        if (hasTimestamp(child)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 81d248c..424eb96 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -21,6 +21,7 @@ package org.apache.orc;
 import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
 import org.apache.orc.impl.OrcCodecPool;
 
+import org.apache.orc.impl.PhysicalFsWriter;
 import org.apache.orc.impl.WriterImpl;
 
 import org.apache.orc.OrcFile.WriterOptions;
@@ -54,10 +55,13 @@ import org.apache.orc.impl.DataReaderProperties;
 import org.apache.orc.impl.OrcIndex;
 import org.apache.orc.impl.RecordReaderImpl;
 import org.apache.orc.impl.RecordReaderUtils;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import java.io.File;
@@ -70,6 +74,7 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -82,8 +87,26 @@ import static org.junit.Assert.*;
 /**
  * Tests for the vectorized reader and writer for ORC files.
  */
+@RunWith(Parameterized.class)
 public class TestVectorOrcFile {
 
+  @Parameterized.Parameter
+  public OrcFile.Version fileFormat;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> getParameters() {
+    OrcFile.Version[] params = new OrcFile.Version[]{
+        OrcFile.Version.V_0_11,
+        OrcFile.Version.V_0_12,
+        OrcFile.Version.UNSTABLE_PRE_2_0};
+
+    List<Object[]> result = new ArrayList<>();
+    for(OrcFile.Version v: params) {
+      result.add(new Object[]{v});
+    }
+    return result;
+  }
+
   public static String getFileFromClasspath(String name) {
     URL url = ClassLoader.getSystemResource(name);
     if (url == null) {
@@ -177,12 +200,14 @@ public class TestVectorOrcFile {
     conf = new Configuration();
     fs = FileSystem.getLocal(conf);
     testFilePath = new Path(workDir, "TestVectorOrcFile." +
-        testCaseName.getMethodName() + ".orc");
+        testCaseName.getMethodName().replaceFirst("\\[[0-9]+\\]", "")
+        + "." + fileFormat.getName() + ".orc");
     fs.delete(testFilePath, false);
   }
 
   @Test
   public void testReadFormat_0_11() throws Exception {
+    Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
     Path oldFilePath =
         new Path(getFileFromClasspath("orc-file-11-format.orc"));
     Reader reader = OrcFile.createReader(oldFilePath,
@@ -351,7 +376,7 @@ public class TestVectorOrcFile {
     TypeDescription schema = TypeDescription.createTimestamp();
     Writer writer = OrcFile.createWriter(testFilePath,
         OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
-            .bufferSize(10000).version(org.apache.orc.OrcFile.Version.V_0_11));
+            .bufferSize(10000).version(fileFormat));
     List<Timestamp> tslist = Lists.newArrayList();
     tslist.add(Timestamp.valueOf("2037-01-01 00:00:00.000999"));
     tslist.add(Timestamp.valueOf("2003-01-01 00:00:00.000000222"));
@@ -407,7 +432,8 @@ public class TestVectorOrcFile {
                                          OrcFile.writerOptions(conf)
                                          .setSchema(schema)
                                          .stripeSize(100000)
-                                         .bufferSize(10000));
+                                         .bufferSize(10000)
+                                         .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 4;
     BytesColumnVector field1 = (BytesColumnVector) batch.cols[0];
@@ -456,7 +482,9 @@ public class TestVectorOrcFile {
     assertEquals("bar", ((StringColumnStatistics) stats[2]).getMinimum());
     assertEquals("hi", ((StringColumnStatistics) stats[2]).getMaximum());
     assertEquals(8, ((StringColumnStatistics) stats[2]).getSum());
-    assertEquals("count: 3 hasNull: true bytesOnDisk: 22 min: bar max: hi sum: 8",
+    assertEquals("count: 3 hasNull: true bytesOnDisk: " +
+            (fileFormat == OrcFile.Version.V_0_11 ? "30" : "22") +
+            " min: bar max: hi sum: 8",
         stats[2].toString());
 
     // check the inspectors
@@ -494,7 +522,8 @@ public class TestVectorOrcFile {
       .addField("dec1", TypeDescription.createDecimal());
 
     Writer writer = OrcFile.createWriter(testFilePath,
-      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000).bufferSize(10000));
+      OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+          .bufferSize(10000).version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 4;
     DecimalColumnVector field1 = (DecimalColumnVector) batch.cols[0];
@@ -527,7 +556,8 @@ public class TestVectorOrcFile {
         OrcFile.writerOptions(conf)
             .setSchema(schema)
             .stripeSize(100000)
-            .bufferSize(10000));
+            .bufferSize(10000)
+            .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 1000;
     LongColumnVector field1 = (LongColumnVector) batch.cols[0];
@@ -980,7 +1010,8 @@ public class TestVectorOrcFile {
         OrcFile.writerOptions(conf)
             .setSchema(schema)
             .stripeSize(100000)
-            .bufferSize(10000));
+            .bufferSize(10000)
+            .version(fileFormat));
     assertEmptyStats(writer.getStatistics());
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 2;
@@ -1050,8 +1081,9 @@ public class TestVectorOrcFile {
     assertEquals(1024, ((IntegerColumnStatistics) stats[3]).getMinimum());
     assertEquals(true, ((IntegerColumnStatistics) stats[3]).isSumDefined());
     assertEquals(3072, ((IntegerColumnStatistics) stats[3]).getSum());
-    assertEquals("count: 2 hasNull: false bytesOnDisk: 9 min: 1024 max: 2048 sum: 3072",
-        stats[3].toString());
+    assertEquals("count: 2 hasNull: false bytesOnDisk: " +
+        (fileFormat == OrcFile.Version.V_0_11 ? "8" : "9") +
+            " min: 1024 max: 2048 sum: 3072", stats[3].toString());
 
     StripeStatistics ss = reader.getStripeStatistics().get(0);
     assertEquals(2, ss.getColumnStatistics()[0].getNumberOfValues());
@@ -1065,7 +1097,9 @@ public class TestVectorOrcFile {
     assertEquals("count: 2 hasNull: false bytesOnDisk: 15 min: -15.0 max: -5.0 sum: -20.0",
         stats[7].toString());
 
-    assertEquals("count: 2 hasNull: false bytesOnDisk: 14 min: bye max: hi sum: 5", stats[9].toString());
+    assertEquals("count: 2 hasNull: false bytesOnDisk: " +
+        (fileFormat == OrcFile.Version.V_0_11 ? "20" : "14") +
+        " min: bye max: hi sum: 5", stats[9].toString());
 
     // check the schema
     TypeDescription readerSchema = reader.getSchema();
@@ -1194,7 +1228,8 @@ public class TestVectorOrcFile {
                                          .stripeSize(1000)
                                          .compress(CompressionKind.NONE)
                                          .bufferSize(100)
-                                         .rowIndexStride(1000));
+                                         .rowIndexStride(1000)
+                                         .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     Random r1 = new Random(1);
     Random r2 = new Random(2);
@@ -1289,7 +1324,8 @@ public class TestVectorOrcFile {
                                          .setSchema(schema)
                                          .stripeSize(1000)
                                          .compress(CompressionKind.NONE)
-                                         .bufferSize(100));
+                                         .bufferSize(100)
+                                         .version(fileFormat));
     writer.close();
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf).filesystem(fs));
@@ -1311,7 +1347,8 @@ public class TestVectorOrcFile {
             .setSchema(schema)
             .stripeSize(1000)
             .compress(CompressionKind.NONE)
-            .bufferSize(100));
+            .bufferSize(100)
+            .version(fileFormat));
     writer.addUserMetadata("my.meta", byteBuf(1, 2, 3, 4, 5, 6, 7, -1, -2, 127,
                                               -128));
     writer.addUserMetadata("clobber", byteBuf(1, 2, 3));
@@ -1370,7 +1407,8 @@ public class TestVectorOrcFile {
             .setSchema(schema)
             .stripeSize(100000)
             .bufferSize(10000)
-            .blockPadding(false));
+            .blockPadding(false)
+            .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 1000;
     TimestampColumnVector timestampColVector = (TimestampColumnVector) batch.cols[0];
@@ -1488,7 +1526,8 @@ public class TestVectorOrcFile {
     Writer writer = OrcFile.createWriter(testFilePath,
         OrcFile.writerOptions(conf)
             .setSchema(schema)
-            .compress(CompressionKind.NONE));
+            .compress(CompressionKind.NONE)
+            .version(fileFormat));
     Decimal64ColumnVector cv = (Decimal64ColumnVector) batch.cols[0];
     cv.precision = 18;
     cv.scale = 3;
@@ -1503,6 +1542,13 @@ public class TestVectorOrcFile {
 
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals("count: 19 hasNull: false", reader.getStatistics()[0].toString());
+    // the size of the column in the different formats
+    int size = (fileFormat == OrcFile.Version.V_0_11 ? 89 :
+      fileFormat == OrcFile.Version.V_0_12 ? 90 : 154);
+    assertEquals("count: 19 hasNull: false bytesOnDisk: " + size +
+            " min: -2 max: 100000000000000 sum: 111111111111109.111",
+        reader.getStatistics()[1].toString());
     RecordReader rows = reader.rows();
     batch = schema.createRowBatchV2();
     cv = (Decimal64ColumnVector) batch.cols[0];
@@ -1557,7 +1603,8 @@ public class TestVectorOrcFile {
     Writer writer = OrcFile.createWriter(testFilePath,
         OrcFile.writerOptions(conf)
             .setSchema(schema)
-            .compress(CompressionKind.NONE));
+            .compress(CompressionKind.NONE)
+            .version(fileFormat));
     DecimalColumnVector cv = (DecimalColumnVector) batch.cols[0];
     cv.precision = 18;
     cv.scale = 3;
@@ -1574,6 +1621,14 @@ public class TestVectorOrcFile {
     // test with new batch
     Reader reader = OrcFile.createReader(testFilePath,
         OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals("count: 19 hasNull: false", reader.getStatistics()[0].toString());
+    // the size of the column in the different formats
+    int size = (fileFormat == OrcFile.Version.V_0_11 ? 63 :
+        fileFormat == OrcFile.Version.V_0_12 ? 65 : 154);
+    assertEquals("count: 19 hasNull: false bytesOnDisk: " + size +
+            " min: -2 max: 10000000000000 sum: 11111111111109.1111",
+        reader.getStatistics()[1].toString());
+
     RecordReader rows = reader.rows();
     batch = schema.createRowBatchV2();
     Decimal64ColumnVector newCv = (Decimal64ColumnVector) batch.cols[0];
@@ -1640,7 +1695,8 @@ public class TestVectorOrcFile {
                                          .stripeSize(1000)
                                          .compress(CompressionKind.NONE)
                                          .bufferSize(100)
-                                         .blockPadding(false));
+                                         .blockPadding(false)
+                                         .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 6;
     setUnion(batch, 0, Timestamp.valueOf("2000-03-12 15:00:00"), 0, 42, null,
@@ -1863,7 +1919,8 @@ public class TestVectorOrcFile {
                                          .setSchema(schema)
                                          .stripeSize(1000)
                                          .compress(CompressionKind.SNAPPY)
-                                         .bufferSize(100));
+                                         .bufferSize(100)
+                                         .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     Random rand;
     writeRandomIntBytesBatches(writer, batch, 10, 1000);
@@ -1902,7 +1959,8 @@ public class TestVectorOrcFile {
             .setSchema(schema)
             .stripeSize(10000)
             .compress(CompressionKind.LZO)
-            .bufferSize(1000));
+            .bufferSize(1000)
+            .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     Random rand = new Random(69);
     batch.size = 1000;
@@ -1951,7 +2009,8 @@ public class TestVectorOrcFile {
             .setSchema(schema)
             .stripeSize(10000)
             .compress(CompressionKind.LZ4)
-            .bufferSize(1000));
+            .bufferSize(1000)
+            .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     Random rand = new Random(3);
     batch.size = 1000;
@@ -1993,10 +2052,11 @@ public class TestVectorOrcFile {
    */
   @Test
   public void testCodecPool() throws Exception {
+    OrcCodecPool.clear();
     TypeDescription schema = createInnerSchema();
     VectorizedRowBatch batch = schema.createRowBatch();
     WriterOptions opts = OrcFile.writerOptions(conf)
-        .setSchema(schema).stripeSize(1000).bufferSize(100);
+        .setSchema(schema).stripeSize(1000).bufferSize(100).version(fileFormat);
 
     CompressionCodec snappyCodec, zlibCodec;
     snappyCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
@@ -2032,12 +2092,17 @@ public class TestVectorOrcFile {
     assertTrue(snappyCodec == codec || snappyCodec2 == codec);
   }
 
-  private CompressionCodec writeBatchesAndGetCodec(int count, int size, WriterOptions opts,
-      VectorizedRowBatch batch) throws IOException {
+  private CompressionCodec writeBatchesAndGetCodec(int count,
+                                                   int size,
+                                                   WriterOptions opts,
+                                                   VectorizedRowBatch batch
+                                                   ) throws IOException {
     fs.delete(testFilePath, false);
-    Writer writer = OrcFile.createWriter(testFilePath, opts);
+    PhysicalWriter physical = new PhysicalFsWriter(fs, testFilePath, opts);
+    CompressionCodec codec = physical.getCompressionCodec();
+    Writer writer = OrcFile.createWriter(testFilePath,
+        opts.physicalWriter(physical));
     writeRandomIntBytesBatches(writer, batch, count, size);
-    CompressionCodec codec = ((WriterImpl)writer).getCompressionCodec();
     writer.close();
     return codec;
   }
@@ -2086,7 +2151,8 @@ public class TestVectorOrcFile {
                                          .stripeSize(5000)
                                          .compress(CompressionKind.SNAPPY)
                                          .bufferSize(1000)
-                                         .rowIndexStride(0));
+                                         .rowIndexStride(0)
+                                         .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     Random rand = new Random(24);
     batch.size = 5;
@@ -2313,6 +2379,8 @@ public class TestVectorOrcFile {
 
   @Test
   public void testMemoryManagementV11() throws Exception {
+    Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
+
     TypeDescription schema = createInnerSchema();
     MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
     Writer writer = OrcFile.createWriter(testFilePath,
@@ -2323,7 +2391,7 @@ public class TestVectorOrcFile {
             .bufferSize(100)
             .rowIndexStride(0)
             .memory(memory)
-            .version(OrcFile.Version.V_0_11));
+            .version(fileFormat));
     assertEquals(testFilePath, memory.path);
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 1;
@@ -2349,6 +2417,7 @@ public class TestVectorOrcFile {
 
   @Test
   public void testMemoryManagementV12() throws Exception {
+    Assume.assumeTrue(fileFormat != OrcFile.Version.V_0_11);
     TypeDescription schema = createInnerSchema();
     MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
     Writer writer = OrcFile.createWriter(testFilePath,
@@ -2359,7 +2428,7 @@ public class TestVectorOrcFile {
                                          .bufferSize(100)
                                          .rowIndexStride(0)
                                          .memory(memory)
-                                         .version(OrcFile.Version.V_0_12));
+                                         .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     assertEquals(testFilePath, memory.path);
     batch.size = 1;
@@ -2395,7 +2464,8 @@ public class TestVectorOrcFile {
             .stripeSize(400000L)
             .compress(CompressionKind.NONE)
             .bufferSize(500)
-            .rowIndexStride(1000));
+            .rowIndexStride(1000)
+            .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.ensureSize(3500);
     batch.size = 3500;
@@ -2520,7 +2590,8 @@ public class TestVectorOrcFile {
     Writer writer = OrcFile.createWriter(testFilePath,
         OrcFile.writerOptions(conf)
             .setSchema(schema)
-            .rowIndexStride(1000));
+            .rowIndexStride(1000)
+            .version(fileFormat));
 
     // write 1024 repeating nulls
     batch.size = 1024;
@@ -2812,8 +2883,7 @@ public class TestVectorOrcFile {
         .addField("char", TypeDescription.createChar().withMaxLength(10))
         .addField("varchar", TypeDescription.createVarchar().withMaxLength(10));
     Writer writer = OrcFile.createWriter(testFilePath,
-        OrcFile.writerOptions(conf)
-            .setSchema(schema));
+        OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 4;
     for(int c=0; c < batch.cols.length; ++c) {
@@ -2858,12 +2928,14 @@ public class TestVectorOrcFile {
    */
   @Test
   public void testNonDictionaryRepeatingString() throws Exception {
+    Assume.assumeTrue(fileFormat != OrcFile.Version.V_0_11);
     TypeDescription schema = TypeDescription.createStruct()
         .addField("str", TypeDescription.createString());
     Writer writer = OrcFile.createWriter(testFilePath,
         OrcFile.writerOptions(conf)
             .setSchema(schema)
-            .rowIndexStride(1000));
+            .rowIndexStride(1000)
+            .version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 1024;
     for(int r=0; r < batch.size; ++r) {
@@ -2901,7 +2973,7 @@ public class TestVectorOrcFile {
         .addField("struct", TypeDescription.createStruct()
             .addField("inner", TypeDescription.createLong()));
     Writer writer = OrcFile.createWriter(testFilePath,
-        OrcFile.writerOptions(conf).setSchema(schema));
+        OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 1024;
     StructColumnVector outer = (StructColumnVector) batch.cols[0];
@@ -2945,7 +3017,7 @@ public class TestVectorOrcFile {
             .addUnionChild(TypeDescription.createInt())
             .addUnionChild(TypeDescription.createLong()));
     Writer writer = OrcFile.createWriter(testFilePath,
-        OrcFile.writerOptions(conf).setSchema(schema));
+        OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 1024;
     UnionColumnVector outer = (UnionColumnVector) batch.cols[0];
@@ -3020,7 +3092,7 @@ public class TestVectorOrcFile {
         .addField("list",
             TypeDescription.createList(TypeDescription.createLong()));
     Writer writer = OrcFile.createWriter(testFilePath,
-        OrcFile.writerOptions(conf).setSchema(schema));
+        OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 1024;
     ListColumnVector list = (ListColumnVector) batch.cols[0];
@@ -3094,7 +3166,7 @@ public class TestVectorOrcFile {
             TypeDescription.createMap(TypeDescription.createLong(),
                 TypeDescription.createLong()));
     Writer writer = OrcFile.createWriter(testFilePath,
-        OrcFile.writerOptions(conf).setSchema(schema));
+        OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 1024;
     MapColumnVector map = (MapColumnVector) batch.cols[0];
@@ -3167,7 +3239,7 @@ public class TestVectorOrcFile {
             "struct<list1:array<string>," +
                     "list2:array<binary>>");
     Writer writer = OrcFile.createWriter(testFilePath,
-        OrcFile.writerOptions(conf).setSchema(schema));
+        OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 2;
     ListColumnVector list1 = (ListColumnVector) batch.cols[0];
@@ -3199,6 +3271,8 @@ public class TestVectorOrcFile {
 
   @Test
   public void testWriterVersion() throws Exception {
+    Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
+
     // test writer implementation serialization
     assertEquals(OrcFile.WriterImplementation.ORC_JAVA,
         OrcFile.WriterImplementation.from(0));
@@ -3242,6 +3316,7 @@ public class TestVectorOrcFile {
 
   @Test(expected=IllegalArgumentException.class)
   public void testBadPrestoVersion() {
+    Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
     OrcFile.WriterVersion.from(OrcFile.WriterImplementation.PRESTO, 0);
   }
 
@@ -3251,6 +3326,7 @@ public class TestVectorOrcFile {
    */
   @Test
   public void testFileVersion() throws Exception {
+    Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
     assertEquals(OrcFile.Version.V_0_11, ReaderImpl.getFileVersion(null));
     assertEquals(OrcFile.Version.V_0_11, ReaderImpl.getFileVersion(new ArrayList<Integer>()));
     assertEquals(OrcFile.Version.V_0_11,
@@ -3263,6 +3339,7 @@ public class TestVectorOrcFile {
 
   @Test
   public void testMergeUnderstood() throws Exception {
+    Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
     Path p = new Path("test.orc");
     Reader futureVersion = Mockito.mock(Reader.class);
     Mockito.when(futureVersion.getFileVersion()).thenReturn(OrcFile.Version.FUTURE);
@@ -3288,11 +3365,14 @@ public class TestVectorOrcFile {
 
   @Test
   public void testMerge() throws Exception {
-    Path input1 = new Path(workDir, "TestVectorOrcFile.testMerge1.orc");
+    Path input1 = new Path(workDir, "TestVectorOrcFile.testMerge1-" +
+        fileFormat.getName() + ".orc");
     fs.delete(input1, false);
-    Path input2 = new Path(workDir, "TestVectorOrcFile.testMerge2.orc");
+    Path input2 = new Path(workDir, "TestVectorOrcFile.testMerge2-" +
+        fileFormat.getName() + ".orc");
     fs.delete(input2, false);
-    Path input3 = new Path(workDir, "TestVectorOrcFile.testMerge3.orc");
+    Path input3 = new Path(workDir, "TestVectorOrcFile.testMerge3-" +
+        fileFormat.getName() + ".orc");
     fs.delete(input3, false);
     TypeDescription schema = TypeDescription.fromString("struct<a:int,b:string>");
     // change all of the options away from default to find anything we
@@ -3303,7 +3383,7 @@ public class TestVectorOrcFile {
         .enforceBufferSize()
         .bufferSize(20*1024)
         .rowIndexStride(1000)
-        .version(OrcFile.Version.V_0_11)
+        .version(fileFormat)
         .writerVersion(OrcFile.WriterVersion.HIVE_8732);
 
     Writer writer = OrcFile.createWriter(input1, opts);
@@ -3344,7 +3424,8 @@ public class TestVectorOrcFile {
     writer.addUserMetadata("d", fromString("bat"));
     writer.close();
 
-    Path output1 = new Path(workDir, "TestVectorOrcFile.testMerge.out1.orc");
+    Path output1 = new Path(workDir, "TestVectorOrcFile.testMerge.out1-" +
+        fileFormat.getName() + ".orc");
     fs.delete(output1, false);
     List<Path> paths = OrcFile.mergeFiles(output1,
         OrcFile.writerOptions(conf), Arrays.asList(input1, input2, input3));
@@ -3354,7 +3435,7 @@ public class TestVectorOrcFile {
     assertEquals(CompressionKind.LZO, reader.getCompressionKind());
     assertEquals(30 * 1024, reader.getCompressionSize());
     assertEquals(1000, reader.getRowIndexStride());
-    assertEquals(OrcFile.Version.V_0_11, reader.getFileVersion());
+    assertEquals(fileFormat, reader.getFileVersion());
     assertEquals(OrcFile.WriterVersion.HIVE_8732, reader.getWriterVersion());
     assertEquals(3, reader.getStripes().size());
     assertEquals(4, reader.getMetadataKeys().size());
@@ -3364,7 +3445,8 @@ public class TestVectorOrcFile {
     assertEquals(fromString("bat"), reader.getMetadataValue("d"));
 
     TypeDescription schema4 = TypeDescription.fromString("struct<a:int>");
-    Path input4 = new Path(workDir, "TestVectorOrcFile.testMerge4.orc");
+    Path input4 = new Path(workDir, "TestVectorOrcFile.testMerge4-" +
+        fileFormat.getName() + ".orc");
     fs.delete(input4, false);
     opts.setSchema(schema4);
     writer = OrcFile.createWriter(input4, opts);
@@ -3376,7 +3458,8 @@ public class TestVectorOrcFile {
     writer.addRowBatch(batch);
     writer.close();
 
-    Path input5 = new Path(workDir, "TestVectorOrcFile.testMerge5.orc");
+    Path input5 = new Path(workDir, "TestVectorOrcFile.testMerge5-" +
+        fileFormat.getName() + ".orc");
     fs.delete(input5, false);
     opts.setSchema(schema)
         .compress(CompressionKind.NONE)
@@ -3391,7 +3474,8 @@ public class TestVectorOrcFile {
     writer.addRowBatch(batch);
     writer.close();
 
-    Path output2 = new Path(workDir, "TestVectorOrcFile.testMerge.out2.orc");
+    Path output2 = new Path(workDir, "TestVectorOrcFile.testMerge.out2-" +
+        fileFormat.getName() + ".orc");
     fs.delete(output2, false);
     paths = OrcFile.mergeFiles(output2, OrcFile.writerOptions(conf),
         Arrays.asList(input3, input4, input1, input5));
@@ -3401,7 +3485,7 @@ public class TestVectorOrcFile {
     assertEquals(CompressionKind.LZO, reader.getCompressionKind());
     assertEquals(20 * 1024, reader.getCompressionSize());
     assertEquals(1000, reader.getRowIndexStride());
-    assertEquals(OrcFile.Version.V_0_11, reader.getFileVersion());
+    assertEquals(fileFormat, reader.getFileVersion());
     assertEquals(OrcFile.WriterVersion.HIVE_8732, reader.getWriterVersion());
     assertEquals(2, reader.getStripes().size());
     assertEquals(4, reader.getMetadataKeys().size());
@@ -3416,6 +3500,7 @@ public class TestVectorOrcFile {
 
   @Test
   public void testZeroByteOrcFile() throws Exception {
+    Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
     Path zeroFile = new Path(exampleDir, "zero.orc");
     Reader reader = OrcFile.createReader(zeroFile, OrcFile.readerOptions(conf));
     assertEquals(0, reader.getNumberOfRows());
@@ -3437,6 +3522,7 @@ public class TestVectorOrcFile {
 
   @Test
   public void testFutureOrcFile() throws Exception {
+    Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
     Path zeroFile = new Path(exampleDir, "version1999.orc");
     try {
       Reader reader = OrcFile.createReader(zeroFile, OrcFile.readerOptions(conf));
@@ -3456,7 +3542,7 @@ public class TestVectorOrcFile {
         TypeDescription.fromString("struct<list1:array<double>," +
             "list2:array<float>>");
     Writer writer = OrcFile.createWriter(testFilePath,
-        OrcFile.writerOptions(conf).setSchema(schema));
+        OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
     VectorizedRowBatch batch = schema.createRowBatch();
     batch.size = 2;
     ListColumnVector list1 = (ListColumnVector) batch.cols[0];

http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/site/specification/ORCv2.md
----------------------------------------------------------------------
diff --git a/site/specification/ORCv2.md b/site/specification/ORCv2.md
index b71b69b..74fc974 100644
--- a/site/specification/ORCv2.md
+++ b/site/specification/ORCv2.md
@@ -10,7 +10,7 @@ developers on the project.
 
 The list of things that we plan to change:
 
-* Create a decimal representation with fixed scale using rle.
+* Move decimal encoding to RLEv3 and remove variable length encoding.
 * Create a better float/double encoding that splits mantissa and
   exponent.
 * Create a dictionary encoding for float, double, and decimal.
@@ -821,23 +821,20 @@ DIRECT_V2     | PRESENT         | Yes      | Boolean RLE
 
 ## Decimal Columns
 
-Decimal was introduced in Hive 0.11 with infinite precision (the total
-number of digits). In Hive 0.13, the definition was change to limit
-the precision to a maximum of 38 digits, which conveniently uses 127
-bits plus a sign bit. The current encoding of decimal columns stores
-the integer representation of the value as an unbounded length zigzag
-encoded base 128 varint. The scale is stored in the SECONDARY stream
-as an signed integer.
+Since Hive 0.13, all decimals have had fixed precision and scale.
+The goal is to use RLEv3 for the value and use the fixed scale from
+the type. As an interim solution, we are using RLE v2 for short decimals
+(precision <= 18) and the old encoding for long decimals.
 
 Encoding      | Stream Kind     | Optional | Contents
 :------------ | :-------------- | :------- | :-------
 DIRECT        | PRESENT         | Yes      | Boolean RLE
-              | DATA            | No       | Unbounded base 128 varints
-              | SECONDARY       | No       | Unsigned Integer RLE v1
+              | DATA            | No       | Signed Integer RLE v2
 DIRECT_V2     | PRESENT         | Yes      | Boolean RLE
               | DATA            | No       | Unbounded base 128 varints
               | SECONDARY       | No       | Unsigned Integer RLE v2
 
+
 ## Date Columns
 
 Date data is encoded with a PRESENT stream, a DATA stream that records