You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/14 22:36:15 UTC

[2/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 993be71..8cfb402 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -18,52 +18,15 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import java.util.TimeZone;
-import java.util.TreeMap;
+import java.util.Set;
 
-import org.apache.orc.BinaryColumnStatistics;
-import org.apache.orc.impl.BitFieldWriter;
-import org.apache.orc.impl.ColumnStatisticsImpl;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.impl.DynamicIntArray;
-import org.apache.orc.impl.IntegerWriter;
-import org.apache.orc.impl.MemoryManager;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.impl.OutStream;
-import org.apache.orc.impl.PositionRecorder;
-import org.apache.orc.impl.PositionedOutputStream;
-import org.apache.orc.impl.RunLengthByteWriter;
-import org.apache.orc.impl.RunLengthIntegerWriter;
-import org.apache.orc.impl.RunLengthIntegerWriterV2;
-import org.apache.orc.impl.SerializationUtils;
-import org.apache.orc.impl.SnappyCodec;
-import org.apache.orc.impl.StreamName;
-import org.apache.orc.StringColumnStatistics;
-import org.apache.orc.impl.StringRedBlackTree;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.ZlibCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -74,10 +37,6 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
-import org.apache.orc.CompressionCodec.Modifier;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -98,17 +57,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.orc.OrcProto;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
 
 /**
  * An ORC file writer. The file is divided into stripes, which is the natural
@@ -128,3184 +78,240 @@ import com.google.protobuf.CodedOutputStream;
  * thread as well.
  * 
  */
-public class WriterImpl implements Writer, MemoryManager.Callback {
-
-  private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
-  static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
-
-  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
-  private static final int MIN_ROW_INDEX_STRIDE = 1000;
+public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer {
 
-  // threshold above which buffer size will be automatically resized
-  private static final int COLUMN_COUNT_THRESHOLD = 1000;
-
-  private final FileSystem fs;
-  private final Path path;
-  private final long defaultStripeSize;
-  private long adjustedStripeSize;
-  private final int rowIndexStride;
-  private final CompressionKind compress;
-  private final CompressionCodec codec;
-  private final boolean addBlockPadding;
-  private final int bufferSize;
-  private final long blockSize;
-  private final double paddingTolerance;
-  private final TypeDescription schema;
-
-  // the streams that make up the current stripe
-  private final Map<StreamName, BufferedStream> streams =
-    new TreeMap<StreamName, BufferedStream>();
-
-  private FSDataOutputStream rawWriter = null;
-  // the compressed metadata information outStream
-  private OutStream writer = null;
-  // a protobuf outStream around streamFactory
-  private CodedOutputStream protobufWriter = null;
-  private long headerLength;
-  private int columnCount;
-  private long rowCount = 0;
-  private long rowsInStripe = 0;
-  private long rawDataSize = 0;
-  private int rowsInIndex = 0;
-  private int stripesAtLastFlush = -1;
-  private final List<OrcProto.StripeInformation> stripes =
-    new ArrayList<OrcProto.StripeInformation>();
-  private final Map<String, ByteString> userMetadata =
-    new TreeMap<String, ByteString>();
-  private final StreamFactory streamFactory = new StreamFactory();
-  private final TreeWriter treeWriter;
-  private final boolean buildIndex;
-  private final MemoryManager memoryManager;
-  private final OrcFile.Version version;
-  private final Configuration conf;
-  private final OrcFile.WriterCallback callback;
-  private final OrcFile.WriterContext callbackContext;
-  private final OrcFile.EncodingStrategy encodingStrategy;
-  private final OrcFile.CompressionStrategy compressionStrategy;
-  private final boolean[] bloomFilterColumns;
-  private final double bloomFilterFpp;
-  private boolean writeTimeZone;
+  private final ObjectInspector inspector;
+  private final VectorizedRowBatch internalBatch;
+  private final StructField[] fields;
 
   WriterImpl(FileSystem fs,
-      Path path,
-      Configuration conf,
-      ObjectInspector inspector,
-      TypeDescription schema,
-      long stripeSize,
-      CompressionKind compress,
-      int bufferSize,
-      int rowIndexStride,
-      MemoryManager memoryManager,
-      boolean addBlockPadding,
-      OrcFile.Version version,
-      OrcFile.WriterCallback callback,
-      OrcFile.EncodingStrategy encodingStrategy,
-      OrcFile.CompressionStrategy compressionStrategy,
-      double paddingTolerance,
-      long blockSizeValue,
-      String bloomFilterColumnNames,
-      double bloomFilterFpp) throws IOException {
-    this.fs = fs;
-    this.path = path;
-    this.conf = conf;
-    this.callback = callback;
-    this.schema = schema;
-    if (callback != null) {
-      callbackContext = new OrcFile.WriterContext(){
-
-        @Override
-        public Writer getWriter() {
-          return WriterImpl.this;
-        }
-      };
-    } else {
-      callbackContext = null;
-    }
-    this.adjustedStripeSize = stripeSize;
-    this.defaultStripeSize = stripeSize;
-    this.version = version;
-    this.encodingStrategy = encodingStrategy;
-    this.compressionStrategy = compressionStrategy;
-    this.addBlockPadding = addBlockPadding;
-    this.blockSize = blockSizeValue;
-    this.paddingTolerance = paddingTolerance;
-    this.compress = compress;
-    this.rowIndexStride = rowIndexStride;
-    this.memoryManager = memoryManager;
-    buildIndex = rowIndexStride > 0;
-    codec = createCodec(compress);
-    int numColumns = schema.getMaximumId() + 1;
-    this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
-        numColumns, bufferSize);
-    if (version == OrcFile.Version.V_0_11) {
-      /* do not write bloom filters for ORC v11 */
-      this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
-    } else {
-      this.bloomFilterColumns =
-          OrcUtils.includeColumns(bloomFilterColumnNames, schema);
-    }
-    this.bloomFilterFpp = bloomFilterFpp;
-    treeWriter = createTreeWriter(inspector, schema, streamFactory, false);
-    if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
-      throw new IllegalArgumentException("Row stride must be at least " +
-          MIN_ROW_INDEX_STRIDE);
-    }
-
-    // ensure that we are able to handle callbacks before we register ourselves
-    memoryManager.addWriter(path, stripeSize, this);
-  }
-
-  @VisibleForTesting
-  static int getEstimatedBufferSize(long stripeSize, int numColumns, int bs) {
-    // The worst case is that there are 2 big streams per a column and
-    // we want to guarantee that each stream gets ~10 buffers.
-    // This keeps buffers small enough that we don't get really small stripe
-    // sizes.
-    int estBufferSize = (int) (stripeSize / (20 * numColumns));
-    estBufferSize = getClosestBufferSize(estBufferSize);
-    if (estBufferSize > bs) {
-      estBufferSize = bs;
-    } else {
-      LOG.info("WIDE TABLE - Number of columns: " + numColumns +
-          " Chosen compression buffer size: " + estBufferSize);
-    }
-    return estBufferSize;
-  }
-
-  private static int getClosestBufferSize(int estBufferSize) {
-    final int kb4 = 4 * 1024;
-    final int kb8 = 8 * 1024;
-    final int kb16 = 16 * 1024;
-    final int kb32 = 32 * 1024;
-    final int kb64 = 64 * 1024;
-    final int kb128 = 128 * 1024;
-    final int kb256 = 256 * 1024;
-    if (estBufferSize <= kb4) {
-      return kb4;
-    } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
-      return kb8;
-    } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
-      return kb16;
-    } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
-      return kb32;
-    } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
-      return kb64;
-    } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
-      return kb128;
+             Path path,
+             OrcFile.WriterOptions opts) throws IOException {
+    super(fs, path, opts);
+    this.inspector = opts.getInspector();
+    internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+    if (inspector instanceof StructObjectInspector) {
+      List<? extends StructField> fieldList =
+          ((StructObjectInspector) inspector).getAllStructFieldRefs();
+      fields = new StructField[fieldList.size()];
+      fieldList.toArray(fields);
     } else {
-      return kb256;
-    }
-  }
-
-  public static CompressionCodec createCodec(CompressionKind kind) {
-    switch (kind) {
-      case NONE:
-        return null;
-      case ZLIB:
-        return new ZlibCodec();
-      case SNAPPY:
-        return new SnappyCodec();
-      case LZO:
-        try {
-          Class<? extends CompressionCodec> lzo =
-              (Class<? extends CompressionCodec>)
-                  JavaUtils.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
-          return lzo.newInstance();
-        } catch (ClassNotFoundException e) {
-          throw new IllegalArgumentException("LZO is not available.", e);
-        } catch (InstantiationException e) {
-          throw new IllegalArgumentException("Problem initializing LZO", e);
-        } catch (IllegalAccessException e) {
-          throw new IllegalArgumentException("Insufficient access to LZO", e);
-        }
-      default:
-        throw new IllegalArgumentException("Unknown compression codec: " +
-            kind);
-    }
-  }
-
-  @Override
-  public boolean checkMemory(double newScale) throws IOException {
-    long limit = (long) Math.round(adjustedStripeSize * newScale);
-    long size = estimateStripeSize();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
-                limit);
-    }
-    if (size > limit) {
-      flushStripe();
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * This class is used to hold the contents of streams as they are buffered.
-   * The TreeWriters write to the outStream and the codec compresses the
-   * data as buffers fill up and stores them in the output list. When the
-   * stripe is being written, the whole stream is written to the file.
-   */
-  private class BufferedStream implements OutStream.OutputReceiver {
-    private final OutStream outStream;
-    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
-
-    BufferedStream(String name, int bufferSize,
-                   CompressionCodec codec) throws IOException {
-      outStream = new OutStream(name, bufferSize, codec, this);
-    }
-
-    /**
-     * Receive a buffer from the compression codec.
-     * @param buffer the buffer to save
-     * @throws IOException
-     */
-    @Override
-    public void output(ByteBuffer buffer) {
-      output.add(buffer);
-    }
-
-    /**
-     * Get the number of bytes in buffers that are allocated to this stream.
-     * @return number of bytes in buffers
-     */
-    public long getBufferSize() {
-      long result = 0;
-      for(ByteBuffer buf: output) {
-        result += buf.capacity();
-      }
-      return outStream.getBufferSize() + result;
-    }
-
-    /**
-     * Flush the stream to the codec.
-     * @throws IOException
-     */
-    public void flush() throws IOException {
-      outStream.flush();
-    }
-
-    /**
-     * Clear all of the buffers.
-     * @throws IOException
-     */
-    public void clear() throws IOException {
-      outStream.clear();
-      output.clear();
-    }
-
-    /**
-     * Check the state of suppress flag in output stream
-     * @return value of suppress flag
-     */
-    public boolean isSuppressed() {
-      return outStream.isSuppressed();
-    }
-
-    /**
-     * Get the number of bytes that will be written to the output. Assumes
-     * the stream has already been flushed.
-     * @return the number of bytes
-     */
-    public long getOutputSize() {
-      long result = 0;
-      for(ByteBuffer buffer: output) {
-        result += buffer.remaining();
-      }
-      return result;
-    }
-
-    /**
-     * Write the saved compressed buffers to the OutputStream.
-     * @param out the stream to write to
-     * @throws IOException
-     */
-    void spillTo(OutputStream out) throws IOException {
-      for(ByteBuffer buffer: output) {
-        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
-          buffer.remaining());
-      }
-    }
-
-    @Override
-    public String toString() {
-      return outStream.toString();
-    }
-  }
-
-  /**
-   * An output receiver that writes the ByteBuffers to the output stream
-   * as they are received.
-   */
-  private class DirectStream implements OutStream.OutputReceiver {
-    private final FSDataOutputStream output;
-
-    DirectStream(FSDataOutputStream output) {
-      this.output = output;
-    }
-
-    @Override
-    public void output(ByteBuffer buffer) throws IOException {
-      output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
-        buffer.remaining());
-    }
-  }
-
-  private static class RowIndexPositionRecorder implements PositionRecorder {
-    private final OrcProto.RowIndexEntry.Builder builder;
-
-    RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
-      this.builder = builder;
-    }
-
-    @Override
-    public void addPosition(long position) {
-      builder.addPositions(position);
+      fields = null;
     }
   }
 
