You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/14 22:36:17 UTC

[4/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.

http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
new file mode 100644
index 0000000..5157d4d
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -0,0 +1,2912 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.BinaryColumnStatistics;
+import org.apache.orc.BloomFilterIO;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+
+/**
+ * An ORC file writer. The file is divided into stripes, which is the natural
+ * unit of work when reading. Each stripe is buffered in memory until the
+ * memory reaches the stripe size and then it is written out broken down by
+ * columns. Each column is written by a TreeWriter that is specific to that
+ * type of column. TreeWriters may have children TreeWriters that handle the
+ * sub-types. Each of the TreeWriters writes the column's data as a set of
+ * streams.
+ *
+ * This class is unsynchronized like most Stream objects, so from the creation
+ * of an OrcFile and all access to a single instance has to be from a single
+ * thread.
+ *
+ * There are no known cases where these happen between different threads today.
+ *
+ * Caveat: the MemoryManager is created during WriterOptions create, that has
+ * to be confined to a single thread as well.
+ *
+ */
+public class WriterImpl implements Writer, MemoryManager.Callback {
+
+  private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
+
+  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+  private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+  // threshold above which buffer size will be automatically resized
+  private static final int COLUMN_COUNT_THRESHOLD = 1000;
+
+  private final FileSystem fs;
+  private final Path path;
+  private final long defaultStripeSize;
+  private long adjustedStripeSize;
+  private final int rowIndexStride;
+  private final CompressionKind compress;
+  private final CompressionCodec codec;
+  private final boolean addBlockPadding;
+  private final int bufferSize;
+  private final long blockSize;
+  private final double paddingTolerance;
+  private final TypeDescription schema;
+
+  // the streams that make up the current stripe
+  private final Map<StreamName, BufferedStream> streams =
+    new TreeMap<StreamName, BufferedStream>();
+
+  private FSDataOutputStream rawWriter = null;
+  // the compressed metadata information outStream
+  private OutStream writer = null;
+  // a protobuf outStream around streamFactory
+  private CodedOutputStream protobufWriter = null;
+  private long headerLength;
+  private int columnCount;
+  private long rowCount = 0;
+  private long rowsInStripe = 0;
+  private long rawDataSize = 0;
+  private int rowsInIndex = 0;
+  private int stripesAtLastFlush = -1;
+  private final List<OrcProto.StripeInformation> stripes =
+    new ArrayList<OrcProto.StripeInformation>();
+  private final Map<String, ByteString> userMetadata =
+    new TreeMap<String, ByteString>();
+  private final StreamFactory streamFactory = new StreamFactory();
+  private final TreeWriter treeWriter;
+  private final boolean buildIndex;
+  private final MemoryManager memoryManager;
+  private final OrcFile.Version version;
+  private final Configuration conf;
+  private final OrcFile.WriterCallback callback;
+  private final OrcFile.WriterContext callbackContext;
+  private final OrcFile.EncodingStrategy encodingStrategy;
+  private final OrcFile.CompressionStrategy compressionStrategy;
+  private final boolean[] bloomFilterColumns;
+  private final double bloomFilterFpp;
+  private boolean writeTimeZone;
+
+  public WriterImpl(FileSystem fs,
+                    Path path,
+                    OrcFile.WriterOptions opts) throws IOException {
+    this.fs = fs;
+    this.path = path;
+    this.conf = opts.getConfiguration();
+    this.callback = opts.getCallback();
+    this.schema = opts.getSchema();
+    if (callback != null) {
+      callbackContext = new OrcFile.WriterContext(){
+
+        @Override
+        public Writer getWriter() {
+          return WriterImpl.this;
+        }
+      };
+    } else {
+      callbackContext = null;
+    }
+    this.adjustedStripeSize = opts.getStripeSize();
+    this.defaultStripeSize = opts.getStripeSize();
+    this.version = opts.getVersion();
+    this.encodingStrategy = opts.getEncodingStrategy();
+    this.compressionStrategy = opts.getCompressionStrategy();
+    this.addBlockPadding = opts.getBlockPadding();
+    this.blockSize = opts.getBlockSize();
+    this.paddingTolerance = opts.getPaddingTolerance();
+    this.compress = opts.getCompress();
+    this.rowIndexStride = opts.getRowIndexStride();
+    this.memoryManager = opts.getMemoryManager();
+    buildIndex = rowIndexStride > 0;
+    codec = createCodec(compress);
+    int numColumns = schema.getMaximumId() + 1;
+    this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
+        numColumns, opts.getBufferSize());
+    if (version == OrcFile.Version.V_0_11) {
+      /* do not write bloom filters for ORC v11 */
+      this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
+    } else {
+      this.bloomFilterColumns =
+          OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
+    }
+    this.bloomFilterFpp = opts.getBloomFilterFpp();
+    treeWriter = createTreeWriter(schema, streamFactory, false);
+    if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+      throw new IllegalArgumentException("Row stride must be at least " +
+          MIN_ROW_INDEX_STRIDE);
+    }
+
+    // ensure that we are able to handle callbacks before we register ourselves
+    memoryManager.addWriter(path, opts.getStripeSize(), this);
+  }
+
+  @VisibleForTesting
+  public static int getEstimatedBufferSize(long stripeSize, int numColumns,
+                                           int bs) {
+    // The worst case is that there are 2 big streams per a column and
+    // we want to guarantee that each stream gets ~10 buffers.
+    // This keeps buffers small enough that we don't get really small stripe
+    // sizes.
+    int estBufferSize = (int) (stripeSize / (20 * numColumns));
+    estBufferSize = getClosestBufferSize(estBufferSize);
+    if (estBufferSize > bs) {
+      estBufferSize = bs;
+    } else {
+      LOG.info("WIDE TABLE - Number of columns: " + numColumns +
+          " Chosen compression buffer size: " + estBufferSize);
+    }
+    return estBufferSize;
+  }
+
+  private static int getClosestBufferSize(int estBufferSize) {
+    final int kb4 = 4 * 1024;
+    final int kb8 = 8 * 1024;
+    final int kb16 = 16 * 1024;
+    final int kb32 = 32 * 1024;
+    final int kb64 = 64 * 1024;
+    final int kb128 = 128 * 1024;
+    final int kb256 = 256 * 1024;
+    if (estBufferSize <= kb4) {
+      return kb4;
+    } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
+      return kb8;
+    } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
+      return kb16;
+    } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
+      return kb32;
+    } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
+      return kb64;
+    } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
+      return kb128;
+    } else {
+      return kb256;
+    }
+  }
+
+  public static CompressionCodec createCodec(CompressionKind kind) {
+    switch (kind) {
+      case NONE:
+        return null;
+      case ZLIB:
+        return new ZlibCodec();
+      case SNAPPY:
+        return new SnappyCodec();
+      case LZO:
+        try {
+          ClassLoader loader = Thread.currentThread().getContextClassLoader();
+          if (loader == null) {
+            loader = WriterImpl.class.getClassLoader();
+          }
+          @SuppressWarnings("unchecked")
+          Class<? extends CompressionCodec> lzo =
+              (Class<? extends CompressionCodec>)
+              loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+          return lzo.newInstance();
+        } catch (ClassNotFoundException e) {
+          throw new IllegalArgumentException("LZO is not available.", e);
+        } catch (InstantiationException e) {
+          throw new IllegalArgumentException("Problem initializing LZO", e);
+        } catch (IllegalAccessException e) {
+          throw new IllegalArgumentException("Insufficient access to LZO", e);
+        }
+      default:
+        throw new IllegalArgumentException("Unknown compression codec: " +
+            kind);
+    }
+  }
+
+  @Override
+  public boolean checkMemory(double newScale) throws IOException {
+    long limit = (long) Math.round(adjustedStripeSize * newScale);
+    long size = estimateStripeSize();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
+                limit);
+    }
+    if (size > limit) {
+      flushStripe();
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * This class is used to hold the contents of streams as they are buffered.
+   * The TreeWriters write to the outStream and the codec compresses the
+   * data as buffers fill up and stores them in the output list. When the
+   * stripe is being written, the whole stream is written to the file.
+   */
+  private class BufferedStream implements OutStream.OutputReceiver {
+    private final OutStream outStream;
+    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+    BufferedStream(String name, int bufferSize,
+                   CompressionCodec codec) throws IOException {
+      outStream = new OutStream(name, bufferSize, codec, this);
+    }
+
+    /**
+     * Receive a buffer from the compression codec.
+     * @param buffer the buffer to save
+     */
+    @Override
+    public void output(ByteBuffer buffer) {
+      output.add(buffer);
+    }
+
+    /**
+     * Get the number of bytes in buffers that are allocated to this stream.
+     * @return number of bytes in buffers
+     */
+    public long getBufferSize() {
+      long result = 0;
+      for(ByteBuffer buf: output) {
+        result += buf.capacity();
+      }
+      return outStream.getBufferSize() + result;
+    }
+
+    /**
+     * Flush the stream to the codec.
+     * @throws IOException
+     */
+    public void flush() throws IOException {
+      outStream.flush();
+    }
+
+    /**
+     * Clear all of the buffers.
+     * @throws IOException
+     */
+    public void clear() throws IOException {
+      outStream.clear();
+      output.clear();
+    }
+
+    /**
+     * Check the state of suppress flag in output stream
+     * @return value of suppress flag
+     */
+    public boolean isSuppressed() {
+      return outStream.isSuppressed();
+    }
+
+    /**
+     * Get the number of bytes that will be written to the output. Assumes
+     * the stream has already been flushed.
+     * @return the number of bytes
+     */
+    public long getOutputSize() {
+      long result = 0;
+      for(ByteBuffer buffer: output) {
+        result += buffer.remaining();
+      }
+      return result;
+    }
+
+    /**
+     * Write the saved compressed buffers to the OutputStream.
+     * @param out the stream to write to
+     * @throws IOException
+     */
+    void spillTo(OutputStream out) throws IOException {
+      for(ByteBuffer buffer: output) {
+        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+          buffer.remaining());
+      }
+    }
+
+    @Override
+    public String toString() {
+      return outStream.toString();
+    }
+  }
+
+  /**
+   * An output receiver that writes the ByteBuffers to the output stream
+   * as they are received.
+   */
+  private class DirectStream implements OutStream.OutputReceiver {
+    private final FSDataOutputStream output;
+
+    DirectStream(FSDataOutputStream output) {
+      this.output = output;
+    }
+
+    @Override
+    public void output(ByteBuffer buffer) throws IOException {
+      output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+        buffer.remaining());
+    }
+  }
+
+  private static class RowIndexPositionRecorder implements PositionRecorder {
+    private final OrcProto.RowIndexEntry.Builder builder;
+
+    RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+      this.builder = builder;
+    }
+
+    @Override
+    public void addPosition(long position) {
+      builder.addPositions(position);
+    }
+  }
+
+  /**
+   * Interface from the Writer to the TreeWriters. This limits the visibility
+   * that the TreeWriters have into the Writer.
+   */
+  private class StreamFactory {
+    /**
+     * Create a stream to store part of a column.
+     * @param column the column id for the stream
+     * @param kind the kind of stream
+     * @return The output outStream that the section needs to be written to.
+     * @throws IOException
+     */
+    public OutStream createStream(int column,
+                                  OrcProto.Stream.Kind kind
+                                  ) throws IOException {
+      final StreamName name = new StreamName(column, kind);
+      final EnumSet<CompressionCodec.Modifier> modifiers;
+
+      switch (kind) {
+        case BLOOM_FILTER:
+        case DATA:
+        case DICTIONARY_DATA:
+          if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
+            modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
+                CompressionCodec.Modifier.TEXT);
+          } else {
+            modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+                CompressionCodec.Modifier.TEXT);
+          }
+          break;
+        case LENGTH:
+        case DICTIONARY_COUNT:
+        case PRESENT:
+        case ROW_INDEX:
+        case SECONDARY:
+          // easily compressed using the fastest modes
+          modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST,
+              CompressionCodec.Modifier.BINARY);
+          break;
+        default:
+          LOG.warn("Missing ORC compression modifiers for " + kind);
+          modifiers = null;
+          break;
+      }
+
+      BufferedStream result = streams.get(name);
+      if (result == null) {
+        result = new BufferedStream(name.toString(), bufferSize,
+            codec == null ? codec : codec.modify(modifiers));
+        streams.put(name, result);
+      }
+      return result.outStream;
+    }
+
+    /**
+     * Get the next column id.
+     * @return a number from 0 to the number of columns - 1
+     */
+    public int getNextColumnId() {
+      return columnCount++;
+    }
+
+    /**
+     * Get the stride rate of the row index.
+     */
+    public int getRowIndexStride() {
+      return rowIndexStride;
+    }
+
+    /**
+     * Should be building the row index.
+     * @return true if we are building the index
+     */
+    public boolean buildIndex() {
+      return buildIndex;
+    }
+
+    /**
+     * Is the ORC file compressed?
+     * @return are the streams compressed
+     */
+    public boolean isCompressed() {
+      return codec != null;
+    }
+
+    /**
+     * Get the encoding strategy to use.
+     * @return encoding strategy
+     */
+    public OrcFile.EncodingStrategy getEncodingStrategy() {
+      return encodingStrategy;
+    }
+
+    /**
+     * Get the compression strategy to use.
+     * @return compression strategy
+     */
+    public OrcFile.CompressionStrategy getCompressionStrategy() {
+      return compressionStrategy;
+    }
+
+    /**
+     * Get the bloom filter columns
+     * @return bloom filter columns
+     */
+    public boolean[] getBloomFilterColumns() {
+      return bloomFilterColumns;
+    }
+
+    /**
+     * Get bloom filter false positive percentage.
+     * @return fpp
+     */
+    public double getBloomFilterFPP() {
+      return bloomFilterFpp;
+    }
+
+    /**
+     * Get the writer's configuration.
+     * @return configuration
+     */
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    /**
+     * Get the version of the file to write.
+     */
+    public OrcFile.Version getVersion() {
+      return version;
+    }
+
+    public void useWriterTimeZone(boolean val) {
+      writeTimeZone = val;
+    }
+
+    public boolean hasWriterTimeZone() {
+      return writeTimeZone;
+    }
+  }
+
+  /**
+   * The parent class of all of the writers for each column. Each column
+   * is written by an instance of this class. The compound types (struct,
+   * list, map, and union) have children tree writers that write the children
+   * types.
+   */
+  private abstract static class TreeWriter {
+    protected final int id;
+    protected final BitFieldWriter isPresent;
+    private final boolean isCompressed;
+    protected final ColumnStatisticsImpl indexStatistics;
+    protected final ColumnStatisticsImpl stripeColStatistics;
+    private final ColumnStatisticsImpl fileStatistics;
+    protected TreeWriter[] childrenWriters;
+    protected final RowIndexPositionRecorder rowIndexPosition;
+    private final OrcProto.RowIndex.Builder rowIndex;
+    private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+    private final PositionedOutputStream rowIndexStream;
+    private final PositionedOutputStream bloomFilterStream;
+    protected final BloomFilterIO bloomFilter;
+    protected final boolean createBloomFilter;
+    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+    private final OrcProto.BloomFilter.Builder bloomFilterEntry;
+    private boolean foundNulls;
+    private OutStream isPresentOutStream;
+    private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
+    private final StreamFactory streamFactory;
+
+    /**
+     * Create a tree writer.
+     * @param columnId the column id of the column to write
+     * @param schema the row schema
+     * @param streamFactory limited access to the Writer's data.
+     * @param nullable can the value be null?
+     * @throws IOException
+     */
+    TreeWriter(int columnId,
+               TypeDescription schema,
+               StreamFactory streamFactory,
+               boolean nullable) throws IOException {
+      this.streamFactory = streamFactory;
+      this.isCompressed = streamFactory.isCompressed();
+      this.id = columnId;
+      if (nullable) {
+        isPresentOutStream = streamFactory.createStream(id,
+            OrcProto.Stream.Kind.PRESENT);
+        isPresent = new BitFieldWriter(isPresentOutStream, 1);
+      } else {
+        isPresent = null;
+      }
+      this.foundNulls = false;
+      createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
+      indexStatistics = ColumnStatisticsImpl.create(schema);
+      stripeColStatistics = ColumnStatisticsImpl.create(schema);
+      fileStatistics = ColumnStatisticsImpl.create(schema);
+      childrenWriters = new TreeWriter[0];
+      rowIndex = OrcProto.RowIndex.newBuilder();
+      rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+      rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+      stripeStatsBuilders = Lists.newArrayList();
+      if (streamFactory.buildIndex()) {
+        rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
+      } else {
+        rowIndexStream = null;
+      }
+      if (createBloomFilter) {
+        bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+        bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
+        bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
+            streamFactory.getBloomFilterFPP());
+      } else {
+        bloomFilterEntry = null;
+        bloomFilterIndex = null;
+        bloomFilterStream = null;
+        bloomFilter = null;
+      }
+    }
+
+    protected OrcProto.RowIndex.Builder getRowIndex() {
+      return rowIndex;
+    }
+
+    protected ColumnStatisticsImpl getStripeStatistics() {
+      return stripeColStatistics;
+    }
+
+    protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+      return rowIndexEntry;
+    }
+
+    IntegerWriter createIntegerWriter(PositionedOutputStream output,
+                                      boolean signed, boolean isDirectV2,
+                                      StreamFactory writer) {
+      if (isDirectV2) {
+        boolean alignedBitpacking = false;
+        if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
+          alignedBitpacking = true;
+        }
+        return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
+      } else {
+        return new RunLengthIntegerWriter(output, signed);
+      }
+    }
+
+    boolean isNewWriteFormat(StreamFactory writer) {
+      return writer.getVersion() != OrcFile.Version.V_0_11;
+    }
+
+    /**
+     * Handle the top level object write.
+     *
+     * This default method is used for all types except structs, which are the
+     * typical case. VectorizedRowBatch assumes the top level object is a
+     * struct, so we use the first column for all other types.
+     * @param batch the batch to write from
+     * @param offset the row to start on
+     * @param length the number of rows to write
+     * @throws IOException
+     */
+    void writeRootBatch(VectorizedRowBatch batch, int offset,
+                        int length) throws IOException {
+      writeBatch(batch.cols[0], offset, length);
+    }
+
+    /**
+     * Write the values from the given vector from offset for length elements.
+     * @param vector the vector to write from
+     * @param offset the first value from the vector to write
+     * @param length the number of values from the vector to write
+     * @throws IOException
+     */
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      if (vector.noNulls) {
+        indexStatistics.increment(length);
+        if (isPresent != null) {
+          for (int i = 0; i < length; ++i) {
+            isPresent.write(1);
+          }
+        }
+      } else {
+        if (vector.isRepeating) {
+          boolean isNull = vector.isNull[0];
+          if (isPresent != null) {
+            for (int i = 0; i < length; ++i) {
+              isPresent.write(isNull ? 0 : 1);
+            }
+          }
+          if (isNull) {
+            foundNulls = true;
+            indexStatistics.setNull();
+          } else {
+            indexStatistics.increment(length);
+          }
+        } else {
+          // count the number of non-null values
+          int nonNullCount = 0;
+          for(int i = 0; i < length; ++i) {
+            boolean isNull = vector.isNull[i + offset];
+            if (!isNull) {
+              nonNullCount += 1;
+            }
+            if (isPresent != null) {
+              isPresent.write(isNull ? 0 : 1);
+            }
+          }
+          indexStatistics.increment(nonNullCount);
+          if (nonNullCount != length) {
+            foundNulls = true;
+            indexStatistics.setNull();
+          }
+        }
+      }
+    }
+
+    private void removeIsPresentPositions() {
+      for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+        OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+        List<Long> positions = entry.getPositionsList();
+        // bit streams use 3 positions if uncompressed, 4 if compressed
+        positions = positions.subList(isCompressed ? 4 : 3, positions.size());
+        entry.clearPositions();
+        entry.addAllPositions(positions);
+      }
+    }
+
+    /**
+     * Write the stripe out to the file.
+     * @param builder the stripe footer that contains the information about the
+     *                layout of the stripe. The TreeWriter is required to update
+     *                the footer with its information.
+     * @param requiredIndexEntries the number of index entries that are
+     *                             required. this is to check to make sure the
+     *                             row index is well formed.
+     * @throws IOException
+     */
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      if (isPresent != null) {
+        isPresent.flush();
+
+        // if no nulls are found in a stream, then suppress the stream
+        if(!foundNulls) {
+          isPresentOutStream.suppress();
+          // since isPresent bitstream is suppressed, update the index to
+          // remove the positions of the isPresent stream
+          if (rowIndexStream != null) {
+            removeIsPresentPositions();
+          }
+        }
+      }
+
+      // merge stripe-level column statistics to file statistics and write it to
+      // stripe statistics
+      OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
+      writeStripeStatistics(stripeStatsBuilder, this);
+      stripeStatsBuilders.add(stripeStatsBuilder);
+
+      // reset the flag for next stripe
+      foundNulls = false;
+
+      builder.addColumns(getEncoding());
+      if (streamFactory.hasWriterTimeZone()) {
+        builder.setWriterTimezone(TimeZone.getDefault().getID());
+      }
+      if (rowIndexStream != null) {
+        if (rowIndex.getEntryCount() != requiredIndexEntries) {
+          throw new IllegalArgumentException("Column has wrong number of " +
+               "index entries found: " + rowIndex.getEntryCount() + " expected: " +
+               requiredIndexEntries);
+        }
+        rowIndex.build().writeTo(rowIndexStream);
+        rowIndexStream.flush();
+      }
+      rowIndex.clear();
+      rowIndexEntry.clear();
+
+      // write the bloom filter to out stream
+      if (bloomFilterStream != null) {
+        bloomFilterIndex.build().writeTo(bloomFilterStream);
+        bloomFilterStream.flush();
+        bloomFilterIndex.clear();
+        bloomFilterEntry.clear();
+      }
+    }
+
+    private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
+        TreeWriter treeWriter) {
+      treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
+      builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
+      treeWriter.stripeColStatistics.reset();
+      for (TreeWriter child : treeWriter.getChildrenWriters()) {
+        writeStripeStatistics(builder, child);
+      }
+    }
+
+    TreeWriter[] getChildrenWriters() {
+      return childrenWriters;
+    }
+
+    /**
+     * Get the encoding for this column.
+     * @return the information about the encoding of this column
+     */
+    OrcProto.ColumnEncoding getEncoding() {
+      return OrcProto.ColumnEncoding.newBuilder().setKind(
+          OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    /**
+     * Create a row index entry with the previous location and the current
+     * index statistics. Also merges the index statistics into the file
+     * statistics before they are cleared. Finally, it records the start of the
+     * next index and ensures all of the children columns also create an entry.
+     * @throws IOException
+     */
+    void createRowIndexEntry() throws IOException {
+      stripeColStatistics.merge(indexStatistics);
+      rowIndexEntry.setStatistics(indexStatistics.serialize());
+      indexStatistics.reset();
+      rowIndex.addEntry(rowIndexEntry);
+      rowIndexEntry.clear();
+      addBloomFilterEntry();
+      recordPosition(rowIndexPosition);
+      for(TreeWriter child: childrenWriters) {
+        child.createRowIndexEntry();
+      }
+    }
+
+    void addBloomFilterEntry() {
+      if (createBloomFilter) {
+        bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
+        bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
+        bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+        bloomFilter.reset();
+        bloomFilterEntry.clear();
+      }
+    }
+
+    /**
+     * Record the current position in each of this column's streams.
+     * @param recorder where should the locations be recorded
+     * @throws IOException
+     */
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      if (isPresent != null) {
+        isPresent.getPosition(recorder);
+      }
+    }
+
+    /**
+     * Estimate how much memory the writer is consuming excluding the streams.
+     * @return the number of bytes.
+     */
+    long estimateMemory() {
+      long result = 0;
+      for (TreeWriter child: childrenWriters) {
+        result += child.estimateMemory();
+      }
+      return result;
+    }
+  }
+
+  private static class BooleanTreeWriter extends TreeWriter {
+    private final BitFieldWriter writer;
+
+    BooleanTreeWriter(int columnId,
+                      TypeDescription schema,
+                      StreamFactory writer,
+                      boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      PositionedOutputStream out = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.writer = new BitFieldWriter(out, 1);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int value = vec.vector[0] == 0 ? 0 : 1;
+          indexStatistics.updateBoolean(value != 0, length);
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            int value = vec.vector[i + offset] == 0 ? 0 : 1;
+            writer.write(value);
+            indexStatistics.updateBoolean(value != 0, 1);
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+  }
+
+  private static class ByteTreeWriter extends TreeWriter {
+    private final RunLengthByteWriter writer;
+
+    ByteTreeWriter(int columnId,
+                      TypeDescription schema,
+                      StreamFactory writer,
+                      boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.writer = new RunLengthByteWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.DATA));
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          byte value = (byte) vec.vector[0];
+          indexStatistics.updateInteger(value, length);
+          if (createBloomFilter) {
+            bloomFilter.addLong(value);
+          }
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            byte value = (byte) vec.vector[i + offset];
+            writer.write(value);
+            indexStatistics.updateInteger(value, 1);
+            if (createBloomFilter) {
+              bloomFilter.addLong(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+  }
+
+  private static class IntegerTreeWriter extends TreeWriter {
+    private final IntegerWriter writer;
+    private boolean isDirectV2 = true;
+
+    IntegerTreeWriter(int columnId,
+                      TypeDescription schema,
+                      StreamFactory writer,
+                      boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      OutStream out = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          long value = vec.vector[0];
+          indexStatistics.updateInteger(value, length);
+          if (createBloomFilter) {
+            bloomFilter.addLong(value);
+          }
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            long value = vec.vector[i + offset];
+            writer.write(value);
+            indexStatistics.updateInteger(value, 1);
+            if (createBloomFilter) {
+              bloomFilter.addLong(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+  }
+
+  private static class FloatTreeWriter extends TreeWriter {
+    private final PositionedOutputStream stream;
+    private final SerializationUtils utils;
+
+    FloatTreeWriter(int columnId,
+                      TypeDescription schema,
+                      StreamFactory writer,
+                      boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.stream = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.utils = new SerializationUtils();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      DoubleColumnVector vec = (DoubleColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          float value = (float) vec.vector[0];
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            bloomFilter.addDouble(value);
+          }
+          for(int i=0; i < length; ++i) {
+            utils.writeFloat(stream, value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            float value = (float) vec.vector[i + offset];
+            utils.writeFloat(stream, value);
+            indexStatistics.updateDouble(value);
+            if (createBloomFilter) {
+              bloomFilter.addDouble(value);
+            }
+          }
+        }
+      }
+    }
+
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      stream.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      stream.getPosition(recorder);
+    }
+  }
+
+  private static class DoubleTreeWriter extends TreeWriter {
+    private final PositionedOutputStream stream;
+    private final SerializationUtils utils;
+
+    DoubleTreeWriter(int columnId,
+                    TypeDescription schema,
+                    StreamFactory writer,
+                    boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.stream = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.utils = new SerializationUtils();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      DoubleColumnVector vec = (DoubleColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          double value = vec.vector[0];
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            bloomFilter.addDouble(value);
+          }
+          for(int i=0; i < length; ++i) {
+            utils.writeDouble(stream, value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            double value = vec.vector[i + offset];
+            utils.writeDouble(stream, value);
+            indexStatistics.updateDouble(value);
+            if (createBloomFilter) {
+              bloomFilter.addDouble(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      stream.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      stream.getPosition(recorder);
+    }
+  }
+
+  private static abstract class StringBaseTreeWriter extends TreeWriter {
+    private static final int INITIAL_DICTIONARY_SIZE = 4096;
+    private final OutStream stringOutput;
+    private final IntegerWriter lengthOutput;
+    private final IntegerWriter rowOutput;
+    protected final StringRedBlackTree dictionary =
+        new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
+    protected final DynamicIntArray rows = new DynamicIntArray();
+    protected final PositionedOutputStream directStreamOutput;
+    protected final IntegerWriter directLengthOutput;
+    private final List<OrcProto.RowIndexEntry> savedRowIndex =
+        new ArrayList<OrcProto.RowIndexEntry>();
+    private final boolean buildIndex;
+    private final List<Long> rowIndexValueCount = new ArrayList<Long>();
+    // If the number of keys in a dictionary is greater than this fraction of
+    //the total number of non-null rows, turn off dictionary encoding
+    private final double dictionaryKeySizeThreshold;
+    protected boolean useDictionaryEncoding = true;
+    private boolean isDirectV2 = true;
+    private boolean doneDictionaryCheck;
+    private final boolean strideDictionaryCheck;
+
+    StringBaseTreeWriter(int columnId,
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      stringOutput = writer.createStream(id,
+          OrcProto.Stream.Kind.DICTIONARY_DATA);
+      lengthOutput = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      rowOutput = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+      rowIndexValueCount.add(0L);
+      buildIndex = writer.buildIndex();
+      directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+      directLengthOutput = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      Configuration conf = writer.getConfiguration();
+      dictionaryKeySizeThreshold =
+          OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+      strideDictionaryCheck =
+          OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
+      doneDictionaryCheck = false;
+    }
+
+    private boolean checkDictionaryEncoding() {
+      if (!doneDictionaryCheck) {
+        // Set the flag indicating whether or not to use dictionary encoding
+        // based on whether or not the fraction of distinct keys over number of
+        // non-null rows is less than the configured threshold
+        float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
+        useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
+        doneDictionaryCheck = true;
+      }
+      return useDictionaryEncoding;
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
+      // checking would not have happened. So do it again here.
+      checkDictionaryEncoding();
+
+      if (useDictionaryEncoding) {
+        flushDictionary();
+      } else {
+        // flushout any left over entries from dictionary
+        if (rows.size() > 0) {
+          flushDictionary();
+        }
+
+        // suppress the stream for every stripe if dictionary is disabled
+        stringOutput.suppress();
+      }
+
+      // we need to build the rowindex before calling super, since it
+      // writes it out.
+      super.writeStripe(builder, requiredIndexEntries);
+      stringOutput.flush();
+      lengthOutput.flush();
+      rowOutput.flush();
+      directStreamOutput.flush();
+      directLengthOutput.flush();
+      // reset all of the fields to be ready for the next stripe.
+      dictionary.clear();
+      savedRowIndex.clear();
+      rowIndexValueCount.clear();
+      recordPosition(rowIndexPosition);
+      rowIndexValueCount.add(0L);
+
+      if (!useDictionaryEncoding) {
+        // record the start positions of first index stride of next stripe i.e
+        // beginning of the direct streams when dictionary is disabled
+        recordDirectStreamPosition();
+      }
+    }
+
+    private void flushDictionary() throws IOException {
+      final int[] dumpOrder = new int[dictionary.size()];
+
+      if (useDictionaryEncoding) {
+        // Write the dictionary by traversing the red-black tree writing out
+        // the bytes and lengths; and creating the map from the original order
+        // to the final sorted order.
+
+        dictionary.visit(new StringRedBlackTree.Visitor() {
+          private int currentId = 0;
+          @Override
+          public void visit(StringRedBlackTree.VisitorContext context
+                           ) throws IOException {
+            context.writeBytes(stringOutput);
+            lengthOutput.write(context.getLength());
+            dumpOrder[context.getOriginalPosition()] = currentId++;
+          }
+        });
+      } else {
+        // for direct encoding, we don't want the dictionary data stream
+        stringOutput.suppress();
+      }
+      int length = rows.size();
+      int rowIndexEntry = 0;
+      OrcProto.RowIndex.Builder rowIndex = getRowIndex();
+      Text text = new Text();
+      // write the values translated into the dump order.
+      for(int i = 0; i <= length; ++i) {
+        // now that we are writing out the row values, we can finalize the
+        // row index
+        if (buildIndex) {
+          while (i == rowIndexValueCount.get(rowIndexEntry) &&
+              rowIndexEntry < savedRowIndex.size()) {
+            OrcProto.RowIndexEntry.Builder base =
+                savedRowIndex.get(rowIndexEntry++).toBuilder();
+            if (useDictionaryEncoding) {
+              rowOutput.getPosition(new RowIndexPositionRecorder(base));
+            } else {
+              PositionRecorder posn = new RowIndexPositionRecorder(base);
+              directStreamOutput.getPosition(posn);
+              directLengthOutput.getPosition(posn);
+            }
+            rowIndex.addEntry(base.build());
+          }
+        }
+        if (i != length) {
+          if (useDictionaryEncoding) {
+            rowOutput.write(dumpOrder[rows.get(i)]);
+          } else {
+            dictionary.getText(text, rows.get(i));
+            directStreamOutput.write(text.getBytes(), 0, text.getLength());
+            directLengthOutput.write(text.getLength());
+          }
+        }
+      }
+      rows.clear();
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      // Returns the encoding used for the last call to writeStripe
+      if (useDictionaryEncoding) {
+        if(isDirectV2) {
+          return OrcProto.ColumnEncoding.newBuilder().setKind(
+              OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
+              setDictionarySize(dictionary.size()).build();
+        }
+        return OrcProto.ColumnEncoding.newBuilder().setKind(
+            OrcProto.ColumnEncoding.Kind.DICTIONARY).
+            setDictionarySize(dictionary.size()).build();
+      } else {
+        if(isDirectV2) {
+          return OrcProto.ColumnEncoding.newBuilder().setKind(
+              OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+        }
+        return OrcProto.ColumnEncoding.newBuilder().setKind(
+            OrcProto.ColumnEncoding.Kind.DIRECT).build();
+      }
+    }
+
+    /**
+     * This method doesn't call the super method, because unlike most of the
+     * other TreeWriters, this one can't record the position in the streams
+     * until the stripe is being flushed. Therefore it saves all of the entries
+     * and augments them with the final information as the stripe is written.
+     * @throws IOException
+     */
+    @Override
+    void createRowIndexEntry() throws IOException {
+      getStripeStatistics().merge(indexStatistics);
+      OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
+      rowIndexEntry.setStatistics(indexStatistics.serialize());
+      indexStatistics.reset();
+      OrcProto.RowIndexEntry base = rowIndexEntry.build();
+      savedRowIndex.add(base);
+      rowIndexEntry.clear();
+      addBloomFilterEntry();
+      recordPosition(rowIndexPosition);
+      rowIndexValueCount.add(Long.valueOf(rows.size()));
+      if (strideDictionaryCheck) {
+        checkDictionaryEncoding();
+      }
+      if (!useDictionaryEncoding) {
+        if (rows.size() > 0) {
+          flushDictionary();
+          // just record the start positions of next index stride
+          recordDirectStreamPosition();
+        } else {
+          // record the start positions of next index stride
+          recordDirectStreamPosition();
+          getRowIndex().addEntry(base);
+        }
+      }
+    }
+
+    private void recordDirectStreamPosition() throws IOException {
+      directStreamOutput.getPosition(rowIndexPosition);
+      directLengthOutput.getPosition(rowIndexPosition);
+    }
+
+    @Override
+    long estimateMemory() {
+      return rows.getSizeInBytes() + dictionary.getSizeInBytes();
+    }
+  }
+
+  private static class StringTreeWriter extends StringBaseTreeWriter {
+    StringTreeWriter(int columnId,
+                   TypeDescription schema,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          if (useDictionaryEncoding) {
+            int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
+            for(int i=0; i < length; ++i) {
+              rows.add(id);
+            }
+          } else {
+            for(int i=0; i < length; ++i) {
+              directStreamOutput.write(vec.vector[0], vec.start[0],
+                  vec.length[0]);
+              directLengthOutput.write(vec.length[0]);
+            }
+          }
+          indexStatistics.updateString(vec.vector[0], vec.start[0],
+              vec.length[0], length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            if (useDictionaryEncoding) {
+              rows.add(dictionary.add(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]));
+            } else {
+              directStreamOutput.write(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+              directLengthOutput.write(vec.length[offset + i]);
+            }
+            indexStatistics.updateString(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i], 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Under the covers, char is written to ORC the same way as string.
+   */
+  private static class CharTreeWriter extends StringBaseTreeWriter {
+    private final int itemLength;
+    private final byte[] padding;
+
+    CharTreeWriter(int columnId,
+        TypeDescription schema,
+        StreamFactory writer,
+        boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      itemLength = schema.getMaxLength();
+      padding = new byte[itemLength];
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          byte[] ptr;
+          int ptrOffset;
+          if (vec.length[0] >= itemLength) {
+            ptr = vec.vector[0];
+            ptrOffset = vec.start[0];
+          } else {
+            ptr = padding;
+            ptrOffset = 0;
+            System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
+                vec.length[0]);
+            Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
+          }
+          if (useDictionaryEncoding) {
+            int id = dictionary.add(ptr, ptrOffset, itemLength);
+            for(int i=0; i < length; ++i) {
+              rows.add(id);
+            }
+          } else {
+            for(int i=0; i < length; ++i) {
+              directStreamOutput.write(ptr, ptrOffset, itemLength);
+              directLengthOutput.write(itemLength);
+            }
+          }
+          indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            byte[] ptr;
+            int ptrOffset;
+            if (vec.length[offset + i] >= itemLength) {
+              ptr = vec.vector[offset + i];
+              ptrOffset = vec.start[offset + i];
+            } else {
+              // it is the wrong length, so copy it
+              ptr = padding;
+              ptrOffset = 0;
+              System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
+                  ptr, 0, vec.length[offset + i]);
+              Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
+            }
+            if (useDictionaryEncoding) {
+              rows.add(dictionary.add(ptr, ptrOffset, itemLength));
+            } else {
+              directStreamOutput.write(ptr, ptrOffset, itemLength);
+              directLengthOutput.write(itemLength);
+            }
+            indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Under the covers, varchar is written to ORC the same way as string.
+   */
+  private static class VarcharTreeWriter extends StringBaseTreeWriter {
+    private final int maxLength;
+
+    VarcharTreeWriter(int columnId,
+        TypeDescription schema,
+        StreamFactory writer,
+        boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      maxLength = schema.getMaxLength();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int itemLength = Math.min(vec.length[0], maxLength);
+          if (useDictionaryEncoding) {
+            int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
+            for(int i=0; i < length; ++i) {
+              rows.add(id);
+            }
+          } else {
+            for(int i=0; i < length; ++i) {
+              directStreamOutput.write(vec.vector[0], vec.start[0],
+                  itemLength);
+              directLengthOutput.write(itemLength);
+            }
+          }
+          indexStatistics.updateString(vec.vector[0], vec.start[0],
+              itemLength, length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            int itemLength = Math.min(vec.length[offset + i], maxLength);
+            if (useDictionaryEncoding) {
+              rows.add(dictionary.add(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength));
+            } else {
+              directStreamOutput.write(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength);
+              directLengthOutput.write(itemLength);
+            }
+            indexStatistics.updateString(vec.vector[offset + i],
+                vec.start[offset + i], itemLength, 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private static class BinaryTreeWriter extends TreeWriter {
+    private final PositionedOutputStream stream;
+    private final IntegerWriter length;
+    private boolean isDirectV2 = true;
+
+    BinaryTreeWriter(int columnId,
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.stream = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.length = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      BytesColumnVector vec = (BytesColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          for(int i=0; i < length; ++i) {
+            stream.write(vec.vector[0], vec.start[0],
+                  vec.length[0]);
+            this.length.write(vec.length[0]);
+          }
+          indexStatistics.updateBinary(vec.vector[0], vec.start[0],
+              vec.length[0], length);
+          if (createBloomFilter) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            stream.write(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+            this.length.write(vec.length[offset + i]);
+            indexStatistics.updateBinary(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i], 1);
+            if (createBloomFilter) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+            }
+          }
+        }
+      }
+    }
+
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      stream.flush();
+      length.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      stream.getPosition(recorder);
+      length.getPosition(recorder);
+    }
+  }
+
+  public static final int MILLIS_PER_SECOND = 1000;
+  static final int NANOS_PER_SECOND = 1000000000;
+  static final int MILLIS_PER_NANO  = 1000000;
+  public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
+
+  private static class TimestampTreeWriter extends TreeWriter {
+    private final IntegerWriter seconds;
+    private final IntegerWriter nanos;
+    private final boolean isDirectV2;
+    private final long base_timestamp;
+
+    TimestampTreeWriter(int columnId,
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.seconds = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
+      this.nanos = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+      // for unit tests to set different time zones
+      this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
+      writer.useWriterTimeZone(true);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          long value = vec.vector[0];
+          long valueMillis = value / MILLIS_PER_NANO;
+          indexStatistics.updateTimestamp(valueMillis);
+          if (createBloomFilter) {
+            bloomFilter.addLong(valueMillis);
+          }
+          final long secs = value / NANOS_PER_SECOND - base_timestamp;
+          final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
+          for(int i=0; i < length; ++i) {
+            seconds.write(secs);
+            nanos.write(nano);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            long value = vec.vector[i + offset];
+            long valueMillis = value / MILLIS_PER_NANO;
+            long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
+            int valueNanos = (int) (value % NANOS_PER_SECOND);
+            if (valueNanos < 0) {
+              valueNanos += NANOS_PER_SECOND;
+            }
+            seconds.write(valueSecs);
+            nanos.write(formatNanos(valueNanos));
+            indexStatistics.updateTimestamp(valueMillis);
+            if (createBloomFilter) {
+              bloomFilter.addLong(valueMillis);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      seconds.flush();
+      nanos.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    private static long formatNanos(int nanos) {
+      if (nanos == 0) {
+        return 0;
+      } else if (nanos % 100 != 0) {
+        return ((long) nanos) << 3;
+      } else {
+        nanos /= 100;
+        int trailingZeros = 1;
+        while (nanos % 10 == 0 && trailingZeros < 7) {
+          nanos /= 10;
+          trailingZeros += 1;
+        }
+        return ((long) nanos) << 3 | trailingZeros;
+      }
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      seconds.getPosition(recorder);
+      nanos.getPosition(recorder);
+    }
+  }
+
+  private static class DateTreeWriter extends TreeWriter {
+    private final IntegerWriter writer;
+    private final boolean isDirectV2;
+
+    DateTreeWriter(int columnId,
+                   TypeDescription schema,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      OutStream out = writer.createStream(id,
+          OrcProto.Stream.Kind.DATA);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      LongColumnVector vec = (LongColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int value = (int) vec.vector[0];
+          indexStatistics.updateDate(value);
+          if (createBloomFilter) {
+            bloomFilter.addLong(value);
+          }
+          for(int i=0; i < length; ++i) {
+            writer.write(value);
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            int value = (int) vec.vector[i + offset];
+            writer.write(value);
+            indexStatistics.updateDate(value);
+            if (createBloomFilter) {
+              bloomFilter.addLong(value);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      writer.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      writer.getPosition(recorder);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+  }
+
+  private static class DecimalTreeWriter extends TreeWriter {
+    private final PositionedOutputStream valueStream;
+    private final IntegerWriter scaleStream;
+    private final boolean isDirectV2;
+
+    DecimalTreeWriter(int columnId,
+                        TypeDescription schema,
+                        StreamFactory writer,
+                        boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+      this.scaleStream = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      DecimalColumnVector vec = (DecimalColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          HiveDecimal value = vec.vector[0].getHiveDecimal();
+          indexStatistics.updateDecimal(value);
+          if (createBloomFilter) {
+            bloomFilter.addString(value.toString());
+          }
+          for(int i=0; i < length; ++i) {
+            SerializationUtils.writeBigInteger(valueStream,
+                value.unscaledValue());
+            scaleStream.write(value.scale());
+          }
+        }
+      } else {
+        for(int i=0; i < length; ++i) {
+          if (vec.noNulls || !vec.isNull[i + offset]) {
+            HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
+            SerializationUtils.writeBigInteger(valueStream,
+                value.unscaledValue());
+            scaleStream.write(value.scale());
+            indexStatistics.updateDecimal(value);
+            if (createBloomFilter) {
+              bloomFilter.addString(value.toString());
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      valueStream.flush();
+      scaleStream.flush();
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      valueStream.getPosition(recorder);
+      scaleStream.getPosition(recorder);
+    }
+  }
+
+  private static class StructTreeWriter extends TreeWriter {
+    StructTreeWriter(int columnId,
+                     TypeDescription schema,
+                     StreamFactory writer,
+                     boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      List<TypeDescription> children = schema.getChildren();
+      childrenWriters = new TreeWriter[children.size()];
+      for(int i=0; i < childrenWriters.length; ++i) {
+        childrenWriters[i] = createTreeWriter(
+          children.get(i), writer,
+          true);
+      }
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeRootBatch(VectorizedRowBatch batch, int offset,
+                        int length) throws IOException {
+      // update the statistics for the root column
+      indexStatistics.increment(length);
+      // I'm assuming that the root column isn't nullable so that I don't need
+      // to update isPresent.
+      for(int i=0; i < childrenWriters.length; ++i) {
+        childrenWriters[i].writeBatch(batch.cols[i], offset, length);
+      }
+    }
+
+    private static void writeFields(StructColumnVector vector,
+                                    TreeWriter[] childrenWriters,
+                                    int offset, int length) throws IOException {
+      for(int field=0; field < childrenWriters.length; ++field) {
+        childrenWriters[field].writeBatch(vector.fields[field], offset, length);
+      }
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      StructColumnVector vec = (StructColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          writeFields(vec, childrenWriters, offset, length);
+        }
+      } else if (vector.noNulls) {
+        writeFields(vec, childrenWriters, offset, length);
+      } else {
+        // write the records in runs
+        int currentRun = 0;
+        boolean started = false;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            if (!started) {
+              started = true;
+              currentRun = i;
+            }
+          } else if (started) {
+            started = false;
+            writeFields(vec, childrenWriters, offset + currentRun,
+                i - currentRun);
+          }
+        }
+        if (started) {
+          writeFields(vec, childrenWriters, offset + currentRun,
+              length - currentRun);
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      for(TreeWriter child: childrenWriters) {
+        child.writeStripe(builder, requiredIndexEntries);
+      }
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  private static class ListTreeWriter extends TreeWriter {
+    private final IntegerWriter lengths;
+    private final boolean isDirectV2;
+
+    ListTreeWriter(int columnId,
+                   TypeDescription schema,
+                   StreamFactory writer,
+                   boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      childrenWriters = new TreeWriter[1];
+      childrenWriters[0] =
+        createTreeWriter(schema.getChildren().get(0), writer, true);
+      lengths = createIntegerWriter(writer.createStream(columnId,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      ListColumnVector vec = (ListColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int childOffset = (int) vec.offsets[0];
+          int childLength = (int) vec.lengths[0];
+          for(int i=0; i < length; ++i) {
+            lengths.write(childLength);
+            childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
+          }
+          if (createBloomFilter) {
+            bloomFilter.addLong(childLength);
+          }
+        }
+      } else {
+        // write the elements in runs
+        int currentOffset = 0;
+        int currentLength = 0;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            int nextLength = (int) vec.lengths[offset + i];
+            int nextOffset = (int) vec.offsets[offset + i];
+            lengths.write(nextLength);
+            if (currentLength == 0) {
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else if (currentOffset + currentLength != nextOffset) {
+              childrenWriters[0].writeBatch(vec.child, currentOffset,
+                  currentLength);
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else {
+              currentLength += nextLength;
+            }
+          }
+        }
+        if (currentLength != 0) {
+          childrenWriters[0].writeBatch(vec.child, currentOffset,
+              currentLength);
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      lengths.flush();
+      for(TreeWriter child: childrenWriters) {
+        child.writeStripe(builder, requiredIndexEntries);
+      }
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      lengths.getPosition(recorder);
+    }
+  }
+
+  private static class MapTreeWriter extends TreeWriter {
+    private final IntegerWriter lengths;
+    private final boolean isDirectV2;
+
+    MapTreeWriter(int columnId,
+                  TypeDescription schema,
+                  StreamFactory writer,
+                  boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      childrenWriters = new TreeWriter[2];
+      List<TypeDescription> children = schema.getChildren();
+      childrenWriters[0] =
+        createTreeWriter(children.get(0), writer, true);
+      childrenWriters[1] =
+        createTreeWriter(children.get(1), writer, true);
+      lengths = createIntegerWriter(writer.createStream(columnId,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      MapColumnVector vec = (MapColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          int childOffset = (int) vec.offsets[0];
+          int childLength = (int) vec.lengths[0];
+          for(int i=0; i < length; ++i) {
+            lengths.write(childLength);
+            childrenWriters[0].writeBatch(vec.keys, childOffset, childLength);
+            childrenWriters[1].writeBatch(vec.values, childOffset, childLength);
+          }
+          if (createBloomFilter) {
+            bloomFilter.addLong(childLength);
+          }
+        }
+      } else {
+        // write the elements in runs
+        int currentOffset = 0;
+        int currentLength = 0;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            int nextLength = (int) vec.lengths[offset + i];
+            int nextOffset = (int) vec.offsets[offset + i];
+            lengths.write(nextLength);
+            if (currentLength == 0) {
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else if (currentOffset + currentLength != nextOffset) {
+              childrenWriters[0].writeBatch(vec.keys, currentOffset,
+                  currentLength);
+              childrenWriters[1].writeBatch(vec.values, currentOffset,
+                  currentLength);
+              currentOffset = nextOffset;
+              currentLength = nextLength;
+            } else {
+              currentLength += nextLength;
+            }
+          }
+        }
+        if (currentLength != 0) {
+          childrenWriters[0].writeBatch(vec.keys, currentOffset,
+              currentLength);
+          childrenWriters[1].writeBatch(vec.values, currentOffset,
+              currentLength);
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      lengths.flush();
+      for(TreeWriter child: childrenWriters) {
+        child.writeStripe(builder, requiredIndexEntries);
+      }
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      lengths.getPosition(recorder);
+    }
+  }
+
+  private static class UnionTreeWriter extends TreeWriter {
+    private final RunLengthByteWriter tags;
+
+    UnionTreeWriter(int columnId,
+                  TypeDescription schema,
+                  StreamFactory writer,
+                  boolean nullable) throws IOException {
+      super(columnId, schema, writer, nullable);
+      List<TypeDescription> children = schema.getChildren();
+      childrenWriters = new TreeWriter[children.size()];
+      for(int i=0; i < childrenWriters.length; ++i) {
+        childrenWriters[i] =
+            createTreeWriter(children.get(i), writer, true);
+      }
+      tags =
+        new RunLengthByteWriter(writer.createStream(columnId,
+            OrcProto.Stream.Kind.DATA));
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException {
+      super.writeBatch(vector, offset, length);
+      UnionColumnVector vec = (UnionColumnVector) vector;
+      if (vector.isRepeating) {
+        if (vector.noNulls || !vector.isNull[0]) {
+          byte tag = (byte) vec.tags[0];
+          for(int i=0; i < length; ++i) {
+            tags.write(tag);
+          }
+          if (createBloomFilter) {
+            bloomFilter.addLong(tag);
+          }
+          childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
+        }
+      } else {
+        // write the records in runs of the same tag
+        byte prevTag = 0;
+        int currentRun = 0;
+        boolean started = false;
+        for(int i=0; i < length; ++i) {
+          if (!vec.isNull[i + offset]) {
+            byte tag = (byte) vec.tags[offset + i];
+            tags.write(tag);
+            if (!started) {
+              started = true;
+              currentRun = i;
+              prevTag = tag;
+            } else if (tag != prevTag) {
+              childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+                  offset + currentRun, i - currentRun);
+              currentRun = i;
+              prevTag = tag;
+            }
+          } else if (started) {
+            started = false;
+            childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+                offset + currentRun, i - currentRun);
+          }
+        }
+        if (started) {
+          childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+              offset + currentRun, length - currentRun);
+        }
+      }
+    }
+
+    @Override
+    void writeStripe(OrcProto.StripeFooter.Builder builder,
+                     int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, requiredIndexEntries);
+      tags.flush();
+      for(TreeWriter child: childrenWriters) {
+        child.writeStripe(builder, requiredIndexEntries);
+      }
+      recordPosition(rowIndexPosition);
+    }
+
+    @Override
+    void recordPosition(PositionRecorder recorder) throws IOException {
+      super.recordPosition(recorder);
+      tags.getPosition(recorder);
+    }
+  }
+
+  private static TreeWriter createTreeWriter(TypeDescription schema,
+                                             StreamFactory streamFactory,
+                                             boolean nullable) throws IOException {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return new BooleanTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case BYTE:
+        return new ByteTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case SHORT:
+      case INT:
+      case LONG:
+        return new IntegerTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case FLOAT:
+        return new FloatTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case DOUBLE:
+        return new DoubleTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case STRING:
+        return new StringTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case CHAR:
+        return new CharTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case VARCHAR:
+        return new VarcharTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case BINARY:
+        return new BinaryTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case TIMESTAMP:
+        return new TimestampTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case DATE:
+        return new DateTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case DECIMAL:
+        return new DecimalTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory,  nullable);
+      case STRUCT:
+        return new StructTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case MAP:
+        return new MapTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case LIST:
+        return new ListTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      case UNION:
+        return new UnionTreeWriter(streamFactory.getNextColumnId(),
+            schema, streamFactory, nullable);
+      default:
+        throw new IllegalArgumentException("Bad category: " +
+            schema.getCategory());
+    }
+  }
+
+  private static void writeTypes(OrcProto.Footer.Builder builder,
+                                 TypeDescription schema) {
+    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+    List<TypeDescription> children = schema.getChildren();
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        type.setKind(OrcProto.Type.Kind.BOOLEAN);
+        break;
+      case BYTE:
+        type.setKind(OrcProto.Type.Kind.BYTE);
+        break;
+      case SHORT:
+        type.setKind(OrcProto.Type.Kind.SHORT);
+        break;
+      case INT:
+        type.setKind(OrcProto.Type.Kind.INT);
+        break;
+      case LONG:
+        type.setKind(OrcProto.Type.Kind.LONG);
+        break;
+      case FLOAT:
+        type.setKind(OrcProto.Type.Kind.FLOAT);
+        break;
+      case DOUBLE:
+        type.setKind(OrcProto.Type.Kind.DOUBLE);
+        break;
+      case STRING:
+        type.setKind(OrcProto.Type.Kind.STRING);
+        break;
+      case CHAR:
+        type.setKind(OrcProto.Type.Kind.CHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case VARCHAR:
+        type.setKind(OrcProto.Type.Kind.VARCHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case BINARY:
+        type.setKind(OrcProto.Type.Kind.BINARY);
+        break;
+      case TIMESTAMP:
+        type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+        break;
+      case DATE:
+        type.setKind(OrcProto.Type.Kind.DATE);
+        break;
+      case DECIMAL:
+        type.setKind(OrcProto.Type.Kind.DECIMAL);
+        type.setPrecision(schema.getPrecision());
+        type.setScale(schema.getScale());
+        break;
+      case LIST:
+        type.setKind(OrcProto.Type.Kind.LIST);
+        type.addSubtypes(children.get(0).getId());
+        break;
+      case MAP:
+        type.setKind(OrcProto.Type.Kind.MAP);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
+        break;
+      case STRUCT:
+        type.setKind(OrcProto.Type.Kind.STRUCT);
+        for(TypeD

<TRUNCATED>