You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/05/20 21:22:59 UTC

[21/27] hive git commit: HIVE-11417. Move the ReaderImpl and RowReaderImpl to the ORC module, by making shims for the row by row reader. (omalley reviewed by prasanth_j)

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/test/org/apache/orc/TestNewIntegerEncoding.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/TestNewIntegerEncoding.java b/orc/src/test/org/apache/orc/TestNewIntegerEncoding.java
new file mode 100644
index 0000000..526dd81
--- /dev/null
+++ b/orc/src/test/org/apache/orc/TestNewIntegerEncoding.java
@@ -0,0 +1,1373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+@RunWith(value = Parameterized.class)
+public class TestNewIntegerEncoding {
+
+  private OrcFile.EncodingStrategy encodingStrategy;
+
+  public TestNewIntegerEncoding( OrcFile.EncodingStrategy es) {
+    this.encodingStrategy = es;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { {  OrcFile.EncodingStrategy.COMPRESSION },
+        {  OrcFile.EncodingStrategy.SPEED } };
+    return Arrays.asList(data);
+  }
+
+  public static class TSRow {
+    Timestamp ts;
+
+    public TSRow(Timestamp ts) {
+      this.ts = ts;
+    }
+  }
+
+  public static TypeDescription getRowSchema() {
+    return TypeDescription.createStruct()
+        .addField("int1", TypeDescription.createInt())
+        .addField("long1", TypeDescription.createLong());
+  }
+
+  public static void appendRow(VectorizedRowBatch batch,
+                               int int1, long long1) {
+    int row = batch.size++;
+    ((LongColumnVector) batch.cols[0]).vector[row] = int1;
+    ((LongColumnVector) batch.cols[1]).vector[row] = long1;
+  }
+
+  public static void appendLong(VectorizedRowBatch batch,
+                                long long1) {
+    int row = batch.size++;
+    ((LongColumnVector) batch.cols[0]).vector[row] = long1;
+  }
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir", "target"
+      + File.separator + "test" + File.separator + "tmp"));
+
+  Configuration conf;
+  FileSystem fs;
+  Path testFilePath;
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestOrcFile."
+        + testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  @Test
+  public void testBasicRow() throws Exception {
+    TypeDescription schema= getRowSchema();
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(100000)
+                                         .compress(CompressionKind.NONE)
+                                         .bufferSize(10000)
+                                         .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    appendRow(batch, 111, 1111L);
+    appendRow(batch, 111, 1111L);
+    appendRow(batch, 111, 1111L);
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(111, ((LongColumnVector) batch.cols[0]).vector[r]);
+        assertEquals(1111, ((LongColumnVector) batch.cols[1]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testBasicOld() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+    long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6,
+        7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1,
+        2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1,
+        9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1,
+        1, 1, 1, 1 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .compress(CompressionKind.NONE)
+                                         .version(OrcFile.Version.V_0_11)
+                                         .bufferSize(10000)
+                                         .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    int idx = 0;
+    batch = reader.getSchema().createRowBatch();
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testBasicNew() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6,
+        7, 8, 9, 10, 1, 1, 1, 1, 1, 1, 10, 9, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1,
+        2, 5, 1, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1, 9, 2, 6, 3, 7, 1,
+        9, 2, 6, 3, 7, 1, 9, 2, 6, 2000, 2, 1, 1, 1, 1, 1, 3, 7, 1, 9, 2, 6, 1,
+        1, 1, 1, 1 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    int idx = 0;
+    batch = reader.getSchema().createRowBatch();
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+  
+  @Test
+  public void testBasicDelta1() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[] { -500, -400, -350, -325, -310 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testBasicDelta2() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[] { -500, -600, -650, -675, -710 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testBasicDelta3() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[] { 500, 400, 350, 325, 310 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testBasicDelta4() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[] { 500, 600, 650, 675, 710 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testDeltaOverflow() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[]{4513343538618202719l, 4513343538618202711l,
+        2911390882471569739l,
+        -9181829309989854913l};
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(
+        testFilePath,
+        OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+            .compress(CompressionKind.NONE).bufferSize(10000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for (Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile
+        .createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testDeltaOverflow2() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[]{Long.MAX_VALUE, 4513343538618202711l,
+        2911390882471569739l,
+        Long.MIN_VALUE};
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(
+        testFilePath,
+        OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+            .compress(CompressionKind.NONE).bufferSize(10000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for (Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile
+        .createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testDeltaOverflow3() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[]{-4513343538618202711l, -2911390882471569739l, -2,
+        Long.MAX_VALUE};
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(
+        testFilePath,
+        OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+            .compress(CompressionKind.NONE).bufferSize(10000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for (Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile
+        .createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testIntegerMin() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    input.add((long) Integer.MIN_VALUE);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testIntegerMax() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    input.add((long) Integer.MAX_VALUE);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testLongMin() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    input.add(Long.MIN_VALUE);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testLongMax() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    input.add(Long.MAX_VALUE);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testRandomInt() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for(int i = 0; i < 100000; i++) {
+      input.add((long) rand.nextInt());
+    }
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(100000);
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testRandomLong() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for(int i = 0; i < 100000; i++) {
+      input.add(rand.nextLong());
+    }
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(100000);
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseNegativeMin() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2,
+        3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1,
+        1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1,
+        52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6,
+        2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -13, 1, 2, 3,
+        13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1,
+        141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4,
+        13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1,
+        1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1,
+        2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1,
+        1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2,
+        2, 16 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseNegativeMin2() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2,
+        3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1,
+        1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1,
+        52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6,
+        2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, -1, 1, 2, 3,
+        13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1,
+        141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4,
+        13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1,
+        1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1,
+        2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1,
+        1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2,
+        2, 16 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseNegativeMin3() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[] { 20, 2, 3, 2, 1, 3, 17, 71, 35, 2, 1, 139, 2, 2,
+        3, 1783, 475, 2, 1, 1, 3, 1, 3, 2, 32, 1, 2, 3, 1, 8, 30, 1, 3, 414, 1,
+        1, 135, 3, 3, 1, 414, 2, 1, 2, 2, 594, 2, 5, 6, 4, 11, 1, 2, 2, 1, 1,
+        52, 4, 1, 2, 7, 1, 17, 334, 1, 2, 1, 2, 2, 6, 1, 266, 1, 2, 217, 2, 6,
+        2, 13, 2, 2, 1, 2, 3, 5, 1, 2, 1, 7244, 11813, 1, 33, 2, 0, 1, 2, 3,
+        13, 1, 92, 3, 13, 5, 14, 9, 141, 12, 6, 15, 25, 1, 1, 1, 46, 2, 1, 1,
+        141, 3, 1, 1, 1, 1, 2, 1, 4, 34, 5, 78, 8, 1, 2, 2, 1, 9, 10, 2, 1, 4,
+        13, 1, 5, 4, 4, 19, 5, 1, 1, 1, 68, 33, 399, 1, 1885, 25, 5, 2, 4, 1,
+        1, 2, 16, 1, 2966, 3, 1, 1, 25501, 1, 1, 1, 66, 1, 3, 8, 131, 14, 5, 1,
+        2, 2, 1, 1, 8, 1, 1, 2, 1, 5, 9, 2, 3, 112, 13, 2, 2, 1, 5, 10, 3, 1,
+        1, 13, 2, 3, 4, 1, 3, 1, 1, 2, 1, 1, 2, 4, 2, 207, 1, 1, 2, 4, 3, 3, 2,
+        2, 16 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseNegativeMin4() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    long[] inp = new long[] { 13, 13, 11, 8, 13, 10, 10, 11, 11, 14, 11, 7, 13,
+        12, 12, 11, 15, 12, 12, 9, 8, 10, 13, 11, 8, 6, 5, 6, 11, 7, 15, 10, 7,
+        6, 8, 7, 9, 9, 11, 33, 11, 3, 7, 4, 6, 10, 14, 12, 5, 14, 7, 6 };
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseAt0() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for(int i = 0; i < 5120; i++) {
+      input.add((long) rand.nextInt(100));
+    }
+    input.set(0, 20000L);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(5120);
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseAt1() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for(int i = 0; i < 5120; i++) {
+      input.add((long) rand.nextInt(100));
+    }
+    input.set(1, 20000L);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .compress(CompressionKind.NONE)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(5120);
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseAt255() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for(int i = 0; i < 5120; i++) {
+      input.add((long) rand.nextInt(100));
+    }
+    input.set(255, 20000L);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(5120);
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseAt256() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for(int i = 0; i < 5120; i++) {
+      input.add((long) rand.nextInt(100));
+    }
+    input.set(256, 20000L);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(5120);
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBase510() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for(int i = 0; i < 5120; i++) {
+      input.add((long) rand.nextInt(100));
+    }
+    input.set(510, 20000L);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(5120);
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBase511() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for(int i = 0; i < 5120; i++) {
+      input.add((long) rand.nextInt(100));
+    }
+    input.set(511, 20000L);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(5120);
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseMax1() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for (int i = 0; i < 5120; i++) {
+      input.add((long) rand.nextInt(60));
+    }
+    input.set(511, Long.MAX_VALUE);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(5120);
+    for (Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseMax2() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for (int i = 0; i < 5120; i++) {
+      input.add((long) rand.nextInt(60));
+    }
+    input.set(128, Long.MAX_VALUE);
+    input.set(256, Long.MAX_VALUE);
+    input.set(511, Long.MAX_VALUE);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(5120);
+    for (Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseMax3() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    input.add(371946367L);
+    input.add(11963367L);
+    input.add(68639400007L);
+    input.add(100233367L);
+    input.add(6367L);
+    input.add(10026367L);
+    input.add(3670000L);
+    input.add(3602367L);
+    input.add(4719226367L);
+    input.add(7196367L);
+    input.add(444442L);
+    input.add(210267L);
+    input.add(21033L);
+    input.add(160267L);
+    input.add(400267L);
+    input.add(23634347L);
+    input.add(16027L);
+    input.add(46026367L);
+    input.add(Long.MAX_VALUE);
+    input.add(33333L);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for (Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseMax4() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    for (int i = 0; i < 25; i++) {
+      input.add(371292224226367L);
+      input.add(119622332222267L);
+      input.add(686329400222007L);
+      input.add(100233333222367L);
+      input.add(636272333322222L);
+      input.add(10202633223267L);
+      input.add(36700222022230L);
+      input.add(36023226224227L);
+      input.add(47192226364427L);
+      input.add(71963622222447L);
+      input.add(22244444222222L);
+      input.add(21220263327442L);
+      input.add(21032233332232L);
+      input.add(16026322232227L);
+      input.add(40022262272212L);
+      input.add(23634342227222L);
+      input.add(16022222222227L);
+      input.add(46026362222227L);
+      input.add(46026362222227L);
+      input.add(33322222222323L);
+    }
+    input.add(Long.MAX_VALUE);
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for (Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+
+  @Test
+  public void testPatchedBaseTimestamp() throws Exception {
+    TypeDescription schema = TypeDescription.createStruct()
+        .addField("ts", TypeDescription.createTimestamp());
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+
+    List<Timestamp> tslist = Lists.newArrayList();
+    tslist.add(Timestamp.valueOf("2099-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2003-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("1999-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("1995-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2002-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2010-03-02 00:00:00"));
+    tslist.add(Timestamp.valueOf("2005-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2006-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2003-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("1996-08-02 00:00:00"));
+    tslist.add(Timestamp.valueOf("1998-11-02 00:00:00"));
+    tslist.add(Timestamp.valueOf("2008-10-02 00:00:00"));
+    tslist.add(Timestamp.valueOf("1993-08-02 00:00:00"));
+    tslist.add(Timestamp.valueOf("2008-01-02 00:00:00"));
+    tslist.add(Timestamp.valueOf("2007-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2004-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2008-10-02 00:00:00"));
+    tslist.add(Timestamp.valueOf("2003-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2004-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2008-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2005-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("1994-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2006-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2004-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2001-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2000-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2000-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2002-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2006-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2011-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2002-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2005-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("1974-01-01 00:00:00"));
+    int idx = 0;
+    for (Timestamp ts : tslist) {
+      ((TimestampColumnVector) batch.cols[0]).set(idx, ts);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(tslist.get(idx++),
+            ((TimestampColumnVector) batch.cols[0]).asScratchTimestamp(r));
+      }
+    }
+  }
+
+  @Test
+  public void testDirectLargeNegatives() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch();
+
+    appendLong(batch, -7486502418706614742L);
+    appendLong(batch, 0L);
+    appendLong(batch, 1L);
+    appendLong(batch, 1L);
+    appendLong(batch, -5535739865598783616L);
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(5, batch.size);
+    assertEquals(-7486502418706614742L,
+        ((LongColumnVector) batch.cols[0]).vector[0]);
+    assertEquals(0L,
+        ((LongColumnVector) batch.cols[0]).vector[1]);
+    assertEquals(1L,
+        ((LongColumnVector) batch.cols[0]).vector[2]);
+    assertEquals(1L,
+        ((LongColumnVector) batch.cols[0]).vector[3]);
+    assertEquals(-5535739865598783616L,
+        ((LongColumnVector) batch.cols[0]).vector[4]);
+    assertEquals(false, rows.nextBatch(batch));
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    TypeDescription schema = TypeDescription.createLong();
+
+    List<Long> input = Lists.newArrayList();
+    Random rand = new Random();
+    for(int i = 0; i < 100000; i++) {
+      input.add((long) rand.nextInt());
+    }
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .compress(CompressionKind.NONE)
+            .stripeSize(100000)
+            .bufferSize(10000)
+            .version(OrcFile.Version.V_0_11)
+            .encodingStrategy(encodingStrategy));
+    VectorizedRowBatch batch = schema.createRowBatch(100000);
+    for(Long l : input) {
+      appendLong(batch, l);
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    int idx = 55555;
+    rows.seekToRow(idx);
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(input.get(idx++).longValue(),
+            ((LongColumnVector) batch.cols[0]).vector[r]);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/test/org/apache/orc/TestOrcNullOptimization.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/TestOrcNullOptimization.java b/orc/src/test/org/apache/orc/TestOrcNullOptimization.java
new file mode 100644
index 0000000..0b605c9
--- /dev/null
+++ b/orc/src/test/org/apache/orc/TestOrcNullOptimization.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.orc.impl.RecordReaderImpl;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.Lists;
+
+public class TestOrcNullOptimization {
+
+  TypeDescription createMyStruct() {
+    return TypeDescription.createStruct()
+        .addField("a", TypeDescription.createInt())
+        .addField("b", TypeDescription.createString())
+        .addField("c", TypeDescription.createBoolean())
+        .addField("d", TypeDescription.createList(
+            TypeDescription.createStruct()
+                .addField("z", TypeDescription.createInt())));
+  }
+
+  void addRow(Writer writer, VectorizedRowBatch batch,
+              Integer a, String b, Boolean c,
+              Integer... d) throws IOException {
+    if (batch.size == batch.getMaxSize()) {
+      writer.addRowBatch(batch);
+      batch.reset();
+    }
+    int row = batch.size++;
+    LongColumnVector aColumn = (LongColumnVector) batch.cols[0];
+    BytesColumnVector bColumn = (BytesColumnVector) batch.cols[1];
+    LongColumnVector cColumn = (LongColumnVector) batch.cols[2];
+    ListColumnVector dColumn = (ListColumnVector) batch.cols[3];
+    StructColumnVector dStruct = (StructColumnVector) dColumn.child;
+    LongColumnVector dInt = (LongColumnVector) dStruct.fields[0];
+    if (a == null) {
+      aColumn.noNulls = false;
+      aColumn.isNull[row] = true;
+    } else {
+      aColumn.vector[row] = a;
+    }
+    if (b == null) {
+      bColumn.noNulls = false;
+      bColumn.isNull[row] = true;
+    } else {
+      bColumn.setVal(row, b.getBytes());
+    }
+    if (c == null) {
+      cColumn.noNulls = false;
+      cColumn.isNull[row] = true;
+    } else {
+      cColumn.vector[row] = c ? 1 : 0;
+    }
+    if (d == null) {
+      dColumn.noNulls = false;
+      dColumn.isNull[row] = true;
+    } else {
+      dColumn.offsets[row] = dColumn.childCount;
+      dColumn.lengths[row] = d.length;
+      dColumn.childCount += d.length;
+      for(int e=0; e < d.length; ++e) {
+        dInt.vector[(int) dColumn.offsets[row] + e] = d[e];
+      }
+    }
+  }
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+
+  Configuration conf;
+  FileSystem fs;
+  Path testFilePath;
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestOrcNullOptimization." +
+        testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  @Test
+  public void testMultiStripeWithNull() throws Exception {
+    TypeDescription schema = createMyStruct();
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(100000)
+                                         .compress(CompressionKind.NONE)
+                                         .bufferSize(10000));
+    Random rand = new Random(100);
+    VectorizedRowBatch batch = schema.createRowBatch();
+    addRow(writer, batch, null, null, true, 100);
+    for (int i = 2; i < 20000; i++) {
+      addRow(writer, batch, rand.nextInt(1), "a", true, 100);
+    }
+    addRow(writer, batch, null, null, true, 100);
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    // check the stats
+    ColumnStatistics[] stats = reader.getStatistics();
+    assertEquals(20000, reader.getNumberOfRows());
+    assertEquals(20000, stats[0].getNumberOfValues());
+
+    assertEquals(0, ((IntegerColumnStatistics) stats[1]).getMaximum());
+    assertEquals(0, ((IntegerColumnStatistics) stats[1]).getMinimum());
+    assertEquals(true, ((IntegerColumnStatistics) stats[1]).isSumDefined());
+    assertEquals(0, ((IntegerColumnStatistics) stats[1]).getSum());
+    assertEquals("count: 19998 hasNull: true min: 0 max: 0 sum: 0",
+        stats[1].toString());
+
+    assertEquals("a", ((StringColumnStatistics) stats[2]).getMaximum());
+    assertEquals("a", ((StringColumnStatistics) stats[2]).getMinimum());
+    assertEquals(19998, stats[2].getNumberOfValues());
+    assertEquals("count: 19998 hasNull: true min: a max: a sum: 19998",
+        stats[2].toString());
+
+    // check the inspectors
+    assertEquals("struct<a:int,b:string,c:boolean,d:array<struct<z:int>>>",
+        reader.getSchema().toString());
+
+    RecordReader rows = reader.rows();
+
+    List<Boolean> expected = Lists.newArrayList();
+    for (StripeInformation sinfo : reader.getStripes()) {
+      expected.add(false);
+    }
+    // only the first and last stripe will have PRESENT stream
+    expected.set(0, true);
+    expected.set(expected.size() - 1, true);
+
+    List<Boolean> got = Lists.newArrayList();
+    // check if the strip footer contains PRESENT stream
+    for (StripeInformation sinfo : reader.getStripes()) {
+      OrcProto.StripeFooter sf =
+        ((RecordReaderImpl) rows).readStripeFooter(sinfo);
+      got.add(sf.toString().indexOf(OrcProto.Stream.Kind.PRESENT.toString())
+              != -1);
+    }
+    assertEquals(expected, got);
+
+    batch = reader.getSchema().createRowBatch();
+    LongColumnVector aColumn = (LongColumnVector) batch.cols[0];
+    BytesColumnVector bColumn = (BytesColumnVector) batch.cols[1];
+    LongColumnVector cColumn = (LongColumnVector) batch.cols[2];
+    ListColumnVector dColumn = (ListColumnVector) batch.cols[3];
+    LongColumnVector dElements =
+        (LongColumnVector)(((StructColumnVector) dColumn.child).fields[0]);
+    assertEquals(true , rows.nextBatch(batch));
+    assertEquals(1024, batch.size);
+
+    // row 1
+    assertEquals(true, aColumn.isNull[0]);
+    assertEquals(true, bColumn.isNull[0]);
+    assertEquals(1, cColumn.vector[0]);
+    assertEquals(0, dColumn.offsets[0]);
+    assertEquals(1, dColumn.lengths[1]);
+    assertEquals(100, dElements.vector[0]);
+
+    rows.seekToRow(19998);
+    rows.nextBatch(batch);
+    assertEquals(2, batch.size);
+
+    // last-1 row
+    assertEquals(0, aColumn.vector[0]);
+    assertEquals("a", bColumn.toString(0));
+    assertEquals(1, cColumn.vector[0]);
+    assertEquals(0, dColumn.offsets[0]);
+    assertEquals(1, dColumn.lengths[0]);
+    assertEquals(100, dElements.vector[0]);
+
+    // last row
+    assertEquals(true, aColumn.isNull[1]);
+    assertEquals(true, bColumn.isNull[1]);
+    assertEquals(1, cColumn.vector[1]);
+    assertEquals(1, dColumn.offsets[1]);
+    assertEquals(1, dColumn.lengths[1]);
+    assertEquals(100, dElements.vector[1]);
+
+    assertEquals(false, rows.nextBatch(batch));
+    rows.close();
+  }
+
+  @Test
+  public void testMultiStripeWithoutNull() throws Exception {
+    TypeDescription schema = createMyStruct();
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(100000)
+                                         .compress(CompressionKind.NONE)
+                                         .bufferSize(10000));
+    Random rand = new Random(100);
+    VectorizedRowBatch batch = schema.createRowBatch();
+    for (int i = 1; i < 20000; i++) {
+      addRow(writer, batch, rand.nextInt(1), "a", true, 100);
+    }
+    addRow(writer, batch, 0, "b", true, 100);
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    // check the stats
+    ColumnStatistics[] stats = reader.getStatistics();
+    assertEquals(20000, reader.getNumberOfRows());
+    assertEquals(20000, stats[0].getNumberOfValues());
+
+    assertEquals(0, ((IntegerColumnStatistics) stats[1]).getMaximum());
+    assertEquals(0, ((IntegerColumnStatistics) stats[1]).getMinimum());
+    assertEquals(true, ((IntegerColumnStatistics) stats[1]).isSumDefined());
+    assertEquals(0, ((IntegerColumnStatistics) stats[1]).getSum());
+    assertEquals("count: 20000 hasNull: false min: 0 max: 0 sum: 0",
+        stats[1].toString());
+
+    assertEquals("b", ((StringColumnStatistics) stats[2]).getMaximum());
+    assertEquals("a", ((StringColumnStatistics) stats[2]).getMinimum());
+    assertEquals(20000, stats[2].getNumberOfValues());
+    assertEquals("count: 20000 hasNull: false min: a max: b sum: 20000",
+        stats[2].toString());
+
+    // check the inspectors
+    Assert.assertEquals("struct<a:int,b:string,c:boolean,d:array<struct<z:int>>>",
+        reader.getSchema().toString());
+
+    RecordReader rows = reader.rows();
+
+    // none of the stripes will have PRESENT stream
+    List<Boolean> expected = Lists.newArrayList();
+    for (StripeInformation sinfo : reader.getStripes()) {
+      expected.add(false);
+    }
+
+    List<Boolean> got = Lists.newArrayList();
+    // check if the strip footer contains PRESENT stream
+    for (StripeInformation sinfo : reader.getStripes()) {
+      OrcProto.StripeFooter sf =
+        ((RecordReaderImpl) rows).readStripeFooter(sinfo);
+      got.add(sf.toString().indexOf(OrcProto.Stream.Kind.PRESENT.toString())
+              != -1);
+    }
+    assertEquals(expected, got);
+
+    rows.seekToRow(19998);
+
+    batch = reader.getSchema().createRowBatch();
+    LongColumnVector aColumn = (LongColumnVector) batch.cols[0];
+    BytesColumnVector bColumn = (BytesColumnVector) batch.cols[1];
+    LongColumnVector cColumn = (LongColumnVector) batch.cols[2];
+    ListColumnVector dColumn = (ListColumnVector) batch.cols[3];
+    LongColumnVector dElements =
+        (LongColumnVector)(((StructColumnVector) dColumn.child).fields[0]);
+
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(2, batch.size);
+
+    // last-1 row
+    assertEquals(0, aColumn.vector[0]);
+    assertEquals("a", bColumn.toString(0));
+    assertEquals(1, cColumn.vector[0]);
+    assertEquals(0, dColumn.offsets[0]);
+    assertEquals(1, dColumn.lengths[0]);
+    assertEquals(100, dElements.vector[0]);
+
+    // last row
+    assertEquals(0, aColumn.vector[1]);
+    assertEquals("b", bColumn.toString(1));
+    assertEquals(1, cColumn.vector[1]);
+    assertEquals(1, dColumn.offsets[1]);
+    assertEquals(1, dColumn.lengths[1]);
+    assertEquals(100, dElements.vector[1]);
+    rows.close();
+  }
+
+  @Test
+  public void testColumnsWithNullAndCompression() throws Exception {
+    TypeDescription schema = createMyStruct();
+    Writer writer = OrcFile.createWriter(testFilePath,
+                                         OrcFile.writerOptions(conf)
+                                         .setSchema(schema)
+                                         .stripeSize(100000)
+                                         .bufferSize(10000));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    addRow(writer, batch, 3, "a", true, 100);
+    addRow(writer, batch, null, "b", true, 100);
+    addRow(writer, batch, 3, null, false, 100);
+    addRow(writer, batch, 3, "d", true, 100);
+    addRow(writer, batch, 2, "e", true, 100);
+    addRow(writer, batch, 2, "f", true, 100);
+    addRow(writer, batch, 2, "g", true, 100);
+    addRow(writer, batch, 2, "h", true, 100);
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    // check the stats
+    ColumnStatistics[] stats = reader.getStatistics();
+    assertEquals(8, reader.getNumberOfRows());
+    assertEquals(8, stats[0].getNumberOfValues());
+
+    assertEquals(3, ((IntegerColumnStatistics) stats[1]).getMaximum());
+    assertEquals(2, ((IntegerColumnStatistics) stats[1]).getMinimum());
+    assertEquals(true, ((IntegerColumnStatistics) stats[1]).isSumDefined());
+    assertEquals(17, ((IntegerColumnStatistics) stats[1]).getSum());
+    assertEquals("count: 7 hasNull: true min: 2 max: 3 sum: 17",
+        stats[1].toString());
+
+    assertEquals("h", ((StringColumnStatistics) stats[2]).getMaximum());
+    assertEquals("a", ((StringColumnStatistics) stats[2]).getMinimum());
+    assertEquals(7, stats[2].getNumberOfValues());
+    assertEquals("count: 7 hasNull: true min: a max: h sum: 7",
+        stats[2].toString());
+
+    // check the inspectors
+    batch = reader.getSchema().createRowBatch();
+    LongColumnVector aColumn = (LongColumnVector) batch.cols[0];
+    BytesColumnVector bColumn = (BytesColumnVector) batch.cols[1];
+    LongColumnVector cColumn = (LongColumnVector) batch.cols[2];
+    ListColumnVector dColumn = (ListColumnVector) batch.cols[3];
+    LongColumnVector dElements =
+        (LongColumnVector)(((StructColumnVector) dColumn.child).fields[0]);
+    Assert.assertEquals("struct<a:int,b:string,c:boolean,d:array<struct<z:int>>>",
+        reader.getSchema().toString());
+
+    RecordReader rows = reader.rows();
+    // only the last strip will have PRESENT stream
+    List<Boolean> expected = Lists.newArrayList();
+    for (StripeInformation sinfo : reader.getStripes()) {
+      expected.add(false);
+    }
+    expected.set(expected.size() - 1, true);
+
+    List<Boolean> got = Lists.newArrayList();
+    // check if the strip footer contains PRESENT stream
+    for (StripeInformation sinfo : reader.getStripes()) {
+      OrcProto.StripeFooter sf =
+        ((RecordReaderImpl) rows).readStripeFooter(sinfo);
+      got.add(sf.toString().indexOf(OrcProto.Stream.Kind.PRESENT.toString())
+              != -1);
+    }
+    assertEquals(expected, got);
+
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(8, batch.size);
+
+    // row 1
+    assertEquals(3, aColumn.vector[0]);
+    assertEquals("a", bColumn.toString(0));
+    assertEquals(1, cColumn.vector[0]);
+    assertEquals(0, dColumn.offsets[0]);
+    assertEquals(1, dColumn.lengths[0]);
+    assertEquals(100, dElements.vector[0]);
+
+    // row 2
+    assertEquals(true, aColumn.isNull[1]);
+    assertEquals("b", bColumn.toString(1));
+    assertEquals(1, cColumn.vector[1]);
+    assertEquals(1, dColumn.offsets[1]);
+    assertEquals(1, dColumn.lengths[1]);
+    assertEquals(100, dElements.vector[1]);
+
+    // row 3
+    assertEquals(3, aColumn.vector[2]);
+    assertEquals(true, bColumn.isNull[2]);
+    assertEquals(0, cColumn.vector[2]);
+    assertEquals(2, dColumn.offsets[2]);
+    assertEquals(1, dColumn.lengths[2]);
+    assertEquals(100, dElements.vector[2]);
+
+    rows.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/test/org/apache/orc/TestOrcTimezone1.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/TestOrcTimezone1.java b/orc/src/test/org/apache/orc/TestOrcTimezone1.java
new file mode 100644
index 0000000..72dc455
--- /dev/null
+++ b/orc/src/test/org/apache/orc/TestOrcTimezone1.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+
+import java.io.File;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.TimeZone;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Lists;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class TestOrcTimezone1 {
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+  Configuration conf;
+  FileSystem fs;
+  Path testFilePath;
+  String writerTimeZone;
+  String readerTimeZone;
+  static TimeZone defaultTimeZone = TimeZone.getDefault();
+
+  public TestOrcTimezone1(String writerTZ, String readerTZ) {
+    this.writerTimeZone = writerTZ;
+    this.readerTimeZone = readerTZ;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    List<Object[]> result = Arrays.asList(new Object[][]{
+        /* Extreme timezones */
+        {"GMT-12:00", "GMT+14:00"},
+        /* No difference in DST */
+        {"America/Los_Angeles", "America/Los_Angeles"}, /* same timezone both with DST */
+        {"Europe/Berlin", "Europe/Berlin"}, /* same as above but europe */
+        {"America/Phoenix", "Asia/Kolkata"} /* Writer no DST, Reader no DST */,
+        {"Europe/Berlin", "America/Los_Angeles"} /* Writer DST, Reader DST */,
+        {"Europe/Berlin", "America/Chicago"} /* Writer DST, Reader DST */,
+        /* With DST difference */
+        {"Europe/Berlin", "UTC"},
+        {"UTC", "Europe/Berlin"} /* Writer no DST, Reader DST */,
+        {"America/Los_Angeles", "Asia/Kolkata"} /* Writer DST, Reader no DST */,
+        {"Europe/Berlin", "Asia/Kolkata"} /* Writer DST, Reader no DST */,
+        /* Timezone offsets for the reader has changed historically */
+        {"Asia/Saigon", "Pacific/Enderbury"},
+        {"UTC", "Asia/Jerusalem"},
+
+        // NOTE:
+        // "1995-01-01 03:00:00.688888888" this is not a valid time in Pacific/Enderbury timezone.
+        // On 1995-01-01 00:00:00 GMT offset moved from -11:00 hr to +13:00 which makes all values
+        // on 1995-01-01 invalid. Try this with joda time
+        // new MutableDateTime("1995-01-01", DateTimeZone.forTimeZone(readerTimeZone));
+    });
+    return result;
+  }
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  @After
+  public void restoreTimeZone() {
+    TimeZone.setDefault(defaultTimeZone);
+  }
+
+  @Test
+  public void testTimestampWriter() throws Exception {
+    TypeDescription schema = TypeDescription.createTimestamp();
+
+    TimeZone.setDefault(TimeZone.getTimeZone(writerTimeZone));
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+            .bufferSize(10000));
+    assertEquals(writerTimeZone, TimeZone.getDefault().getID());
+    List<String> ts = Lists.newArrayList();
+    ts.add("2003-01-01 01:00:00.000000222");
+    ts.add("1996-08-02 09:00:00.723100809");
+    ts.add("1999-01-01 02:00:00.999999999");
+    ts.add("1995-01-02 03:00:00.688888888");
+    ts.add("2002-01-01 04:00:00.1");
+    ts.add("2010-03-02 05:00:00.000009001");
+    ts.add("2005-01-01 06:00:00.000002229");
+    ts.add("2006-01-01 07:00:00.900203003");
+    ts.add("2003-01-01 08:00:00.800000007");
+    ts.add("1998-11-02 10:00:00.857340643");
+    ts.add("2008-10-02 11:00:00.0");
+    ts.add("2037-01-01 00:00:00.000999");
+    ts.add("2014-03-28 00:00:00.0");
+    VectorizedRowBatch batch = schema.createRowBatch();
+    TimestampColumnVector times = (TimestampColumnVector) batch.cols[0];
+    for (String t : ts) {
+      times.set(batch.size++, Timestamp.valueOf(t));
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    TimeZone.setDefault(TimeZone.getTimeZone(readerTimeZone));
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(readerTimeZone, TimeZone.getDefault().getID());
+    RecordReader rows = reader.rows();
+    batch = reader.getSchema().createRowBatch();
+    times = (TimestampColumnVector) batch.cols[0];
+    int idx = 0;
+    while (rows.nextBatch(batch)) {
+      for(int r=0; r < batch.size; ++r) {
+        assertEquals(ts.get(idx++), times.asScratchTimestamp(r).toString());
+      }
+    }
+    rows.close();
+  }
+
+  @Test
+  public void testReadTimestampFormat_0_11() throws Exception {
+    TimeZone.setDefault(TimeZone.getTimeZone(readerTimeZone));
+    Path oldFilePath = new Path(getClass().getClassLoader().
+        getSystemResource("orc-file-11-format.orc").getPath());
+    Reader reader = OrcFile.createReader(oldFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    TypeDescription schema = reader.getSchema();
+    int col = schema.getFieldNames().indexOf("ts");
+    VectorizedRowBatch batch = schema.createRowBatch(10);
+    TimestampColumnVector ts = (TimestampColumnVector) batch.cols[col];
+
+    boolean[] include = new boolean[schema.getMaximumId() + 1];
+    include[schema.getChildren().get(col).getId()] = true;
+    RecordReader rows = reader.rows
+        (new Reader.Options().include(include));
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(Timestamp.valueOf("2000-03-12 15:00:00"),
+        ts.asScratchTimestamp(0));
+
+    // check the contents of second row
+    rows.seekToRow(7499);
+    assertEquals(true, rows.nextBatch(batch));
+    assertEquals(1, batch.size);
+    assertEquals(Timestamp.valueOf("2000-03-12 15:00:01"),
+        ts.asScratchTimestamp(0));
+
+    // handle the close up
+    Assert.assertEquals(false, rows.nextBatch(batch));
+    rows.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/orc/src/test/org/apache/orc/TestOrcTimezone2.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/TestOrcTimezone2.java b/orc/src/test/org/apache/orc/TestOrcTimezone2.java
new file mode 100644
index 0000000..4a02855
--- /dev/null
+++ b/orc/src/test/org/apache/orc/TestOrcTimezone2.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Lists;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class TestOrcTimezone2 {
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+  Configuration conf;
+  FileSystem fs;
+  Path testFilePath;
+  String writerTimeZone;
+  String readerTimeZone;
+  static TimeZone defaultTimeZone = TimeZone.getDefault();
+
+  public TestOrcTimezone2(String writerTZ, String readerTZ) {
+    this.writerTimeZone = writerTZ;
+    this.readerTimeZone = readerTZ;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    String[] allTimeZones = TimeZone.getAvailableIDs();
+    Random rand = new Random(123);
+    int len = allTimeZones.length;
+    int n = 500;
+    Object[][] data = new Object[n][];
+    for (int i = 0; i < n; i++) {
+      int wIdx = rand.nextInt(len);
+      int rIdx = rand.nextInt(len);
+      data[i] = new Object[2];
+      data[i][0] = allTimeZones[wIdx];
+      data[i][1] = allTimeZones[rIdx];
+    }
+    return Arrays.asList(data);
+  }
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
+
+  @After
+  public void restoreTimeZone() {
+    TimeZone.setDefault(defaultTimeZone);
+  }
+
+  @Test
+  public void testTimestampWriter() throws Exception {
+    TypeDescription schema = TypeDescription.createTimestamp();
+
+    TimeZone.setDefault(TimeZone.getTimeZone(writerTimeZone));
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf).setSchema(schema)
+            .stripeSize(100000).bufferSize(10000));
+    assertEquals(writerTimeZone, TimeZone.getDefault().getID());
+    List<String> ts = Lists.newArrayList();
+    ts.add("2003-01-01 01:00:00.000000222");
+    ts.add("1999-01-01 02:00:00.999999999");
+    ts.add("1995-01-02 03:00:00.688888888");
+    ts.add("2002-01-01 04:00:00.1");
+    ts.add("2010-03-02 05:00:00.000009001");
+    ts.add("2005-01-01 06:00:00.000002229");
+    ts.add("2006-01-01 07:00:00.900203003");
+    ts.add("2003-01-01 08:00:00.800000007");
+    ts.add("1996-08-02 09:00:00.723100809");
+    ts.add("1998-11-02 10:00:00.857340643");
+    ts.add("2008-10-02 11:00:00.0");
+    ts.add("2037-01-01 00:00:00.000999");
+    VectorizedRowBatch batch = schema.createRowBatch();
+    TimestampColumnVector tsc = (TimestampColumnVector) batch.cols[0];
+    for (String t : ts) {
+      tsc.set(batch.size++, Timestamp.valueOf(t));
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    TimeZone.setDefault(TimeZone.getTimeZone(readerTimeZone));
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).filesystem(fs));
+    assertEquals(readerTimeZone, TimeZone.getDefault().getID());
+    RecordReader rows = reader.rows();
+    int idx = 0;
+    batch = reader.getSchema().createRowBatch();
+    tsc = (TimestampColumnVector) batch.cols[0];
+    while (rows.nextBatch(batch)) {
+      for (int r=0; r < batch.size; ++r) {
+        assertEquals(ts.get(idx++), tsc.asScratchTimestamp(r).toString());
+      }
+    }
+    rows.close();
+  }
+}