-  /**
-   * Interface from the Writer to the TreeWriters. This limits the visibility
-   * that the TreeWriters have into the Writer.
-   */
-  private class StreamFactory {
-    /**
-     * Create a stream to store part of a column.
-     * @param column the column id for the stream
-     * @param kind the kind of stream
-     * @return The output outStream that the section needs to be written to.
-     * @throws IOException
-     */
-    public OutStream createStream(int column,
-                                  OrcProto.Stream.Kind kind
-                                  ) throws IOException {
-      final StreamName name = new StreamName(column, kind);
-      final EnumSet<CompressionCodec.Modifier> modifiers;
-
-      switch (kind) {
-        case BLOOM_FILTER:
-        case DATA:
-        case DICTIONARY_DATA:
-          if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
-            modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
-          } else {
-            modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
-          }
-          break;
-        case LENGTH:
-        case DICTIONARY_COUNT:
-        case PRESENT:
-        case ROW_INDEX:
-        case SECONDARY:
-          // easily compressed using the fastest modes
-          modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
-          break;
-        default:
-          LOG.warn("Missing ORC compression modifiers for " + kind);
-          modifiers = null;
-          break;
-      }
-
-      BufferedStream result = streams.get(name);
-      if (result == null) {
-        result = new BufferedStream(name.toString(), bufferSize,
-            codec == null ? codec : codec.modify(modifiers));
-        streams.put(name, result);
-      }
-      return result.outStream;
-    }
-
-    /**
-     * Get the next column id.
-     * @return a number from 0 to the number of columns - 1
-     */
-    public int getNextColumnId() {
-      return columnCount++;
-    }
-
-    /**
-     * Get the stride rate of the row index.
-     */
-    public int getRowIndexStride() {
-      return rowIndexStride;
-    }
-
-    /**
-     * Should be building the row index.
-     * @return true if we are building the index
-     */
-    public boolean buildIndex() {
-      return buildIndex;
-    }
-
-    /**
-     * Is the ORC file compressed?
-     * @return are the streams compressed
-     */
-    public boolean isCompressed() {
-      return codec != null;
-    }
-
-    /**
-     * Get the encoding strategy to use.
-     * @return encoding strategy
-     */
-    public OrcFile.EncodingStrategy getEncodingStrategy() {
-      return encodingStrategy;
-    }
-
-    /**
-     * Get the compression strategy to use.
-     * @return compression strategy
-     */
-    public OrcFile.CompressionStrategy getCompressionStrategy() {
-      return compressionStrategy;
-    }
-
-    /**
-     * Get the bloom filter columns
-     * @return bloom filter columns
-     */
-    public boolean[] getBloomFilterColumns() {
-      return bloomFilterColumns;
-    }
-
-    /**
-     * Get bloom filter false positive percentage.
-     * @return fpp
-     */
-    public double getBloomFilterFPP() {
-      return bloomFilterFpp;
-    }
-
-    /**
-     * Get the writer's configuration.
-     * @return configuration
-     */
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    /**
-     * Get the version of the file to write.
-     */
-    public OrcFile.Version getVersion() {
-      return version;
-    }
-
-    public void useWriterTimeZone(boolean val) {
-      writeTimeZone = val;
-    }
-
-    public boolean hasWriterTimeZone() {
-      return writeTimeZone;
-    }
-  }
+  private static final long NANOS_PER_MILLI = 1000000;
 
   /**
-   * The parent class of all of the writers for each column. Each column
-   * is written by an instance of this class. The compound types (struct,
-   * list, map, and union) have children tree writers that write the children
-   * types.
+   * Set the value for a given column value within a batch.
+   * @param rowId the row to set
+   * @param column the column to set
+   * @param inspector the object inspector to interpret the obj
+   * @param obj the value to use
    */
