You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/14 22:36:14 UTC

[1/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.

Repository: hive
Updated Branches:
  refs/heads/master 49dc6452a -> 06e39ebe0


http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java b/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
deleted file mode 100644
index 151f30d..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.util;
-
-/**
- * Estimation of memory footprint of object
- */
-public enum JavaDataModel {
-
-  JAVA32 {
-    @Override
-    public int object() {
-      return JAVA32_OBJECT;
-    }
-
-    @Override
-    public int array() {
-      return JAVA32_ARRAY;
-    }
-
-    @Override
-    public int ref() {
-      return JAVA32_REF;
-    }
-
-    @Override
-    public int hashMap(int entry) {
-      // base  = JAVA32_OBJECT + PRIMITIVES1 * 4 + JAVA32_FIELDREF * 3 + JAVA32_ARRAY;
-      // entry = JAVA32_OBJECT + JAVA32_FIELDREF + PRIMITIVES1
-      return hashMapBase() + hashMapEntry() * entry;
-    }
-
-    @Override
-    public int hashMapBase() {
-      return 64;
-    }
-
-    @Override
-    public int hashMapEntry() {
-      return 24;
-    }
-
-    @Override
-    public int hashSet(int entry) {
-      // hashMap += JAVA32_OBJECT
-      return hashSetBase() + hashSetEntry() * entry;
-    }
-
-    @Override
-    public int hashSetBase() {
-      return 80;
-    }
-
-    @Override
-    public int hashSetEntry() {
-      return 24;
-    }
-
-    @Override
-    public int linkedHashMap(int entry) {
-      // hashMap += JAVA32_FIELDREF + PRIMITIVES1
-      // hashMap.entry += JAVA32_FIELDREF * 2
-      return 72 + 32 * entry;
-    }
-
-    @Override
-    public int linkedList(int entry) {
-      // base  = JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_FIELDREF;
-      // entry = JAVA32_OBJECT + JAVA32_FIELDREF * 2
-      return linkedListBase() + linkedListEntry() * entry;
-     }
-
-     @Override
-     public int linkedListBase() {
-       return 28;
-     }
-
-     @Override
-     public int linkedListEntry() {
-       return 24;
-     }
-
-    @Override
-    public int arrayList() {
-      // JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_ARRAY;
-      return 44;
-    }
-
-    @Override
-    public int memoryAlign() {
-      return 8;
-    }
-  }, JAVA64 {
-    @Override
-    public int object() {
-      return JAVA64_OBJECT;
-    }
-
-    @Override
-    public int array() {
-      return JAVA64_ARRAY;
-    }
-
-    @Override
-    public int ref() {
-      return JAVA64_REF;
-    }
-
-    @Override
-    public int hashMap(int entry) {
-      // base  = JAVA64_OBJECT + PRIMITIVES1 * 4 + JAVA64_FIELDREF * 3 + JAVA64_ARRAY;
-      // entry = JAVA64_OBJECT + JAVA64_FIELDREF + PRIMITIVES1
-      return hashMapBase() + hashMapEntry() * entry;
-    }
-
-    @Override
-    public int hashMapBase() {
-      return 112;
-    }
-
-
-    @Override
-    public int hashMapEntry() {
-      return 44;
-    }
-
-    @Override
-    public int hashSet(int entry) {
-      // hashMap += JAVA64_OBJECT
-      return hashSetBase() + hashSetEntry() * entry;
-     }
-
-     @Override
-     public int hashSetBase() {
-       return 144;
-     }
-
-     @Override
-     public int hashSetEntry() {
-       return 44;
-     }
-
-    @Override
-    public int linkedHashMap(int entry) {
-      // hashMap += JAVA64_FIELDREF + PRIMITIVES1
-      // hashMap.entry += JAVA64_FIELDREF * 2
-      return 128 + 60 * entry;
-    }
-
-    @Override
-    public int linkedList(int entry) {
-      // base  = JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_FIELDREF;
-      // entry = JAVA64_OBJECT + JAVA64_FIELDREF * 2
-      return linkedListBase() + linkedListEntry() * entry;
-     }
-
-     @Override
-     public int linkedListBase() {
-       return 48;
-     }
-
-     @Override
-     public int linkedListEntry() {
-       return 48;
-     }
-
-    @Override
-    public int arrayList() {
-      // JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_ARRAY;
-      return 80;
-    }
-
-    @Override
-    public int memoryAlign() {
-      return 8;
-    }
-  };
-
-  public abstract int object();
-  public abstract int array();
-  public abstract int ref();
-  public abstract int hashMap(int entry);
-  public abstract int hashMapBase();
-  public abstract int hashMapEntry();
-  public abstract int hashSetBase();
-  public abstract int hashSetEntry();
-  public abstract int hashSet(int entry);
-  public abstract int linkedHashMap(int entry);
-  public abstract int linkedListBase();
-  public abstract int linkedListEntry();
-  public abstract int linkedList(int entry);
-  public abstract int arrayList();
-  public abstract int memoryAlign();
-
-  // ascii string
-  public int lengthFor(String string) {
-    return lengthForStringOfLength(string.length());
-  }
-
-  public int lengthForRandom() {
-    // boolean + double + AtomicLong
-    return object() + primitive1() + primitive2() + object() + primitive2();
-  }
-
-  public int primitive1() {
-    return PRIMITIVES1;
-  }
-  public int primitive2() {
-    return PRIMITIVES2;
-  }
-
-  public static int alignUp(int value, int align) {
-    return (value + align - 1) & ~(align - 1);
-  }
-
-  public static final int JAVA32_META = 12;
-  public static final int JAVA32_ARRAY_META = 16;
-  public static final int JAVA32_REF = 4;
-  public static final int JAVA32_OBJECT = 16;   // JAVA32_META + JAVA32_REF
-  public static final int JAVA32_ARRAY = 20;    // JAVA32_ARRAY_META + JAVA32_REF
-
-  public static final int JAVA64_META = 24;
-  public static final int JAVA64_ARRAY_META = 32;
-  public static final int JAVA64_REF = 8;
-  public static final int JAVA64_OBJECT = 32;   // JAVA64_META + JAVA64_REF
-  public static final int JAVA64_ARRAY = 40;    // JAVA64_ARRAY_META + JAVA64_REF
-
-  public static final int PRIMITIVES1 = 4;      // void, boolean, byte, short, int, float
-  public static final int PRIMITIVES2 = 8;      // long, double
-
-  public static final int PRIMITIVE_BYTE = 1;    // byte
-
-  private static JavaDataModel current;
-
-  public static JavaDataModel get() {
-    if (current != null) {
-      return current;
-    }
-    try {
-      String props = System.getProperty("sun.arch.data.model");
-      if ("32".equals(props)) {
-        return current = JAVA32;
-      }
-    } catch (Exception e) {
-      // ignore
-    }
-    // TODO: separate model is needed for compressedOops, which can be guessed from memory size.
-    return current = JAVA64;
-  }
-
-  public static int round(int size) {
-    JavaDataModel model = get();
-    if (model == JAVA32 || size % 8 == 0) {
-      return size;
-    }
-    return ((size + 8) >> 3) << 3;
-  }
-
-  private int lengthForPrimitiveArrayOfSize(int primitiveSize, int length) {
-    return alignUp(array() + primitiveSize*length, memoryAlign());
-  }
-
-  public int lengthForByteArrayOfSize(int length) {
-    return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length);
-  }
-  public int lengthForObjectArrayOfSize(int length) {
-    return lengthForPrimitiveArrayOfSize(ref(), length);
-  }
-  public int lengthForLongArrayOfSize(int length) {
-    return lengthForPrimitiveArrayOfSize(primitive2(), length);
-  }
-  public int lengthForDoubleArrayOfSize(int length) {
-    return lengthForPrimitiveArrayOfSize(primitive2(), length);
-  }
-  public int lengthForIntArrayOfSize(int length) {
-    return lengthForPrimitiveArrayOfSize(primitive1(), length);
-  }
-  public int lengthForBooleanArrayOfSize(int length) {
-    return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length);
-  }
-  public int lengthForTimestampArrayOfSize(int length) {
-    return lengthForPrimitiveArrayOfSize(lengthOfTimestamp(), length);
-  }
-  public int lengthForDateArrayOfSize(int length) {
-    return lengthForPrimitiveArrayOfSize(lengthOfDate(), length);
-  }
-  public int lengthForDecimalArrayOfSize(int length) {
-    return lengthForPrimitiveArrayOfSize(lengthOfDecimal(), length);
-  }
-
-  public int lengthOfDecimal() {
-    // object overhead + 8 bytes for intCompact + 4 bytes for precision
-    // + 4 bytes for scale + size of BigInteger
-    return object() + 2 * primitive2() + lengthOfBigInteger();
-  }
-
-  private int lengthOfBigInteger() {
-    // object overhead + 4 bytes for bitCount + 4 bytes for bitLength
-    // + 4 bytes for firstNonzeroByteNum + 4 bytes for firstNonzeroIntNum +
-    // + 4 bytes for lowestSetBit + 5 bytes for size of magnitude (since max precision
-    // is only 38 for HiveDecimal) + 7 bytes of padding (since java memory allocations
-    // are 8 byte aligned)
-    return object() + 4 * primitive2();
-  }
-
-  public int lengthOfTimestamp() {
-    // object overhead + 4 bytes for int (nanos) + 4 bytes of padding
-    return object() + primitive2();
-  }
-
-  public int lengthOfDate() {
-    // object overhead + 8 bytes for long (fastTime) + 16 bytes for cdate
-    return object() + 3 * primitive2();
-  }
-
-  public int lengthForStringOfLength(int strLen) {
-    return object() + primitive1() * 3 + array() + strLen;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
index 40674ea..554033c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
@@ -153,8 +153,14 @@ public class TestFileDump {
           (MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
     conf.set(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname, "COMPRESSION");
-    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
-        100000, CompressionKind.ZLIB, 10000, 1000);
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .fileSystem(fs)
+            .inspector(inspector)
+            .batchSize(1000)
+            .compress(CompressionKind.ZLIB)
+            .stripeSize(100000)
+            .rowIndexStride(1000));
     Random r1 = new Random(1);
     String[] words = new String[]{"It", "was", "the", "best", "of", "times,",
         "it", "was", "the", "worst", "of", "times,", "it", "was", "the", "age",
@@ -263,8 +269,15 @@ public class TestFileDump {
     Configuration conf = new Configuration();
     conf.set(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname, "COMPRESSION");
     conf.setFloat(HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, 0.49f);
-    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
-        100000, CompressionKind.ZLIB, 10000, 1000);
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .fileSystem(fs)
+            .batchSize(1000)
+            .inspector(inspector)
+            .stripeSize(100000)
+            .compress(CompressionKind.ZLIB)
+            .rowIndexStride(1000)
+            .bufferSize(10000));
     Random r1 = new Random(1);
     String[] words = new String[]{"It", "was", "the", "best", "of", "times,",
         "it", "was", "the", "worst", "of", "times,", "it", "was", "the", "age",
@@ -319,6 +332,7 @@ public class TestFileDump {
         .compress(CompressionKind.ZLIB)
         .bufferSize(10000)
         .rowIndexStride(1000)
+        .batchSize(1000)
         .bloomFilterColumns("S");
     Writer writer = OrcFile.createWriter(testFilePath, options);
     Random r1 = new Random(1);
@@ -368,7 +382,8 @@ public class TestFileDump {
         .bufferSize(10000)
         .rowIndexStride(1000)
         .bloomFilterColumns("l")
-        .bloomFilterFpp(0.01);
+        .bloomFilterFpp(0.01)
+        .batchSize(1000);
     Writer writer = OrcFile.createWriter(testFilePath, options);
     Random r1 = new Random(1);
     String[] words = new String[]{"It", "was", "the", "best", "of", "times,",

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
index 2fd13c7..f41a7ba 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
@@ -1215,7 +1215,7 @@ public class TestNewIntegerEncoding {
         .encodingStrategy(encodingStrategy));
 
     List<Timestamp> tslist = Lists.newArrayList();
-    tslist.add(Timestamp.valueOf("9999-01-01 00:00:00"));
+    tslist.add(Timestamp.valueOf("2099-01-01 00:00:00"));
     tslist.add(Timestamp.valueOf("2003-01-01 00:00:00"));
     tslist.add(Timestamp.valueOf("1999-01-01 00:00:00"));
     tslist.add(Timestamp.valueOf("1995-01-01 00:00:00"));

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
index ebe3096..a7e657c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
@@ -655,7 +655,8 @@ public class TestOrcFile {
         OrcFile.writerOptions(conf)
             .inspector(inspector)
             .stripeSize(100000)
-            .bufferSize(10000));
+            .bufferSize(10000)
+            .batchSize(1000));
     for (int i = 0; i < 11000; i++) {
       if (i >= 5000) {
         if (i >= 10000) {
@@ -1260,6 +1261,7 @@ public class TestOrcFile {
                                          .inspector(inspector)
                                          .stripeSize(1000)
                                          .compress(CompressionKind.NONE)
+                                         .batchSize(1000)
                                          .bufferSize(100)
                                          .blockPadding(false));
     OrcStruct row = new OrcStruct(3);
@@ -1835,8 +1837,9 @@ public class TestOrcFile {
     @Override
     public void addedRow(int count) throws IOException {
       rows += count;
-      if (rows % 100 == 0) {
+      if (rows >= 100) {
         callback.checkMemory(rate);
+        rows = 0;
       }
     }
   }
@@ -1858,6 +1861,7 @@ public class TestOrcFile {
                                          .bufferSize(100)
                                          .rowIndexStride(0)
                                          .memory(memory)
+                                         .batchSize(100)
                                          .version(OrcFile.Version.V_0_11));
     assertEquals(testFilePath, memory.path);
     for(int i=0; i < 2500; ++i) {
@@ -1894,6 +1898,7 @@ public class TestOrcFile {
                                          .bufferSize(100)
                                          .rowIndexStride(0)
                                          .memory(memory)
+                                         .batchSize(100)
                                          .version(OrcFile.Version.V_0_12));
     assertEquals(testFilePath, memory.path);
     for(int i=0; i < 2500; ++i) {

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 966621c..ab1d2aa 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -867,7 +867,7 @@ public class TestOrcRawRecordMerger {
     Writer writer = OrcFile.createWriter(new Path(root, "0000010_0"),
         OrcFile.writerOptions(conf).inspector(inspector).fileSystem(fs)
         .blockPadding(false).bufferSize(10000).compress(CompressionKind.NONE)
-        .stripeSize(1).memory(mgr).version(OrcFile.Version.V_0_11));
+        .stripeSize(1).memory(mgr).batchSize(2).version(OrcFile.Version.V_0_11));
     String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
        "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
     for(int i=0; i < values.length; ++i) {
@@ -878,7 +878,8 @@ public class TestOrcRawRecordMerger {
     // write a delta
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
         .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
-        .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
+        .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5)
+        .finalDestination(root);
     RecordUpdater ru = of.getRecordUpdater(root, options);
     values = new String[]{"0.0", null, null, "1.1", null, null, null,
         "ignore.7"};
@@ -972,7 +973,7 @@ public class TestOrcRawRecordMerger {
         .bucket(BUCKET).inspector(inspector).filesystem(fs);
     options.orcOptions(OrcFile.writerOptions(conf)
       .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
-      .memory(mgr));
+      .memory(mgr).batchSize(2));
     options.finalDestination(root);
     RecordUpdater ru = of.getRecordUpdater(root, options);
     String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
@@ -983,7 +984,8 @@ public class TestOrcRawRecordMerger {
     ru.close(false);
 
     // write a delta
-    options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1).recordIdColumn(5);
+    options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
+        .recordIdColumn(5);
     ru = of.getRecordUpdater(root, options);
     values = new String[]{"0.0", null, null, "1.1", null, null, null,
         "ignore.7"};
@@ -1020,7 +1022,7 @@ public class TestOrcRawRecordMerger {
 
     // loop through the 5 splits and read each
     for(int i=0; i < 4; ++i) {
-      System.out.println("starting split " + i);
+      System.out.println("starting split " + i + " = " + splits[i]);
       rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
       NullWritable key = rr.createKey();
       OrcStruct value = rr.createValue();

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
index a409be8..6803abd 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.orc.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump-bloomfilter.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter.out b/ql/src/test/resources/orc-file-dump-bloomfilter.out
index 7c3db78..1654e33 100644
--- a/ql/src/test/resources/orc-file-dump-bloomfilter.out
+++ b/ql/src/test/resources/orc-file-dump-bloomfilter.out
@@ -1,5 +1,5 @@
 Structure for TestFileDump.testDump.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 21000
 Compression: ZLIB
 Compression size: 4096

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump-bloomfilter2.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter2.out b/ql/src/test/resources/orc-file-dump-bloomfilter2.out
index a4f006b..1f6e046 100644
--- a/ql/src/test/resources/orc-file-dump-bloomfilter2.out
+++ b/ql/src/test/resources/orc-file-dump-bloomfilter2.out
@@ -1,5 +1,5 @@
 Structure for TestFileDump.testDump.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 21000
 Compression: ZLIB
 Compression size: 4096

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
index 8ad856d..64cf0e9 100644
--- a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
+++ b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
@@ -1,5 +1,5 @@
 Structure for TestFileDump.testDump.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 21000
 Compression: ZLIB
 Compression size: 4096

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump.json
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump.json b/ql/src/test/resources/orc-file-dump.json
index 25fd63b..0376d8a 100644
--- a/ql/src/test/resources/orc-file-dump.json
+++ b/ql/src/test/resources/orc-file-dump.json
@@ -1,7 +1,7 @@
 {
   "fileName": "TestFileDump.testDump.orc",
   "fileVersion": "0.12",
-  "writerVersion": "HIVE_4243",
+  "writerVersion": "HIVE_12055",
   "numberOfRows": 21000,
   "compression": "ZLIB",
   "compressionBufferSize": 4096,

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump.out b/ql/src/test/resources/orc-file-dump.out
index 5aaa0f3..57356d3 100644
--- a/ql/src/test/resources/orc-file-dump.out
+++ b/ql/src/test/resources/orc-file-dump.out
@@ -1,5 +1,5 @@
 Structure for TestFileDump.testDump.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 21000
 Compression: ZLIB
 Compression size: 4096

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-has-null.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-has-null.out b/ql/src/test/resources/orc-file-has-null.out
index 438c27c..0e915c6 100644
--- a/ql/src/test/resources/orc-file-has-null.out
+++ b/ql/src/test/resources/orc-file-has-null.out
@@ -1,5 +1,5 @@
 Structure for TestOrcFile.testHasNull.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 20000
 Compression: ZLIB
 Compression size: 4096

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/orc_file_dump.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/orc_file_dump.q.out b/ql/src/test/results/clientpositive/orc_file_dump.q.out
index 43c38a8..4c73bac 100644
--- a/ql/src/test/results/clientpositive/orc_file_dump.q.out
+++ b/ql/src/test/results/clientpositive/orc_file_dump.q.out
@@ -93,7 +93,7 @@ PREHOOK: Input: default@orc_ppd
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 1049
 Compression: ZLIB
 Compression size: 262144
@@ -213,7 +213,7 @@ PREHOOK: Input: default@orc_ppd
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 1049
 Compression: ZLIB
 Compression size: 262144
@@ -345,7 +345,7 @@ PREHOOK: Input: default@orc_ppd_part@ds=2015/hr=10
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 1049
 Compression: ZLIB
 Compression size: 262144

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/orc_merge10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/orc_merge10.q.out b/ql/src/test/results/clientpositive/orc_merge10.q.out
index a415776..776ca9a 100644
--- a/ql/src/test/results/clientpositive/orc_merge10.q.out
+++ b/ql/src/test/results/clientpositive/orc_merge10.q.out
@@ -517,7 +517,7 @@ PREHOOK: Input: default@orcfile_merge1@ds=1/part=0
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 242
 Compression: SNAPPY
 Compression size: 4096
@@ -579,7 +579,7 @@ PREHOOK: Input: default@orcfile_merge1c@ds=1/part=0
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 242
 Compression: SNAPPY
 Compression size: 4096

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/orc_merge11.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/orc_merge11.q.out b/ql/src/test/results/clientpositive/orc_merge11.q.out
index a7e3d47..65e3d8b 100644
--- a/ql/src/test/results/clientpositive/orc_merge11.q.out
+++ b/ql/src/test/results/clientpositive/orc_merge11.q.out
@@ -72,7 +72,7 @@ PREHOOK: Input: default@orcfile_merge1
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 50000
 Compression: ZLIB
 Compression size: 4096
@@ -133,7 +133,7 @@ ________________________________________________________________________________
 -- END ORC FILE DUMP --
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 50000
 Compression: ZLIB
 Compression size: 4096
@@ -217,7 +217,7 @@ PREHOOK: Input: default@orcfile_merge1
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 100000
 Compression: ZLIB
 Compression size: 4096

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/tez/orc_merge10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/orc_merge10.q.out b/ql/src/test/results/clientpositive/tez/orc_merge10.q.out
index d41671a..8b6a595 100644
--- a/ql/src/test/results/clientpositive/tez/orc_merge10.q.out
+++ b/ql/src/test/results/clientpositive/tez/orc_merge10.q.out
@@ -552,7 +552,7 @@ PREHOOK: Input: default@orcfile_merge1@ds=1/part=0
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 242
 Compression: SNAPPY
 Compression size: 4096
@@ -629,7 +629,7 @@ PREHOOK: Input: default@orcfile_merge1c@ds=1/part=0
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 242
 Compression: SNAPPY
 Compression size: 4096

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/tez/orc_merge11.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/orc_merge11.q.out b/ql/src/test/results/clientpositive/tez/orc_merge11.q.out
index a7e3d47..65e3d8b 100644
--- a/ql/src/test/results/clientpositive/tez/orc_merge11.q.out
+++ b/ql/src/test/results/clientpositive/tez/orc_merge11.q.out
@@ -72,7 +72,7 @@ PREHOOK: Input: default@orcfile_merge1
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 50000
 Compression: ZLIB
 Compression size: 4096
@@ -133,7 +133,7 @@ ________________________________________________________________________________
 -- END ORC FILE DUMP --
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 50000
 Compression: ZLIB
 Compression size: 4096
@@ -217,7 +217,7 @@ PREHOOK: Input: default@orcfile_merge1
 #### A masked pattern was here ####
 -- BEGIN ORC FILE DUMP --
 #### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
 Rows: 100000
 Compression: ZLIB
 Compression size: 4096

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java b/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
new file mode 100644
index 0000000..151f30d
--- /dev/null
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+/**
+ * Estimation of memory footprint of object
+ */
+public enum JavaDataModel {
+
+  JAVA32 {
+    @Override
+    public int object() {
+      return JAVA32_OBJECT;
+    }
+
+    @Override
+    public int array() {
+      return JAVA32_ARRAY;
+    }
+
+    @Override
+    public int ref() {
+      return JAVA32_REF;
+    }
+
+    @Override
+    public int hashMap(int entry) {
+      // base  = JAVA32_OBJECT + PRIMITIVES1 * 4 + JAVA32_FIELDREF * 3 + JAVA32_ARRAY;
+      // entry = JAVA32_OBJECT + JAVA32_FIELDREF + PRIMITIVES1
+      return hashMapBase() + hashMapEntry() * entry;
+    }
+
+    @Override
+    public int hashMapBase() {
+      return 64;
+    }
+
+    @Override
+    public int hashMapEntry() {
+      return 24;
+    }
+
+    @Override
+    public int hashSet(int entry) {
+      // hashMap += JAVA32_OBJECT
+      return hashSetBase() + hashSetEntry() * entry;
+    }
+
+    @Override
+    public int hashSetBase() {
+      return 80;
+    }
+
+    @Override
+    public int hashSetEntry() {
+      return 24;
+    }
+
+    @Override
+    public int linkedHashMap(int entry) {
+      // hashMap += JAVA32_FIELDREF + PRIMITIVES1
+      // hashMap.entry += JAVA32_FIELDREF * 2
+      return 72 + 32 * entry;
+    }
+
+    @Override
+    public int linkedList(int entry) {
+      // base  = JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_FIELDREF;
+      // entry = JAVA32_OBJECT + JAVA32_FIELDREF * 2
+      return linkedListBase() + linkedListEntry() * entry;
+     }
+
+     @Override
+     public int linkedListBase() {
+       return 28;
+     }
+
+     @Override
+     public int linkedListEntry() {
+       return 24;
+     }
+
+    @Override
+    public int arrayList() {
+      // JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_ARRAY;
+      return 44;
+    }
+
+    @Override
+    public int memoryAlign() {
+      return 8;
+    }
+  }, JAVA64 {
+    @Override
+    public int object() {
+      return JAVA64_OBJECT;
+    }
+
+    @Override
+    public int array() {
+      return JAVA64_ARRAY;
+    }
+
+    @Override
+    public int ref() {
+      return JAVA64_REF;
+    }
+
+    @Override
+    public int hashMap(int entry) {
+      // base  = JAVA64_OBJECT + PRIMITIVES1 * 4 + JAVA64_FIELDREF * 3 + JAVA64_ARRAY;
+      // entry = JAVA64_OBJECT + JAVA64_FIELDREF + PRIMITIVES1
+      return hashMapBase() + hashMapEntry() * entry;
+    }
+
+    @Override
+    public int hashMapBase() {
+      return 112;
+    }
+
+
+    @Override
+    public int hashMapEntry() {
+      return 44;
+    }
+
+    @Override
+    public int hashSet(int entry) {
+      // hashMap += JAVA64_OBJECT
+      return hashSetBase() + hashSetEntry() * entry;
+     }
+
+     @Override
+     public int hashSetBase() {
+       return 144;
+     }
+
+     @Override
+     public int hashSetEntry() {
+       return 44;
+     }
+
+    @Override
+    public int linkedHashMap(int entry) {
+      // hashMap += JAVA64_FIELDREF + PRIMITIVES1
+      // hashMap.entry += JAVA64_FIELDREF * 2
+      return 128 + 60 * entry;
+    }
+
+    @Override
+    public int linkedList(int entry) {
+      // base  = JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_FIELDREF;
+      // entry = JAVA64_OBJECT + JAVA64_FIELDREF * 2
+      return linkedListBase() + linkedListEntry() * entry;
+     }
+
+     @Override
+     public int linkedListBase() {
+       return 48;
+     }
+
+     @Override
+     public int linkedListEntry() {
+       return 48;
+     }
+
+    @Override
+    public int arrayList() {
+      // JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_ARRAY;
+      return 80;
+    }
+
+    @Override
+    public int memoryAlign() {
+      return 8;
+    }
+  };
+
+  public abstract int object();
+  public abstract int array();
+  public abstract int ref();
+  public abstract int hashMap(int entry);
+  public abstract int hashMapBase();
+  public abstract int hashMapEntry();
+  public abstract int hashSetBase();
+  public abstract int hashSetEntry();
+  public abstract int hashSet(int entry);
+  public abstract int linkedHashMap(int entry);
+  public abstract int linkedListBase();
+  public abstract int linkedListEntry();
+  public abstract int linkedList(int entry);
+  public abstract int arrayList();
+  public abstract int memoryAlign();
+
+  // ascii string
+  public int lengthFor(String string) {
+    return lengthForStringOfLength(string.length());
+  }
+
+  public int lengthForRandom() {
+    // boolean + double + AtomicLong
+    return object() + primitive1() + primitive2() + object() + primitive2();
+  }
+
+  public int primitive1() {
+    return PRIMITIVES1;
+  }
+  public int primitive2() {
+    return PRIMITIVES2;
+  }
+
+  public static int alignUp(int value, int align) {
+    return (value + align - 1) & ~(align - 1);
+  }
+
+  public static final int JAVA32_META = 12;
+  public static final int JAVA32_ARRAY_META = 16;
+  public static final int JAVA32_REF = 4;
+  public static final int JAVA32_OBJECT = 16;   // JAVA32_META + JAVA32_REF
+  public static final int JAVA32_ARRAY = 20;    // JAVA32_ARRAY_META + JAVA32_REF
+
+  public static final int JAVA64_META = 24;
+  public static final int JAVA64_ARRAY_META = 32;
+  public static final int JAVA64_REF = 8;
+  public static final int JAVA64_OBJECT = 32;   // JAVA64_META + JAVA64_REF
+  public static final int JAVA64_ARRAY = 40;    // JAVA64_ARRAY_META + JAVA64_REF
+
+  public static final int PRIMITIVES1 = 4;      // void, boolean, byte, short, int, float
+  public static final int PRIMITIVES2 = 8;      // long, double
+
+  public static final int PRIMITIVE_BYTE = 1;    // byte
+
+  private static JavaDataModel current;
+
+  public static JavaDataModel get() {
+    if (current != null) {
+      return current;
+    }
+    try {
+      String props = System.getProperty("sun.arch.data.model");
+      if ("32".equals(props)) {
+        return current = JAVA32;
+      }
+    } catch (Exception e) {
+      // ignore
+    }
+    // TODO: separate model is needed for compressedOops, which can be guessed from memory size.
+    return current = JAVA64;
+  }
+
+  public static int round(int size) {
+    JavaDataModel model = get();
+    if (model == JAVA32 || size % 8 == 0) {
+      return size;
+    }
+    return ((size + 8) >> 3) << 3;
+  }
+
+  private int lengthForPrimitiveArrayOfSize(int primitiveSize, int length) {
+    return alignUp(array() + primitiveSize*length, memoryAlign());
+  }
+
+  public int lengthForByteArrayOfSize(int length) {
+    return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length);
+  }
+  public int lengthForObjectArrayOfSize(int length) {
+    return lengthForPrimitiveArrayOfSize(ref(), length);
+  }
+  public int lengthForLongArrayOfSize(int length) {
+    return lengthForPrimitiveArrayOfSize(primitive2(), length);
+  }
+  public int lengthForDoubleArrayOfSize(int length) {
+    return lengthForPrimitiveArrayOfSize(primitive2(), length);
+  }
+  public int lengthForIntArrayOfSize(int length) {
+    return lengthForPrimitiveArrayOfSize(primitive1(), length);
+  }
+  public int lengthForBooleanArrayOfSize(int length) {
+    return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length);
+  }
+  public int lengthForTimestampArrayOfSize(int length) {
+    return lengthForPrimitiveArrayOfSize(lengthOfTimestamp(), length);
+  }
+  public int lengthForDateArrayOfSize(int length) {
+    return lengthForPrimitiveArrayOfSize(lengthOfDate(), length);
+  }
+  public int lengthForDecimalArrayOfSize(int length) {
+    return lengthForPrimitiveArrayOfSize(lengthOfDecimal(), length);
+  }
+
+  public int lengthOfDecimal() {
+    // object overhead + 8 bytes for intCompact + 4 bytes for precision
+    // + 4 bytes for scale + size of BigInteger
+    return object() + 2 * primitive2() + lengthOfBigInteger();
+  }
+
+  private int lengthOfBigInteger() {
+    // object overhead + 4 bytes for bitCount + 4 bytes for bitLength
+    // + 4 bytes for firstNonzeroByteNum + 4 bytes for firstNonzeroIntNum +
+    // + 4 bytes for lowestSetBit + 5 bytes for size of magnitude (since max precision
+    // is only 38 for HiveDecimal) + 7 bytes of padding (since java memory allocations
+    // are 8 byte aligned)
+    return object() + 4 * primitive2();
+  }
+
+  public int lengthOfTimestamp() {
+    // object overhead + 4 bytes for int (nanos) + 4 bytes of padding
+    return object() + primitive2();
+  }
+
+  public int lengthOfDate() {
+    // object overhead + 8 bytes for long (fastTime) + 16 bytes for cdate
+    return object() + 3 * primitive2();
+  }
+
+  public int lengthForStringOfLength(int strLen) {
+    return object() + primitive1() * 3 + array() + strLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
new file mode 100644
index 0000000..bb0b8f2
--- /dev/null
+++ b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
+ * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of
+ * bloom filter false positive (element not present in bloom filter but test() says true) are
+ * possible but false negatives are not possible (if element is present then test() will never
+ * say false). The false positive probability is configurable (default: 5%) depending on which
+ * storage requirement may increase or decrease. Lower the false positive probability greater
+ * is the space requirement.
+ * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
+ * During the creation of bloom filter expected number of entries must be specified. If the number
+ * of insertions exceed the specified initial number of entries then false positive probability will
+ * increase accordingly.
+ *
+ * Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash
+ * algorithm. Although Murmur2 is slightly faster than Murmur3 in Java, it suffers from hash
+ * collisions for specific sequence of repeating bytes. Check the following link for more info
+ * https://code.google.com/p/smhasher/wiki/MurmurHash2Flaw
+ */
+public class BloomFilter {
+  public static final double DEFAULT_FPP = 0.05;
+  protected BitSet bitSet;
+  protected int numBits;
+  protected int numHashFunctions;
+
+  public BloomFilter() {
+  }
+
+  public BloomFilter(long expectedEntries) {
+    this(expectedEntries, DEFAULT_FPP);
+  }
+
+  public BloomFilter(long expectedEntries, double fpp) {
+    checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
+    checkArgument(fpp > 0.0 && fpp < 1.0, "False positive probability should be > 0.0 & < 1.0");
+    int nb = optimalNumOfBits(expectedEntries, fpp);
+    // make 'm' multiple of 64
+    this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
+    this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, numBits);
+    this.bitSet = new BitSet(numBits);
+  }
+
+  /**
+   * A constructor to support rebuilding the BloomFilter from a serialized representation.
+   * @param bits
+   * @param numBits
+   * @param numFuncs
+   */
+  public BloomFilter(List<Long> bits, int numBits, int numFuncs) {
+    super();
+    long[] copied = new long[bits.size()];
+    for (int i = 0; i < bits.size(); i++) copied[i] = bits.get(i);
+    bitSet = new BitSet(copied);
+    this.numBits = numBits;
+    numHashFunctions = numFuncs;
+  }
+
+  static int optimalNumOfHashFunctions(long n, long m) {
+    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
+  }
+
+  static int optimalNumOfBits(long n, double p) {
+    return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
+  }
+
+  public void add(byte[] val) {
+    if (val == null) {
+      addBytes(val, -1, -1);
+    } else {
+      addBytes(val, 0, val.length);
+    }
+  }
+
+  public void addBytes(byte[] val, int offset, int length) {
+    // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter"
+    // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively
+    // implement a Bloom filter without any loss in the asymptotic false positive probability'
+
+    // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned
+    // in the above paper
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+        Murmur3.hash64(val, offset, length);
+    addHash(hash64);
+  }
+
+  private void addHash(long hash64) {
+    int hash1 = (int) hash64;
+    int hash2 = (int) (hash64 >>> 32);
+
+    for (int i = 1; i <= numHashFunctions; i++) {
+      int combinedHash = hash1 + (i * hash2);
+      // hashcode should be positive, flip all the bits if it's negative
+      if (combinedHash < 0) {
+        combinedHash = ~combinedHash;
+      }
+      int pos = combinedHash % numBits;
+      bitSet.set(pos);
+    }
+  }
+
+  public void addString(String val) {
+    if (val == null) {
+      add(null);
+    } else {
+      add(val.getBytes());
+    }
+  }
+
+  public void addLong(long val) {
+    addHash(getLongHash(val));
+  }
+
+  public void addDouble(double val) {
+    addLong(Double.doubleToLongBits(val));
+  }
+
+  public boolean test(byte[] val) {
+    if (val == null) {
+      return testBytes(val, -1, -1);
+    }
+    return testBytes(val, 0, val.length);
+  }
+
+  public boolean testBytes(byte[] val, int offset, int length) {
+    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+        Murmur3.hash64(val, offset, length);
+    return testHash(hash64);
+  }
+
+  private boolean testHash(long hash64) {
+    int hash1 = (int) hash64;
+    int hash2 = (int) (hash64 >>> 32);
+
+    for (int i = 1; i <= numHashFunctions; i++) {
+      int combinedHash = hash1 + (i * hash2);
+      // hashcode should be positive, flip all the bits if it's negative
+      if (combinedHash < 0) {
+        combinedHash = ~combinedHash;
+      }
+      int pos = combinedHash % numBits;
+      if (!bitSet.get(pos)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public boolean testString(String val) {
+    if (val == null) {
+      return test(null);
+    } else {
+      return test(val.getBytes());
+    }
+  }
+
+  public boolean testLong(long val) {
+    return testHash(getLongHash(val));
+  }
+
+  // Thomas Wang's integer hash function
+  // http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
+  private long getLongHash(long key) {
+    key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+    key = key ^ (key >> 24);
+    key = (key + (key << 3)) + (key << 8); // key * 265
+    key = key ^ (key >> 14);
+    key = (key + (key << 2)) + (key << 4); // key * 21
+    key = key ^ (key >> 28);
+    key = key + (key << 31);
+    return key;
+  }
+
+  public boolean testDouble(double val) {
+    return testLong(Double.doubleToLongBits(val));
+  }
+
+  public long sizeInBytes() {
+    return getBitSize() / 8;
+  }
+
+  public int getBitSize() {
+    return bitSet.getData().length * Long.SIZE;
+  }
+
+  public int getNumHashFunctions() {
+    return numHashFunctions;
+  }
+
+  public long[] getBitSet() {
+    return bitSet.getData();
+  }
+
+  @Override
+  public String toString() {
+    return "m: " + numBits + " k: " + numHashFunctions;
+  }
+
+  /**
+   * Merge the specified bloom filter with current bloom filter.
+   *
+   * @param that - bloom filter to merge
+   */
+  public void merge(BloomFilter that) {
+    if (this != that && this.numBits == that.numBits && this.numHashFunctions == that.numHashFunctions) {
+      this.bitSet.putAll(that.bitSet);
+    } else {
+      throw new IllegalArgumentException("BloomFilters are not compatible for merging." +
+          " this - " + this.toString() + " that - " + that.toString());
+    }
+  }
+
+  public void reset() {
+    this.bitSet.clear();
+  }
+
+  /**
+   * Bare metal bit set implementation. For performance reasons, this implementation does not check
+   * for index bounds nor expand the bit set size if the specified index is greater than the size.
+   */
+  public class BitSet {
+    private final long[] data;
+
+    public BitSet(long bits) {
+      this(new long[(int) Math.ceil((double) bits / (double) Long.SIZE)]);
+    }
+
+    /**
+     * Deserialize long array as bit set.
+     *
+     * @param data - bit array
+     */
+    public BitSet(long[] data) {
+      assert data.length > 0 : "data length is zero!";
+      this.data = data;
+    }
+
+    /**
+     * Sets the bit at specified index.
+     *
+     * @param index - position
+     */
+    public void set(int index) {
+      data[index >>> 6] |= (1L << index);
+    }
+
+    /**
+     * Returns true if the bit is set in the specified index.
+     *
+     * @param index - position
+     * @return - value at the bit position
+     */
+    public boolean get(int index) {
+      return (data[index >>> 6] & (1L << index)) != 0;
+    }
+
+    /**
+     * Number of bits
+     */
+    public long bitSize() {
+      return (long) data.length * Long.SIZE;
+    }
+
+    public long[] getData() {
+      return data;
+    }
+
+    /**
+     * Combines the two BitArrays using bitwise OR.
+     */
+    public void putAll(BitSet array) {
+      assert data.length == array.data.length :
+          "BitArrays must be of equal length (" + data.length + "!= " + array.data.length + ")";
+      for (int i = 0; i < data.length; i++) {
+        data[i] |= array.data[i];
+      }
+    }
+
+    /**
+     * Clear the bit set.
+     */
+    public void clear() {
+      Arrays.fill(data, 0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/storage-api/src/java/org/apache/hive/common/util/Murmur3.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hive/common/util/Murmur3.java b/storage-api/src/java/org/apache/hive/common/util/Murmur3.java
new file mode 100644
index 0000000..88c3514
--- /dev/null
+++ b/storage-api/src/java/org/apache/hive/common/util/Murmur3.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+/**
+ * Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms.
+ *
+ * Murmur3 32 and 128 bit variants.
+ * 32-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94
+ * 128-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#255
+ *
+ * This is a public domain code with no copyrights.
+ * From homepage of MurmurHash (https://code.google.com/p/smhasher/),
+ * "All MurmurHash versions are public domain software, and the author disclaims all copyright
+ * to their code."
+ */
+public class Murmur3 {
+  // from 64-bit linear congruential generator
+  public static final long NULL_HASHCODE = 2862933555777941757L;
+
+  // Constants for 32 bit variant
+  private static final int C1_32 = 0xcc9e2d51;
+  private static final int C2_32 = 0x1b873593;
+  private static final int R1_32 = 15;
+  private static final int R2_32 = 13;
+  private static final int M_32 = 5;
+  private static final int N_32 = 0xe6546b64;
+
+  // Constants for 128 bit variant
+  private static final long C1 = 0x87c37b91114253d5L;
+  private static final long C2 = 0x4cf5ad432745937fL;
+  private static final int R1 = 31;
+  private static final int R2 = 27;
+  private static final int R3 = 33;
+  private static final int M = 5;
+  private static final int N1 = 0x52dce729;
+  private static final int N2 = 0x38495ab5;
+
+  private static final int DEFAULT_SEED = 104729;
+
+  /**
+   * Murmur3 32-bit variant.
+   *
+   * @param data - input byte array
+   * @return - hashcode
+   */
+  public static int hash32(byte[] data) {
+    return hash32(data, data.length, DEFAULT_SEED);
+  }
+
+  /**
+   * Murmur3 32-bit variant.
+   *
+   * @param data   - input byte array
+   * @param length - length of array
+   * @param seed   - seed. (default 0)
+   * @return - hashcode
+   */
+  public static int hash32(byte[] data, int length, int seed) {
+    int hash = seed;
+    final int nblocks = length >> 2;
+
+    // body
+    for (int i = 0; i < nblocks; i++) {
+      int i_4 = i << 2;
+      int k = (data[i_4] & 0xff)
+          | ((data[i_4 + 1] & 0xff) << 8)
+          | ((data[i_4 + 2] & 0xff) << 16)
+          | ((data[i_4 + 3] & 0xff) << 24);
+
+      // mix functions
+      k *= C1_32;
+      k = Integer.rotateLeft(k, R1_32);
+      k *= C2_32;
+      hash ^= k;
+      hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+    }
+
+    // tail
+    int idx = nblocks << 2;
+    int k1 = 0;
+    switch (length - idx) {
+      case 3:
+        k1 ^= data[idx + 2] << 16;
+      case 2:
+        k1 ^= data[idx + 1] << 8;
+      case 1:
+        k1 ^= data[idx];
+
+        // mix functions
+        k1 *= C1_32;
+        k1 = Integer.rotateLeft(k1, R1_32);
+        k1 *= C2_32;
+        hash ^= k1;
+    }
+
+    // finalization
+    hash ^= length;
+    hash ^= (hash >>> 16);
+    hash *= 0x85ebca6b;
+    hash ^= (hash >>> 13);
+    hash *= 0xc2b2ae35;
+    hash ^= (hash >>> 16);
+
+    return hash;
+  }
+
+  /**
+   * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
+   *
+   * @param data - input byte array
+   * @return - hashcode
+   */
+  public static long hash64(byte[] data) {
+    return hash64(data, 0, data.length, DEFAULT_SEED);
+  }
+
+  public static long hash64(byte[] data, int offset, int length) {
+    return hash64(data, offset, length, DEFAULT_SEED);
+  }
+
+  /**
+   * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
+   *
+   * @param data   - input byte array
+   * @param length - length of array
+   * @param seed   - seed. (default is 0)
+   * @return - hashcode
+   */
+  public static long hash64(byte[] data, int offset, int length, int seed) {
+    long hash = seed;
+    final int nblocks = length >> 3;
+
+    // body
+    for (int i = 0; i < nblocks; i++) {
+      final int i8 = i << 3;
+      long k = ((long) data[offset + i8] & 0xff)
+          | (((long) data[offset + i8 + 1] & 0xff) << 8)
+          | (((long) data[offset + i8 + 2] & 0xff) << 16)
+          | (((long) data[offset + i8 + 3] & 0xff) << 24)
+          | (((long) data[offset + i8 + 4] & 0xff) << 32)
+          | (((long) data[offset + i8 + 5] & 0xff) << 40)
+          | (((long) data[offset + i8 + 6] & 0xff) << 48)
+          | (((long) data[offset + i8 + 7] & 0xff) << 56);
+
+      // mix functions
+      k *= C1;
+      k = Long.rotateLeft(k, R1);
+      k *= C2;
+      hash ^= k;
+      hash = Long.rotateLeft(hash, R2) * M + N1;
+    }
+
+    // tail
+    long k1 = 0;
+    int tailStart = nblocks << 3;
+    switch (length - tailStart) {
+      case 7:
+        k1 ^= ((long) data[offset + tailStart + 6] & 0xff) << 48;
+      case 6:
+        k1 ^= ((long) data[offset + tailStart + 5] & 0xff) << 40;
+      case 5:
+        k1 ^= ((long) data[offset + tailStart + 4] & 0xff) << 32;
+      case 4:
+        k1 ^= ((long) data[offset + tailStart + 3] & 0xff) << 24;
+      case 3:
+        k1 ^= ((long) data[offset + tailStart + 2] & 0xff) << 16;
+      case 2:
+        k1 ^= ((long) data[offset + tailStart + 1] & 0xff) << 8;
+      case 1:
+        k1 ^= ((long) data[offset + tailStart] & 0xff);
+        k1 *= C1;
+        k1 = Long.rotateLeft(k1, R1);
+        k1 *= C2;
+        hash ^= k1;
+    }
+
+    // finalization
+    hash ^= length;
+    hash = fmix64(hash);
+
+    return hash;
+  }
+
+  /**
+   * Murmur3 128-bit variant.
+   *
+   * @param data - input byte array
+   * @return - hashcode (2 longs)
+   */
+  public static long[] hash128(byte[] data) {
+    return hash128(data, 0, data.length, DEFAULT_SEED);
+  }
+
+  /**
+   * Murmur3 128-bit variant.
+   *
+   * @param data   - input byte array
+   * @param offset - the first element of array
+   * @param length - length of array
+   * @param seed   - seed. (default is 0)
+   * @return - hashcode (2 longs)
+   */
+  public static long[] hash128(byte[] data, int offset, int length, int seed) {
+    long h1 = seed;
+    long h2 = seed;
+    final int nblocks = length >> 4;
+
+    // body
+    for (int i = 0; i < nblocks; i++) {
+      final int i16 = i << 4;
+      long k1 = ((long) data[offset + i16] & 0xff)
+          | (((long) data[offset + i16 + 1] & 0xff) << 8)
+          | (((long) data[offset + i16 + 2] & 0xff) << 16)
+          | (((long) data[offset + i16 + 3] & 0xff) << 24)
+          | (((long) data[offset + i16 + 4] & 0xff) << 32)
+          | (((long) data[offset + i16 + 5] & 0xff) << 40)
+          | (((long) data[offset + i16 + 6] & 0xff) << 48)
+          | (((long) data[offset + i16 + 7] & 0xff) << 56);
+
+      long k2 = ((long) data[offset + i16 + 8] & 0xff)
+          | (((long) data[offset + i16 + 9] & 0xff) << 8)
+          | (((long) data[offset + i16 + 10] & 0xff) << 16)
+          | (((long) data[offset + i16 + 11] & 0xff) << 24)
+          | (((long) data[offset + i16 + 12] & 0xff) << 32)
+          | (((long) data[offset + i16 + 13] & 0xff) << 40)
+          | (((long) data[offset + i16 + 14] & 0xff) << 48)
+          | (((long) data[offset + i16 + 15] & 0xff) << 56);
+
+      // mix functions for k1
+      k1 *= C1;
+      k1 = Long.rotateLeft(k1, R1);
+      k1 *= C2;
+      h1 ^= k1;
+      h1 = Long.rotateLeft(h1, R2);
+      h1 += h2;
+      h1 = h1 * M + N1;
+
+      // mix functions for k2
+      k2 *= C2;
+      k2 = Long.rotateLeft(k2, R3);
+      k2 *= C1;
+      h2 ^= k2;
+      h2 = Long.rotateLeft(h2, R1);
+      h2 += h1;
+      h2 = h2 * M + N2;
+    }
+
+    // tail
+    long k1 = 0;
+    long k2 = 0;
+    int tailStart = nblocks << 4;
+    switch (length - tailStart) {
+      case 15:
+        k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48;
+      case 14:
+        k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40;
+      case 13:
+        k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32;
+      case 12:
+        k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24;
+      case 11:
+        k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16;
+      case 10:
+        k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8;
+      case 9:
+        k2 ^= (long) (data[offset + tailStart + 8] & 0xff);
+        k2 *= C2;
+        k2 = Long.rotateLeft(k2, R3);
+        k2 *= C1;
+        h2 ^= k2;
+
+      case 8:
+        k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56;
+      case 7:
+        k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48;
+      case 6:
+        k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40;
+      case 5:
+        k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32;
+      case 4:
+        k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24;
+      case 3:
+        k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16;
+      case 2:
+        k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8;
+      case 1:
+        k1 ^= (long) (data[offset + tailStart] & 0xff);
+        k1 *= C1;
+        k1 = Long.rotateLeft(k1, R1);
+        k1 *= C2;
+        h1 ^= k1;
+    }
+
+    // finalization
+    h1 ^= length;
+    h2 ^= length;
+
+    h1 += h2;
+    h2 += h1;
+
+    h1 = fmix64(h1);
+    h2 = fmix64(h2);
+
+    h1 += h2;
+    h2 += h1;
+
+    return new long[]{h1, h2};
+  }
+
+  private static long fmix64(long h) {
+    h ^= (h >>> 33);
+    h *= 0xff51afd7ed558ccdL;
+    h ^= (h >>> 33);
+    h *= 0xc4ceb9fe1a85ec53L;
+    h ^= (h >>> 33);
+    return h;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/storage-api/src/test/org/apache/hive/common/util/TestMurmur3.java
----------------------------------------------------------------------
diff --git a/storage-api/src/test/org/apache/hive/common/util/TestMurmur3.java b/storage-api/src/test/org/apache/hive/common/util/TestMurmur3.java
new file mode 100644
index 0000000..5facc7c
--- /dev/null
+++ b/storage-api/src/test/org/apache/hive/common/util/TestMurmur3.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * Tests for Murmur3 variants.
+ */
+public class TestMurmur3 {
+
+  @Test
+  public void testHashCodesM3_32_string() {
+    String key = "test";
+    int seed = 123;
+    HashFunction hf = Hashing.murmur3_32(seed);
+    int hc1 = hf.hashBytes(key.getBytes()).asInt();
+    int hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed);
+    assertEquals(hc1, hc2);
+
+    key = "testkey";
+    hc1 = hf.hashBytes(key.getBytes()).asInt();
+    hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed);
+    assertEquals(hc1, hc2);
+  }
+
+  @Test
+  public void testHashCodesM3_32_ints() {
+    int seed = 123;
+    Random rand = new Random(seed);
+    HashFunction hf = Hashing.murmur3_32(seed);
+    for (int i = 0; i < 1000; i++) {
+      int val = rand.nextInt();
+      byte[] data = ByteBuffer.allocate(4).putInt(val).array();
+      int hc1 = hf.hashBytes(data).asInt();
+      int hc2 = Murmur3.hash32(data, data.length, seed);
+      assertEquals(hc1, hc2);
+    }
+  }
+
+  @Test
+  public void testHashCodesM3_32_longs() {
+    int seed = 123;
+    Random rand = new Random(seed);
+    HashFunction hf = Hashing.murmur3_32(seed);
+    for (int i = 0; i < 1000; i++) {
+      long val = rand.nextLong();
+      byte[] data = ByteBuffer.allocate(8).putLong(val).array();
+      int hc1 = hf.hashBytes(data).asInt();
+      int hc2 = Murmur3.hash32(data, data.length, seed);
+      assertEquals(hc1, hc2);
+    }
+  }
+
+  @Test
+  public void testHashCodesM3_32_double() {
+    int seed = 123;
+    Random rand = new Random(seed);
+    HashFunction hf = Hashing.murmur3_32(seed);
+    for (int i = 0; i < 1000; i++) {
+      double val = rand.nextDouble();
+      byte[] data = ByteBuffer.allocate(8).putDouble(val).array();
+      int hc1 = hf.hashBytes(data).asInt();
+      int hc2 = Murmur3.hash32(data, data.length, seed);
+      assertEquals(hc1, hc2);
+    }
+  }
+
+  @Test
+  public void testHashCodesM3_128_string() {
+    String key = "test";
+    int seed = 123;
+    HashFunction hf = Hashing.murmur3_128(seed);
+    // guava stores the hashcodes in little endian order
+    ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+    buf.put(hf.hashBytes(key.getBytes()).asBytes());
+    buf.flip();
+    long gl1 = buf.getLong();
+    long gl2 = buf.getLong(8);
+    long[] hc = Murmur3.hash128(key.getBytes(), 0, key.getBytes().length, seed);
+    long m1 = hc[0];
+    long m2 = hc[1];
+    assertEquals(gl1, m1);
+    assertEquals(gl2, m2);
+
+    key = "testkey128_testkey128";
+    buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+    buf.put(hf.hashBytes(key.getBytes()).asBytes());
+    buf.flip();
+    gl1 = buf.getLong();
+    gl2 = buf.getLong(8);
+    byte[] keyBytes = key.getBytes();
+    hc = Murmur3.hash128(keyBytes, 0, keyBytes.length, seed);
+    m1 = hc[0];
+    m2 = hc[1];
+    assertEquals(gl1, m1);
+    assertEquals(gl2, m2);
+
+    byte[] offsetKeyBytes = new byte[keyBytes.length + 35];
+    Arrays.fill(offsetKeyBytes, (byte) -1);
+    System.arraycopy(keyBytes, 0, offsetKeyBytes, 35, keyBytes.length);
+    hc = Murmur3.hash128(offsetKeyBytes, 35, keyBytes.length, seed);
+    assertEquals(gl1, hc[0]);
+    assertEquals(gl2, hc[1]);
+  }
+
+  @Test
+  public void testHashCodeM3_64() {
+    byte[] origin = ("It was the best of times, it was the worst of times," +
+        " it was the age of wisdom, it was the age of foolishness," +
+        " it was the epoch of belief, it was the epoch of incredulity," +
+        " it was the season of Light, it was the season of Darkness," +
+        " it was the spring of hope, it was the winter of despair," +
+        " we had everything before us, we had nothing before us," +
+        " we were all going direct to Heaven," +
+        " we were all going direct the other way.").getBytes();
+    long hash = Murmur3.hash64(origin, 0, origin.length);
+    assertEquals(305830725663368540L, hash);
+
+    byte[] originOffset = new byte[origin.length + 150];
+    Arrays.fill(originOffset, (byte) 123);
+    System.arraycopy(origin, 0, originOffset, 150, origin.length);
+    hash = Murmur3.hash64(originOffset, 150, origin.length);
+    assertEquals(305830725663368540L, hash);
+  }
+
+  @Test
+  public void testHashCodesM3_128_ints() {
+    int seed = 123;
+    Random rand = new Random(seed);
+    HashFunction hf = Hashing.murmur3_128(seed);
+    for (int i = 0; i < 1000; i++) {
+      int val = rand.nextInt();
+      byte[] data = ByteBuffer.allocate(4).putInt(val).array();
+      // guava stores the hashcodes in little endian order
+      ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+      buf.put(hf.hashBytes(data).asBytes());
+      buf.flip();
+      long gl1 = buf.getLong();
+      long gl2 = buf.getLong(8);
+      long[] hc = Murmur3.hash128(data, 0, data.length, seed);
+      long m1 = hc[0];
+      long m2 = hc[1];
+      assertEquals(gl1, m1);
+      assertEquals(gl2, m2);
+
+      byte[] offsetData = new byte[data.length + 50];
+      System.arraycopy(data, 0, offsetData, 50, data.length);
+      hc = Murmur3.hash128(offsetData, 50, data.length, seed);
+      assertEquals(gl1, hc[0]);
+      assertEquals(gl2, hc[1]);
+    }
+  }
+
+  @Test
+  public void testHashCodesM3_128_longs() {
+    int seed = 123;
+    Random rand = new Random(seed);
+    HashFunction hf = Hashing.murmur3_128(seed);
+    for (int i = 0; i < 1000; i++) {
+      long val = rand.nextLong();
+      byte[] data = ByteBuffer.allocate(8).putLong(val).array();
+      // guava stores the hashcodes in little endian order
+      ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+      buf.put(hf.hashBytes(data).asBytes());
+      buf.flip();
+      long gl1 = buf.getLong();
+      long gl2 = buf.getLong(8);
+      long[] hc = Murmur3.hash128(data, 0, data.length, seed);
+      long m1 = hc[0];
+      long m2 = hc[1];
+      assertEquals(gl1, m1);
+      assertEquals(gl2, m2);
+    }
+  }
+
+  @Test
+  public void testHashCodesM3_128_double() {
+    int seed = 123;
+    Random rand = new Random(seed);
+    HashFunction hf = Hashing.murmur3_128(seed);
+    for (int i = 0; i < 1000; i++) {
+      double val = rand.nextDouble();
+      byte[] data = ByteBuffer.allocate(8).putDouble(val).array();
+      // guava stores the hashcodes in little endian order
+      ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+      buf.put(hf.hashBytes(data).asBytes());
+      buf.flip();
+      long gl1 = buf.getLong();
+      long gl2 = buf.getLong(8);
+      long[] hc = Murmur3.hash128(data, 0, data.length, seed);
+      long m1 = hc[0];
+      long m2 = hc[1];
+      assertEquals(gl1, m1);
+      assertEquals(gl2, m2);
+    }
+  }
+}


[5/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.

Posted by om...@apache.org.
HIVE-12055. Move WriterImpl over to orc module.


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/06e39ebe
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/06e39ebe
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/06e39ebe

Branch: refs/heads/master
Commit: 06e39ebe07d7854df669529e73f1c461f3c7d9d4
Parents: 49dc645
Author: Owen O'Malley <om...@apache.org>
Authored: Mon Dec 14 13:35:39 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Dec 14 13:35:39 2015 -0800

----------------------------------------------------------------------
 .../apache/hive/common/util/BloomFilter.java    |  309 --
 .../org/apache/hive/common/util/Murmur3.java    |  335 --
 .../apache/hive/common/util/TestMurmur3.java    |  224 --
 orc/src/java/org/apache/orc/BloomFilterIO.java  |   43 +
 orc/src/java/org/apache/orc/OrcFile.java        |   22 +
 .../java/org/apache/orc/TypeDescription.java    |   26 +-
 .../java/org/apache/orc/impl/WriterImpl.java    | 2912 +++++++++++++++
 .../hive/ql/io/filters/BloomFilterIO.java       |   44 -
 .../apache/hadoop/hive/ql/io/orc/FileDump.java  |    2 +-
 .../hadoop/hive/ql/io/orc/JsonFileDump.java     |    2 +-
 .../apache/hadoop/hive/ql/io/orc/OrcFile.java   |   30 +-
 .../hadoop/hive/ql/io/orc/ReaderImpl.java       |   15 +
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java |    2 +-
 .../apache/hadoop/hive/ql/io/orc/Writer.java    |    2 +-
 .../hadoop/hive/ql/io/orc/WriterImpl.java       | 3394 ++----------------
 .../hadoop/hive/ql/util/JavaDataModel.java      |  335 --
 .../hadoop/hive/ql/io/orc/TestFileDump.java     |   25 +-
 .../hive/ql/io/orc/TestNewIntegerEncoding.java  |    2 +-
 .../hadoop/hive/ql/io/orc/TestOrcFile.java      |    9 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |   12 +-
 .../hive/ql/io/orc/TestRecordReaderImpl.java    |    2 +-
 .../resources/orc-file-dump-bloomfilter.out     |    2 +-
 .../resources/orc-file-dump-bloomfilter2.out    |    2 +-
 .../orc-file-dump-dictionary-threshold.out      |    2 +-
 ql/src/test/resources/orc-file-dump.json        |    2 +-
 ql/src/test/resources/orc-file-dump.out         |    2 +-
 ql/src/test/resources/orc-file-has-null.out     |    2 +-
 .../results/clientpositive/orc_file_dump.q.out  |    6 +-
 .../results/clientpositive/orc_merge10.q.out    |    4 +-
 .../results/clientpositive/orc_merge11.q.out    |    6 +-
 .../clientpositive/tez/orc_merge10.q.out        |    4 +-
 .../clientpositive/tez/orc_merge11.q.out        |    6 +-
 .../hadoop/hive/ql/util/JavaDataModel.java      |  335 ++
 .../apache/hive/common/util/BloomFilter.java    |  309 ++
 .../org/apache/hive/common/util/Murmur3.java    |  335 ++
 .../apache/hive/common/util/TestMurmur3.java    |  224 ++
 36 files changed, 4489 insertions(+), 4499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/common/src/java/org/apache/hive/common/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/BloomFilter.java b/common/src/java/org/apache/hive/common/util/BloomFilter.java
deleted file mode 100644
index bb0b8f2..0000000
--- a/common/src/java/org/apache/hive/common/util/BloomFilter.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.common.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
- * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of
- * bloom filter false positive (element not present in bloom filter but test() says true) are
- * possible but false negatives are not possible (if element is present then test() will never
- * say false). The false positive probability is configurable (default: 5%) depending on which
- * storage requirement may increase or decrease. Lower the false positive probability greater
- * is the space requirement.
- * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
- * During the creation of bloom filter expected number of entries must be specified. If the number
- * of insertions exceed the specified initial number of entries then false positive probability will
- * increase accordingly.
- *
- * Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash
- * algorithm. Although Murmur2 is slightly faster than Murmur3 in Java, it suffers from hash
- * collisions for specific sequence of repeating bytes. Check the following link for more info
- * https://code.google.com/p/smhasher/wiki/MurmurHash2Flaw
- */
-public class BloomFilter {
-  public static final double DEFAULT_FPP = 0.05;
-  protected BitSet bitSet;
-  protected int numBits;
-  protected int numHashFunctions;
-
-  public BloomFilter() {
-  }
-
-  public BloomFilter(long expectedEntries) {
-    this(expectedEntries, DEFAULT_FPP);
-  }
-
-  public BloomFilter(long expectedEntries, double fpp) {
-    checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
-    checkArgument(fpp > 0.0 && fpp < 1.0, "False positive probability should be > 0.0 & < 1.0");
-    int nb = optimalNumOfBits(expectedEntries, fpp);
-    // make 'm' multiple of 64
-    this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
-    this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, numBits);
-    this.bitSet = new BitSet(numBits);
-  }
-
-  /**
-   * A constructor to support rebuilding the BloomFilter from a serialized representation.
-   * @param bits
-   * @param numBits
-   * @param numFuncs
-   */
-  public BloomFilter(List<Long> bits, int numBits, int numFuncs) {
-    super();
-    long[] copied = new long[bits.size()];
-    for (int i = 0; i < bits.size(); i++) copied[i] = bits.get(i);
-    bitSet = new BitSet(copied);
-    this.numBits = numBits;
-    numHashFunctions = numFuncs;
-  }
-
-  static int optimalNumOfHashFunctions(long n, long m) {
-    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
-  }
-
-  static int optimalNumOfBits(long n, double p) {
-    return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
-  }
-
-  public void add(byte[] val) {
-    if (val == null) {
-      addBytes(val, -1, -1);
-    } else {
-      addBytes(val, 0, val.length);
-    }
-  }
-
-  public void addBytes(byte[] val, int offset, int length) {
-    // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter"
-    // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively
-    // implement a Bloom filter without any loss in the asymptotic false positive probability'
-
-    // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned
-    // in the above paper
-    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
-        Murmur3.hash64(val, offset, length);
-    addHash(hash64);
-  }
-
-  private void addHash(long hash64) {
-    int hash1 = (int) hash64;
-    int hash2 = (int) (hash64 >>> 32);
-
-    for (int i = 1; i <= numHashFunctions; i++) {
-      int combinedHash = hash1 + (i * hash2);
-      // hashcode should be positive, flip all the bits if it's negative
-      if (combinedHash < 0) {
-        combinedHash = ~combinedHash;
-      }
-      int pos = combinedHash % numBits;
-      bitSet.set(pos);
-    }
-  }
-
-  public void addString(String val) {
-    if (val == null) {
-      add(null);
-    } else {
-      add(val.getBytes());
-    }
-  }
-
-  public void addLong(long val) {
-    addHash(getLongHash(val));
-  }
-
-  public void addDouble(double val) {
-    addLong(Double.doubleToLongBits(val));
-  }
-
-  public boolean test(byte[] val) {
-    if (val == null) {
-      return testBytes(val, -1, -1);
-    }
-    return testBytes(val, 0, val.length);
-  }
-
-  public boolean testBytes(byte[] val, int offset, int length) {
-    long hash64 = val == null ? Murmur3.NULL_HASHCODE :
-        Murmur3.hash64(val, offset, length);
-    return testHash(hash64);
-  }
-
-  private boolean testHash(long hash64) {
-    int hash1 = (int) hash64;
-    int hash2 = (int) (hash64 >>> 32);
-
-    for (int i = 1; i <= numHashFunctions; i++) {
-      int combinedHash = hash1 + (i * hash2);
-      // hashcode should be positive, flip all the bits if it's negative
-      if (combinedHash < 0) {
-        combinedHash = ~combinedHash;
-      }
-      int pos = combinedHash % numBits;
-      if (!bitSet.get(pos)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  public boolean testString(String val) {
-    if (val == null) {
-      return test(null);
-    } else {
-      return test(val.getBytes());
-    }
-  }
-
-  public boolean testLong(long val) {
-    return testHash(getLongHash(val));
-  }
-
-  // Thomas Wang's integer hash function
-  // http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
-  private long getLongHash(long key) {
-    key = (~key) + (key << 21); // key = (key << 21) - key - 1;
-    key = key ^ (key >> 24);
-    key = (key + (key << 3)) + (key << 8); // key * 265
-    key = key ^ (key >> 14);
-    key = (key + (key << 2)) + (key << 4); // key * 21
-    key = key ^ (key >> 28);
-    key = key + (key << 31);
-    return key;
-  }
-
-  public boolean testDouble(double val) {
-    return testLong(Double.doubleToLongBits(val));
-  }
-
-  public long sizeInBytes() {
-    return getBitSize() / 8;
-  }
-
-  public int getBitSize() {
-    return bitSet.getData().length * Long.SIZE;
-  }
-
-  public int getNumHashFunctions() {
-    return numHashFunctions;
-  }
-
-  public long[] getBitSet() {
-    return bitSet.getData();
-  }
-
-  @Override
-  public String toString() {
-    return "m: " + numBits + " k: " + numHashFunctions;
-  }
-
-  /**
-   * Merge the specified bloom filter with current bloom filter.
-   *
-   * @param that - bloom filter to merge
-   */
-  public void merge(BloomFilter that) {
-    if (this != that && this.numBits == that.numBits && this.numHashFunctions == that.numHashFunctions) {
-      this.bitSet.putAll(that.bitSet);
-    } else {
-      throw new IllegalArgumentException("BloomFilters are not compatible for merging." +
-          " this - " + this.toString() + " that - " + that.toString());
-    }
-  }
-
-  public void reset() {
-    this.bitSet.clear();
-  }
-
-  /**
-   * Bare metal bit set implementation. For performance reasons, this implementation does not check
-   * for index bounds nor expand the bit set size if the specified index is greater than the size.
-   */
-  public class BitSet {
-    private final long[] data;
-
-    public BitSet(long bits) {
-      this(new long[(int) Math.ceil((double) bits / (double) Long.SIZE)]);
-    }
-
-    /**
-     * Deserialize long array as bit set.
-     *
-     * @param data - bit array
-     */
-    public BitSet(long[] data) {
-      assert data.length > 0 : "data length is zero!";
-      this.data = data;
-    }
-
-    /**
-     * Sets the bit at specified index.
-     *
-     * @param index - position
-     */
-    public void set(int index) {
-      data[index >>> 6] |= (1L << index);
-    }
-
-    /**
-     * Returns true if the bit is set in the specified index.
-     *
-     * @param index - position
-     * @return - value at the bit position
-     */
-    public boolean get(int index) {
-      return (data[index >>> 6] & (1L << index)) != 0;
-    }
-
-    /**
-     * Number of bits
-     */
-    public long bitSize() {
-      return (long) data.length * Long.SIZE;
-    }
-
-    public long[] getData() {
-      return data;
-    }
-
-    /**
-     * Combines the two BitArrays using bitwise OR.
-     */
-    public void putAll(BitSet array) {
-      assert data.length == array.data.length :
-          "BitArrays must be of equal length (" + data.length + "!= " + array.data.length + ")";
-      for (int i = 0; i < data.length; i++) {
-        data[i] |= array.data[i];
-      }
-    }
-
-    /**
-     * Clear the bit set.
-     */
-    public void clear() {
-      Arrays.fill(data, 0);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/common/src/java/org/apache/hive/common/util/Murmur3.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/Murmur3.java b/common/src/java/org/apache/hive/common/util/Murmur3.java
deleted file mode 100644
index 88c3514..0000000
--- a/common/src/java/org/apache/hive/common/util/Murmur3.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.common.util;
-
-/**
- * Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms.
- *
- * Murmur3 32 and 128 bit variants.
- * 32-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94
- * 128-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#255
- *
- * This is a public domain code with no copyrights.
- * From homepage of MurmurHash (https://code.google.com/p/smhasher/),
- * "All MurmurHash versions are public domain software, and the author disclaims all copyright
- * to their code."
- */
-public class Murmur3 {
-  // from 64-bit linear congruential generator
-  public static final long NULL_HASHCODE = 2862933555777941757L;
-
-  // Constants for 32 bit variant
-  private static final int C1_32 = 0xcc9e2d51;
-  private static final int C2_32 = 0x1b873593;
-  private static final int R1_32 = 15;
-  private static final int R2_32 = 13;
-  private static final int M_32 = 5;
-  private static final int N_32 = 0xe6546b64;
-
-  // Constants for 128 bit variant
-  private static final long C1 = 0x87c37b91114253d5L;
-  private static final long C2 = 0x4cf5ad432745937fL;
-  private static final int R1 = 31;
-  private static final int R2 = 27;
-  private static final int R3 = 33;
-  private static final int M = 5;
-  private static final int N1 = 0x52dce729;
-  private static final int N2 = 0x38495ab5;
-
-  private static final int DEFAULT_SEED = 104729;
-
-  /**
-   * Murmur3 32-bit variant.
-   *
-   * @param data - input byte array
-   * @return - hashcode
-   */
-  public static int hash32(byte[] data) {
-    return hash32(data, data.length, DEFAULT_SEED);
-  }
-
-  /**
-   * Murmur3 32-bit variant.
-   *
-   * @param data   - input byte array
-   * @param length - length of array
-   * @param seed   - seed. (default 0)
-   * @return - hashcode
-   */
-  public static int hash32(byte[] data, int length, int seed) {
-    int hash = seed;
-    final int nblocks = length >> 2;
-
-    // body
-    for (int i = 0; i < nblocks; i++) {
-      int i_4 = i << 2;
-      int k = (data[i_4] & 0xff)
-          | ((data[i_4 + 1] & 0xff) << 8)
-          | ((data[i_4 + 2] & 0xff) << 16)
-          | ((data[i_4 + 3] & 0xff) << 24);
-
-      // mix functions
-      k *= C1_32;
-      k = Integer.rotateLeft(k, R1_32);
-      k *= C2_32;
-      hash ^= k;
-      hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
-    }
-
-    // tail
-    int idx = nblocks << 2;
-    int k1 = 0;
-    switch (length - idx) {
-      case 3:
-        k1 ^= data[idx + 2] << 16;
-      case 2:
-        k1 ^= data[idx + 1] << 8;
-      case 1:
-        k1 ^= data[idx];
-
-        // mix functions
-        k1 *= C1_32;
-        k1 = Integer.rotateLeft(k1, R1_32);
-        k1 *= C2_32;
-        hash ^= k1;
-    }
-
-    // finalization
-    hash ^= length;
-    hash ^= (hash >>> 16);
-    hash *= 0x85ebca6b;
-    hash ^= (hash >>> 13);
-    hash *= 0xc2b2ae35;
-    hash ^= (hash >>> 16);
-
-    return hash;
-  }
-
-  /**
-   * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
-   *
-   * @param data - input byte array
-   * @return - hashcode
-   */
-  public static long hash64(byte[] data) {
-    return hash64(data, 0, data.length, DEFAULT_SEED);
-  }
-
-  public static long hash64(byte[] data, int offset, int length) {
-    return hash64(data, offset, length, DEFAULT_SEED);
-  }
-
-  /**
-   * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
-   *
-   * @param data   - input byte array
-   * @param length - length of array
-   * @param seed   - seed. (default is 0)
-   * @return - hashcode
-   */
-  public static long hash64(byte[] data, int offset, int length, int seed) {
-    long hash = seed;
-    final int nblocks = length >> 3;
-
-    // body
-    for (int i = 0; i < nblocks; i++) {
-      final int i8 = i << 3;
-      long k = ((long) data[offset + i8] & 0xff)
-          | (((long) data[offset + i8 + 1] & 0xff) << 8)
-          | (((long) data[offset + i8 + 2] & 0xff) << 16)
-          | (((long) data[offset + i8 + 3] & 0xff) << 24)
-          | (((long) data[offset + i8 + 4] & 0xff) << 32)
-          | (((long) data[offset + i8 + 5] & 0xff) << 40)
-          | (((long) data[offset + i8 + 6] & 0xff) << 48)
-          | (((long) data[offset + i8 + 7] & 0xff) << 56);
-
-      // mix functions
-      k *= C1;
-      k = Long.rotateLeft(k, R1);
-      k *= C2;
-      hash ^= k;
-      hash = Long.rotateLeft(hash, R2) * M + N1;
-    }
-
-    // tail
-    long k1 = 0;
-    int tailStart = nblocks << 3;
-    switch (length - tailStart) {
-      case 7:
-        k1 ^= ((long) data[offset + tailStart + 6] & 0xff) << 48;
-      case 6:
-        k1 ^= ((long) data[offset + tailStart + 5] & 0xff) << 40;
-      case 5:
-        k1 ^= ((long) data[offset + tailStart + 4] & 0xff) << 32;
-      case 4:
-        k1 ^= ((long) data[offset + tailStart + 3] & 0xff) << 24;
-      case 3:
-        k1 ^= ((long) data[offset + tailStart + 2] & 0xff) << 16;
-      case 2:
-        k1 ^= ((long) data[offset + tailStart + 1] & 0xff) << 8;
-      case 1:
-        k1 ^= ((long) data[offset + tailStart] & 0xff);
-        k1 *= C1;
-        k1 = Long.rotateLeft(k1, R1);
-        k1 *= C2;
-        hash ^= k1;
-    }
-
-    // finalization
-    hash ^= length;
-    hash = fmix64(hash);
-
-    return hash;
-  }
-
-  /**
-   * Murmur3 128-bit variant.
-   *
-   * @param data - input byte array
-   * @return - hashcode (2 longs)
-   */
-  public static long[] hash128(byte[] data) {
-    return hash128(data, 0, data.length, DEFAULT_SEED);
-  }
-
-  /**
-   * Murmur3 128-bit variant.
-   *
-   * @param data   - input byte array
-   * @param offset - the first element of array
-   * @param length - length of array
-   * @param seed   - seed. (default is 0)
-   * @return - hashcode (2 longs)
-   */
-  public static long[] hash128(byte[] data, int offset, int length, int seed) {
-    long h1 = seed;
-    long h2 = seed;
-    final int nblocks = length >> 4;
-
-    // body
-    for (int i = 0; i < nblocks; i++) {
-      final int i16 = i << 4;
-      long k1 = ((long) data[offset + i16] & 0xff)
-          | (((long) data[offset + i16 + 1] & 0xff) << 8)
-          | (((long) data[offset + i16 + 2] & 0xff) << 16)
-          | (((long) data[offset + i16 + 3] & 0xff) << 24)
-          | (((long) data[offset + i16 + 4] & 0xff) << 32)
-          | (((long) data[offset + i16 + 5] & 0xff) << 40)
-          | (((long) data[offset + i16 + 6] & 0xff) << 48)
-          | (((long) data[offset + i16 + 7] & 0xff) << 56);
-
-      long k2 = ((long) data[offset + i16 + 8] & 0xff)
-          | (((long) data[offset + i16 + 9] & 0xff) << 8)
-          | (((long) data[offset + i16 + 10] & 0xff) << 16)
-          | (((long) data[offset + i16 + 11] & 0xff) << 24)
-          | (((long) data[offset + i16 + 12] & 0xff) << 32)
-          | (((long) data[offset + i16 + 13] & 0xff) << 40)
-          | (((long) data[offset + i16 + 14] & 0xff) << 48)
-          | (((long) data[offset + i16 + 15] & 0xff) << 56);
-
-      // mix functions for k1
-      k1 *= C1;
-      k1 = Long.rotateLeft(k1, R1);
-      k1 *= C2;
-      h1 ^= k1;
-      h1 = Long.rotateLeft(h1, R2);
-      h1 += h2;
-      h1 = h1 * M + N1;
-
-      // mix functions for k2
-      k2 *= C2;
-      k2 = Long.rotateLeft(k2, R3);
-      k2 *= C1;
-      h2 ^= k2;
-      h2 = Long.rotateLeft(h2, R1);
-      h2 += h1;
-      h2 = h2 * M + N2;
-    }
-
-    // tail
-    long k1 = 0;
-    long k2 = 0;
-    int tailStart = nblocks << 4;
-    switch (length - tailStart) {
-      case 15:
-        k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48;
-      case 14:
-        k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40;
-      case 13:
-        k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32;
-      case 12:
-        k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24;
-      case 11:
-        k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16;
-      case 10:
-        k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8;
-      case 9:
-        k2 ^= (long) (data[offset + tailStart + 8] & 0xff);
-        k2 *= C2;
-        k2 = Long.rotateLeft(k2, R3);
-        k2 *= C1;
-        h2 ^= k2;
-
-      case 8:
-        k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56;
-      case 7:
-        k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48;
-      case 6:
-        k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40;
-      case 5:
-        k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32;
-      case 4:
-        k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24;
-      case 3:
-        k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16;
-      case 2:
-        k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8;
-      case 1:
-        k1 ^= (long) (data[offset + tailStart] & 0xff);
-        k1 *= C1;
-        k1 = Long.rotateLeft(k1, R1);
-        k1 *= C2;
-        h1 ^= k1;
-    }
-
-    // finalization
-    h1 ^= length;
-    h2 ^= length;
-
-    h1 += h2;
-    h2 += h1;
-
-    h1 = fmix64(h1);
-    h2 = fmix64(h2);
-
-    h1 += h2;
-    h2 += h1;
-
-    return new long[]{h1, h2};
-  }
-
-  private static long fmix64(long h) {
-    h ^= (h >>> 33);
-    h *= 0xff51afd7ed558ccdL;
-    h ^= (h >>> 33);
-    h *= 0xc4ceb9fe1a85ec53L;
-    h ^= (h >>> 33);
-    return h;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/common/src/test/org/apache/hive/common/util/TestMurmur3.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hive/common/util/TestMurmur3.java b/common/src/test/org/apache/hive/common/util/TestMurmur3.java
deleted file mode 100644
index 5facc7c..0000000
--- a/common/src/test/org/apache/hive/common/util/TestMurmur3.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.common.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Arrays;
-import java.util.Random;
-
-/**
- * Tests for Murmur3 variants.
- */
-public class TestMurmur3 {
-
-  @Test
-  public void testHashCodesM3_32_string() {
-    String key = "test";
-    int seed = 123;
-    HashFunction hf = Hashing.murmur3_32(seed);
-    int hc1 = hf.hashBytes(key.getBytes()).asInt();
-    int hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed);
-    assertEquals(hc1, hc2);
-
-    key = "testkey";
-    hc1 = hf.hashBytes(key.getBytes()).asInt();
-    hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed);
-    assertEquals(hc1, hc2);
-  }
-
-  @Test
-  public void testHashCodesM3_32_ints() {
-    int seed = 123;
-    Random rand = new Random(seed);
-    HashFunction hf = Hashing.murmur3_32(seed);
-    for (int i = 0; i < 1000; i++) {
-      int val = rand.nextInt();
-      byte[] data = ByteBuffer.allocate(4).putInt(val).array();
-      int hc1 = hf.hashBytes(data).asInt();
-      int hc2 = Murmur3.hash32(data, data.length, seed);
-      assertEquals(hc1, hc2);
-    }
-  }
-
-  @Test
-  public void testHashCodesM3_32_longs() {
-    int seed = 123;
-    Random rand = new Random(seed);
-    HashFunction hf = Hashing.murmur3_32(seed);
-    for (int i = 0; i < 1000; i++) {
-      long val = rand.nextLong();
-      byte[] data = ByteBuffer.allocate(8).putLong(val).array();
-      int hc1 = hf.hashBytes(data).asInt();
-      int hc2 = Murmur3.hash32(data, data.length, seed);
-      assertEquals(hc1, hc2);
-    }
-  }
-
-  @Test
-  public void testHashCodesM3_32_double() {
-    int seed = 123;
-    Random rand = new Random(seed);
-    HashFunction hf = Hashing.murmur3_32(seed);
-    for (int i = 0; i < 1000; i++) {
-      double val = rand.nextDouble();
-      byte[] data = ByteBuffer.allocate(8).putDouble(val).array();
-      int hc1 = hf.hashBytes(data).asInt();
-      int hc2 = Murmur3.hash32(data, data.length, seed);
-      assertEquals(hc1, hc2);
-    }
-  }
-
-  @Test
-  public void testHashCodesM3_128_string() {
-    String key = "test";
-    int seed = 123;
-    HashFunction hf = Hashing.murmur3_128(seed);
-    // guava stores the hashcodes in little endian order
-    ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
-    buf.put(hf.hashBytes(key.getBytes()).asBytes());
-    buf.flip();
-    long gl1 = buf.getLong();
-    long gl2 = buf.getLong(8);
-    long[] hc = Murmur3.hash128(key.getBytes(), 0, key.getBytes().length, seed);
-    long m1 = hc[0];
-    long m2 = hc[1];
-    assertEquals(gl1, m1);
-    assertEquals(gl2, m2);
-
-    key = "testkey128_testkey128";
-    buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
-    buf.put(hf.hashBytes(key.getBytes()).asBytes());
-    buf.flip();
-    gl1 = buf.getLong();
-    gl2 = buf.getLong(8);
-    byte[] keyBytes = key.getBytes();
-    hc = Murmur3.hash128(keyBytes, 0, keyBytes.length, seed);
-    m1 = hc[0];
-    m2 = hc[1];
-    assertEquals(gl1, m1);
-    assertEquals(gl2, m2);
-
-    byte[] offsetKeyBytes = new byte[keyBytes.length + 35];
-    Arrays.fill(offsetKeyBytes, (byte) -1);
-    System.arraycopy(keyBytes, 0, offsetKeyBytes, 35, keyBytes.length);
-    hc = Murmur3.hash128(offsetKeyBytes, 35, keyBytes.length, seed);
-    assertEquals(gl1, hc[0]);
-    assertEquals(gl2, hc[1]);
-  }
-
-  @Test
-  public void testHashCodeM3_64() {
-    byte[] origin = ("It was the best of times, it was the worst of times," +
-        " it was the age of wisdom, it was the age of foolishness," +
-        " it was the epoch of belief, it was the epoch of incredulity," +
-        " it was the season of Light, it was the season of Darkness," +
-        " it was the spring of hope, it was the winter of despair," +
-        " we had everything before us, we had nothing before us," +
-        " we were all going direct to Heaven," +
-        " we were all going direct the other way.").getBytes();
-    long hash = Murmur3.hash64(origin, 0, origin.length);
-    assertEquals(305830725663368540L, hash);
-
-    byte[] originOffset = new byte[origin.length + 150];
-    Arrays.fill(originOffset, (byte) 123);
-    System.arraycopy(origin, 0, originOffset, 150, origin.length);
-    hash = Murmur3.hash64(originOffset, 150, origin.length);
-    assertEquals(305830725663368540L, hash);
-  }
-
-  @Test
-  public void testHashCodesM3_128_ints() {
-    int seed = 123;
-    Random rand = new Random(seed);
-    HashFunction hf = Hashing.murmur3_128(seed);
-    for (int i = 0; i < 1000; i++) {
-      int val = rand.nextInt();
-      byte[] data = ByteBuffer.allocate(4).putInt(val).array();
-      // guava stores the hashcodes in little endian order
-      ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
-      buf.put(hf.hashBytes(data).asBytes());
-      buf.flip();
-      long gl1 = buf.getLong();
-      long gl2 = buf.getLong(8);
-      long[] hc = Murmur3.hash128(data, 0, data.length, seed);
-      long m1 = hc[0];
-      long m2 = hc[1];
-      assertEquals(gl1, m1);
-      assertEquals(gl2, m2);
-
-      byte[] offsetData = new byte[data.length + 50];
-      System.arraycopy(data, 0, offsetData, 50, data.length);
-      hc = Murmur3.hash128(offsetData, 50, data.length, seed);
-      assertEquals(gl1, hc[0]);
-      assertEquals(gl2, hc[1]);
-    }
-  }
-
-  @Test
-  public void testHashCodesM3_128_longs() {
-    int seed = 123;
-    Random rand = new Random(seed);
-    HashFunction hf = Hashing.murmur3_128(seed);
-    for (int i = 0; i < 1000; i++) {
-      long val = rand.nextLong();
-      byte[] data = ByteBuffer.allocate(8).putLong(val).array();
-      // guava stores the hashcodes in little endian order
-      ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
-      buf.put(hf.hashBytes(data).asBytes());
-      buf.flip();
-      long gl1 = buf.getLong();
-      long gl2 = buf.getLong(8);
-      long[] hc = Murmur3.hash128(data, 0, data.length, seed);
-      long m1 = hc[0];
-      long m2 = hc[1];
-      assertEquals(gl1, m1);
-      assertEquals(gl2, m2);
-    }
-  }
-
-  @Test
-  public void testHashCodesM3_128_double() {
-    int seed = 123;
-    Random rand = new Random(seed);
-    HashFunction hf = Hashing.murmur3_128(seed);
-    for (int i = 0; i < 1000; i++) {
-      double val = rand.nextDouble();
-      byte[] data = ByteBuffer.allocate(8).putDouble(val).array();
-      // guava stores the hashcodes in little endian order
-      ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
-      buf.put(hf.hashBytes(data).asBytes());
-      buf.flip();
-      long gl1 = buf.getLong();
-      long gl2 = buf.getLong(8);
-      long[] hc = Murmur3.hash128(data, 0, data.length, seed);
-      long m1 = hc[0];
-      long m2 = hc[1];
-      assertEquals(gl1, m1);
-      assertEquals(gl2, m2);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/BloomFilterIO.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/BloomFilterIO.java b/orc/src/java/org/apache/orc/BloomFilterIO.java
new file mode 100644
index 0000000..1406266
--- /dev/null
+++ b/orc/src/java/org/apache/orc/BloomFilterIO.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hive.common.util.BloomFilter;
+
+import com.google.common.primitives.Longs;
+
+public class BloomFilterIO extends BloomFilter {
+
+  public BloomFilterIO(long expectedEntries) {
+    super(expectedEntries, DEFAULT_FPP);
+  }
+
+  public BloomFilterIO(long expectedEntries, double fpp) {
+    super(expectedEntries, fpp);
+  }
+
+/**
+ * Initializes the BloomFilter from the given Orc BloomFilter
+ */
+  public BloomFilterIO(OrcProto.BloomFilter bloomFilter) {
+    this.bitSet = new BitSet(Longs.toArray(bloomFilter.getBitsetList()));
+    this.numHashFunctions = bloomFilter.getNumHashFunctions();
+    this.numBits = (int) this.bitSet.bitSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcFile.java b/orc/src/java/org/apache/orc/OrcFile.java
index 9ea0b52..98226f9 100644
--- a/orc/src/java/org/apache/orc/OrcFile.java
+++ b/orc/src/java/org/apache/orc/OrcFile.java
@@ -23,7 +23,9 @@ import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.orc.impl.MemoryManager;
+import org.apache.orc.impl.WriterImpl;
 
 /**
  * Contains factory methods to read or write ORC files.
@@ -102,6 +104,8 @@ public class OrcFile {
     ORIGINAL(0),
     HIVE_8732(1), // corrupted stripe/file maximum column statistics
     HIVE_4243(2), // use real column names from Hive tables
+    HIVE_12055(3), // vectorized writer
+
     // Don't use any magic numbers here except for the below:
     FUTURE(Integer.MAX_VALUE); // a version from a future writer
 
@@ -138,6 +142,7 @@ public class OrcFile {
       return values[val];
     }
   }
+  public static final WriterVersion CURRENT_WRITER = WriterVersion.HIVE_12055;
 
   public enum EncodingStrategy {
     SPEED, COMPRESSION
@@ -511,4 +516,21 @@ public class OrcFile {
     return memoryManager.get();
   }
 
+  /**
+   * Create an ORC file writer. This is the public interface for creating
+   * writers going forward and new options will only be added to this method.
+   * @param path filename to write to
+   * @param opts the options
+   * @return a new ORC file writer
+   * @throws IOException
+   */
+  public static Writer createWriter(Path path,
+                                    WriterOptions opts
+                                    ) throws IOException {
+    FileSystem fs = opts.getFileSystem() == null ?
+        path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
+
+    return new WriterImpl(fs, path, opts);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/TypeDescription.java b/orc/src/java/org/apache/orc/TypeDescription.java
index fc945e4..f97a113 100644
--- a/orc/src/java/org/apache/orc/TypeDescription.java
+++ b/orc/src/java/org/apache/orc/TypeDescription.java
@@ -275,7 +275,7 @@ public class TypeDescription {
     return maxId;
   }
 
-  private ColumnVector createColumn() {
+  private ColumnVector createColumn(int maxSize) {
     switch (category) {
       case BOOLEAN:
       case BYTE:
@@ -298,7 +298,7 @@ public class TypeDescription {
       case STRUCT: {
         ColumnVector[] fieldVector = new ColumnVector[children.size()];
         for(int i=0; i < fieldVector.length; ++i) {
-          fieldVector[i] = children.get(i).createColumn();
+          fieldVector[i] = children.get(i).createColumn(maxSize);
         }
         return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
                 fieldVector);
@@ -306,38 +306,42 @@ public class TypeDescription {
       case UNION: {
         ColumnVector[] fieldVector = new ColumnVector[children.size()];
         for(int i=0; i < fieldVector.length; ++i) {
-          fieldVector[i] = children.get(i).createColumn();
+          fieldVector[i] = children.get(i).createColumn(maxSize);
         }
         return new UnionColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
             fieldVector);
       }
       case LIST:
         return new ListColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
-            children.get(0).createColumn());
+            children.get(0).createColumn(maxSize));
       case MAP:
         return new MapColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
-            children.get(0).createColumn(), children.get(1).createColumn());
+            children.get(0).createColumn(maxSize),
+            children.get(1).createColumn(maxSize));
       default:
         throw new IllegalArgumentException("Unknown type " + category);
     }
   }
 
-  public VectorizedRowBatch createRowBatch() {
+  public VectorizedRowBatch createRowBatch(int maxSize) {
     VectorizedRowBatch result;
     if (category == Category.STRUCT) {
-      result = new VectorizedRowBatch(children.size(),
-          VectorizedRowBatch.DEFAULT_SIZE);
+      result = new VectorizedRowBatch(children.size(), maxSize);
       for(int i=0; i < result.cols.length; ++i) {
-        result.cols[i] = children.get(i).createColumn();
+        result.cols[i] = children.get(i).createColumn(maxSize);
       }
     } else {
-      result = new VectorizedRowBatch(1, VectorizedRowBatch.DEFAULT_SIZE);
-      result.cols[0] = createColumn();
+      result = new VectorizedRowBatch(1, maxSize);
+      result.cols[0] = createColumn(maxSize);
     }
     result.reset();
     return result;
   }
 
+  public VectorizedRowBatch createRowBatch() {
+    return createRowBatch(VectorizedRowBatch.DEFAULT_SIZE);
+  }
+
   /**
    * Get the kind of this type.
    * @return get the category for this type.


[4/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.

Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
new file mode 100644
index 0000000..5157d4d
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -0,0 +1,2912 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.BinaryColumnStatistics;
+import org.apache.orc.BloomFilterIO;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+
+/**
+ * An ORC file writer. The file is divided into stripes, which is the natural
+ * unit of work when reading. Each stripe is buffered in memory until the
+ * memory reaches the stripe size and then it is written out broken down by
+ * columns. Each column is written by a TreeWriter that is specific to that
+ * type of column. TreeWriters may have children TreeWriters that handle the
+ * sub-types. Each of the TreeWriters writes the column's data as a set of
+ * streams.
+ *
+ * This class is unsynchronized like most Stream objects, so from the creation
+ * of an OrcFile and all access to a single instance has to be from a single
+ * thread.
+ *
+ * There are no known cases where these happen between different threads today.
+ *
+ * Caveat: the MemoryManager is created during WriterOptions create, that has
+ * to be confined to a single thread as well.
+ *
+ */
+public class WriterImpl implements Writer, MemoryManager.Callback {
+
+  private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
+
+  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+  private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+  // threshold above which buffer size will be automatically resized
+  private static final int COLUMN_COUNT_THRESHOLD = 1000;
+
+  private final FileSystem fs;
+  private final Path path;
+  private final long defaultStripeSize;
+  private long adjustedStripeSize;
+  private final int rowIndexStride;
+  private final CompressionKind compress;
+  private final CompressionCodec codec;
+  private final boolean addBlockPadding;
+  private final int bufferSize;
+  private final long blockSize;
+  private final double paddingTolerance;
+  private final TypeDescription schema;
+
+  // the streams that make up the current stripe
+  private final Map<StreamName, BufferedStream> streams =
+    new TreeMap<StreamName, BufferedStream>();
+
+  private FSDataOutputStream rawWriter = null;
+  // the compressed metadata information outStream
+  private OutStream writer = null;
+  // a protobuf outStream around streamFactory
+  private CodedOutputStream protobufWriter = null;
+  private long headerLength;
+  private int columnCount;
+  private long rowCount = 0;
+  private long rowsInStripe = 0;
+  private long rawDataSize = 0;
+  private int rowsInIndex = 0;
+  private int stripesAtLastFlush = -1;
+  private final List<OrcProto.StripeInformation> stripes =
+    new ArrayList<OrcProto.StripeInformation>();
+  private final Map<String, ByteString> userMetadata =
+    new TreeMap<String, ByteString>();
+  private final StreamFactory streamFactory = new StreamFactory();
+  private final TreeWriter treeWriter;
+  private final boolean buildIndex;
+  private final MemoryManager memoryManager;
+  private final OrcFile.Version version;
+  private final Configuration conf;
+  private final OrcFile.WriterCallback callback;
+  private final OrcFile.WriterContext callbackContext;
+  private final OrcFile.EncodingStrategy encodingStrategy;
+  private final OrcFile.CompressionStrategy compressionStrategy;
+  private final boolean[] bloomFilterColumns;
+  private final double bloomFilterFpp;
+  private boolean writeTimeZone;
+
+  public WriterImpl(FileSystem fs,
+                    Path path,
+                    OrcFile.WriterOptions opts) throws IOException {
+    this.fs = fs;
+    this.path = path;
+    this.conf = opts.getConfiguration();
+    this.callback = opts.getCallback();
+    this.schema = opts.getSchema();
+    if (callback != null) {
+      callbackContext = new OrcFile.WriterContext(){
+
+        @Override
+        public Writer getWriter() {
+          return WriterImpl.this;
+        }
+      };
+    } else {
+      callbackContext = null;
+    }
+    this.adjustedStripeSize = opts.getStripeSize();
+    this.defaultStripeSize = opts.getStripeSize();
+    this.version = opts.getVersion();
+    this.encodingStrategy = opts.getEncodingStrategy();
+    this.compressionStrategy = opts.getCompressionStrategy();
+    this.addBlockPadding = opts.getBlockPadding();
+    this.blockSize = opts.getBlockSize();
+    this.paddingTolerance = opts.getPaddingTolerance();
+    this.compress = opts.getCompress();
+    this.rowIndexStride = opts.getRowIndexStride();
+    this.memoryManager = opts.getMemoryManager();
+    buildIndex = rowIndexStride > 0;
+    codec = createCodec(compress);
+    int numColumns = schema.getMaximumId() + 1;
+    this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
+        numColumns, opts.getBufferSize());
+    if (version == OrcFile.Version.V_0_11) {
+      /* do not write bloom filters for ORC v11 */
+      this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
+    } else {
+      this.bloomFilterColumns =
+          OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
+    }
+    this.bloomFilterFpp = opts.getBloomFilterFpp();
+    treeWriter = createTreeWriter(schema, streamFactory, false);
+    if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+      throw new IllegalArgumentException("Row stride must be at least " +
+          MIN_ROW_INDEX_STRIDE);
+    }
+
+    // ensure that we are able to handle callbacks before we register ourselves
+    memoryManager.addWriter(path, opts.getStripeSize(), this);
+  }
+
+  @VisibleForTesting
+  public static int getEstimatedBufferSize(long stripeSize, int numColumns,
+                                           int bs) {
+    // The worst case is that there are 2 big streams per a column and
+    // we want to guarantee that each stream gets ~10 buffers.
+    // This keeps buffers small enough that we don't get really small stripe
+    // sizes.
+    int estBufferSize = (int) (stripeSize / (20 * numColumns));
+    estBufferSize = getClosestBufferSize(estBufferSize);
+    if (estBufferSize > bs) {
+      estBufferSize = bs;
+    } else {
+      LOG.info("WIDE TABLE - Number of columns: " + numColumns +
+          " Chosen compression buffer size: " + estBufferSize);
+    }
+    return estBufferSize;
+  }
+
+  private static int getClosestBufferSize(int estBufferSize) {
+    final int kb4 = 4 * 1024;
+    final int kb8 = 8 * 1024;
+    final int kb16 = 16 * 1024;
+    final int kb32 = 32 * 1024;
+    final int kb64 = 64 * 1024;
+    final int kb128 = 128 * 1024;
+    final int kb256 = 256 * 1024;
+    if (estBufferSize <= kb4) {
+      return kb4;
+    } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
+      return kb8;
+    } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
+      return kb16;
+    } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
+      return kb32;
+    } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
+      return kb64;
+    } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
+      return kb128;
+    } else {
+      return kb256;
+    }
+  }
+
+  public static CompressionCodec createCodec(CompressionKind kind) {
+    switch (kind) {
+      case NONE:
+        return null;
+      case ZLIB:
+        return new ZlibCodec();
+      case SNAPPY:
+        return new SnappyCodec();
+      case LZO:
+        try {
+          ClassLoader loader = Thread.currentThread().getContextClassLoader();
+          if (loader == null) {
+            loader = WriterImpl.class.getClassLoader();
+          }
+          @SuppressWarnings("unchecked")
+          Class<? extends CompressionCodec> lzo =
+              (Class<? extends CompressionCodec>)
+              loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+          return lzo.newInstance();
+        } catch (ClassNotFoundException e) {
+          throw new IllegalArgumentException("LZO is not available.", e);
+        } catch (InstantiationException e) {
+          throw new IllegalArgumentException("Problem initializing LZO", e);
+        } catch (IllegalAccessException e) {
+          throw new IllegalArgumentException("Insufficient access to LZO", e);
+        }
+      default:
+        throw new IllegalArgumentException("Unknown compression codec: " +
+            kind);
+    }
+  }
+
+  @Override
+  public boolean checkMemory(double newScale) throws IOException {
+    long limit = (long) Math.round(adjustedStripeSize * newScale);
+    long size = estimateStripeSize();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
+                limit);
+    }
+    if (size > limit) {
+      flushStripe();
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * This class is used to hold the contents of streams as they are buffered.
+   * The TreeWriters write to the outStream and the codec compresses the
+   * data as buffers fill up and stores them in the output list. When the
+   * stripe is being written, the whole stream is written to the file.
+   */
+  private class BufferedStream implements OutStream.OutputReceiver {
+    private final OutStream outStream;
+    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+    BufferedStream(String name, int bufferSize,
+                   CompressionCodec codec) throws IOException {
+      outStream = new OutStream(name, bufferSize, codec, this);
+    }
+
+    /**
+     * Receive a buffer from the compression codec.
+     * @param buffer the buffer to save
+     */
+    @Override
+    public void output(ByteBuffer buffer) {
+      output.add(buffer);
+    }
+
+    /**
+     * Get the number of bytes in buffers that are allocated to this stream.
+     * @return number of bytes in buffers
+     */
+    public long getBufferSize() {
+      long result = 0;
+      for(ByteBuffer buf: output) {
+        result += buf.capacity();
+      }
+      return outStream.getBufferSize() + result;
+    }
+
+    /**
+     * Flush the stream to the codec.
+     * @throws IOException
+     */
+    public void flush() throws IOException {
+      outStream.flush();
+    }
+
+    /**
+     * Clear all of the buffers.
+     * @throws IOException
+     */
+    public void clear() throws IOException {
+      outStream.clear();
+      output.clear();
+    }
+
+    /**
+     * Check the state of suppress flag in output stream
+     * @return value of suppress flag
+     */
+    public boolean isSuppressed() {
+      return outStream.isSuppressed();
+    }
+
+    /**
+     * Get the number of bytes that will be written to the output. Assumes
+     * the stream has already been flushed.
+     * @return the number of bytes
+     */
+    public long getOutputSize() {
+      long result = 0;
+      for(ByteBuffer buffer: output) {
+        result += buffer.remaining();
+      }
+      return result;
+    }
+
+    /**
+     * Write the saved compressed buffers to the OutputStream.
+     * @param out the stream to write to
+     * @throws IOException
+     */
+    void spillTo(OutputStream out) throws IOException {
+      for(ByteBuffer buffer: output) {
+        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+          buffer.remaining());
+      }
+    }
+
+    @Override
+    public String toString() {
+      return outStream.toString();
+    }
+  }
+
+  /**
+   * An output receiver that writes the ByteBuffers to the output stream
+   * as they are received.
+   */
+  private class DirectStream implements OutStream.OutputReceiver {
+    private final FSDataOutputStream output;
+
+    DirectStream(FSDataOutputStream output) {
+      this.output = output;
+    }
+
+    @Override
+    public void output(ByteBuffer buffer) throws IOException {
+      output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+        buffer.remaining());
+    }
+  }
+
+  private static class RowIndexPositionRecorder implements PositionRecorder {
+    private final OrcProto.RowIndexEntry.Builder builder;
+
+    RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+      this.builder = builder;
+    }
+
+    @Override
+    public void addPosition(long position) {
+      builder.addPositions(position);
+    }
+  }
+
+  /**
+   * Interface from the Writer to the TreeWriters. This limits the visibility
+   * that the TreeWriters have into the Writer.
+   */
+  private class StreamFactory {
+    /**
+     * Create a stream to store part of a column.
+     * @param column the column id for the stream
+     * @param kind the kind of stream
+     * @return The output outStream that the section needs to be written to.
+     * @throws IOException
+     */
+    public OutStream createStream(int column,
+                                  OrcProto.Stream.Kind kind
+                                  ) throws IOException {
+      final StreamName name = new StreamName(column, kind);
+      final EnumSet<CompressionCodec.Modifier> modifiers;
+
+      switch (kind) {
+        case BLOOM_FILTER:
+        case DATA:
+        case DICTIONARY_DATA:
+          if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
+            modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
+                CompressionCodec.Modifier.TEXT);
+          } else {
+            modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+                CompressionCodec.Modifier.TEXT);
+          }
+          break;
+        case LENGTH:
+        case DICTIONARY_COUNT:
+        case PRESENT:
+        case ROW_INDEX:
+        case SECONDARY:
+          // easily compressed using the fastest modes
+          modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST,
+              CompressionCodec.Modifier.BINARY);
+          break;
+        default:
+          LOG.warn("Missing ORC compression modifiers for " + kind);
+          modifiers = null;
+          break;
+      }
+
+      BufferedStream result = streams.get(name);
+      if (result == null) {
+        result = new BufferedStream(name.toString(), bufferSize,
+            codec == null ? codec : codec.modify(modifiers));
+        streams.put(name, result);
+      }
+      return result.outStream;
+    }
+
+    /**
+     * Get the next column id.
+     * @return a number from 0 to the number of columns - 1
+     */
+    public int getNextColumnId() {
+      return columnCount++;
+    }
+
+    /**
+     * Get the stride rate of the row index.
+     */
+    public int getRowIndexStride() {
+      return rowIndexStride;
+    }
+
+    /**
+     * Should be building the row index.
+     * @return true if we are building the index
+     */
+    public boolean buildIndex() {
+      return buildIndex;
+    }
+
+    /**
+     * Is the ORC file compressed?
+     * @return are the streams compressed
+     */
+    public boolean isCompressed() {
+      return codec != null;
+    }
+
+    /**
+     * Get the encoding strategy to use.
+     * @return encoding strategy
+     */
+    public OrcFile.EncodingStrategy getEncodingStrategy() {
+      return encodingStrategy;
+    }
+
+    /**
+     * Get the compression strategy to use.
+     * @return compression strategy
+     */
+    public OrcFile.CompressionStrategy getCompressionStrategy() {
+      return compressionStrategy;
+    }
+
+    /**
+     * Get the bloom filter columns
+     * @return bloom filter columns
+     */
+    public boolean[] getBloomFilterColumns() {
+      return bloomFilterColumns;
+    }
+
+    /**
+     * Get bloom filter false positive percentage.
+     * @return fpp
+     */
+    public double getBloomFilterFPP() {
+      return bloomFilterFpp;
+    }
+
+    /**
+     * Get the writer's configuration.
+     * @return configuration
+     */
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    /**
+     * Get the version of the file to write.
+     */
+    public OrcFile.Version getVersion() {
+      return version;
+    }
+
+    public void useWriterTimeZone(boolean val) {
+      writeTimeZone = val;
+    }
+
+    public boolean hasWriterTimeZone() {
+      return writeTimeZone;
+    }
+  }
+
+  /**
+   * The parent class of all of the writers for each column. Each column
+   * is written by an instance of this class. The compound types (struct,
+   * list, map, and union) have children tree writers that write the children
+   * types.
+   */
+  private abstract static class TreeWriter {
+    protected final int id;
+    protected final BitFieldWriter isPresent;
+    private final boolean isCompressed;
+    protected final ColumnStatisticsImpl indexStatistics;
+    protected final ColumnStatisticsImpl stripeColStatistics;
+    private final ColumnStatisticsImpl fileStatistics;
+    protected TreeWriter[] childrenWriters;
+    protected final RowIndexPositionRecorder rowIndexPosition;
+    private final OrcProto.RowIndex.Builder rowIndex;
+    private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+    private final PositionedOutputStream rowIndexStream;
+    private final PositionedOutputStream bloomFilterStream;
+    protected final BloomFilterIO bloomFilter;
+    protected final boolean createBloomFilter;
+    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+    private final OrcProto.BloomFilter.Builder bloomFilterEntry;
+    private boolean foundNulls;
+    private OutStream isPresentOutStream;
+    private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
+    private final StreamFactory streamFactory;
+
+    /**
+     * Create a tree writer.
+     * @param columnId the column id of the column to write
+     * @param schema the row schema
+     * @param streamFactory limited access to the Writer's data.
+     * @param nullable can the value be null?
+     * @throws IOException
+     */
+    TreeWriter(int columnId,
+               TypeDescription schema,
+               StreamFactory streamFactory,
+               boolean nullable) throws IOException {
+      this.streamFactory = streamFactory;
+      this.isCompressed = streamFactory.isCompressed();
+      this.id = columnId;
+      if (nullable) {
+        isPresentOutStream = streamFactory.createStream(id,
+            OrcProto.Stream.Kind.PRESENT);
+        isPresent = new BitFieldWriter(isPresentOutStream, 1);
+      } else {
+        isPresent = null;
+      }
+      this.foundNulls = false;
+      createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
+      indexStatistics = ColumnStatisticsImpl.create(schema);
+      stripeColStatistics = ColumnStatisticsImpl.create(schema);
+      fileStatistics = ColumnStatisticsImpl.create(schema);
+      childrenWriters = new TreeWriter[0];
+      rowIndex = OrcProto.RowIndex.newBuilder();
+      rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+      rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+      stripeStatsBuilders = Lists.newArrayList();
+      if (streamFactory.buildIndex()) {
+        rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
+      } else {
+        rowIndexStream = null;
+      }
+      if (createBloomFilter) {
+        bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+        bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
+        bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
+            streamFactory.getBloomFilterFPP());
+      } else {
+        bloomFilterEntry = null;
+        bloomFilterIndex = null;
+        bloomFilterStream = null;
+        bloomFilter = null;
+      }
+    }
+
+    protected OrcProto.RowIndex.Builder getRowIndex() {
+      return rowIndex;
+    }
+
+    protected ColumnStatisticsImpl getStripeStatistics() {
+      return stripeColStatistics;
+    }
+
+    protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+      return rowIndexEntry;
+    }
+
+    IntegerWriter createIntegerWriter(PositionedOutputStream output,
+                                      boolean signed, boolean isDirectV2,
+                                      StreamFactory writer) {
+      if (isDirectV2) {
+        boolean alignedBitpacking = false;
+        if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
+          alignedBitpacking = true;
+        }
+        return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
+      } else {
+        return new RunLengthIntegerWriter(output, signed);
+      }
+    }
+
+    boolean isNewWriteFormat(StreamFactory writer) {
+      return writer.getVersion() != OrcFile.Version.V_0_11;
+    }
+
+    /**
+     * Handle the top level object write.
+     *
+     * This default method is used for all types except structs, which are the
+     * typical case. VectorizedRowBatch assumes the top level object is a
+     * struct, so we use the first column for all other types.
+     * @param batch the batch to write from
+     * @param offset the row to start on
+     * @param length the number of rows to write
+     * @throws IOException
+     */
+    void writeRootBatch(VectorizedRowBatch batch, int offset,
+                        int length) throws IOException {
+      writeBatch(batch.cols[0], offset, length);
+    }
+
+    /**
+     * Write the values from the given vector from offset for length elements.
+     * @param vector the vector to write from
+     * @param offset the first value from the vector to write
+     * @param length the number of values from the vector to write
+     * @throws IOException
+     */
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      if (vector.noNulls) {
+        indexStatistics.increment(length);
+        if (isPresent != null) {
+          for (int i = 0; i < length; ++i) {
+            isPresent.write(1);
+          }
+        }
+      } else {
+        if (vector.isRepeating) {
+          boolean isNull = vector.isNull[0];
+          if (isPresent != null) {
+            for (int i = 0; i < length; ++i) {
+              isPresent.write(isNull ? 0 : 1);
+            }
+          }
+          if (isNull) {
+            foundNulls = true;
+            indexStatistics.setNull();
+          } else {
+            indexStatistics.increment(length);
+          }
+        } else {
+          // count the number of non-null values
+          int nonNullCount = 0;
+          for(int i = 0; i < length; ++i) {
+            boolean isNull = vector.isNull[i + offset];
+            if (!isNull) {
+              nonNullCount += 1;
+            }
+            if (isPresent != null) {
+              isPresent.write(isNull ? 0 : 1);
+            }
+          }
+          indexStatistics.increment(nonNullCount);
+          if (nonNullCount != length) {
+            foundNulls = true;
+            indexStatistics.setNull();
+          }
+        }
+      }
+    }
+
+    private void removeIsPresentPositions() {
+      for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+        OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+        List<Long> positions = entry.getPositionsList();
+        // bit streams use 3 positions if uncompressed, 4 if compressed
+        positions = positions.subList(isCompressed ? 4 : 3, positions.size());
+        entry.clearPositions();
+        entry.addAllPositions(positions);
+      }
+    }
+
+    /**
+     * Write the stripe out to the file.
+     * @param builder the stripe footer that contains the information about the
+     *                layout of the stripe. The TreeWriter is required to update
+     *                the footer with its information.
+     * @param requiredIndexEntries the number of index entries that are
+     *                             required. this is to check to make sure the
+     *                             row index is well formed.
+     * @throws IOException
+     */
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      if (isPresent != null) {
+        isPresent.flush();
+
+        // if no nulls are found in a stream, then suppress the stream
+        if(!foundNulls) {
+          isPresentOutStream.suppress();
+          // since isPresent bitstream is suppressed, update the index to
+          // remove the positions of the isPresent stream
+          if (rowIndexStream != null) {
+            removeIsPresentPositions();
+          }
+        }
+      }
+
+      // merge stripe-level column statistics to file statistics and write it to
+      // stripe statistics
+      OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
+      writeStripeStatistics(stripeStatsBuilder, this);
+      stripeStatsBuilders.add(stripeStatsBuilder);
+
+      // reset the flag for next stripe
+      foundNulls = false;
+
+      builder.addColumns(getEncoding());
+      if (streamFactory.hasWriterTimeZone()) {
+        builder.setWriterTimezone(TimeZone.getDefault().getID());
+      }
+      if (rowIndexStream != null) {
+        if (rowIndex.getEntryCount() != requiredIndexEntries) {
+          throw new IllegalArgumentException("Column has wrong number of " +
+               "index entries found: " + rowIndex.getEntryCount() + " expected: " +
+               requiredIndexEntries);
+        }
+        rowIndex.build().writeTo(rowIndexStream);
+        rowIndexStream.flush();
+      }
+      rowIndex.clear();
+      rowIndexEntry.clear();
+
+      // write the bloom filter to out stream
+      if (bloomFilterStream != null) {
+        bloomFilterIndex.build().writeTo(bloomFilterStream);
+        bloomFilterStream.flush();
+        bloomFilterIndex.clear();
+        bloomFilterEntry.clear();
+      }
+    }
+
+    private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
+        TreeWriter treeWriter) {
+      treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
+      builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
+      treeWriter.stripeColStatistics.reset();
+      for (TreeWriter child : treeWriter.getChildrenWriters()) {
+        writeStripeStatistics(builder, child);
+      }
+    }
+
+    TreeWriter[] getChildrenWriters() {
+      return childrenWriters;
+    }
+
+    /**
+     * Get the encoding for this column.
+     * @return the information about the encoding of this column
+     */
+    OrcProto.ColumnEncoding getEncoding() {
+      return OrcProto.ColumnEncoding.newBuilder().setKind(
+          OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    /**
+     * Create a row index entry with the previous location and the current
+     * index statistics. Also merges the index statistics into the file
+     * statistics before they are cleared. Finally, it records the start of the
+     * next index and ensures all of the children columns also create an entry.
+     * @throws IOException
+     */
+    void createRowIndexEntry() throws IOException {
+      stripeColStatistics.merge(indexStatistics);
+      rowIndexEntry.setStatistics(indexStatistics.serialize());
+      indexStatistics.reset();
+      rowIndex.addEntry(rowIndexEntry);
+      rowIndexEntry.clear();
+      addBloomFilterEntry();
+      recordPosition(rowIndexPosition);
+      for(TreeWriter child: childrenWriters) {
+        child.createRowIndexEntry();
+      }
+    }
+
+    void addBloomFilterEntry() {
+      if (createBloomFilter) {
+        bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
+        bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
+        bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+        bloomFilter.reset();
+        bloomFilterEntry.clear();
+      }
+    }
+
+    /**
+     * Record the current position in each of this column's streams.
+     * @param recorder where should the locations be recorded
+     * @throws IOException
+     */
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      if (isPresent != null) {
+        isPresent.getPosition(recorder);
+      }
+    }
+
+    /**
+     * Estimate how much memory the writer is consuming excluding the streams.
+     * @return the number of bytes.
+     */
+    long estimateMemory() {
+      long result = 0;
+      for (TreeWriter child: childrenWriters) {
+        result += child.estimateMemory();
+      }
+      return result;
+    }
+  }
+
+  private static class BooleanTreeWriter extends TreeWriter {
+    private final BitFieldWriter writer;
+
+    BooleanTreeWriter(int columnId,
+                      TypeDescription schema,
+                      StreamFactory writer,
+                      boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      PositionedOutputStream out = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.writer = new BitFieldWriter(out, 1);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int value = vec.vector[0] == 0 ? 0 : 1;
+          indexStatistics.updateBoolean(value != 0, length);
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            int value = vec.vector[i + offset] == 0 ? 0 : 1;
+            writer.write(value);
+            indexStatistics.updateBoolean(value != 0, 1);
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+  }
+
+  private static class ByteTreeWriter extends TreeWriter {
+    private final RunLengthByteWriter writer;
+
+    ByteTreeWriter(int columnId,
+                      TypeDescription schema,
+                      StreamFactory writer,
+                      boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.writer = new RunLengthByteWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.DATA));
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          byte value = (byte) vec.vector[0];
+          indexStatistics.updateInteger(value, length);
+          if (createBloomFilter) {
+            bloomFilter.addLong(value);
+          }
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            byte value = (byte) vec.vector[i + offset];
+            writer.write(value);
+            indexStatistics.updateInteger(value, 1);
+            if (createBloomFilter) {
+              bloomFilter.addLong(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+  }
+
+  private static class IntegerTreeWriter extends TreeWriter {
+    private final IntegerWriter writer;
+    private boolean isDirectV2 = true;
+
+    IntegerTreeWriter(int columnId,
+                      TypeDescription schema,
+                      StreamFactory writer,
+                      boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      OutStream out = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          long value = vec.vector[0];
+          indexStatistics.updateInteger(value, length);
+          if (createBloomFilter) {
+            bloomFilter.addLong(value);
+          }
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            long value = vec.vector[i + offset];
+            writer.write(value);
+            indexStatistics.updateInteger(value, 1);
+            if (createBloomFilter) {
+              bloomFilter.addLong(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+  }
+
+  private static class FloatTreeWriter extends TreeWriter {
+    private final PositionedOutputStream stream;
+    private final SerializationUtils utils;
+
+    FloatTreeWriter(int columnId,
+                      TypeDescription schema,
+                      StreamFactory writer,
+                      boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.stream = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.utils = new SerializationUtils();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      DoubleColumnVector vec = (DoubleColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          float value = (float) vec.vector[0];
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            bloomFilter.addDouble(value);
+          }
+          for(int i=0; i < length; ++i) {
+            utils.writeFloat(stream, value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            float value = (float) vec.vector[i + offset];
+            utils.writeFloat(stream, value);
+            indexStatistics.updateDouble(value);
+            if (createBloomFilter) {
+              bloomFilter.addDouble(value);
+            }
+          }
+        }
+      }
+    }
+
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      stream.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      stream.getPosition(recorder);
+    }
+  }
+
+  private static class DoubleTreeWriter extends TreeWriter {
+    private final PositionedOutputStream stream;
+    private final SerializationUtils utils;
+
+    DoubleTreeWriter(int columnId,
+                    TypeDescription schema,
+                    StreamFactory writer,
+                    boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.stream = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.utils = new SerializationUtils();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      DoubleColumnVector vec = (DoubleColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          double value = vec.vector[0];
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            bloomFilter.addDouble(value);
+          }
+          for(int i=0; i < length; ++i) {
+            utils.writeDouble(stream, value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            double value = vec.vector[i + offset];
+            utils.writeDouble(stream, value);
+            indexStatistics.updateDouble(value);
+            if (createBloomFilter) {
+              bloomFilter.addDouble(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      stream.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      stream.getPosition(recorder);
+    }
+  }
+
+  private static abstract class StringBaseTreeWriter extends TreeWriter {
+    private static final int INITIAL_DICTIONARY_SIZE = 4096;
+    private final OutStream stringOutput;
+    private final IntegerWriter lengthOutput;
+    private final IntegerWriter rowOutput;
+    protected final StringRedBlackTree dictionary =
+        new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
+    protected final DynamicIntArray rows = new DynamicIntArray();
+    protected final PositionedOutputStream directStreamOutput;
+    protected final IntegerWriter directLengthOutput;
+    private final List<OrcProto.RowIndexEntry> savedRowIndex =
+        new ArrayList<OrcProto.RowIndexEntry>();
+    private final boolean buildIndex;
+    private final List<Long> rowIndexValueCount = new ArrayList<Long>();
+    // If the number of keys in a dictionary is greater than this fraction of
+    //the total number of non-null rows, turn off dictionary encoding
+    private final double dictionaryKeySizeThreshold;
+    protected boolean useDictionaryEncoding = true;
+    private boolean isDirectV2 = true;
+    private boolean doneDictionaryCheck;
+    private final boolean strideDictionaryCheck;
+
+    StringBaseTreeWriter(int columnId,
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      stringOutput = writer.createStream(id,
+          OrcProto.Stream.Kind.DICTIONARY_DATA);
+      lengthOutput = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      rowOutput = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+      rowIndexValueCount.add(0L);
+      buildIndex = writer.buildIndex();
+      directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+      directLengthOutput = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      Configuration conf = writer.getConfiguration();
+      dictionaryKeySizeThreshold =
+          OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+      strideDictionaryCheck =
+          OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
+      doneDictionaryCheck = false;
+    }
+
+    private boolean checkDictionaryEncoding() {
+      if (!doneDictionaryCheck) {
+        // Set the flag indicating whether or not to use dictionary encoding
+        // based on whether or not the fraction of distinct keys over number of
+        // non-null rows is less than the configured threshold
+        float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
+        useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
+        doneDictionaryCheck = true;
+      }
+      return useDictionaryEncoding;
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
+      // checking would not have happened. So do it again here.
+      checkDictionaryEncoding();
+
+      if (useDictionaryEncoding) {
+        flushDictionary();
+      } else {
+        // flushout any left over entries from dictionary
+        if (rows.size() > 0) {
+          flushDictionary();
+        }
+
+        // suppress the stream for every stripe if dictionary is disabled
+        stringOutput.suppress();
+      }
+
+      // we need to build the rowindex before calling super, since it
+      // writes it out.
+      super.writeStripe(builder, requiredIndexEntries);
+      stringOutput.flush();
+      lengthOutput.flush();
+      rowOutput.flush();
+      directStreamOutput.flush();
+      directLengthOutput.flush();
+      // reset all of the fields to be ready for the next stripe.
+      dictionary.clear();
+      savedRowIndex.clear();
+      rowIndexValueCount.clear();
+      recordPosition(rowIndexPosition);
+      rowIndexValueCount.add(0L);
+
+      if (!useDictionaryEncoding) {
+        // record the start positions of first index stride of next stripe i.e
+        // beginning of the direct streams when dictionary is disabled
+        recordDirectStreamPosition();
+      }
+    }
+
+    private void flushDictionary() throws IOException {
+      final int[] dumpOrder = new int[dictionary.size()];
+
+      if (useDictionaryEncoding) {
+        // Write the dictionary by traversing the red-black tree writing out
+        // the bytes and lengths; and creating the map from the original order
+        // to the final sorted order.
+
+        dictionary.visit(new StringRedBlackTree.Visitor() {
+          private int currentId = 0;
+          @Override
+          public void visit(StringRedBlackTree.VisitorContext context
+                           ) throws IOException {
+            context.writeBytes(stringOutput);
+            lengthOutput.write(context.getLength());
+            dumpOrder[context.getOriginalPosition()] = currentId++;
+          }
+        });
+      } else {
+        // for direct encoding, we don't want the dictionary data stream
+        stringOutput.suppress();
+      }
+      int length = rows.size();
+      int rowIndexEntry = 0;
+      OrcProto.RowIndex.Builder rowIndex = getRowIndex();
+      Text text = new Text();
+      // write the values translated into the dump order.
+      for(int i = 0; i <= length; ++i) {
+        // now that we are writing out the row values, we can finalize the
+        // row index
+        if (buildIndex) {
+          while (i == rowIndexValueCount.get(rowIndexEntry) &&
+              rowIndexEntry < savedRowIndex.size()) {
+            OrcProto.RowIndexEntry.Builder base =
+                savedRowIndex.get(rowIndexEntry++).toBuilder();
+            if (useDictionaryEncoding) {
+              rowOutput.getPosition(new RowIndexPositionRecorder(base));
+            } else {
+              PositionRecorder posn = new RowIndexPositionRecorder(base);
+              directStreamOutput.getPosition(posn);
+              directLengthOutput.getPosition(posn);
+            }
+            rowIndex.addEntry(base.build());
+          }
+        }
+        if (i != length) {
+          if (useDictionaryEncoding) {
+            rowOutput.write(dumpOrder[rows.get(i)]);
+          } else {
+            dictionary.getText(text, rows.get(i));
+            directStreamOutput.write(text.getBytes(), 0, text.getLength());
+            directLengthOutput.write(text.getLength());
+          }
+        }
+      }
+      rows.clear();
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      // Returns the encoding used for the last call to writeStripe
+      if (useDictionaryEncoding) {
+        if(isDirectV2) {
+          return OrcProto.ColumnEncoding.newBuilder().setKind(
+              OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
+              setDictionarySize(dictionary.size()).build();
+        }
+        return OrcProto.ColumnEncoding.newBuilder().setKind(
+            OrcProto.ColumnEncoding.Kind.DICTIONARY).
+            setDictionarySize(dictionary.size()).build();
+      } else {
+        if(isDirectV2) {
+          return OrcProto.ColumnEncoding.newBuilder().setKind(
+              OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+        }
+        return OrcProto.ColumnEncoding.newBuilder().setKind(
+            OrcProto.ColumnEncoding.Kind.DIRECT).build();
+      }
+    }
+
+    /**
+     * This method doesn't call the super method, because unlike most of the
+     * other TreeWriters, this one can't record the position in the streams
+     * until the stripe is being flushed. Therefore it saves all of the entries
+     * and augments them with the final information as the stripe is written.
+     * @throws IOException
+     */
+    @Override
+    void createRowIndexEntry() throws IOException {
+      getStripeStatistics().merge(indexStatistics);
+      OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
+      rowIndexEntry.setStatistics(indexStatistics.serialize());
+      indexStatistics.reset();
+      OrcProto.RowIndexEntry base = rowIndexEntry.build();
+      savedRowIndex.add(base);
+      rowIndexEntry.clear();
+      addBloomFilterEntry();
+      recordPosition(rowIndexPosition);
+      rowIndexValueCount.add(Long.valueOf(rows.size()));
+      if (strideDictionaryCheck) {
+        checkDictionaryEncoding();
+      }
+      if (!useDictionaryEncoding) {
+        if (rows.size() > 0) {
+          flushDictionary();
+          // just record the start positions of next index stride
+          recordDirectStreamPosition();
+        } else {
+          // record the start positions of next index stride
+          recordDirectStreamPosition();
+          getRowIndex().addEntry(base);
+        }
+      }
+    }
+
+    private void recordDirectStreamPosition() throws IOException {
+      directStreamOutput.getPosition(rowIndexPosition);
+      directLengthOutput.getPosition(rowIndexPosition);
+    }
+
+    @Override
+    long estimateMemory() {
+      return rows.getSizeInBytes() + dictionary.getSizeInBytes();
+    }
+  }
+
+  private static class StringTreeWriter extends StringBaseTreeWriter {
+    StringTreeWriter(int columnId,
+                   TypeDescription schema,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          if (useDictionaryEncoding) {
+            int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
+            for(int i=0; i < length; ++i) {
+              rows.add(id);
+            }
+          } else {
+            for(int i=0; i < length; ++i) {
+              directStreamOutput.write(vec.vector[0], vec.start[0],
+                  vec.length[0]);
+              directLengthOutput.write(vec.length[0]);
+            }
+          }
+          indexStatistics.updateString(vec.vector[0], vec.start[0],
+              vec.length[0], length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            if (useDictionaryEncoding) {
+              rows.add(dictionary.add(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]));
+            } else {
+              directStreamOutput.write(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+              directLengthOutput.write(vec.length[offset + i]);
+            }
+            indexStatistics.updateString(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i], 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Under the covers, char is written to ORC the same way as string.
+   */
+  private static class CharTreeWriter extends StringBaseTreeWriter {
+    private final int itemLength;
+    private final byte[] padding;
+
+    CharTreeWriter(int columnId,
+        TypeDescription schema,
+        StreamFactory writer,
+        boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      itemLength = schema.getMaxLength();
+      padding = new byte[itemLength];
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          byte[] ptr;
+          int ptrOffset;
+          if (vec.length[0] >= itemLength) {
+            ptr = vec.vector[0];
+            ptrOffset = vec.start[0];
+          } else {
+            ptr = padding;
+            ptrOffset = 0;
+            System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
+                vec.length[0]);
+            Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
+          }
+          if (useDictionaryEncoding) {
+            int id = dictionary.add(ptr, ptrOffset, itemLength);
+            for(int i=0; i < length; ++i) {
+              rows.add(id);
+            }
+          } else {
+            for(int i=0; i < length; ++i) {
+              directStreamOutput.write(ptr, ptrOffset, itemLength);
+              directLengthOutput.write(itemLength);
+            }
+          }
+          indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            byte[] ptr;
+            int ptrOffset;
+            if (vec.length[offset + i] >= itemLength) {
+              ptr = vec.vector[offset + i];
+              ptrOffset = vec.start[offset + i];
+            } else {
+              // it is the wrong length, so copy it
+              ptr = padding;
+              ptrOffset = 0;
+              System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
+                  ptr, 0, vec.length[offset + i]);
+              Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
+            }
+            if (useDictionaryEncoding) {
+              rows.add(dictionary.add(ptr, ptrOffset, itemLength));
+            } else {
+              directStreamOutput.write(ptr, ptrOffset, itemLength);
+              directLengthOutput.write(itemLength);
+            }
+            indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Under the covers, varchar is written to ORC the same way as string.
+   */
+  private static class VarcharTreeWriter extends StringBaseTreeWriter {
+    private final int maxLength;
+
+    VarcharTreeWriter(int columnId,
+        TypeDescription schema,
+        StreamFactory writer,
+        boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      maxLength = schema.getMaxLength();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int itemLength = Math.min(vec.length[0], maxLength);
+          if (useDictionaryEncoding) {
+            int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
+            for(int i=0; i < length; ++i) {
+              rows.add(id);
+            }
+          } else {
+            for(int i=0; i < length; ++i) {
+              directStreamOutput.write(vec.vector[0], vec.start[0],
+                  itemLength);
+              directLengthOutput.write(itemLength);
+            }
+          }
+          indexStatistics.updateString(vec.vector[0], vec.start[0],
+              itemLength, length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            int itemLength = Math.min(vec.length[offset + i], maxLength);
+            if (useDictionaryEncoding) {
+              rows.add(dictionary.add(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength));
+            } else {
+              directStreamOutput.write(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength);
+              directLengthOutput.write(itemLength);
+            }
+            indexStatistics.updateString(vec.vector[offset + i],
+                vec.start[offset + i], itemLength, 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private static class BinaryTreeWriter extends TreeWriter {
+    private final PositionedOutputStream stream;
+    private final IntegerWriter length;
+    private boolean isDirectV2 = true;
+
+    BinaryTreeWriter(int columnId,
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.stream = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.length = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          for(int i=0; i < length; ++i) {
+            stream.write(vec.vector[0], vec.start[0],
+                  vec.length[0]);
+            this.length.write(vec.length[0]);
+          }
+          indexStatistics.updateBinary(vec.vector[0], vec.start[0],
+              vec.length[0], length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            stream.write(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+            this.length.write(vec.length[offset + i]);
+            indexStatistics.updateBinary(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i], 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+            }
+          }
+        }
+      }
+    }
+
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      stream.flush();
+      length.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      stream.getPosition(recorder);
+      length.getPosition(recorder);
+    }
+  }
+
+  public static final int MILLIS_PER_SECOND = 1000;
+  static final int NANOS_PER_SECOND = 1000000000;
+  static final int MILLIS_PER_NANO  = 1000000;
+  public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
+
+  private static class TimestampTreeWriter extends TreeWriter {
+    private final IntegerWriter seconds;
+    private final IntegerWriter nanos;
+    private final boolean isDirectV2;
+    private final long base_timestamp;
+
+    TimestampTreeWriter(int columnId,
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.seconds = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
+      this.nanos = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+      // for unit tests to set different time zones
+      this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
+      writer.useWriterTimeZone(true);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          long value = vec.vector[0];
+          long valueMillis = value / MILLIS_PER_NANO;
+          indexStatistics.updateTimestamp(valueMillis);
+          if (createBloomFilter) {
+            bloomFilter.addLong(valueMillis);
+          }
+          final long secs = value / NANOS_PER_SECOND - base_timestamp;
+          final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
+          for(int i=0; i < length; ++i) {
+            seconds.write(secs);
+            nanos.write(nano);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            long value = vec.vector[i + offset];
+            long valueMillis = value / MILLIS_PER_NANO;
+            long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
+            int valueNanos = (int) (value % NANOS_PER_SECOND);
+            if (valueNanos < 0) {
+              valueNanos += NANOS_PER_SECOND;
+            }
+            seconds.write(valueSecs);
+            nanos.write(formatNanos(valueNanos));
+            indexStatistics.updateTimestamp(valueMillis);
+            if (createBloomFilter) {
+              bloomFilter.addLong(valueMillis);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      seconds.flush();
+      nanos.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    private static long formatNanos(int nanos) {
+      if (nanos == 0) {
+        return 0;
+      } else if (nanos % 100 != 0) {
+        return ((long) nanos) << 3;
+      } else {
+        nanos /= 100;
+        int trailingZeros = 1;
+        while (nanos % 10 == 0 && trailingZeros < 7) {
+          nanos /= 10;
+          trailingZeros += 1;
+        }
+        return ((long) nanos) << 3 | trailingZeros;
+      }
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      seconds.getPosition(recorder);
+      nanos.getPosition(recorder);
+    }
+  }
+
+  private static class DateTreeWriter extends TreeWriter {
+    private final IntegerWriter writer;
+    private final boolean isDirectV2;
+
+    DateTreeWriter(int columnId,
+                   TypeDescription schema,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      OutStream out = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int value = (int) vec.vector[0];
+          indexStatistics.updateDate(value);
+          if (createBloomFilter) {
+            bloomFilter.addLong(value);
+          }
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            int value = (int) vec.vector[i + offset];
+            writer.write(value);
+            indexStatistics.updateDate(value);
+            if (createBloomFilter) {
+              bloomFilter.addLong(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+  }
+
+  private static class DecimalTreeWriter extends TreeWriter {
+    private final PositionedOutputStream valueStream;
+    private final IntegerWriter scaleStream;
+    private final boolean isDirectV2;
+
+    DecimalTreeWriter(int columnId,
+                        TypeDescription schema,
+                        StreamFactory writer,
+                        boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+      this.scaleStream = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      DecimalColumnVector vec = (DecimalColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          HiveDecimal value = vec.vector[0].getHiveDecimal();
+          indexStatistics.updateDecimal(value);
+          if (createBloomFilter) {
+            bloomFilter.addString(value.toString());
+          }
+          for(int i=0; i < length; ++i) {
+            SerializationUtils.writeBigInteger(valueStream,
+                value.unscaledValue());
+            scaleStream.write(value.scale());
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
+            SerializationUtils.writeBigInteger(valueStream,
+                value.unscaledValue());
+            scaleStream.write(value.scale());
+            indexStatistics.updateDecimal(value);
+            if (createBloomFilter) {
+              bloomFilter.addString(value.toString());
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      valueStream.flush();
+      scaleStream.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      valueStream.getPosition(recorder);
+      scaleStream.getPosition(recorder);
+    }
+  }
+
+  private static class StructTreeWriter extends TreeWriter {
+    StructTreeWriter(int columnId,
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      List<TypeDescription> children = schema.getChildren();
+      childrenWriters = new TreeWriter[children.size()];
+      for(int i=0; i < childrenWriters.length; ++i) {
+        childrenWriters[i] = createTreeWriter(
+          children.get(i), writer,
+          true);
+      }
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeRootBatch(VectorizedRowBatch batch, int offset,
+                        int length) throws IOException {
+      // update the statistics for the root column
+      indexStatistics.increment(length);
+      // I'm assuming that the root column isn't nullable so that I don't need
+      // to update isPresent.
+      for(int i=0; i < childrenWriters.length; ++i) {
+        childrenWriters[i].writeBatch(batch.cols[i], offset, length);
+      }
+    }
+
+    private static void writeFields(StructColumnVector vector,
+                                    TreeWriter[] childrenWriters,
+                                    int offset, int length) throws IOException {
+      for(int field=0; field < childrenWriters.length; ++field) {
+        childrenWriters[field].writeBatch(vector.fields[field], offset, length);
+      }
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      StructColumnVector vec = (StructColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          writeFields(vec, childrenWriters, offset, length);
+        }
+      } else if (vector.noNulls) {
+        writeFields(vec, childrenWriters, offset, length);
+      } else {
+        // write the records in runs
+        int currentRun = 0;
+        boolean started = false;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            if (!started) {
+              started = true;
+              currentRun = i;
+            }
+          } else if (started) {
+            started = false;
+            writeFields(vec, childrenWriters, offset + currentRun,
+                i - currentRun);
+          }
+        }
+        if (started) {
+          writeFields(vec, childrenWriters, offset + currentRun,
+              length - currentRun);
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      for(TreeWriter child: childrenWriters) {
+        child.writeStripe(builder, requiredIndexEntries);
+      }
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  private static class ListTreeWriter extends TreeWriter {
+    private final IntegerWriter lengths;
+    private final boolean isDirectV2;
+
+    ListTreeWriter(int columnId,
+                   TypeDescription schema,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      childrenWriters = new TreeWriter[1];
+      childrenWriters[0] =
+        createTreeWriter(schema.getChildren().get(0), writer, true);
+      lengths = createIntegerWriter(writer.createStream(columnId,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      ListColumnVector vec = (ListColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int childOffset = (int) vec.offsets[0];
+          int childLength = (int) vec.lengths[0];
+          for(int i=0; i < length; ++i) {
+            lengths.write(childLength);
+            childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
+          }
+          if (createBloomFilter) {
+            bloomFilter.addLong(childLength);
+          }
+        }
+      } else {
+        // write the elements in runs
+        int currentOffset = 0;
+        int currentLength = 0;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            int nextLength = (int) vec.lengths[offset + i];
+            int nextOffset = (int) vec.offsets[offset + i];
+            lengths.write(nextLength);
+            if (currentLength == 0) {
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else if (currentOffset + currentLength != nextOffset) {
+              childrenWriters[0].writeBatch(vec.child, currentOffset,
+                  currentLength);
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else {
+              currentLength += nextLength;
+            }
+          }
+        }
+        if (currentLength != 0) {
+          childrenWriters[0].writeBatch(vec.child, currentOffset,
+              currentLength);
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      lengths.flush();
+      for(TreeWriter child: childrenWriters) {
+        child.writeStripe(builder, requiredIndexEntries);
+      }
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      lengths.getPosition(recorder);
+    }
+  }
+
+  private static class MapTreeWriter extends TreeWriter {
+    private final IntegerWriter lengths;
+    private final boolean isDirectV2;
+
+    MapTreeWriter(int columnId,
+                  TypeDescription schema,
+                  StreamFactory writer,
+                  boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      childrenWriters = new TreeWriter[2];
+      List<TypeDescription> children = schema.getChildren();
+      childrenWriters[0] =
+        createTreeWriter(children.get(0), writer, true);
+      childrenWriters[1] =
+        createTreeWriter(children.get(1), writer, true);
+      lengths = createIntegerWriter(writer.createStream(columnId,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      MapColumnVector vec = (MapColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int childOffset = (int) vec.offsets[0];
+          int childLength = (int) vec.lengths[0];
+          for(int i=0; i < length; ++i) {
+            lengths.write(childLength);
+            childrenWriters[0].writeBatch(vec.keys, childOffset, childLength);
+            childrenWriters[1].writeBatch(vec.values, childOffset, childLength);
+          }
+          if (createBloomFilter) {
+            bloomFilter.addLong(childLength);
+          }
+        }
+      } else {
+        // write the elements in runs
+        int currentOffset = 0;
+        int currentLength = 0;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            int nextLength = (int) vec.lengths[offset + i];
+            int nextOffset = (int) vec.offsets[offset + i];
+            lengths.write(nextLength);
+            if (currentLength == 0) {
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else if (currentOffset + currentLength != nextOffset) {
+              childrenWriters[0].writeBatch(vec.keys, currentOffset,
+                  currentLength);
+              childrenWriters[1].writeBatch(vec.values, currentOffset,
+                  currentLength);
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else {
+              currentLength += nextLength;
+            }
+          }
+        }
+        if (currentLength != 0) {
+          childrenWriters[0].writeBatch(vec.keys, currentOffset,
+              currentLength);
+          childrenWriters[1].writeBatch(vec.values, currentOffset,
+              currentLength);
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      lengths.flush();
+      for(TreeWriter child: childrenWriters) {
+        child.writeStripe(builder, requiredIndexEntries);
+      }
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      lengths.getPosition(recorder);
+    }
+  }
+
+  private static class UnionTreeWriter extends TreeWriter {
+    private final RunLengthByteWriter tags;
+
+    UnionTreeWriter(int columnId,
+                  TypeDescription schema,
+                  StreamFactory writer,
+                  boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      List<TypeDescription> children = schema.getChildren();
+      childrenWriters = new TreeWriter[children.size()];
+      for(int i=0; i < childrenWriters.length; ++i) {
+        childrenWriters[i] =
+            createTreeWriter(children.get(i), writer, true);
+      }
+      tags =
+        new RunLengthByteWriter(writer.createStream(columnId,
+            OrcProto.Stream.Kind.DATA));
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      UnionColumnVector vec = (UnionColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          byte tag = (byte) vec.tags[0];
+          for(int i=0; i < length; ++i) {
+            tags.write(tag);
+          }
+          if (createBloomFilter) {
+            bloomFilter.addLong(tag);
+          }
+          childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
+        }
+      } else {
+        // write the records in runs of the same tag
+        byte prevTag = 0;
+        int currentRun = 0;
+        boolean started = false;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            byte tag = (byte) vec.tags[offset + i];
+            tags.write(tag);
+            if (!started) {
+              started = true;
+              currentRun = i;
+              prevTag = tag;
+            } else if (tag != prevTag) {
+              childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+                  offset + currentRun, i - currentRun);
+              currentRun = i;
+              prevTag = tag;
+            }
+          } else if (started) {
+            started = false;
+            childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+                offset + currentRun, i - currentRun);
+          }
+        }
+        if (started) {
+          childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+              offset + currentRun, length - currentRun);
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      tags.flush();
+      for(TreeWriter child: childrenWriters) {
+        child.writeStripe(builder, requiredIndexEntries);
+      }
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      tags.getPosition(recorder);
+    }
+  }
+
+  private static TreeWriter createTreeWriter(TypeDescription schema,
+                                             StreamFactory streamFactory,
+                                             boolean nullable) throws IOException {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return new BooleanTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case BYTE:
+        return new ByteTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case SHORT:
+      case INT:
+      case LONG:
+        return new IntegerTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case FLOAT:
+        return new FloatTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case DOUBLE:
+        return new DoubleTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case STRING:
+        return new StringTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case CHAR:
+        return new CharTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case VARCHAR:
+        return new VarcharTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case BINARY:
+        return new BinaryTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case TIMESTAMP:
+        return new TimestampTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case DATE:
+        return new DateTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case DECIMAL:
+        return new DecimalTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory,  nullable);
+      case STRUCT:
+        return new StructTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case MAP:
+        return new MapTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case LIST:
+        return new ListTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case UNION:
+        return new UnionTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      default:
+        throw new IllegalArgumentException("Bad category: " +
+            schema.getCategory());
+    }
+  }
+
+  private static void writeTypes(OrcProto.Footer.Builder builder,
+                                 TypeDescription schema) {
+    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+    List<TypeDescription> children = schema.getChildren();
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        type.setKind(OrcProto.Type.Kind.BOOLEAN);
+        break;
+      case BYTE:
+        type.setKind(OrcProto.Type.Kind.BYTE);
+        break;
+      case SHORT:
+        type.setKind(OrcProto.Type.Kind.SHORT);
+        break;
+      case INT:
+        type.setKind(OrcProto.Type.Kind.INT);
+        break;
+      case LONG:
+        type.setKind(OrcProto.Type.Kind.LONG);
+        break;
+      case FLOAT:
+        type.setKind(OrcProto.Type.Kind.FLOAT);
+        break;
+      case DOUBLE:
+        type.setKind(OrcProto.Type.Kind.DOUBLE);
+        break;
+      case STRING:
+        type.setKind(OrcProto.Type.Kind.STRING);
+        break;
+      case CHAR:
+        type.setKind(OrcProto.Type.Kind.CHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case VARCHAR:
+        type.setKind(OrcProto.Type.Kind.VARCHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case BINARY:
+        type.setKind(OrcProto.Type.Kind.BINARY);
+        break;
+      case TIMESTAMP:
+        type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+        break;
+      case DATE:
+        type.setKind(OrcProto.Type.Kind.DATE);
+        break;
+      case DECIMAL:
+        type.setKind(OrcProto.Type.Kind.DECIMAL);
+        type.setPrecision(schema.getPrecision());
+        type.setScale(schema.getScale());
+        break;
+      case LIST:
+        type.setKind(OrcProto.Type.Kind.LIST);
+        type.addSubtypes(children.get(0).getId());
+        break;
+      case MAP:
+        type.setKind(OrcProto.Type.Kind.MAP);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
+        break;
+      case STRUCT:
+        type.setKind(OrcProto.Type.Kind.STRUCT);
+        for(TypeD

<TRUNCATED>

[3/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.

Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilterIO.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilterIO.java b/ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilterIO.java
deleted file mode 100644
index 878efbe..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilterIO.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.filters;
-
-import org.apache.orc.OrcProto;
-import org.apache.hive.common.util.BloomFilter;
-
-import com.google.common.primitives.Longs;
-
-public class BloomFilterIO extends BloomFilter {
-
-  public BloomFilterIO(long expectedEntries) {
-    super(expectedEntries, DEFAULT_FPP);
-  }
-
-  public BloomFilterIO(long expectedEntries, double fpp) {
-    super(expectedEntries, fpp);
-  }
-
-/**
- * Initializes the BloomFilter from the given Orc BloomFilter
- */
-  public BloomFilterIO(OrcProto.BloomFilter bloomFilter) {
-    this.bitSet = new BitSet(Longs.toArray(bloomFilter.getBitsetList()));
-    this.numHashFunctions = bloomFilter.getNumHashFunctions();
-    this.numBits = (int) this.bitSet.bitSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
index a242a37..5bcb281 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.orc.BloomFilterIO;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
index b746390..e3e234e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
@@ -24,8 +24,8 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
 import org.codehaus.jettison.json.JSONArray;
+import org.apache.orc.BloomFilterIO;
 import org.apache.orc.BinaryColumnStatistics;
 import org.apache.orc.BooleanColumnStatistics;
 import org.apache.orc.ColumnStatistics;

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 975825a..58e5da1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.orc.FileMetadata;
@@ -88,6 +89,11 @@ public final class OrcFile extends org.apache.orc.OrcFile {
   public static class WriterOptions extends org.apache.orc.OrcFile.WriterOptions {
     private boolean explicitSchema = false;
     private ObjectInspector inspector = null;
+    // Setting the default batch size to 1000 makes the memory check at 5000
+    // rows work the same as the row by row writer. (If it was the default 1024,
+    // the smallest stripe size would be 5120 rows, which changes the output
+    // of some of the tests.)
+    private int batchSize = 1000;
 
     WriterOptions(Properties tableProperties, Configuration conf) {
       super(tableProperties, conf);
@@ -249,6 +255,19 @@ public final class OrcFile extends org.apache.orc.OrcFile {
       super.memory(value);
       return this;
     }
+
+    protected WriterOptions batchSize(int maxSize) {
+      batchSize = maxSize;
+      return this;
+    }
+
+    ObjectInspector getInspector() {
+      return inspector;
+    }
+
+    int getBatchSize() {
+      return batchSize;
+    }
   }
 
   /**
@@ -286,16 +305,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
     FileSystem fs = opts.getFileSystem() == null ?
       path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
 
-    return new WriterImpl(fs, path, opts.getConfiguration(), opts.inspector,
-                          opts.getSchema(),
-                          opts.getStripeSize(), opts.getCompress(),
-                          opts.getBufferSize(), opts.getRowIndexStride(),
-                          opts.getMemoryManager(), opts.getBlockPadding(),
-                          opts.getVersion(), opts.getCallback(),
-                          opts.getEncodingStrategy(),
-                          opts.getCompressionStrategy(),
-                          opts.getPaddingTolerance(), opts.getBlockSize(),
-                          opts.getBloomFilterColumns(), opts.getBloomFilterFpp());
+    return new WriterImpl(fs, path, opts);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index e31fd0b..30c2fad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -63,6 +63,7 @@ public class ReaderImpl implements Reader {
   private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
 
   protected final FileSystem fileSystem;
+  private final long maxLength;
   protected final Path path;
   protected final org.apache.orc.CompressionKind compressionKind;
   protected final CompressionCodec codec;
@@ -329,6 +330,7 @@ public class ReaderImpl implements Reader {
     this.fileSystem = fs;
     this.path = path;
     this.conf = options.getConfiguration();
+    this.maxLength = options.getMaxLength();
 
     FileMetadata fileMetadata = options.getFileMetadata();
     if (fileMetadata != null) {
@@ -859,4 +861,17 @@ public class ReaderImpl implements Reader {
   public DataReader createDefaultDataReader(boolean useZeroCopy) {
     return RecordReaderUtils.createDefaultDataReader(fileSystem, path, useZeroCopy, codec);
   }
+
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("ORC Reader(");
+    buffer.append(path);
+    if (maxLength != -1) {
+      buffer.append(", ");
+      buffer.append(maxLength);
+    }
+    buffer.append(")");
+    return buffer.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 607003f..a85bfef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.orc.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
index 1f5927a..92f5ab8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 public interface Writer extends org.apache.orc.Writer {
 
   /**
-   * Add a row to the ORC file.
+   * Add a row to the end of the ORC file.
    * @param row the row to add
    * @throws IOException
    */


[2/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.

Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 993be71..8cfb402 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -18,52 +18,15 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import java.util.TimeZone;
-import java.util.TreeMap;
+import java.util.Set;
 
-import org.apache.orc.BinaryColumnStatistics;
-import org.apache.orc.impl.BitFieldWriter;
-import org.apache.orc.impl.ColumnStatisticsImpl;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.impl.DynamicIntArray;
-import org.apache.orc.impl.IntegerWriter;
-import org.apache.orc.impl.MemoryManager;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.impl.OutStream;
-import org.apache.orc.impl.PositionRecorder;
-import org.apache.orc.impl.PositionedOutputStream;
-import org.apache.orc.impl.RunLengthByteWriter;
-import org.apache.orc.impl.RunLengthIntegerWriter;
-import org.apache.orc.impl.RunLengthIntegerWriterV2;
-import org.apache.orc.impl.SerializationUtils;
-import org.apache.orc.impl.SnappyCodec;
-import org.apache.orc.impl.StreamName;
-import org.apache.orc.StringColumnStatistics;
-import org.apache.orc.impl.StringRedBlackTree;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.ZlibCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -74,10 +37,6 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
-import org.apache.orc.CompressionCodec.Modifier;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -98,17 +57,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.orc.OrcProto;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
 
 /**
  * An ORC file writer. The file is divided into stripes, which is the natural
@@ -128,3184 +78,240 @@ import com.google.protobuf.CodedOutputStream;
  * thread as well.
  * 
  */
-public class WriterImpl implements Writer, MemoryManager.Callback {
-
-  private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
-  static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
-
-  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
-  private static final int MIN_ROW_INDEX_STRIDE = 1000;
+public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer {
 
-  // threshold above which buffer size will be automatically resized
-  private static final int COLUMN_COUNT_THRESHOLD = 1000;
-
-  private final FileSystem fs;
-  private final Path path;
-  private final long defaultStripeSize;
-  private long adjustedStripeSize;
-  private final int rowIndexStride;
-  private final CompressionKind compress;
-  private final CompressionCodec codec;
-  private final boolean addBlockPadding;
-  private final int bufferSize;
-  private final long blockSize;
-  private final double paddingTolerance;
-  private final TypeDescription schema;
-
-  // the streams that make up the current stripe
-  private final Map<StreamName, BufferedStream> streams =
-    new TreeMap<StreamName, BufferedStream>();
-
-  private FSDataOutputStream rawWriter = null;
-  // the compressed metadata information outStream
-  private OutStream writer = null;
-  // a protobuf outStream around streamFactory
-  private CodedOutputStream protobufWriter = null;
-  private long headerLength;
-  private int columnCount;
-  private long rowCount = 0;
-  private long rowsInStripe = 0;
-  private long rawDataSize = 0;
-  private int rowsInIndex = 0;
-  private int stripesAtLastFlush = -1;
-  private final List<OrcProto.StripeInformation> stripes =
-    new ArrayList<OrcProto.StripeInformation>();
-  private final Map<String, ByteString> userMetadata =
-    new TreeMap<String, ByteString>();
-  private final StreamFactory streamFactory = new StreamFactory();
-  private final TreeWriter treeWriter;
-  private final boolean buildIndex;
-  private final MemoryManager memoryManager;
-  private final OrcFile.Version version;
-  private final Configuration conf;
-  private final OrcFile.WriterCallback callback;
-  private final OrcFile.WriterContext callbackContext;
-  private final OrcFile.EncodingStrategy encodingStrategy;
-  private final OrcFile.CompressionStrategy compressionStrategy;
-  private final boolean[] bloomFilterColumns;
-  private final double bloomFilterFpp;
-  private boolean writeTimeZone;
+  private final ObjectInspector inspector;
+  private final VectorizedRowBatch internalBatch;
+  private final StructField[] fields;
 
   WriterImpl(FileSystem fs,
-      Path path,
-      Configuration conf,
-      ObjectInspector inspector,
-      TypeDescription schema,
-      long stripeSize,
-      CompressionKind compress,
-      int bufferSize,
-      int rowIndexStride,
-      MemoryManager memoryManager,
-      boolean addBlockPadding,
-      OrcFile.Version version,
-      OrcFile.WriterCallback callback,
-      OrcFile.EncodingStrategy encodingStrategy,
-      OrcFile.CompressionStrategy compressionStrategy,
-      double paddingTolerance,
-      long blockSizeValue,
-      String bloomFilterColumnNames,
-      double bloomFilterFpp) throws IOException {
-    this.fs = fs;
-    this.path = path;
-    this.conf = conf;
-    this.callback = callback;
-    this.schema = schema;
-    if (callback != null) {
-      callbackContext = new OrcFile.WriterContext(){
-
-        @Override
-        public Writer getWriter() {
-          return WriterImpl.this;
-        }
-      };
-    } else {
-      callbackContext = null;
-    }
-    this.adjustedStripeSize = stripeSize;
-    this.defaultStripeSize = stripeSize;
-    this.version = version;
-    this.encodingStrategy = encodingStrategy;
-    this.compressionStrategy = compressionStrategy;
-    this.addBlockPadding = addBlockPadding;
-    this.blockSize = blockSizeValue;
-    this.paddingTolerance = paddingTolerance;
-    this.compress = compress;
-    this.rowIndexStride = rowIndexStride;
-    this.memoryManager = memoryManager;
-    buildIndex = rowIndexStride > 0;
-    codec = createCodec(compress);
-    int numColumns = schema.getMaximumId() + 1;
-    this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
-        numColumns, bufferSize);
-    if (version == OrcFile.Version.V_0_11) {
-      /* do not write bloom filters for ORC v11 */
-      this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
-    } else {
-      this.bloomFilterColumns =
-          OrcUtils.includeColumns(bloomFilterColumnNames, schema);
-    }
-    this.bloomFilterFpp = bloomFilterFpp;
-    treeWriter = createTreeWriter(inspector, schema, streamFactory, false);
-    if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
-      throw new IllegalArgumentException("Row stride must be at least " +
-          MIN_ROW_INDEX_STRIDE);
-    }
-
-    // ensure that we are able to handle callbacks before we register ourselves
-    memoryManager.addWriter(path, stripeSize, this);
-  }
-
-  @VisibleForTesting
-  static int getEstimatedBufferSize(long stripeSize, int numColumns, int bs) {
-    // The worst case is that there are 2 big streams per a column and
-    // we want to guarantee that each stream gets ~10 buffers.
-    // This keeps buffers small enough that we don't get really small stripe
-    // sizes.
-    int estBufferSize = (int) (stripeSize / (20 * numColumns));
-    estBufferSize = getClosestBufferSize(estBufferSize);
-    if (estBufferSize > bs) {
-      estBufferSize = bs;
-    } else {
-      LOG.info("WIDE TABLE - Number of columns: " + numColumns +
-          " Chosen compression buffer size: " + estBufferSize);
-    }
-    return estBufferSize;
-  }
-
-  private static int getClosestBufferSize(int estBufferSize) {
-    final int kb4 = 4 * 1024;
-    final int kb8 = 8 * 1024;
-    final int kb16 = 16 * 1024;
-    final int kb32 = 32 * 1024;
-    final int kb64 = 64 * 1024;
-    final int kb128 = 128 * 1024;
-    final int kb256 = 256 * 1024;
-    if (estBufferSize <= kb4) {
-      return kb4;
-    } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
-      return kb8;
-    } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
-      return kb16;
-    } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
-      return kb32;
-    } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
-      return kb64;
-    } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
-      return kb128;
+             Path path,
+             OrcFile.WriterOptions opts) throws IOException {
+    super(fs, path, opts);
+    this.inspector = opts.getInspector();
+    internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+    if (inspector instanceof StructObjectInspector) {
+      List<? extends StructField> fieldList =
+          ((StructObjectInspector) inspector).getAllStructFieldRefs();
+      fields = new StructField[fieldList.size()];
+      fieldList.toArray(fields);
     } else {
-      return kb256;
-    }
-  }
-
-  public static CompressionCodec createCodec(CompressionKind kind) {
-    switch (kind) {
-      case NONE:
-        return null;
-      case ZLIB:
-        return new ZlibCodec();
-      case SNAPPY:
-        return new SnappyCodec();
-      case LZO:
-        try {
-          Class<? extends CompressionCodec> lzo =
-              (Class<? extends CompressionCodec>)
-                  JavaUtils.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
-          return lzo.newInstance();
-        } catch (ClassNotFoundException e) {
-          throw new IllegalArgumentException("LZO is not available.", e);
-        } catch (InstantiationException e) {
-          throw new IllegalArgumentException("Problem initializing LZO", e);
-        } catch (IllegalAccessException e) {
-          throw new IllegalArgumentException("Insufficient access to LZO", e);
-        }
-      default:
-        throw new IllegalArgumentException("Unknown compression codec: " +
-            kind);
-    }
-  }
-
-  @Override
-  public boolean checkMemory(double newScale) throws IOException {
-    long limit = (long) Math.round(adjustedStripeSize * newScale);
-    long size = estimateStripeSize();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
-                limit);
-    }
-    if (size > limit) {
-      flushStripe();
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * This class is used to hold the contents of streams as they are buffered.
-   * The TreeWriters write to the outStream and the codec compresses the
-   * data as buffers fill up and stores them in the output list. When the
-   * stripe is being written, the whole stream is written to the file.
-   */
-  private class BufferedStream implements OutStream.OutputReceiver {
-    private final OutStream outStream;
-    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
-
-    BufferedStream(String name, int bufferSize,
-                   CompressionCodec codec) throws IOException {
-      outStream = new OutStream(name, bufferSize, codec, this);
-    }
-
-    /**
-     * Receive a buffer from the compression codec.
-     * @param buffer the buffer to save
-     * @throws IOException
-     */
-    @Override
-    public void output(ByteBuffer buffer) {
-      output.add(buffer);
-    }
-
-    /**
-     * Get the number of bytes in buffers that are allocated to this stream.
-     * @return number of bytes in buffers
-     */
-    public long getBufferSize() {
-      long result = 0;
-      for(ByteBuffer buf: output) {
-        result += buf.capacity();
-      }
-      return outStream.getBufferSize() + result;
-    }
-
-    /**
-     * Flush the stream to the codec.
-     * @throws IOException
-     */
-    public void flush() throws IOException {
-      outStream.flush();
-    }
-
-    /**
-     * Clear all of the buffers.
-     * @throws IOException
-     */
-    public void clear() throws IOException {
-      outStream.clear();
-      output.clear();
-    }
-
-    /**
-     * Check the state of suppress flag in output stream
-     * @return value of suppress flag
-     */
-    public boolean isSuppressed() {
-      return outStream.isSuppressed();
-    }
-
-    /**
-     * Get the number of bytes that will be written to the output. Assumes
-     * the stream has already been flushed.
-     * @return the number of bytes
-     */
-    public long getOutputSize() {
-      long result = 0;
-      for(ByteBuffer buffer: output) {
-        result += buffer.remaining();
-      }
-      return result;
-    }
-
-    /**
-     * Write the saved compressed buffers to the OutputStream.
-     * @param out the stream to write to
-     * @throws IOException
-     */
-    void spillTo(OutputStream out) throws IOException {
-      for(ByteBuffer buffer: output) {
-        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
-          buffer.remaining());
-      }
-    }
-
-    @Override
-    public String toString() {
-      return outStream.toString();
-    }
-  }
-
-  /**
-   * An output receiver that writes the ByteBuffers to the output stream
-   * as they are received.
-   */
-  private class DirectStream implements OutStream.OutputReceiver {
-    private final FSDataOutputStream output;
-
-    DirectStream(FSDataOutputStream output) {
-      this.output = output;
-    }
-
-    @Override
-    public void output(ByteBuffer buffer) throws IOException {
-      output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
-        buffer.remaining());
-    }
-  }
-
-  private static class RowIndexPositionRecorder implements PositionRecorder {
-    private final OrcProto.RowIndexEntry.Builder builder;
-
-    RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
-      this.builder = builder;
-    }
-
-    @Override
-    public void addPosition(long position) {
-      builder.addPositions(position);
+      fields = null;
     }
   }
 
-  /**
-   * Interface from the Writer to the TreeWriters. This limits the visibility
-   * that the TreeWriters have into the Writer.
-   */
-  private class StreamFactory {
-    /**
-     * Create a stream to store part of a column.
-     * @param column the column id for the stream
-     * @param kind the kind of stream
-     * @return The output outStream that the section needs to be written to.
-     * @throws IOException
-     */
-    public OutStream createStream(int column,
-                                  OrcProto.Stream.Kind kind
-                                  ) throws IOException {
-      final StreamName name = new StreamName(column, kind);
-      final EnumSet<CompressionCodec.Modifier> modifiers;
-
-      switch (kind) {
-        case BLOOM_FILTER:
-        case DATA:
-        case DICTIONARY_DATA:
-          if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
-            modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
-          } else {
-            modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
-          }
-          break;
-        case LENGTH:
-        case DICTIONARY_COUNT:
-        case PRESENT:
-        case ROW_INDEX:
-        case SECONDARY:
-          // easily compressed using the fastest modes
-          modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
-          break;
-        default:
-          LOG.warn("Missing ORC compression modifiers for " + kind);
-          modifiers = null;
-          break;
-      }
-
-      BufferedStream result = streams.get(name);
-      if (result == null) {
-        result = new BufferedStream(name.toString(), bufferSize,
-            codec == null ? codec : codec.modify(modifiers));
-        streams.put(name, result);
-      }
-      return result.outStream;
-    }
-
-    /**
-     * Get the next column id.
-     * @return a number from 0 to the number of columns - 1
-     */
-    public int getNextColumnId() {
-      return columnCount++;
-    }
-
-    /**
-     * Get the stride rate of the row index.
-     */
-    public int getRowIndexStride() {
-      return rowIndexStride;
-    }
-
-    /**
-     * Should be building the row index.
-     * @return true if we are building the index
-     */
-    public boolean buildIndex() {
-      return buildIndex;
-    }
-
-    /**
-     * Is the ORC file compressed?
-     * @return are the streams compressed
-     */
-    public boolean isCompressed() {
-      return codec != null;
-    }
-
-    /**
-     * Get the encoding strategy to use.
-     * @return encoding strategy
-     */
-    public OrcFile.EncodingStrategy getEncodingStrategy() {
-      return encodingStrategy;
-    }
-
-    /**
-     * Get the compression strategy to use.
-     * @return compression strategy
-     */
-    public OrcFile.CompressionStrategy getCompressionStrategy() {
-      return compressionStrategy;
-    }
-
-    /**
-     * Get the bloom filter columns
-     * @return bloom filter columns
-     */
-    public boolean[] getBloomFilterColumns() {
-      return bloomFilterColumns;
-    }
-
-    /**
-     * Get bloom filter false positive percentage.
-     * @return fpp
-     */
-    public double getBloomFilterFPP() {
-      return bloomFilterFpp;
-    }
-
-    /**
-     * Get the writer's configuration.
-     * @return configuration
-     */
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    /**
-     * Get the version of the file to write.
-     */
-    public OrcFile.Version getVersion() {
-      return version;
-    }
-
-    public void useWriterTimeZone(boolean val) {
-      writeTimeZone = val;
-    }
-
-    public boolean hasWriterTimeZone() {
-      return writeTimeZone;
-    }
-  }
+  private static final long NANOS_PER_MILLI = 1000000;
 
   /**
-   * The parent class of all of the writers for each column. Each column
-   * is written by an instance of this class. The compound types (struct,
-   * list, map, and union) have children tree writers that write the children
-   * types.
+   * Set the value for a given column value within a batch.
+   * @param rowId the row to set
+   * @param column the column to set
+   * @param inspector the object inspector to interpret the obj
+   * @param obj the value to use
    */
-  private abstract static class TreeWriter {
-    protected final int id;
-    protected final ObjectInspector inspector;
-    protected final BitFieldWriter isPresent;
-    private final boolean isCompressed;
-    protected final ColumnStatisticsImpl indexStatistics;
-    protected final ColumnStatisticsImpl stripeColStatistics;
-    private final ColumnStatisticsImpl fileStatistics;
-    protected TreeWriter[] childrenWriters;
-    protected final RowIndexPositionRecorder rowIndexPosition;
-    private final OrcProto.RowIndex.Builder rowIndex;
-    private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
-    private final PositionedOutputStream rowIndexStream;
-    private final PositionedOutputStream bloomFilterStream;
-    protected final BloomFilterIO bloomFilter;
-    protected final boolean createBloomFilter;
-    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
-    private final OrcProto.BloomFilter.Builder bloomFilterEntry;
-    private boolean foundNulls;
-    private OutStream isPresentOutStream;
-    private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
-    private final StreamFactory streamFactory;
-
-    /**
-     * Create a tree writer.
-     * @param columnId the column id of the column to write
-     * @param inspector the object inspector to use
-     * @param schema the row schema
-     * @param streamFactory limited access to the Writer's data.
-     * @param nullable can the value be null?
-     * @throws IOException
-     */
-    TreeWriter(int columnId, ObjectInspector inspector,
-               TypeDescription schema,
-               StreamFactory streamFactory,
-               boolean nullable) throws IOException {
-      this.streamFactory = streamFactory;
-      this.isCompressed = streamFactory.isCompressed();
-      this.id = columnId;
-      this.inspector = inspector;
-      if (nullable) {
-        isPresentOutStream = streamFactory.createStream(id,
-            OrcProto.Stream.Kind.PRESENT);
-        isPresent = new BitFieldWriter(isPresentOutStream, 1);
-      } else {
-        isPresent = null;
-      }
-      this.foundNulls = false;
-      createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
-      indexStatistics = ColumnStatisticsImpl.create(schema);
-      stripeColStatistics = ColumnStatisticsImpl.create(schema);
-      fileStatistics = ColumnStatisticsImpl.create(schema);
-      childrenWriters = new TreeWriter[0];
-      rowIndex = OrcProto.RowIndex.newBuilder();
-      rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
-      rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
-      stripeStatsBuilders = Lists.newArrayList();
-      if (streamFactory.buildIndex()) {
-        rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
-      } else {
-        rowIndexStream = null;
-      }
-      if (createBloomFilter) {
-        bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
-        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
-        bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
-        bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
-            streamFactory.getBloomFilterFPP());
-      } else {
-        bloomFilterEntry = null;
-        bloomFilterIndex = null;
-        bloomFilterStream = null;
-        bloomFilter = null;
-      }
-    }
-
-    protected OrcProto.RowIndex.Builder getRowIndex() {
-      return rowIndex;
-    }
-
-    protected ColumnStatisticsImpl getStripeStatistics() {
-      return stripeColStatistics;
-    }
-
-    protected ColumnStatisticsImpl getFileStatistics() {
-      return fileStatistics;
-    }
-
-    protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
-      return rowIndexEntry;
-    }
-
-    IntegerWriter createIntegerWriter(PositionedOutputStream output,
-                                      boolean signed, boolean isDirectV2,
-                                      StreamFactory writer) {
-      if (isDirectV2) {
-        boolean alignedBitpacking = false;
-        if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
-          alignedBitpacking = true;
-        }
-        return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
-      } else {
-        return new RunLengthIntegerWriter(output, signed);
-      }
-    }
-
-    boolean isNewWriteFormat(StreamFactory writer) {
-      return writer.getVersion() != OrcFile.Version.V_0_11;
-    }
-
-    /**
-     * Add a new value to the column.
-     * @param obj the object to write
-     * @throws IOException
-     */
-    void write(Object obj) throws IOException {
-      if (obj != null) {
-        indexStatistics.increment();
-      } else {
-        indexStatistics.setNull();
-      }
-      if (isPresent != null) {
-        isPresent.write(obj == null ? 0 : 1);
-        if(obj == null) {
-          foundNulls = true;
-        }
-      }
-    }
-
-    /**
-     * Handle the top level object write.
-     *
-     * This default method is used for all types except structs, which are the
-     * typical case. VectorizedRowBatch assumes the top level object is a
-     * struct, so we use the first column for all other types.
-     * @param batch the batch to write from
-     * @param offset the row to start on
-     * @param length the number of rows to write
-     * @throws IOException
-     */
-    void writeRootBatch(VectorizedRowBatch batch, int offset,
-                        int length) throws IOException {
-      writeBatch(batch.cols[0], offset, length);
-    }
-
-    /**
-     * Write the values from the given vector from offset for length elements.
-     * @param vector the vector to write from
-     * @param offset the first value from the vector to write
-     * @param length the number of values from the vector to write
-     * @throws IOException
-     */
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      if (vector.noNulls) {
-        indexStatistics.increment(length);
-        if (isPresent != null) {
-          for (int i = 0; i < length; ++i) {
-            isPresent.write(1);
-          }
-        }
-      } else {
-        if (vector.isRepeating) {
-          boolean isNull = vector.isNull[0];
-          if (isPresent != null) {
-            for (int i = 0; i < length; ++i) {
-              isPresent.write(isNull ? 0 : 1);
+  static void setColumn(int rowId, ColumnVector column,
+                        ObjectInspector inspector, Object obj) {
+    if (obj == null) {
+      column.noNulls = false;
+      column.isNull[rowId] = true;
+    } else {
+      switch (inspector.getCategory()) {
+        case PRIMITIVE:
+          switch (((PrimitiveObjectInspector) inspector)
+              .getPrimitiveCategory()) {
+            case BOOLEAN: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] =
+                  ((BooleanObjectInspector) inspector).get(obj) ? 1 : 0;
+              break;
             }
-          }
-          if (isNull) {
-            foundNulls = true;
-            indexStatistics.setNull();
-          } else {
-            indexStatistics.increment(length);
-          }
-        } else {
-          // count the number of non-null values
-          int nonNullCount = 0;
-          for(int i = 0; i < length; ++i) {
-            boolean isNull = vector.isNull[i + offset];
-            if (!isNull) {
-              nonNullCount += 1;
+            case BYTE: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] = ((ByteObjectInspector) inspector).get(obj);
+              break;
             }
-            if (isPresent != null) {
-              isPresent.write(isNull ? 0 : 1);
+            case SHORT: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] =
+                  ((ShortObjectInspector) inspector).get(obj);
+              break;
+            }
+            case INT: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] = ((IntObjectInspector) inspector).get(obj);
+              break;
+            }
+            case LONG: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] = ((LongObjectInspector) inspector).get(obj);
+              break;
+            }
+            case FLOAT: {
+              DoubleColumnVector vector = (DoubleColumnVector) column;
+              vector.vector[rowId] =
+                  ((FloatObjectInspector) inspector).get(obj);
+              break;
+            }
+            case DOUBLE: {
+              DoubleColumnVector vector = (DoubleColumnVector) column;
+              vector.vector[rowId] =
+                  ((DoubleObjectInspector) inspector).get(obj);
+              break;
+            }
+            case BINARY: {
+              BytesColumnVector vector = (BytesColumnVector) column;
+              BytesWritable blob = ((BinaryObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj);
+              vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+              break;
+            }
+            case STRING: {
+              BytesColumnVector vector = (BytesColumnVector) column;
+              Text blob = ((StringObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj);
+              vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+              break;
+            }
+            case VARCHAR: {
+              BytesColumnVector vector = (BytesColumnVector) column;
+              Text blob = ((HiveVarcharObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj).getTextValue();
+              vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+              break;
+            }
+            case CHAR: {
+              BytesColumnVector vector = (BytesColumnVector) column;
+              Text blob = ((HiveCharObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj).getTextValue();
+              vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+              break;
+            }
+            case TIMESTAMP: {
+              LongColumnVector vector = (LongColumnVector) column;
+              Timestamp ts = ((TimestampObjectInspector) inspector)
+                  .getPrimitiveJavaObject(obj);
+              vector.vector[rowId] = ts.getTime() * NANOS_PER_MILLI +
+                  (ts.getNanos() % NANOS_PER_MILLI);
+              break;
+            }
+            case DATE: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] = ((DateObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj).getDays();
+              break;
+            }
+            case DECIMAL: {
+              DecimalColumnVector vector = (DecimalColumnVector) column;
+              vector.set(rowId, ((HiveDecimalObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj));
+              break;
             }
           }
-          indexStatistics.increment(nonNullCount);
-          if (nonNullCount != length) {
-            foundNulls = true;
-            indexStatistics.setNull();
-          }
-        }
-      }
-    }
-
-    private void removeIsPresentPositions() {
-      for(int i=0; i < rowIndex.getEntryCount(); ++i) {
-        OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
-        List<Long> positions = entry.getPositionsList();
-        // bit streams use 3 positions if uncompressed, 4 if compressed
-        positions = positions.subList(isCompressed ? 4 : 3, positions.size());
-        entry.clearPositions();
-        entry.addAllPositions(positions);
-      }
-    }
-
-    /**
-     * Write the stripe out to the file.
-     * @param builder the stripe footer that contains the information about the
-     *                layout of the stripe. The TreeWriter is required to update
-     *                the footer with its information.
-     * @param requiredIndexEntries the number of index entries that are
-     *                             required. this is to check to make sure the
-     *                             row index is well formed.
-     * @throws IOException
-     */
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      if (isPresent != null) {
-        isPresent.flush();
-
-        // if no nulls are found in a stream, then suppress the stream
-        if(!foundNulls) {
-          isPresentOutStream.suppress();
-          // since isPresent bitstream is suppressed, update the index to
-          // remove the positions of the isPresent stream
-          if (rowIndexStream != null) {
-            removeIsPresentPositions();
+          break;
+        case STRUCT: {
+          StructColumnVector vector = (StructColumnVector) column;
+          StructObjectInspector oi = (StructObjectInspector) inspector;
+          List<? extends StructField> fields = oi.getAllStructFieldRefs();
+          for (int c = 0; c < vector.fields.length; ++c) {
+            StructField field = fields.get(c);
+            setColumn(rowId, vector.fields[c], field.getFieldObjectInspector(),
+                oi.getStructFieldData(obj, field));
           }
+          break;
         }
-      }
-
-      // merge stripe-level column statistics to file statistics and write it to
-      // stripe statistics
-      OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
-      writeStripeStatistics(stripeStatsBuilder, this);
-      stripeStatsBuilders.add(stripeStatsBuilder);
-
-      // reset the flag for next stripe
-      foundNulls = false;
-
-      builder.addColumns(getEncoding());
-      if (streamFactory.hasWriterTimeZone()) {
-        builder.setWriterTimezone(TimeZone.getDefault().getID());
-      }
-      if (rowIndexStream != null) {
-        if (rowIndex.getEntryCount() != requiredIndexEntries) {
-          throw new IllegalArgumentException("Column has wrong number of " +
-               "index entries found: " + rowIndex.getEntryCount() + " expected: " +
-               requiredIndexEntries);
+        case UNION: {
+          UnionColumnVector vector = (UnionColumnVector) column;
+          UnionObjectInspector oi = (UnionObjectInspector) inspector;
+          int tag = oi.getTag(obj);
+          vector.tags[rowId] = tag;
+          setColumn(rowId, vector.fields[tag],
+              oi.getObjectInspectors().get(tag), oi.getField(obj));
+          break;
         }
-        rowIndex.build().writeTo(rowIndexStream);
-        rowIndexStream.flush();
-      }
-      rowIndex.clear();
-      rowIndexEntry.clear();
-
-      // write the bloom filter to out stream
-      if (bloomFilterStream != null) {
-        bloomFilterIndex.build().writeTo(bloomFilterStream);
-        bloomFilterStream.flush();
-        bloomFilterIndex.clear();
-        bloomFilterEntry.clear();
-      }
-    }
-
-    private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
-        TreeWriter treeWriter) {
-      treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
-      builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
-      treeWriter.stripeColStatistics.reset();
-      for (TreeWriter child : treeWriter.getChildrenWriters()) {
-        writeStripeStatistics(builder, child);
-      }
-    }
-
-    TreeWriter[] getChildrenWriters() {
-      return childrenWriters;
-    }
-
-    /**
-     * Get the encoding for this column.
-     * @return the information about the encoding of this column
-     */
-    OrcProto.ColumnEncoding getEncoding() {
-      return OrcProto.ColumnEncoding.newBuilder().setKind(
-          OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    /**
-     * Create a row index entry with the previous location and the current
-     * index statistics. Also merges the index statistics into the file
-     * statistics before they are cleared. Finally, it records the start of the
-     * next index and ensures all of the children columns also create an entry.
-     * @throws IOException
-     */
-    void createRowIndexEntry() throws IOException {
-      stripeColStatistics.merge(indexStatistics);
-      rowIndexEntry.setStatistics(indexStatistics.serialize());
-      indexStatistics.reset();
-      rowIndex.addEntry(rowIndexEntry);
-      rowIndexEntry.clear();
-      addBloomFilterEntry();
-      recordPosition(rowIndexPosition);
-      for(TreeWriter child: childrenWriters) {
-        child.createRowIndexEntry();
-      }
-    }
-
-    void addBloomFilterEntry() {
-      if (createBloomFilter) {
-        bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
-        bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
-        bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
-        bloomFilter.reset();
-        bloomFilterEntry.clear();
-      }
-    }
-
-    /**
-     * Record the current position in each of this column's streams.
-     * @param recorder where should the locations be recorded
-     * @throws IOException
-     */
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      if (isPresent != null) {
-        isPresent.getPosition(recorder);
-      }
-    }
-
-    /**
-     * Estimate how much memory the writer is consuming excluding the streams.
-     * @return the number of bytes.
-     */
-    long estimateMemory() {
-      long result = 0;
-      for (TreeWriter child: childrenWriters) {
-        result += child.estimateMemory();
-      }
-      return result;
-    }
-  }
-
-  private static class BooleanTreeWriter extends TreeWriter {
-    private final BitFieldWriter writer;
-
-    BooleanTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      PositionedOutputStream out = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.writer = new BitFieldWriter(out, 1);
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        boolean val = ((BooleanObjectInspector) inspector).get(obj);
-        indexStatistics.updateBoolean(val, 1);
-        writer.write(val ? 1 : 0);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int value = vec.vector[0] == 0 ? 0 : 1;
-          indexStatistics.updateBoolean(value != 0, length);
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
+        case LIST: {
+          ListColumnVector vector = (ListColumnVector) column;
+          ListObjectInspector oi = (ListObjectInspector) inspector;
+          int offset = vector.childCount;
+          int length = oi.getListLength(obj);
+          vector.offsets[rowId] = offset;
+          vector.lengths[rowId] = length;
+          vector.child.ensureSize(offset + length, true);
+          vector.childCount += length;
+          for (int c = 0; c < length; ++c) {
+            setColumn(offset + c, vector.child,
+                oi.getListElementObjectInspector(),
+                oi.getListElement(obj, c));
           }
+          break;
         }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            int value = vec.vector[i + offset] == 0 ? 0 : 1;
-            writer.write(value);
-            indexStatistics.updateBoolean(value != 0, 1);
+        case MAP: {
+          MapColumnVector vector = (MapColumnVector) column;
+          MapObjectInspector oi = (MapObjectInspector) inspector;
+          int offset = vector.childCount;
+          Set map = oi.getMap(obj).entrySet();
+          int length = map.size();
+          vector.offsets[rowId] = offset;
+          vector.lengths[rowId] = length;
+          vector.keys.ensureSize(offset + length, true);
+          vector.values.ensureSize(offset + length, true);
+          vector.childCount += length;
+          for (Object item: map) {
+            Map.Entry pair = (Map.Entry) item;
+            setColumn(offset, vector.keys, oi.getMapKeyObjectInspector(),
+                pair.getKey());
+            setColumn(offset, vector.values, oi.getMapValueObjectInspector(),
+                pair.getValue());
+            offset += 1;
           }
+          break;
         }
+        default:
+          throw new IllegalArgumentException("Unknown ObjectInspector kind " +
+              inspector.getCategory());
       }
     }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      writer.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
-    }
   }
 
-  private static class ByteTreeWriter extends TreeWriter {
-    private final RunLengthByteWriter writer;
-
-    ByteTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.writer = new RunLengthByteWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA));
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        byte val = ((ByteObjectInspector) inspector).get(obj);
-        indexStatistics.updateInteger(val, 1);
-        if (createBloomFilter) {
-          bloomFilter.addLong(val);
-        }
-        writer.write(val);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          byte value = (byte) vec.vector[0];
-          indexStatistics.updateInteger(value, length);
-          if (createBloomFilter) {
-            bloomFilter.addLong(value);
-          }
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            byte value = (byte) vec.vector[i + offset];
-            writer.write(value);
-            indexStatistics.updateInteger(value, 1);
-            if (createBloomFilter) {
-              bloomFilter.addLong(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      writer.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
+  void flushInternalBatch() throws IOException {
+    if (internalBatch.size != 0) {
+      super.addRowBatch(internalBatch);
+      internalBatch.reset();
     }
   }
 
-  private static class IntegerTreeWriter extends TreeWriter {
-    private final IntegerWriter writer;
-    private final ShortObjectInspector shortInspector;
-    private final IntObjectInspector intInspector;
-    private final LongObjectInspector longInspector;
-    private boolean isDirectV2 = true;
-
-    IntegerTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      OutStream out = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
-      if (inspector instanceof IntObjectInspector) {
-        intInspector = (IntObjectInspector) inspector;
-        shortInspector = null;
-        longInspector = null;
-      } else {
-        intInspector = null;
-        if (inspector instanceof LongObjectInspector) {
-          longInspector = (LongObjectInspector) inspector;
-          shortInspector = null;
-        } else {
-          shortInspector = (ShortObjectInspector) inspector;
-          longInspector = null;
-        }
-      }
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        long val;
-        if (intInspector != null) {
-          val = intInspector.get(obj);
-        } else if (longInspector != null) {
-          val = longInspector.get(obj);
-        } else {
-          val = shortInspector.get(obj);
-        }
-        indexStatistics.updateInteger(val, 1);
-        if (createBloomFilter) {
-          // integers are converted to longs in column statistics and during SARG evaluation
-          bloomFilter.addLong(val);
-        }
-        writer.write(val);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          long value = vec.vector[0];
-          indexStatistics.updateInteger(value, length);
-          if (createBloomFilter) {
-            bloomFilter.addLong(value);
-          }
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            long value = vec.vector[i + offset];
-            writer.write(value);
-            indexStatistics.updateInteger(value, 1);
-            if (createBloomFilter) {
-              bloomFilter.addLong(value);
-            }
-          }
-        }
+  @Override
+  public void addRow(Object row) throws IOException {
+    int rowId = internalBatch.size++;
+    if (fields != null) {
+      StructObjectInspector soi = (StructObjectInspector) inspector;
+      for(int i=0; i < fields.length; ++i) {
+        setColumn(rowId, internalBatch.cols[i],
+            fields[i].getFieldObjectInspector(),
+            soi.getStructFieldData(row, fields[i]));
       }
+    } else {
+      setColumn(rowId, internalBatch.cols[0], inspector, row);
     }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      writer.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
+    if (internalBatch.size == internalBatch.getMaxSize()) {
+      flushInternalBatch();
     }
   }
 
-  private static class FloatTreeWriter extends TreeWriter {
-    private final PositionedOutputStream stream;
-    private final SerializationUtils utils;
+  @Override
+  public long writeIntermediateFooter() throws IOException {
+    flushInternalBatch();
+    return super.writeIntermediateFooter();
+  }
 
-    FloatTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.stream = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.utils = new SerializationUtils();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        float val = ((FloatObjectInspector) inspector).get(obj);
-        indexStatistics.updateDouble(val);
-        if (createBloomFilter) {
-          // floats are converted to doubles in column statistics and during SARG evaluation
-          bloomFilter.addDouble(val);
-        }
-        utils.writeFloat(stream, val);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      DoubleColumnVector vec = (DoubleColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          float value = (float) vec.vector[0];
-          indexStatistics.updateDouble(value);
-          if (createBloomFilter) {
-            bloomFilter.addDouble(value);
-          }
-          for(int i=0; i < length; ++i) {
-            utils.writeFloat(stream, value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            float value = (float) vec.vector[i + offset];
-            utils.writeFloat(stream, value);
-            indexStatistics.updateDouble(value);
-            if (createBloomFilter) {
-              bloomFilter.addDouble(value);
-            }
-          }
-        }
-      }
-    }
-
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      stream.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      stream.getPosition(recorder);
-    }
-  }
-
-  private static class DoubleTreeWriter extends TreeWriter {
-    private final PositionedOutputStream stream;
-    private final SerializationUtils utils;
-
-    DoubleTreeWriter(int columnId,
-                    ObjectInspector inspector,
-                    TypeDescription schema,
-                    StreamFactory writer,
-                    boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.stream = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.utils = new SerializationUtils();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        double val = ((DoubleObjectInspector) inspector).get(obj);
-        indexStatistics.updateDouble(val);
-        if (createBloomFilter) {
-          bloomFilter.addDouble(val);
-        }
-        utils.writeDouble(stream, val);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      DoubleColumnVector vec = (DoubleColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          double value = vec.vector[0];
-          indexStatistics.updateDouble(value);
-          if (createBloomFilter) {
-            bloomFilter.addDouble(value);
-          }
-          for(int i=0; i < length; ++i) {
-            utils.writeDouble(stream, value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            double value = vec.vector[i + offset];
-            utils.writeDouble(stream, value);
-            indexStatistics.updateDouble(value);
-            if (createBloomFilter) {
-              bloomFilter.addDouble(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      stream.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      stream.getPosition(recorder);
-    }
-  }
-
-  private static abstract class StringBaseTreeWriter extends TreeWriter {
-    private static final int INITIAL_DICTIONARY_SIZE = 4096;
-    private final OutStream stringOutput;
-    private final IntegerWriter lengthOutput;
-    private final IntegerWriter rowOutput;
-    protected final StringRedBlackTree dictionary =
-        new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
-    protected final DynamicIntArray rows = new DynamicIntArray();
-    protected final PositionedOutputStream directStreamOutput;
-    protected final IntegerWriter directLengthOutput;
-    private final List<OrcProto.RowIndexEntry> savedRowIndex =
-        new ArrayList<OrcProto.RowIndexEntry>();
-    private final boolean buildIndex;
-    private final List<Long> rowIndexValueCount = new ArrayList<Long>();
-    // If the number of keys in a dictionary is greater than this fraction of
-    //the total number of non-null rows, turn off dictionary encoding
-    private final double dictionaryKeySizeThreshold;
-    protected boolean useDictionaryEncoding = true;
-    private boolean isDirectV2 = true;
-    private boolean doneDictionaryCheck;
-    private final boolean strideDictionaryCheck;
-
-    StringBaseTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      stringOutput = writer.createStream(id,
-          OrcProto.Stream.Kind.DICTIONARY_DATA);
-      lengthOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      rowOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-      rowIndexValueCount.add(0L);
-      buildIndex = writer.buildIndex();
-      directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
-      directLengthOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      Configuration conf = writer.getConfiguration();
-      dictionaryKeySizeThreshold =
-          OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
-      strideDictionaryCheck =
-          OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
-      doneDictionaryCheck = false;
-    }
-
-    /**
-     * Method to retrieve text values from the value object, which can be overridden
-     * by subclasses.
-     * @param obj  value
-     * @return Text text value from obj
-     */
-    Text getTextValue(Object obj) {
-      return ((StringObjectInspector) inspector).getPrimitiveWritableObject(obj);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        Text val = getTextValue(obj);
-        if (useDictionaryEncoding) {
-          rows.add(dictionary.add(val));
-        } else {
-          // write data and length
-          directStreamOutput.write(val.getBytes(), 0, val.getLength());
-          directLengthOutput.write(val.getLength());
-        }
-        indexStatistics.updateString(val);
-        if (createBloomFilter) {
-          bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
-        }
-      }
-    }
-
-    private boolean checkDictionaryEncoding() {
-      if (!doneDictionaryCheck) {
-        // Set the flag indicating whether or not to use dictionary encoding
-        // based on whether or not the fraction of distinct keys over number of
-        // non-null rows is less than the configured threshold
-        float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
-        useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
-        doneDictionaryCheck = true;
-      }
-      return useDictionaryEncoding;
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
-      // checking would not have happened. So do it again here.
-      checkDictionaryEncoding();
-
-      if (useDictionaryEncoding) {
-        flushDictionary();
-      } else {
-        // flushout any left over entries from dictionary
-        if (rows.size() > 0) {
-          flushDictionary();
-        }
-
-        // suppress the stream for every stripe if dictionary is disabled
-        stringOutput.suppress();
-      }
-
-      // we need to build the rowindex before calling super, since it
-      // writes it out.
-      super.writeStripe(builder, requiredIndexEntries);
-      stringOutput.flush();
-      lengthOutput.flush();
-      rowOutput.flush();
-      directStreamOutput.flush();
-      directLengthOutput.flush();
-      // reset all of the fields to be ready for the next stripe.
-      dictionary.clear();
-      savedRowIndex.clear();
-      rowIndexValueCount.clear();
-      recordPosition(rowIndexPosition);
-      rowIndexValueCount.add(0L);
-
-      if (!useDictionaryEncoding) {
-        // record the start positions of first index stride of next stripe i.e
-        // beginning of the direct streams when dictionary is disabled
-        recordDirectStreamPosition();
-      }
-    }
-
-    private void flushDictionary() throws IOException {
-      final int[] dumpOrder = new int[dictionary.size()];
-
-      if (useDictionaryEncoding) {
-        // Write the dictionary by traversing the red-black tree writing out
-        // the bytes and lengths; and creating the map from the original order
-        // to the final sorted order.
-
-        dictionary.visit(new StringRedBlackTree.Visitor() {
-          private int currentId = 0;
-          @Override
-          public void visit(StringRedBlackTree.VisitorContext context
-                           ) throws IOException {
-            context.writeBytes(stringOutput);
-            lengthOutput.write(context.getLength());
-            dumpOrder[context.getOriginalPosition()] = currentId++;
-          }
-        });
-      } else {
-        // for direct encoding, we don't want the dictionary data stream
-        stringOutput.suppress();
-      }
-      int length = rows.size();
-      int rowIndexEntry = 0;
-      OrcProto.RowIndex.Builder rowIndex = getRowIndex();
-      Text text = new Text();
-      // write the values translated into the dump order.
-      for(int i = 0; i <= length; ++i) {
-        // now that we are writing out the row values, we can finalize the
-        // row index
-        if (buildIndex) {
-          while (i == rowIndexValueCount.get(rowIndexEntry) &&
-              rowIndexEntry < savedRowIndex.size()) {
-            OrcProto.RowIndexEntry.Builder base =
-                savedRowIndex.get(rowIndexEntry++).toBuilder();
-            if (useDictionaryEncoding) {
-              rowOutput.getPosition(new RowIndexPositionRecorder(base));
-            } else {
-              PositionRecorder posn = new RowIndexPositionRecorder(base);
-              directStreamOutput.getPosition(posn);
-              directLengthOutput.getPosition(posn);
-            }
-            rowIndex.addEntry(base.build());
-          }
-        }
-        if (i != length) {
-          if (useDictionaryEncoding) {
-            rowOutput.write(dumpOrder[rows.get(i)]);
-          } else {
-            dictionary.getText(text, rows.get(i));
-            directStreamOutput.write(text.getBytes(), 0, text.getLength());
-            directLengthOutput.write(text.getLength());
-          }
-        }
-      }
-      rows.clear();
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      // Returns the encoding used for the last call to writeStripe
-      if (useDictionaryEncoding) {
-        if(isDirectV2) {
-          return OrcProto.ColumnEncoding.newBuilder().setKind(
-              OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
-              setDictionarySize(dictionary.size()).build();
-        }
-        return OrcProto.ColumnEncoding.newBuilder().setKind(
-            OrcProto.ColumnEncoding.Kind.DICTIONARY).
-            setDictionarySize(dictionary.size()).build();
-      } else {
-        if(isDirectV2) {
-          return OrcProto.ColumnEncoding.newBuilder().setKind(
-              OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-        }
-        return OrcProto.ColumnEncoding.newBuilder().setKind(
-            OrcProto.ColumnEncoding.Kind.DIRECT).build();
-      }
-    }
-
-    /**
-     * This method doesn't call the super method, because unlike most of the
-     * other TreeWriters, this one can't record the position in the streams
-     * until the stripe is being flushed. Therefore it saves all of the entries
-     * and augments them with the final information as the stripe is written.
-     * @throws IOException
-     */
-    @Override
-    void createRowIndexEntry() throws IOException {
-      getStripeStatistics().merge(indexStatistics);
-      OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
-      rowIndexEntry.setStatistics(indexStatistics.serialize());
-      indexStatistics.reset();
-      OrcProto.RowIndexEntry base = rowIndexEntry.build();
-      savedRowIndex.add(base);
-      rowIndexEntry.clear();
-      addBloomFilterEntry();
-      recordPosition(rowIndexPosition);
-      rowIndexValueCount.add(Long.valueOf(rows.size()));
-      if (strideDictionaryCheck) {
-        checkDictionaryEncoding();
-      }
-      if (!useDictionaryEncoding) {
-        if (rows.size() > 0) {
-          flushDictionary();
-          // just record the start positions of next index stride
-          recordDirectStreamPosition();
-        } else {
-          // record the start positions of next index stride
-          recordDirectStreamPosition();
-          getRowIndex().addEntry(base);
-        }
-      }
-    }
-
-    private void recordDirectStreamPosition() throws IOException {
-      directStreamOutput.getPosition(rowIndexPosition);
-      directLengthOutput.getPosition(rowIndexPosition);
-    }
-
-    @Override
-    long estimateMemory() {
-      return rows.getSizeInBytes() + dictionary.getSizeInBytes();
-    }
-  }
-
-  private static class StringTreeWriter extends StringBaseTreeWriter {
-    StringTreeWriter(int columnId,
-                   ObjectInspector inspector,
-                   TypeDescription schema,
-                   StreamFactory writer,
-                   boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          if (useDictionaryEncoding) {
-            int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
-            for(int i=0; i < length; ++i) {
-              rows.add(id);
-            }
-          } else {
-            for(int i=0; i < length; ++i) {
-              directStreamOutput.write(vec.vector[0], vec.start[0],
-                  vec.length[0]);
-              directLengthOutput.write(vec.length[0]);
-            }
-          }
-          indexStatistics.updateString(vec.vector[0], vec.start[0],
-              vec.length[0], length);
-          if (createBloomFilter) {
-            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            if (useDictionaryEncoding) {
-              rows.add(dictionary.add(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]));
-            } else {
-              directStreamOutput.write(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-              directLengthOutput.write(vec.length[offset + i]);
-            }
-            indexStatistics.updateString(vec.vector[offset + i],
-                vec.start[offset + i], vec.length[offset + i], 1);
-            if (createBloomFilter) {
-              bloomFilter.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Under the covers, char is written to ORC the same way as string.
-   */
-  private static class CharTreeWriter extends StringBaseTreeWriter {
-    private final int itemLength;
-    private final byte[] padding;
-
-    CharTreeWriter(int columnId,
-        ObjectInspector inspector,
-        TypeDescription schema,
-        StreamFactory writer,
-        boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      itemLength = schema.getMaxLength();
-      padding = new byte[itemLength];
-    }
-
-    /**
-     * Override base class implementation to support char values.
-     */
-    @Override
-    Text getTextValue(Object obj) {
-      return (((HiveCharObjectInspector) inspector)
-          .getPrimitiveWritableObject(obj)).getTextValue();
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          byte[] ptr;
-          int ptrOffset;
-          if (vec.length[0] >= itemLength) {
-            ptr = vec.vector[0];
-            ptrOffset = vec.start[0];
-          } else {
-            ptr = padding;
-            ptrOffset = 0;
-            System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
-                vec.length[0]);
-            Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
-          }
-          if (useDictionaryEncoding) {
-            int id = dictionary.add(ptr, ptrOffset, itemLength);
-            for(int i=0; i < length; ++i) {
-              rows.add(id);
-            }
-          } else {
-            for(int i=0; i < length; ++i) {
-              directStreamOutput.write(ptr, ptrOffset, itemLength);
-              directLengthOutput.write(itemLength);
-            }
-          }
-          indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
-          if (createBloomFilter) {
-            bloomFilter.addBytes(ptr, ptrOffset, itemLength);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            byte[] ptr;
-            int ptrOffset;
-            if (vec.length[offset + i] >= itemLength) {
-              ptr = vec.vector[offset + i];
-              ptrOffset = vec.start[offset + i];
-            } else {
-              // it is the wrong length, so copy it
-              ptr = padding;
-              ptrOffset = 0;
-              System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
-                  ptr, 0, vec.length[offset + i]);
-              Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
-            }
-            if (useDictionaryEncoding) {
-              rows.add(dictionary.add(ptr, ptrOffset, itemLength));
-            } else {
-              directStreamOutput.write(ptr, ptrOffset, itemLength);
-              directLengthOutput.write(itemLength);
-            }
-            indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
-            if (createBloomFilter) {
-              bloomFilter.addBytes(ptr, ptrOffset, itemLength);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Under the covers, varchar is written to ORC the same way as string.
-   */
-  private static class VarcharTreeWriter extends StringBaseTreeWriter {
-    private final int maxLength;
-
-    VarcharTreeWriter(int columnId,
-        ObjectInspector inspector,
-        TypeDescription schema,
-        StreamFactory writer,
-        boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      maxLength = schema.getMaxLength();
-    }
-
-    /**
-     * Override base class implementation to support varchar values.
-     */
-    @Override
-    Text getTextValue(Object obj) {
-      return (((HiveVarcharObjectInspector) inspector)
-          .getPrimitiveWritableObject(obj)).getTextValue();
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int itemLength = Math.min(vec.length[0], maxLength);
-          if (useDictionaryEncoding) {
-            int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
-            for(int i=0; i < length; ++i) {
-              rows.add(id);
-            }
-          } else {
-            for(int i=0; i < length; ++i) {
-              directStreamOutput.write(vec.vector[0], vec.start[0],
-                  itemLength);
-              directLengthOutput.write(itemLength);
-            }
-          }
-          indexStatistics.updateString(vec.vector[0], vec.start[0],
-              itemLength, length);
-          if (createBloomFilter) {
-            bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            int itemLength = Math.min(vec.length[offset + i], maxLength);
-            if (useDictionaryEncoding) {
-              rows.add(dictionary.add(vec.vector[offset + i],
-                  vec.start[offset + i], itemLength));
-            } else {
-              directStreamOutput.write(vec.vector[offset + i],
-                  vec.start[offset + i], itemLength);
-              directLengthOutput.write(itemLength);
-            }
-            indexStatistics.updateString(vec.vector[offset + i],
-                vec.start[offset + i], itemLength, 1);
-            if (createBloomFilter) {
-              bloomFilter.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], itemLength);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  private static class BinaryTreeWriter extends TreeWriter {
-    private final PositionedOutputStream stream;
-    private final IntegerWriter length;
-    private boolean isDirectV2 = true;
-
-    BinaryTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.stream = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.length = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        BytesWritable val =
-            ((BinaryObjectInspector) inspector).getPrimitiveWritableObject(obj);
-        stream.write(val.getBytes(), 0, val.getLength());
-        length.write(val.getLength());
-        indexStatistics.updateBinary(val);
-        if (createBloomFilter) {
-          bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
-        }
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          for(int i=0; i < length; ++i) {
-            stream.write(vec.vector[0], vec.start[0],
-                  vec.length[0]);
-            this.length.write(vec.length[0]);
-          }
-          indexStatistics.updateBinary(vec.vector[0], vec.start[0],
-              vec.length[0], length);
-          if (createBloomFilter) {
-            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            stream.write(vec.vector[offset + i],
-                vec.start[offset + i], vec.length[offset + i]);
-            this.length.write(vec.length[offset + i]);
-            indexStatistics.updateBinary(vec.vector[offset + i],
-                vec.start[offset + i], vec.length[offset + i], 1);
-            if (createBloomFilter) {
-              bloomFilter.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-            }
-          }
-        }
-      }
-    }
-
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      stream.flush();
-      length.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      stream.getPosition(recorder);
-      length.getPosition(recorder);
-    }
-  }
-
-  static final int MILLIS_PER_SECOND = 1000;
-  static final int NANOS_PER_SECOND = 1000000000;
-  static final int MILLIS_PER_NANO  = 1000000;
-  static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
-
-  private static class TimestampTreeWriter extends TreeWriter {
-    private final IntegerWriter seconds;
-    private final IntegerWriter nanos;
-    private final boolean isDirectV2;
-    private final long base_timestamp;
-
-    TimestampTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.seconds = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
-      this.nanos = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-      // for unit tests to set different time zones
-      this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
-      writer.useWriterTimeZone(true);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        Timestamp val =
-            ((TimestampObjectInspector) inspector).
-                getPrimitiveJavaObject(obj);
-        indexStatistics.updateTimestamp(val);
-        seconds.write((val.getTime() / MILLIS_PER_SECOND) - base_timestamp);
-        nanos.write(formatNanos(val.getNanos()));
-        if (createBloomFilter) {
-          bloomFilter.addLong(val.getTime());
-        }
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          long value = vec.vector[0];
-          long valueMillis = value / MILLIS_PER_NANO;
-          indexStatistics.updateTimestamp(valueMillis);
-          if (createBloomFilter) {
-            bloomFilter.addLong(valueMillis);
-          }
-          final long secs = value / NANOS_PER_SECOND - base_timestamp;
-          final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
-          for(int i=0; i < length; ++i) {
-            seconds.write(secs);
-            nanos.write(nano);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            long value = vec.vector[i + offset];
-            long valueMillis = value / MILLIS_PER_NANO;
-            long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
-            int valueNanos = (int) (value % NANOS_PER_SECOND);
-            if (valueNanos < 0) {
-              valueNanos += NANOS_PER_SECOND;
-            }
-            seconds.write(valueSecs);
-            nanos.write(formatNanos(valueNanos));
-            indexStatistics.updateTimestamp(valueMillis);
-            if (createBloomFilter) {
-              bloomFilter.addLong(valueMillis);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      seconds.flush();
-      nanos.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    private static long formatNanos(int nanos) {
-      if (nanos == 0) {
-        return 0;
-      } else if (nanos % 100 != 0) {
-        return ((long) nanos) << 3;
-      } else {
-        nanos /= 100;
-        int trailingZeros = 1;
-        while (nanos % 10 == 0 && trailingZeros < 7) {
-          nanos /= 10;
-          trailingZeros += 1;
-        }
-        return ((long) nanos) << 3 | trailingZeros;
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      seconds.getPosition(recorder);
-      nanos.getPosition(recorder);
-    }
-  }
-
-  private static class DateTreeWriter extends TreeWriter {
-    private final IntegerWriter writer;
-    private final boolean isDirectV2;
-
-    DateTreeWriter(int columnId,
-                   ObjectInspector inspector,
-                   TypeDescription schema,
-                   StreamFactory writer,
-                   boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      OutStream out = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        // Using the Writable here as it's used directly for writing as well as for stats.
-        DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
-        indexStatistics.updateDate(val);
-        writer.write(val.getDays());
-        if (createBloomFilter) {
-          bloomFilter.addLong(val.getDays());
-        }
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int value = (int) vec.vector[0];
-          indexStatistics.updateDate(value);
-          if (createBloomFilter) {
-            bloomFilter.addLong(value);
-          }
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            int value = (int) vec.vector[i + offset];
-            writer.write(value);
-            indexStatistics.updateDate(value);
-            if (createBloomFilter) {
-              bloomFilter.addLong(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      writer.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-  }
-
-  private static class DecimalTreeWriter extends TreeWriter {
-    private final PositionedOutputStream valueStream;
-    private final IntegerWriter scaleStream;
-    private final boolean isDirectV2;
-
-    DecimalTreeWriter(int columnId,
-                        ObjectInspector inspector,
-                        TypeDescription schema,
-                        StreamFactory writer,
-                        boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
-      this.scaleStream = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        HiveDecimal decimal = ((HiveDecimalObjectInspector) inspector).
-            getPrimitiveJavaObject(obj);
-        if (decimal == null) {
-          return;
-        }
-        SerializationUtils.writeBigInteger(valueStream,
-            decimal.unscaledValue());
-        scaleStream.write(decimal.scale());
-        indexStatistics.updateDecimal(decimal);
-        if (createBloomFilter) {
-          bloomFilter.addString(decimal.toString());
-        }
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      DecimalColumnVector vec = (DecimalColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          HiveDecimal value = vec.vector[0].getHiveDecimal();
-          indexStatistics.updateDecimal(value);
-          if (createBloomFilter) {
-            bloomFilter.addString(value.toString());
-          }
-          for(int i=0; i < length; ++i) {
-            SerializationUtils.writeBigInteger(valueStream,
-                value.unscaledValue());
-            scaleStream.write(value.scale());
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
-            SerializationUtils.writeBigInteger(valueStream,
-                value.unscaledValue());
-            scaleStream.write(value.scale());
-            indexStatistics.updateDecimal(value);
-            if (createBloomFilter) {
-              bloomFilter.addString(value.toString());
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      valueStream.flush();
-      scaleStream.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      valueStream.getPosition(recorder);
-      scaleStream.getPosition(recorder);
-    }
-  }
-
-  private static class StructTreeWriter extends TreeWriter {
-    private final List<? extends StructField> fields;
-    StructTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      List<TypeDescription> children = schema.getChildren();
-      if (inspector != null) {
-        StructObjectInspector structObjectInspector =
-            (StructObjectInspector) inspector;
-        fields = structObjectInspector.getAllStructFieldRefs();
-      } else {
-        fields = null;
-      }
-      childrenWriters = new TreeWriter[children.size()];
-      for(int i=0; i < childrenWriters.length; ++i) {
-        ObjectInspector childOI;
-        if (fields != null && i < fields.size()) {
-          childOI = fields.get(i).getFieldObjectInspector();
-        } else {
-          childOI = null;
-        }
-        childrenWriters[i] = createTreeWriter(
-          childOI, children.get(i), writer,
-          true);
-      }
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        StructObjectInspector insp = (StructObjectInspecto

<TRUNCATED>