-  private abstract static class TreeWriter {
-    protected final int id;
-    protected final ObjectInspector inspector;
-    protected final BitFieldWriter isPresent;
-    private final boolean isCompressed;
-    protected final ColumnStatisticsImpl indexStatistics;
-    protected final ColumnStatisticsImpl stripeColStatistics;
-    private final ColumnStatisticsImpl fileStatistics;
-    protected TreeWriter[] childrenWriters;
-    protected final RowIndexPositionRecorder rowIndexPosition;
-    private final OrcProto.RowIndex.Builder rowIndex;
-    private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
-    private final PositionedOutputStream rowIndexStream;
-    private final PositionedOutputStream bloomFilterStream;
-    protected final BloomFilterIO bloomFilter;
-    protected final boolean createBloomFilter;
-    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
-    private final OrcProto.BloomFilter.Builder bloomFilterEntry;
-    private boolean foundNulls;
-    private OutStream isPresentOutStream;
-    private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
-    private final StreamFactory streamFactory;
-
-    /**
-     * Create a tree writer.
-     * @param columnId the column id of the column to write
-     * @param inspector the object inspector to use
-     * @param schema the row schema
-     * @param streamFactory limited access to the Writer's data.
-     * @param nullable can the value be null?
-     * @throws IOException
-     */
-    TreeWriter(int columnId, ObjectInspector inspector,
-               TypeDescription schema,
-               StreamFactory streamFactory,
-               boolean nullable) throws IOException {
-      this.streamFactory = streamFactory;
-      this.isCompressed = streamFactory.isCompressed();
-      this.id = columnId;
-      this.inspector = inspector;
-      if (nullable) {
-        isPresentOutStream = streamFactory.createStream(id,
-            OrcProto.Stream.Kind.PRESENT);
-        isPresent = new BitFieldWriter(isPresentOutStream, 1);
-      } else {
-        isPresent = null;
-      }
-      this.foundNulls = false;
-      createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
-      indexStatistics = ColumnStatisticsImpl.create(schema);
-      stripeColStatistics = ColumnStatisticsImpl.create(schema);
-      fileStatistics = ColumnStatisticsImpl.create(schema);
-      childrenWriters = new TreeWriter[0];
-      rowIndex = OrcProto.RowIndex.newBuilder();
-      rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
-      rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
-      stripeStatsBuilders = Lists.newArrayList();
-      if (streamFactory.buildIndex()) {
-        rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
-      } else {
-        rowIndexStream = null;
-      }
-      if (createBloomFilter) {
-        bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
-        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
-        bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
-        bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
-            streamFactory.getBloomFilterFPP());
-      } else {
-        bloomFilterEntry = null;
-        bloomFilterIndex = null;
-        bloomFilterStream = null;
-        bloomFilter = null;
-      }
-    }
-
-    protected OrcProto.RowIndex.Builder getRowIndex() {
-      return rowIndex;
-    }
-
-    protected ColumnStatisticsImpl getStripeStatistics() {
-      return stripeColStatistics;
-    }
-
-    protected ColumnStatisticsImpl getFileStatistics() {
-      return fileStatistics;
-    }
-
-    protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
-      return rowIndexEntry;
-    }
-
-    IntegerWriter createIntegerWriter(PositionedOutputStream output,
-                                      boolean signed, boolean isDirectV2,
-                                      StreamFactory writer) {
-      if (isDirectV2) {
-        boolean alignedBitpacking = false;
-        if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
-          alignedBitpacking = true;
-        }
-        return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
-      } else {
-        return new RunLengthIntegerWriter(output, signed);
-      }
-    }
-
-    boolean isNewWriteFormat(StreamFactory writer) {
-      return writer.getVersion() != OrcFile.Version.V_0_11;
-    }
-
-    /**
-     * Add a new value to the column.
-     * @param obj the object to write
-     * @throws IOException
-     */
-    void write(Object obj) throws IOException {
-      if (obj != null) {
-        indexStatistics.increment();
-      } else {
-        indexStatistics.setNull();
-      }
-      if (isPresent != null) {
-        isPresent.write(obj == null ? 0 : 1);
-        if(obj == null) {
-          foundNulls = true;
-        }
-      }
-    }
-
-    /**
-     * Handle the top level object write.
-     *
-     * This default method is used for all types except structs, which are the
-     * typical case. VectorizedRowBatch assumes the top level object is a
-     * struct, so we use the first column for all other types.
-     * @param batch the batch to write from
-     * @param offset the row to start on
-     * @param length the number of rows to write
-     * @throws IOException
-     */
-    void writeRootBatch(VectorizedRowBatch batch, int offset,
-                        int length) throws IOException {
-      writeBatch(batch.cols[0], offset, length);
-    }
-
-    /**
-     * Write the values from the given vector from offset for length elements.
-     * @param vector the vector to write from
-     * @param offset the first value from the vector to write
-     * @param length the number of values from the vector to write
-     * @throws IOException
-     */
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      if (vector.noNulls) {
-        indexStatistics.increment(length);
-        if (isPresent != null) {
-          for (int i = 0; i < length; ++i) {
-            isPresent.write(1);
-          }
-        }
-      } else {
-        if (vector.isRepeating) {
-          boolean isNull = vector.isNull[0];
-          if (isPresent != null) {
-            for (int i = 0; i < length; ++i) {
-              isPresent.write(isNull ? 0 : 1);
+  static void setColumn(int rowId, ColumnVector column,
+                        ObjectInspector inspector, Object obj) {
+    if (obj == null) {
+      column.noNulls = false;
+      column.isNull[rowId] = true;
+    } else {
+      switch (inspector.getCategory()) {
+        case PRIMITIVE:
+          switch (((PrimitiveObjectInspector) inspector)
+              .getPrimitiveCategory()) {
+            case BOOLEAN: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] =
+                  ((BooleanObjectInspector) inspector).get(obj) ? 1 : 0;
+              break;
             }
-          }
-          if (isNull) {
-            foundNulls = true;
-            indexStatistics.setNull();
-          } else {
-            indexStatistics.increment(length);
-          }
-        } else {
-          // count the number of non-null values
-          int nonNullCount = 0;
-          for(int i = 0; i < length; ++i) {
-            boolean isNull = vector.isNull[i + offset];
-            if (!isNull) {
-              nonNullCount += 1;
+            case BYTE: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] = ((ByteObjectInspector) inspector).get(obj);
+              break;
             }
-            if (isPresent != null) {
-              isPresent.write(isNull ? 0 : 1);
+            case SHORT: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] =
+                  ((ShortObjectInspector) inspector).get(obj);
+              break;
+            }
+            case INT: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] = ((IntObjectInspector) inspector).get(obj);
+              break;
+            }
+            case LONG: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] = ((LongObjectInspector) inspector).get(obj);
+              break;
+            }
+            case FLOAT: {
+              DoubleColumnVector vector = (DoubleColumnVector) column;
+              vector.vector[rowId] =
+                  ((FloatObjectInspector) inspector).get(obj);
+              break;
+            }
+            case DOUBLE: {
+              DoubleColumnVector vector = (DoubleColumnVector) column;
+              vector.vector[rowId] =
+                  ((DoubleObjectInspector) inspector).get(obj);
+              break;
+            }
+            case BINARY: {
+              BytesColumnVector vector = (BytesColumnVector) column;
+              BytesWritable blob = ((BinaryObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj);
+              vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+              break;
+            }
+            case STRING: {
+              BytesColumnVector vector = (BytesColumnVector) column;
+              Text blob = ((StringObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj);
+              vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+              break;
+            }
+            case VARCHAR: {
+              BytesColumnVector vector = (BytesColumnVector) column;
+              Text blob = ((HiveVarcharObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj).getTextValue();
+              vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+              break;
+            }
+            case CHAR: {
+              BytesColumnVector vector = (BytesColumnVector) column;
+              Text blob = ((HiveCharObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj).getTextValue();
+              vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+              break;
+            }
+            case TIMESTAMP: {
+              LongColumnVector vector = (LongColumnVector) column;
+              Timestamp ts = ((TimestampObjectInspector) inspector)
+                  .getPrimitiveJavaObject(obj);
+              vector.vector[rowId] = ts.getTime() * NANOS_PER_MILLI +
+                  (ts.getNanos() % NANOS_PER_MILLI);
+              break;
+            }
+            case DATE: {
+              LongColumnVector vector = (LongColumnVector) column;
+              vector.vector[rowId] = ((DateObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj).getDays();
+              break;
+            }
+            case DECIMAL: {
+              DecimalColumnVector vector = (DecimalColumnVector) column;
+              vector.set(rowId, ((HiveDecimalObjectInspector) inspector)
+                  .getPrimitiveWritableObject(obj));
+              break;
             }
           }
-          indexStatistics.increment(nonNullCount);
-          if (nonNullCount != length) {
-            foundNulls = true;
-            indexStatistics.setNull();
-          }
-        }
-      }
-    }
-
-    private void removeIsPresentPositions() {
-      for(int i=0; i < rowIndex.getEntryCount(); ++i) {
-        OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
-        List<Long> positions = entry.getPositionsList();
-        // bit streams use 3 positions if uncompressed, 4 if compressed
-        positions = positions.subList(isCompressed ? 4 : 3, positions.size());
-        entry.clearPositions();
-        entry.addAllPositions(positions);
-      }
-    }
-
-    /**
-     * Write the stripe out to the file.
-     * @param builder the stripe footer that contains the information about the
-     *                layout of the stripe. The TreeWriter is required to update
-     *                the footer with its information.
-     * @param requiredIndexEntries the number of index entries that are
-     *                             required. this is to check to make sure the
-     *                             row index is well formed.
-     * @throws IOException
-     */
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      if (isPresent != null) {
-        isPresent.flush();
-
-        // if no nulls are found in a stream, then suppress the stream
-        if(!foundNulls) {
-          isPresentOutStream.suppress();
-          // since isPresent bitstream is suppressed, update the index to
-          // remove the positions of the isPresent stream
-          if (rowIndexStream != null) {
-            removeIsPresentPositions();
+          break;
+        case STRUCT: {
+          StructColumnVector vector = (StructColumnVector) column;
+          StructObjectInspector oi = (StructObjectInspector) inspector;
+          List<? extends StructField> fields = oi.getAllStructFieldRefs();
+          for (int c = 0; c < vector.fields.length; ++c) {
+            StructField field = fields.get(c);
+            setColumn(rowId, vector.fields[c], field.getFieldObjectInspector(),
+                oi.getStructFieldData(obj, field));
           }
+          break;
         }
-      }
-
-      // merge stripe-level column statistics to file statistics and write it to
-      // stripe statistics
-      OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
-      writeStripeStatistics(stripeStatsBuilder, this);
-      stripeStatsBuilders.add(stripeStatsBuilder);
-
-      // reset the flag for next stripe
-      foundNulls = false;
-
-      builder.addColumns(getEncoding());
-      if (streamFactory.hasWriterTimeZone()) {
-        builder.setWriterTimezone(TimeZone.getDefault().getID());
-      }
-      if (rowIndexStream != null) {
-        if (rowIndex.getEntryCount() != requiredIndexEntries) {
-          throw new IllegalArgumentException("Column has wrong number of " +
-               "index entries found: " + rowIndex.getEntryCount() + " expected: " +
-               requiredIndexEntries);
+        case UNION: {
+          UnionColumnVector vector = (UnionColumnVector) column;
+          UnionObjectInspector oi = (UnionObjectInspector) inspector;
+          int tag = oi.getTag(obj);
+          vector.tags[rowId] = tag;
+          setColumn(rowId, vector.fields[tag],
+              oi.getObjectInspectors().get(tag), oi.getField(obj));
+          break;
         }
-        rowIndex.build().writeTo(rowIndexStream);
-        rowIndexStream.flush();
-      }
-      rowIndex.clear();
-      rowIndexEntry.clear();
-
-      // write the bloom filter to out stream
-      if (bloomFilterStream != null) {
-        bloomFilterIndex.build().writeTo(bloomFilterStream);
-        bloomFilterStream.flush();
-        bloomFilterIndex.clear();
-        bloomFilterEntry.clear();
-      }
-    }
-
-    private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
-        TreeWriter treeWriter) {
-      treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
-      builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
-      treeWriter.stripeColStatistics.reset();
-      for (TreeWriter child : treeWriter.getChildrenWriters()) {
-        writeStripeStatistics(builder, child);
-      }
-    }
-
-    TreeWriter[] getChildrenWriters() {
-      return childrenWriters;
-    }
-
-    /**
-     * Get the encoding for this column.
-     * @return the information about the encoding of this column
-     */
-    OrcProto.ColumnEncoding getEncoding() {
-      return OrcProto.ColumnEncoding.newBuilder().setKind(
-          OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    /**
-     * Create a row index entry with the previous location and the current
-     * index statistics. Also merges the index statistics into the file
-     * statistics before they are cleared. Finally, it records the start of the
-     * next index and ensures all of the children columns also create an entry.
-     * @throws IOException
-     */
-    void createRowIndexEntry() throws IOException {
-      stripeColStatistics.merge(indexStatistics);
-      rowIndexEntry.setStatistics(indexStatistics.serialize());
-      indexStatistics.reset();
-      rowIndex.addEntry(rowIndexEntry);
-      rowIndexEntry.clear();
-      addBloomFilterEntry();
-      recordPosition(rowIndexPosition);
-      for(TreeWriter child: childrenWriters) {
-        child.createRowIndexEntry();
-      }
-    }
-
-    void addBloomFilterEntry() {
-      if (createBloomFilter) {
-        bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
-        bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
-        bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
-        bloomFilter.reset();
-        bloomFilterEntry.clear();
-      }
-    }
-
-    /**
-     * Record the current position in each of this column's streams.
-     * @param recorder where should the locations be recorded
-     * @throws IOException
-     */
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      if (isPresent != null) {
-        isPresent.getPosition(recorder);
-      }
-    }
-
-    /**
-     * Estimate how much memory the writer is consuming excluding the streams.
-     * @return the number of bytes.
-     */
-    long estimateMemory() {
-      long result = 0;
-      for (TreeWriter child: childrenWriters) {
-        result += child.estimateMemory();
-      }
-      return result;
-    }
-  }
-
-  private static class BooleanTreeWriter extends TreeWriter {
-    private final BitFieldWriter writer;
-
-    BooleanTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      PositionedOutputStream out = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.writer = new BitFieldWriter(out, 1);
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        boolean val = ((BooleanObjectInspector) inspector).get(obj);
-        indexStatistics.updateBoolean(val, 1);
-        writer.write(val ? 1 : 0);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int value = vec.vector[0] == 0 ? 0 : 1;
-          indexStatistics.updateBoolean(value != 0, length);
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
+        case LIST: {
+          ListColumnVector vector = (ListColumnVector) column;
+          ListObjectInspector oi = (ListObjectInspector) inspector;
+          int offset = vector.childCount;
+          int length = oi.getListLength(obj);
+          vector.offsets[rowId] = offset;
+          vector.lengths[rowId] = length;
+          vector.child.ensureSize(offset + length, true);
+          vector.childCount += length;
+          for (int c = 0; c < length; ++c) {
+            setColumn(offset + c, vector.child,
+                oi.getListElementObjectInspector(),
+                oi.getListElement(obj, c));
           }
+          break;
         }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            int value = vec.vector[i + offset] == 0 ? 0 : 1;
-            writer.write(value);
-            indexStatistics.updateBoolean(value != 0, 1);
+        case MAP: {
+          MapColumnVector vector = (MapColumnVector) column;
+          MapObjectInspector oi = (MapObjectInspector) inspector;
+          int offset = vector.childCount;
+          Set map = oi.getMap(obj).entrySet();
+          int length = map.size();
+          vector.offsets[rowId] = offset;
+          vector.lengths[rowId] = length;
+          vector.keys.ensureSize(offset + length, true);
+          vector.values.ensureSize(offset + length, true);
+          vector.childCount += length;
+          for (Object item: map) {
+            Map.Entry pair = (Map.Entry) item;
+            setColumn(offset, vector.keys, oi.getMapKeyObjectInspector(),
+                pair.getKey());
+            setColumn(offset, vector.values, oi.getMapValueObjectInspector(),
+                pair.getValue());
+            offset += 1;
           }
+          break;
         }
+        default:
+          throw new IllegalArgumentException("Unknown ObjectInspector kind " +
+              inspector.getCategory());
       }
     }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      writer.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
-    }
   }
 
-  private static class ByteTreeWriter extends TreeWriter {
-    private final RunLengthByteWriter writer;
-
-    ByteTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.writer = new RunLengthByteWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA));
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        byte val = ((ByteObjectInspector) inspector).get(obj);
-        indexStatistics.updateInteger(val, 1);
-        if (createBloomFilter) {
-          bloomFilter.addLong(val);
-        }
-        writer.write(val);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          byte value = (byte) vec.vector[0];
-          indexStatistics.updateInteger(value, length);
-          if (createBloomFilter) {
-            bloomFilter.addLong(value);
-          }
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            byte value = (byte) vec.vector[i + offset];
-            writer.write(value);
-            indexStatistics.updateInteger(value, 1);
-            if (createBloomFilter) {
-              bloomFilter.addLong(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      writer.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
+  void flushInternalBatch() throws IOException {
+    if (internalBatch.size != 0) {
+      super.addRowBatch(internalBatch);
+      internalBatch.reset();
     }
   }
 
-  private static class IntegerTreeWriter extends TreeWriter {
-    private final IntegerWriter writer;
-    private final ShortObjectInspector shortInspector;
-    private final IntObjectInspector intInspector;
-    private final LongObjectInspector longInspector;
-    private boolean isDirectV2 = true;
-
-    IntegerTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      OutStream out = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
-      if (inspector instanceof IntObjectInspector) {
-        intInspector = (IntObjectInspector) inspector;
-        shortInspector = null;
-        longInspector = null;
-      } else {
-        intInspector = null;
-        if (inspector instanceof LongObjectInspector) {
-          longInspector = (LongObjectInspector) inspector;
-          shortInspector = null;
-        } else {
-          shortInspector = (ShortObjectInspector) inspector;
-          longInspector = null;
-        }
-      }
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        long val;
-        if (intInspector != null) {
-          val = intInspector.get(obj);
-        } else if (longInspector != null) {
-          val = longInspector.get(obj);
-        } else {
-          val = shortInspector.get(obj);
-        }
-        indexStatistics.updateInteger(val, 1);
-        if (createBloomFilter) {
-          // integers are converted to longs in column statistics and during SARG evaluation
-          bloomFilter.addLong(val);
-        }
-        writer.write(val);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          long value = vec.vector[0];
-          indexStatistics.updateInteger(value, length);
-          if (createBloomFilter) {
-            bloomFilter.addLong(value);
-          }
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            long value = vec.vector[i + offset];
-            writer.write(value);
-            indexStatistics.updateInteger(value, 1);
-            if (createBloomFilter) {
-              bloomFilter.addLong(value);
-            }
-          }
-        }
+  @Override
+  public void addRow(Object row) throws IOException {
+    int rowId = internalBatch.size++;
+    if (fields != null) {
+      StructObjectInspector soi = (StructObjectInspector) inspector;
+      for(int i=0; i < fields.length; ++i) {
+        setColumn(rowId, internalBatch.cols[i],
+            fields[i].getFieldObjectInspector(),
+            soi.getStructFieldData(row, fields[i]));
       }
+    } else {
+      setColumn(rowId, internalBatch.cols[0], inspector, row);
     }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      writer.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
+    if (internalBatch.size == internalBatch.getMaxSize()) {
+      flushInternalBatch();
     }
   }
 
-  private static class FloatTreeWriter extends TreeWriter {
-    private final PositionedOutputStream stream;
-    private final SerializationUtils utils;
+  @Override
+  public long writeIntermediateFooter() throws IOException {
+    flushInternalBatch();
+    return super.writeIntermediateFooter();
+  }
 
-    FloatTreeWriter(int columnId,
-                      ObjectInspector inspector,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.stream = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.utils = new SerializationUtils();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        float val = ((FloatObjectInspector) inspector).get(obj);
-        indexStatistics.updateDouble(val);
-        if (createBloomFilter) {
-          // floats are converted to doubles in column statistics and during SARG evaluation
-          bloomFilter.addDouble(val);
-        }
-        utils.writeFloat(stream, val);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      DoubleColumnVector vec = (DoubleColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          float value = (float) vec.vector[0];
-          indexStatistics.updateDouble(value);
-          if (createBloomFilter) {
-            bloomFilter.addDouble(value);
-          }
-          for(int i=0; i < length; ++i) {
-            utils.writeFloat(stream, value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            float value = (float) vec.vector[i + offset];
-            utils.writeFloat(stream, value);
-            indexStatistics.updateDouble(value);
-            if (createBloomFilter) {
-              bloomFilter.addDouble(value);
-            }
-          }
-        }
-      }
-    }
-
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      stream.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      stream.getPosition(recorder);
-    }
-  }
-
-  private static class DoubleTreeWriter extends TreeWriter {
-    private final PositionedOutputStream stream;
-    private final SerializationUtils utils;
-
-    DoubleTreeWriter(int columnId,
-                    ObjectInspector inspector,
-                    TypeDescription schema,
-                    StreamFactory writer,
-                    boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.stream = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.utils = new SerializationUtils();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        double val = ((DoubleObjectInspector) inspector).get(obj);
-        indexStatistics.updateDouble(val);
-        if (createBloomFilter) {
-          bloomFilter.addDouble(val);
-        }
-        utils.writeDouble(stream, val);
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      DoubleColumnVector vec = (DoubleColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          double value = vec.vector[0];
-          indexStatistics.updateDouble(value);
-          if (createBloomFilter) {
-            bloomFilter.addDouble(value);
-          }
-          for(int i=0; i < length; ++i) {
-            utils.writeDouble(stream, value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            double value = vec.vector[i + offset];
-            utils.writeDouble(stream, value);
-            indexStatistics.updateDouble(value);
-            if (createBloomFilter) {
-              bloomFilter.addDouble(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      stream.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      stream.getPosition(recorder);
-    }
-  }
-
-  private static abstract class StringBaseTreeWriter extends TreeWriter {
-    private static final int INITIAL_DICTIONARY_SIZE = 4096;
-    private final OutStream stringOutput;
-    private final IntegerWriter lengthOutput;
-    private final IntegerWriter rowOutput;
-    protected final StringRedBlackTree dictionary =
-        new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
-    protected final DynamicIntArray rows = new DynamicIntArray();
-    protected final PositionedOutputStream directStreamOutput;
-    protected final IntegerWriter directLengthOutput;
-    private final List<OrcProto.RowIndexEntry> savedRowIndex =
-        new ArrayList<OrcProto.RowIndexEntry>();
-    private final boolean buildIndex;
-    private final List<Long> rowIndexValueCount = new ArrayList<Long>();
-    // If the number of keys in a dictionary is greater than this fraction of
-    //the total number of non-null rows, turn off dictionary encoding
-    private final double dictionaryKeySizeThreshold;
-    protected boolean useDictionaryEncoding = true;
-    private boolean isDirectV2 = true;
-    private boolean doneDictionaryCheck;
-    private final boolean strideDictionaryCheck;
-
-    StringBaseTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      stringOutput = writer.createStream(id,
-          OrcProto.Stream.Kind.DICTIONARY_DATA);
-      lengthOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      rowOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-      rowIndexValueCount.add(0L);
-      buildIndex = writer.buildIndex();
-      directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
-      directLengthOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      Configuration conf = writer.getConfiguration();
-      dictionaryKeySizeThreshold =
-          OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
-      strideDictionaryCheck =
-          OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
-      doneDictionaryCheck = false;
-    }
-
-    /**
-     * Method to retrieve text values from the value object, which can be overridden
-     * by subclasses.
-     * @param obj  value
-     * @return Text text value from obj
-     */
-    Text getTextValue(Object obj) {
-      return ((StringObjectInspector) inspector).getPrimitiveWritableObject(obj);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        Text val = getTextValue(obj);
-        if (useDictionaryEncoding) {
-          rows.add(dictionary.add(val));
-        } else {
-          // write data and length
-          directStreamOutput.write(val.getBytes(), 0, val.getLength());
-          directLengthOutput.write(val.getLength());
-        }
-        indexStatistics.updateString(val);
-        if (createBloomFilter) {
-          bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
-        }
-      }
-    }
-
-    private boolean checkDictionaryEncoding() {
-      if (!doneDictionaryCheck) {
-        // Set the flag indicating whether or not to use dictionary encoding
-        // based on whether or not the fraction of distinct keys over number of
-        // non-null rows is less than the configured threshold
-        float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
-        useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
-        doneDictionaryCheck = true;
-      }
-      return useDictionaryEncoding;
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
-      // checking would not have happened. So do it again here.
-      checkDictionaryEncoding();
-
-      if (useDictionaryEncoding) {
-        flushDictionary();
-      } else {
-        // flushout any left over entries from dictionary
-        if (rows.size() > 0) {
-          flushDictionary();
-        }
-
-        // suppress the stream for every stripe if dictionary is disabled
-        stringOutput.suppress();
-      }
-
-      // we need to build the rowindex before calling super, since it
-      // writes it out.
-      super.writeStripe(builder, requiredIndexEntries);
-      stringOutput.flush();
-      lengthOutput.flush();
-      rowOutput.flush();
-      directStreamOutput.flush();
-      directLengthOutput.flush();
-      // reset all of the fields to be ready for the next stripe.
-      dictionary.clear();
-      savedRowIndex.clear();
-      rowIndexValueCount.clear();
-      recordPosition(rowIndexPosition);
-      rowIndexValueCount.add(0L);
-
-      if (!useDictionaryEncoding) {
-        // record the start positions of first index stride of next stripe i.e
-        // beginning of the direct streams when dictionary is disabled
-        recordDirectStreamPosition();
-      }
-    }
-
-    private void flushDictionary() throws IOException {
-      final int[] dumpOrder = new int[dictionary.size()];
-
-      if (useDictionaryEncoding) {
-        // Write the dictionary by traversing the red-black tree writing out
-        // the bytes and lengths; and creating the map from the original order
-        // to the final sorted order.
-
-        dictionary.visit(new StringRedBlackTree.Visitor() {
-          private int currentId = 0;
-          @Override
-          public void visit(StringRedBlackTree.VisitorContext context
-                           ) throws IOException {
-            context.writeBytes(stringOutput);
-            lengthOutput.write(context.getLength());
-            dumpOrder[context.getOriginalPosition()] = currentId++;
-          }
-        });
-      } else {
-        // for direct encoding, we don't want the dictionary data stream
-        stringOutput.suppress();
-      }
-      int length = rows.size();
-      int rowIndexEntry = 0;
-      OrcProto.RowIndex.Builder rowIndex = getRowIndex();
-      Text text = new Text();
-      // write the values translated into the dump order.
-      for(int i = 0; i <= length; ++i) {
-        // now that we are writing out the row values, we can finalize the
-        // row index
-        if (buildIndex) {
-          while (i == rowIndexValueCount.get(rowIndexEntry) &&
-              rowIndexEntry < savedRowIndex.size()) {
-            OrcProto.RowIndexEntry.Builder base =
-                savedRowIndex.get(rowIndexEntry++).toBuilder();
-            if (useDictionaryEncoding) {
-              rowOutput.getPosition(new RowIndexPositionRecorder(base));
-            } else {
-              PositionRecorder posn = new RowIndexPositionRecorder(base);
-              directStreamOutput.getPosition(posn);
-              directLengthOutput.getPosition(posn);
-            }
-            rowIndex.addEntry(base.build());
-          }
-        }
-        if (i != length) {
-          if (useDictionaryEncoding) {
-            rowOutput.write(dumpOrder[rows.get(i)]);
-          } else {
-            dictionary.getText(text, rows.get(i));
-            directStreamOutput.write(text.getBytes(), 0, text.getLength());
-            directLengthOutput.write(text.getLength());
-          }
-        }
-      }
-      rows.clear();
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      // Returns the encoding used for the last call to writeStripe
-      if (useDictionaryEncoding) {
-        if(isDirectV2) {
-          return OrcProto.ColumnEncoding.newBuilder().setKind(
-              OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
-              setDictionarySize(dictionary.size()).build();
-        }
-        return OrcProto.ColumnEncoding.newBuilder().setKind(
-            OrcProto.ColumnEncoding.Kind.DICTIONARY).
-            setDictionarySize(dictionary.size()).build();
-      } else {
-        if(isDirectV2) {
-          return OrcProto.ColumnEncoding.newBuilder().setKind(
-              OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-        }
-        return OrcProto.ColumnEncoding.newBuilder().setKind(
-            OrcProto.ColumnEncoding.Kind.DIRECT).build();
-      }
-    }
-
-    /**
-     * This method doesn't call the super method, because unlike most of the
-     * other TreeWriters, this one can't record the position in the streams
-     * until the stripe is being flushed. Therefore it saves all of the entries
-     * and augments them with the final information as the stripe is written.
-     * @throws IOException
-     */
-    @Override
-    void createRowIndexEntry() throws IOException {
-      getStripeStatistics().merge(indexStatistics);
-      OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
-      rowIndexEntry.setStatistics(indexStatistics.serialize());
-      indexStatistics.reset();
-      OrcProto.RowIndexEntry base = rowIndexEntry.build();
-      savedRowIndex.add(base);
-      rowIndexEntry.clear();
-      addBloomFilterEntry();
-      recordPosition(rowIndexPosition);
-      rowIndexValueCount.add(Long.valueOf(rows.size()));
-      if (strideDictionaryCheck) {
-        checkDictionaryEncoding();
-      }
-      if (!useDictionaryEncoding) {
-        if (rows.size() > 0) {
-          flushDictionary();
-          // just record the start positions of next index stride
-          recordDirectStreamPosition();
-        } else {
-          // record the start positions of next index stride
-          recordDirectStreamPosition();
-          getRowIndex().addEntry(base);
-        }
-      }
-    }
-
-    private void recordDirectStreamPosition() throws IOException {
-      directStreamOutput.getPosition(rowIndexPosition);
-      directLengthOutput.getPosition(rowIndexPosition);
-    }
-
-    @Override
-    long estimateMemory() {
-      return rows.getSizeInBytes() + dictionary.getSizeInBytes();
-    }
-  }
-
-  private static class StringTreeWriter extends StringBaseTreeWriter {
-    StringTreeWriter(int columnId,
-                   ObjectInspector inspector,
-                   TypeDescription schema,
-                   StreamFactory writer,
-                   boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          if (useDictionaryEncoding) {
-            int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
-            for(int i=0; i < length; ++i) {
-              rows.add(id);
-            }
-          } else {
-            for(int i=0; i < length; ++i) {
-              directStreamOutput.write(vec.vector[0], vec.start[0],
-                  vec.length[0]);
-              directLengthOutput.write(vec.length[0]);
-            }
-          }
-          indexStatistics.updateString(vec.vector[0], vec.start[0],
-              vec.length[0], length);
-          if (createBloomFilter) {
-            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            if (useDictionaryEncoding) {
-              rows.add(dictionary.add(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]));
-            } else {
-              directStreamOutput.write(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-              directLengthOutput.write(vec.length[offset + i]);
-            }
-            indexStatistics.updateString(vec.vector[offset + i],
-                vec.start[offset + i], vec.length[offset + i], 1);
-            if (createBloomFilter) {
-              bloomFilter.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Under the covers, char is written to ORC the same way as string.
-   */
-  private static class CharTreeWriter extends StringBaseTreeWriter {
-    private final int itemLength;
-    private final byte[] padding;
-
-    CharTreeWriter(int columnId,
-        ObjectInspector inspector,
-        TypeDescription schema,
-        StreamFactory writer,
-        boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      itemLength = schema.getMaxLength();
-      padding = new byte[itemLength];
-    }
-
-    /**
-     * Override base class implementation to support char values.
-     */
-    @Override
-    Text getTextValue(Object obj) {
-      return (((HiveCharObjectInspector) inspector)
-          .getPrimitiveWritableObject(obj)).getTextValue();
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          byte[] ptr;
-          int ptrOffset;
-          if (vec.length[0] >= itemLength) {
-            ptr = vec.vector[0];
-            ptrOffset = vec.start[0];
-          } else {
-            ptr = padding;
-            ptrOffset = 0;
-            System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
-                vec.length[0]);
-            Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
-          }
-          if (useDictionaryEncoding) {
-            int id = dictionary.add(ptr, ptrOffset, itemLength);
-            for(int i=0; i < length; ++i) {
-              rows.add(id);
-            }
-          } else {
-            for(int i=0; i < length; ++i) {
-              directStreamOutput.write(ptr, ptrOffset, itemLength);
-              directLengthOutput.write(itemLength);
-            }
-          }
-          indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
-          if (createBloomFilter) {
-            bloomFilter.addBytes(ptr, ptrOffset, itemLength);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            byte[] ptr;
-            int ptrOffset;
-            if (vec.length[offset + i] >= itemLength) {
-              ptr = vec.vector[offset + i];
-              ptrOffset = vec.start[offset + i];
-            } else {
-              // it is the wrong length, so copy it
-              ptr = padding;
-              ptrOffset = 0;
-              System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
-                  ptr, 0, vec.length[offset + i]);
-              Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
-            }
-            if (useDictionaryEncoding) {
-              rows.add(dictionary.add(ptr, ptrOffset, itemLength));
-            } else {
-              directStreamOutput.write(ptr, ptrOffset, itemLength);
-              directLengthOutput.write(itemLength);
-            }
-            indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
-            if (createBloomFilter) {
-              bloomFilter.addBytes(ptr, ptrOffset, itemLength);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Under the covers, varchar is written to ORC the same way as string.
-   */
-  private static class VarcharTreeWriter extends StringBaseTreeWriter {
-    private final int maxLength;
-
-    VarcharTreeWriter(int columnId,
-        ObjectInspector inspector,
-        TypeDescription schema,
-        StreamFactory writer,
-        boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      maxLength = schema.getMaxLength();
-    }
-
-    /**
-     * Override base class implementation to support varchar values.
-     */
-    @Override
-    Text getTextValue(Object obj) {
-      return (((HiveVarcharObjectInspector) inspector)
-          .getPrimitiveWritableObject(obj)).getTextValue();
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int itemLength = Math.min(vec.length[0], maxLength);
-          if (useDictionaryEncoding) {
-            int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
-            for(int i=0; i < length; ++i) {
-              rows.add(id);
-            }
-          } else {
-            for(int i=0; i < length; ++i) {
-              directStreamOutput.write(vec.vector[0], vec.start[0],
-                  itemLength);
-              directLengthOutput.write(itemLength);
-            }
-          }
-          indexStatistics.updateString(vec.vector[0], vec.start[0],
-              itemLength, length);
-          if (createBloomFilter) {
-            bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            int itemLength = Math.min(vec.length[offset + i], maxLength);
-            if (useDictionaryEncoding) {
-              rows.add(dictionary.add(vec.vector[offset + i],
-                  vec.start[offset + i], itemLength));
-            } else {
-              directStreamOutput.write(vec.vector[offset + i],
-                  vec.start[offset + i], itemLength);
-              directLengthOutput.write(itemLength);
-            }
-            indexStatistics.updateString(vec.vector[offset + i],
-                vec.start[offset + i], itemLength, 1);
-            if (createBloomFilter) {
-              bloomFilter.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], itemLength);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  private static class BinaryTreeWriter extends TreeWriter {
-    private final PositionedOutputStream stream;
-    private final IntegerWriter length;
-    private boolean isDirectV2 = true;
-
-    BinaryTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.stream = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.length = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        BytesWritable val =
-            ((BinaryObjectInspector) inspector).getPrimitiveWritableObject(obj);
-        stream.write(val.getBytes(), 0, val.getLength());
-        length.write(val.getLength());
-        indexStatistics.updateBinary(val);
-        if (createBloomFilter) {
-          bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
-        }
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          for(int i=0; i < length; ++i) {
-            stream.write(vec.vector[0], vec.start[0],
-                  vec.length[0]);
-            this.length.write(vec.length[0]);
-          }
-          indexStatistics.updateBinary(vec.vector[0], vec.start[0],
-              vec.length[0], length);
-          if (createBloomFilter) {
-            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            stream.write(vec.vector[offset + i],
-                vec.start[offset + i], vec.length[offset + i]);
-            this.length.write(vec.length[offset + i]);
-            indexStatistics.updateBinary(vec.vector[offset + i],
-                vec.start[offset + i], vec.length[offset + i], 1);
-            if (createBloomFilter) {
-              bloomFilter.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-            }
-          }
-        }
-      }
-    }
-
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      stream.flush();
-      length.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      stream.getPosition(recorder);
-      length.getPosition(recorder);
-    }
-  }
-
-  static final int MILLIS_PER_SECOND = 1000;
-  static final int NANOS_PER_SECOND = 1000000000;
-  static final int MILLIS_PER_NANO  = 1000000;
-  static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
-
-  private static class TimestampTreeWriter extends TreeWriter {
-    private final IntegerWriter seconds;
-    private final IntegerWriter nanos;
-    private final boolean isDirectV2;
-    private final long base_timestamp;
-
-    TimestampTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.seconds = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
-      this.nanos = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-      // for unit tests to set different time zones
-      this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
-      writer.useWriterTimeZone(true);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        Timestamp val =
-            ((TimestampObjectInspector) inspector).
-                getPrimitiveJavaObject(obj);
-        indexStatistics.updateTimestamp(val);
-        seconds.write((val.getTime() / MILLIS_PER_SECOND) - base_timestamp);
-        nanos.write(formatNanos(val.getNanos()));
-        if (createBloomFilter) {
-          bloomFilter.addLong(val.getTime());
-        }
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          long value = vec.vector[0];
-          long valueMillis = value / MILLIS_PER_NANO;
-          indexStatistics.updateTimestamp(valueMillis);
-          if (createBloomFilter) {
-            bloomFilter.addLong(valueMillis);
-          }
-          final long secs = value / NANOS_PER_SECOND - base_timestamp;
-          final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
-          for(int i=0; i < length; ++i) {
-            seconds.write(secs);
-            nanos.write(nano);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            long value = vec.vector[i + offset];
-            long valueMillis = value / MILLIS_PER_NANO;
-            long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
-            int valueNanos = (int) (value % NANOS_PER_SECOND);
-            if (valueNanos < 0) {
-              valueNanos += NANOS_PER_SECOND;
-            }
-            seconds.write(valueSecs);
-            nanos.write(formatNanos(valueNanos));
-            indexStatistics.updateTimestamp(valueMillis);
-            if (createBloomFilter) {
-              bloomFilter.addLong(valueMillis);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      seconds.flush();
-      nanos.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    private static long formatNanos(int nanos) {
-      if (nanos == 0) {
-        return 0;
-      } else if (nanos % 100 != 0) {
-        return ((long) nanos) << 3;
-      } else {
-        nanos /= 100;
-        int trailingZeros = 1;
-        while (nanos % 10 == 0 && trailingZeros < 7) {
-          nanos /= 10;
-          trailingZeros += 1;
-        }
-        return ((long) nanos) << 3 | trailingZeros;
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      seconds.getPosition(recorder);
-      nanos.getPosition(recorder);
-    }
-  }
-
-  private static class DateTreeWriter extends TreeWriter {
-    private final IntegerWriter writer;
-    private final boolean isDirectV2;
-
-    DateTreeWriter(int columnId,
-                   ObjectInspector inspector,
-                   TypeDescription schema,
-                   StreamFactory writer,
-                   boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      OutStream out = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        // Using the Writable here as it's used directly for writing as well as for stats.
-        DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
-        indexStatistics.updateDate(val);
-        writer.write(val.getDays());
-        if (createBloomFilter) {
-          bloomFilter.addLong(val.getDays());
-        }
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int value = (int) vec.vector[0];
-          indexStatistics.updateDate(value);
-          if (createBloomFilter) {
-            bloomFilter.addLong(value);
-          }
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            int value = (int) vec.vector[i + offset];
-            writer.write(value);
-            indexStatistics.updateDate(value);
-            if (createBloomFilter) {
-              bloomFilter.addLong(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      writer.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-  }
-
-  private static class DecimalTreeWriter extends TreeWriter {
-    private final PositionedOutputStream valueStream;
-    private final IntegerWriter scaleStream;
-    private final boolean isDirectV2;
-
-    DecimalTreeWriter(int columnId,
-                        ObjectInspector inspector,
-                        TypeDescription schema,
-                        StreamFactory writer,
-                        boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
-      this.scaleStream = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding getEncoding() {
-      if (isDirectV2) {
-        return OrcProto.ColumnEncoding.newBuilder()
-            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
-      }
-      return OrcProto.ColumnEncoding.newBuilder()
-          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        HiveDecimal decimal = ((HiveDecimalObjectInspector) inspector).
-            getPrimitiveJavaObject(obj);
-        if (decimal == null) {
-          return;
-        }
-        SerializationUtils.writeBigInteger(valueStream,
-            decimal.unscaledValue());
-        scaleStream.write(decimal.scale());
-        indexStatistics.updateDecimal(decimal);
-        if (createBloomFilter) {
-          bloomFilter.addString(decimal.toString());
-        }
-      }
-    }
-
-    @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      DecimalColumnVector vec = (DecimalColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          HiveDecimal value = vec.vector[0].getHiveDecimal();
-          indexStatistics.updateDecimal(value);
-          if (createBloomFilter) {
-            bloomFilter.addString(value.toString());
-          }
-          for(int i=0; i < length; ++i) {
-            SerializationUtils.writeBigInteger(valueStream,
-                value.unscaledValue());
-            scaleStream.write(value.scale());
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
-            SerializationUtils.writeBigInteger(valueStream,
-                value.unscaledValue());
-            scaleStream.write(value.scale());
-            indexStatistics.updateDecimal(value);
-            if (createBloomFilter) {
-              bloomFilter.addString(value.toString());
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
-      valueStream.flush();
-      scaleStream.flush();
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      valueStream.getPosition(recorder);
-      scaleStream.getPosition(recorder);
-    }
-  }
-
-  private static class StructTreeWriter extends TreeWriter {
-    private final List<? extends StructField> fields;
-    StructTreeWriter(int columnId,
-                     ObjectInspector inspector,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, inspector, schema, writer, nullable);
-      List<TypeDescription> children = schema.getChildren();
-      if (inspector != null) {
-        StructObjectInspector structObjectInspector =
-            (StructObjectInspector) inspector;
-        fields = structObjectInspector.getAllStructFieldRefs();
-      } else {
-        fields = null;
-      }
-      childrenWriters = new TreeWriter[children.size()];
-      for(int i=0; i < childrenWriters.length; ++i) {
-        ObjectInspector childOI;
-        if (fields != null && i < fields.size()) {
-          childOI = fields.get(i).getFieldObjectInspector();
-        } else {
-          childOI = null;
-        }
-        childrenWriters[i] = createTreeWriter(
-          childOI, children.get(i), writer,
-          true);
-      }
-      recordPosition(rowIndexPosition);
-    }
-
-    @Override
-    void write(Object obj) throws IOException {
-      super.write(obj);
-      if (obj != null) {
-        StructObjectInspector insp = (StructObjectInspecto

<TRUNCATED>