You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/06/16 18:29:58 UTC

[1/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.

Repository: orc
Updated Branches:
  refs/heads/master 8b103da92 -> ded204a4a


http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
new file mode 100644
index 0000000..ea4e0e6
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+
+/**
+ * The writers for the specific writers of each type. This provides
+ * the generic API that they must all implement.
+ */
+public interface TreeWriter {
+
+  /**
+   * Estimate the memory currently used to buffer the stripe.
+   * @return the number of bytes
+   */
+  long estimateMemory();
+
+  /**
+   * Estimate the memory used if the file was read into Hive's Writable
+   * types. This is used as an estimate for the query optimizer.
+   * @return the number of bytes
+   */
+  long getRawDataSize();
+
+  /**
+   * Write a VectorizedRowBath to the file. This is called by the WriterImpl
+   * at the top level.
+   * @param batch the list of all of the columns
+   * @param offset the first row from the batch to write
+   * @param length the number of rows to write
+   */
+  void writeRootBatch(VectorizedRowBatch batch, int offset,
+                      int length) throws IOException;
+
+  /**
+   * Write a ColumnVector to the file. This is called recursively by
+   * writeRootBatch.
+   * @param vector the data to write
+   * @param offset the first value offset to write.
+   * @param length the number of values to write
+   */
+  void writeBatch(ColumnVector vector, int offset,
+                  int length) throws IOException;
+
+  /**
+   * Create a row index entry at the current point in the stripe.
+   */
+  void createRowIndexEntry() throws IOException;
+
+  /**
+   * Write the stripe out to the file.
+   * @param stripeFooter the stripe footer that contains the information about the
+   *                layout of the stripe. The TreeWriterBase is required to update
+   *                the footer with its information.
+   * @param stats the stripe statistics information
+   * @param requiredIndexEntries the number of index entries that are
+   *                             required. this is to check to make sure the
+   *                             row index is well formed.
+   */
+  void writeStripe(OrcProto.StripeFooter.Builder stripeFooter,
+                   OrcProto.StripeStatistics.Builder stats,
+                   int requiredIndexEntries) throws IOException;
+
+  /**
+   * During a stripe append, we need to update the file statistics.
+   * @param stripeStatistics the statistics for the new stripe
+   */
+  void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics);
+
+  /**
+   * Add the file statistics to the file footer.
+   * @param footer the file footer builder
+   */
+  void writeFileStatistics(OrcProto.Footer.Builder footer);
+
+  public class Factory {
+    public static TreeWriter create(TypeDescription schema,
+                                    WriterContext streamFactory,
+                                    boolean nullable) throws IOException {
+      switch (schema.getCategory()) {
+        case BOOLEAN:
+          return new BooleanTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case BYTE:
+          return new ByteTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case SHORT:
+        case INT:
+        case LONG:
+          return new IntegerTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case FLOAT:
+          return new FloatTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case DOUBLE:
+          return new DoubleTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case STRING:
+          return new StringTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case CHAR:
+          return new CharTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case VARCHAR:
+          return new VarcharTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case BINARY:
+          return new BinaryTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case TIMESTAMP:
+          return new TimestampTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case DATE:
+          return new DateTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case DECIMAL:
+          return new DecimalTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case STRUCT:
+          return new StructTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case MAP:
+          return new MapTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case LIST:
+          return new ListTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        case UNION:
+          return new UnionTreeWriter(schema.getId(),
+              schema, streamFactory, nullable);
+        default:
+          throw new IllegalArgumentException("Bad category: " +
+              schema.getCategory());
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
new file mode 100644
index 0000000..5cfde07
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.BitFieldWriter;
+import org.apache.orc.impl.ColumnStatisticsImpl;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.RunLengthIntegerWriter;
+import org.apache.orc.impl.RunLengthIntegerWriterV2;
+import org.apache.orc.impl.StreamName;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
+import org.apache.orc.util.BloomFilterUtf8;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * The parent class of all of the writers for each column. Each column
+ * is written by an instance of this class. The compound types (struct,
+ * list, map, and union) have children tree writers that write the children
+ * types.
+ */
+public abstract class TreeWriterBase implements TreeWriter {
+  protected final int id;
+  protected final BitFieldWriter isPresent;
+  private final boolean isCompressed;
+  protected final ColumnStatisticsImpl indexStatistics;
+  protected final ColumnStatisticsImpl stripeColStatistics;
+  protected final ColumnStatisticsImpl fileStatistics;
+  protected final RowIndexPositionRecorder rowIndexPosition;
+  private final OrcProto.RowIndex.Builder rowIndex;
+  private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+  protected final BloomFilter bloomFilter;
+  protected final BloomFilterUtf8 bloomFilterUtf8;
+  protected final boolean createBloomFilter;
+  private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+  private final OrcProto.BloomFilterIndex.Builder bloomFilterIndexUtf8;
+  protected final OrcProto.BloomFilter.Builder bloomFilterEntry;
+  private boolean foundNulls;
+  private OutStream isPresentOutStream;
+  private final WriterContext streamFactory;
+
+  /**
+   * Create a tree writer.
+   * @param columnId the column id of the column to write
+   * @param schema the row schema
+   * @param streamFactory limited access to the Writer's data.
+   * @param nullable can the value be null?
+   */
+  TreeWriterBase(int columnId,
+                 TypeDescription schema,
+                 WriterContext streamFactory,
+                 boolean nullable) throws IOException {
+    this.streamFactory = streamFactory;
+    this.isCompressed = streamFactory.isCompressed();
+    this.id = columnId;
+    if (nullable) {
+      isPresentOutStream = streamFactory.createStream(id,
+          OrcProto.Stream.Kind.PRESENT);
+      isPresent = new BitFieldWriter(isPresentOutStream, 1);
+    } else {
+      isPresent = null;
+    }
+    this.foundNulls = false;
+    createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
+    indexStatistics = ColumnStatisticsImpl.create(schema);
+    stripeColStatistics = ColumnStatisticsImpl.create(schema);
+    fileStatistics = ColumnStatisticsImpl.create(schema);
+    if (streamFactory.buildIndex()) {
+      rowIndex = OrcProto.RowIndex.newBuilder();
+      rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+      rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+    } else {
+      rowIndex = null;
+      rowIndexEntry = null;
+      rowIndexPosition = null;
+    }
+    if (createBloomFilter) {
+      bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+      if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
+        bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
+            streamFactory.getBloomFilterFPP());
+        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+      } else {
+        bloomFilter = null;
+        bloomFilterIndex = null;
+      }
+      bloomFilterUtf8 = new BloomFilterUtf8(streamFactory.getRowIndexStride(),
+          streamFactory.getBloomFilterFPP());
+      bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder();
+    } else {
+      bloomFilterEntry = null;
+      bloomFilterIndex = null;
+      bloomFilterIndexUtf8 = null;
+      bloomFilter = null;
+      bloomFilterUtf8 = null;
+    }
+  }
+
+  protected OrcProto.RowIndex.Builder getRowIndex() {
+    return rowIndex;
+  }
+
+  protected ColumnStatisticsImpl getStripeStatistics() {
+    return stripeColStatistics;
+  }
+
+  protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+    return rowIndexEntry;
+  }
+
+  IntegerWriter createIntegerWriter(PositionedOutputStream output,
+                                    boolean signed, boolean isDirectV2,
+                                    WriterContext writer) {
+    if (isDirectV2) {
+      boolean alignedBitpacking = false;
+      if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
+        alignedBitpacking = true;
+      }
+      return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
+    } else {
+      return new RunLengthIntegerWriter(output, signed);
+    }
+  }
+
+  boolean isNewWriteFormat(WriterContext writer) {
+    return writer.getVersion() != OrcFile.Version.V_0_11;
+  }
+
+  /**
+   * Handle the top level object write.
+   *
+   * This default method is used for all types except structs, which are the
+   * typical case. VectorizedRowBatch assumes the top level object is a
+   * struct, so we use the first column for all other types.
+   * @param batch the batch to write from
+   * @param offset the row to start on
+   * @param length the number of rows to write
+   */
+  public void writeRootBatch(VectorizedRowBatch batch, int offset,
+                             int length) throws IOException {
+    writeBatch(batch.cols[0], offset, length);
+  }
+
+  /**
+   * Write the values from the given vector from offset for length elements.
+   * @param vector the vector to write from
+   * @param offset the first value from the vector to write
+   * @param length the number of values from the vector to write
+   */
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    if (vector.noNulls) {
+      indexStatistics.increment(length);
+      if (isPresent != null) {
+        for (int i = 0; i < length; ++i) {
+          isPresent.write(1);
+        }
+      }
+    } else {
+      if (vector.isRepeating) {
+        boolean isNull = vector.isNull[0];
+        if (isPresent != null) {
+          for (int i = 0; i < length; ++i) {
+            isPresent.write(isNull ? 0 : 1);
+          }
+        }
+        if (isNull) {
+          foundNulls = true;
+          indexStatistics.setNull();
+        } else {
+          indexStatistics.increment(length);
+        }
+      } else {
+        // count the number of non-null values
+        int nonNullCount = 0;
+        for(int i = 0; i < length; ++i) {
+          boolean isNull = vector.isNull[i + offset];
+          if (!isNull) {
+            nonNullCount += 1;
+          }
+          if (isPresent != null) {
+            isPresent.write(isNull ? 0 : 1);
+          }
+        }
+        indexStatistics.increment(nonNullCount);
+        if (nonNullCount != length) {
+          foundNulls = true;
+          indexStatistics.setNull();
+        }
+      }
+    }
+  }
+
+  private void removeIsPresentPositions() {
+    for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+      OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+      List<Long> positions = entry.getPositionsList();
+      // bit streams use 3 positions if uncompressed, 4 if compressed
+      positions = positions.subList(isCompressed ? 4 : 3, positions.size());
+      entry.clearPositions();
+      entry.addAllPositions(positions);
+    }
+  }
+
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    if (isPresent != null) {
+      isPresent.flush();
+
+      // if no nulls are found in a stream, then suppress the stream
+      if(!foundNulls) {
+        isPresentOutStream.suppress();
+        // since isPresent bitstream is suppressed, update the index to
+        // remove the positions of the isPresent stream
+        if (rowIndex != null) {
+          removeIsPresentPositions();
+        }
+      }
+    }
+
+    // merge stripe-level column statistics to file statistics and write it to
+    // stripe statistics
+    fileStatistics.merge(stripeColStatistics);
+    stats.addColStats(stripeColStatistics.serialize());
+    stripeColStatistics.reset();
+
+    // reset the flag for next stripe
+    foundNulls = false;
+
+    builder.addColumns(getEncoding());
+    if (rowIndex != null) {
+      if (rowIndex.getEntryCount() != requiredIndexEntries) {
+        throw new IllegalArgumentException("Column has wrong number of " +
+             "index entries found: " + rowIndex.getEntryCount() + " expected: " +
+             requiredIndexEntries);
+      }
+      streamFactory.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
+      rowIndex.clear();
+      rowIndexEntry.clear();
+    }
+
+    // write the bloom filter to out stream
+    if (bloomFilterIndex != null) {
+      streamFactory.writeBloomFilter(new StreamName(id,
+          OrcProto.Stream.Kind.BLOOM_FILTER), bloomFilterIndex);
+      bloomFilterIndex.clear();
+    }
+    // write the bloom filter to out stream
+    if (bloomFilterIndexUtf8 != null) {
+      streamFactory.writeBloomFilter(new StreamName(id,
+          OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), bloomFilterIndexUtf8);
+      bloomFilterIndexUtf8.clear();
+    }
+  }
+
+  /**
+   * Get the encoding for this column.
+   * @return the information about the encoding of this column
+   */
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder builder =
+        OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    if (createBloomFilter) {
+      builder.setBloomEncoding(BloomFilterIO.Encoding.CURRENT.getId());
+    }
+    return builder;
+  }
+
+  /**
+   * Create a row index entry with the previous location and the current
+   * index statistics. Also merges the index statistics into the file
+   * statistics before they are cleared. Finally, it records the start of the
+   * next index and ensures all of the children columns also create an entry.
+   */
+  public void createRowIndexEntry() throws IOException {
+    stripeColStatistics.merge(indexStatistics);
+    rowIndexEntry.setStatistics(indexStatistics.serialize());
+    indexStatistics.reset();
+    rowIndex.addEntry(rowIndexEntry);
+    rowIndexEntry.clear();
+    addBloomFilterEntry();
+    recordPosition(rowIndexPosition);
+  }
+
+  void addBloomFilterEntry() {
+    if (createBloomFilter) {
+      if (bloomFilter != null) {
+        BloomFilterIO.serialize(bloomFilterEntry, bloomFilter);
+        bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+        bloomFilter.reset();
+      }
+      if (bloomFilterUtf8 != null) {
+        BloomFilterIO.serialize(bloomFilterEntry, bloomFilterUtf8);
+        bloomFilterIndexUtf8.addBloomFilter(bloomFilterEntry.build());
+        bloomFilterUtf8.reset();
+      }
+    }
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    fileStatistics.merge(ColumnStatisticsImpl.deserialize(stats.getColStats(id)));
+  }
+
+  /**
+   * Record the current position in each of this column's streams.
+   * @param recorder where should the locations be recorded
+   */
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    if (isPresent != null) {
+      isPresent.getPosition(recorder);
+    }
+  }
+
+  /**
+   * Estimate how much memory the writer is consuming excluding the streams.
+   * @return the number of bytes.
+   */
+  public long estimateMemory() {
+    long result = 0;
+    if (isPresent != null) {
+      result = isPresentOutStream.getBufferSize();
+    }
+    return result;
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    footer.addStatistics(fileStatistics.serialize());
+  }
+
+  static class RowIndexPositionRecorder implements PositionRecorder {
+    private final OrcProto.RowIndexEntry.Builder builder;
+
+    RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+      this.builder = builder;
+    }
+
+    @Override
+    public void addPosition(long position) {
+      builder.addPositions(position);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java
new file mode 100644
index 0000000..5047f01
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.RunLengthByteWriter;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.util.List;
+
+public class UnionTreeWriter extends TreeWriterBase {
+  private final RunLengthByteWriter tags;
+  private final TreeWriter[] childrenWriters;
+
+  UnionTreeWriter(int columnId,
+                  TypeDescription schema,
+                  WriterContext writer,
+                  boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    List<TypeDescription> children = schema.getChildren();
+    childrenWriters = new TreeWriterBase[children.size()];
+    for (int i = 0; i < childrenWriters.length; ++i) {
+      childrenWriters[i] = Factory.create(children.get(i), writer, true);
+    }
+    tags =
+        new RunLengthByteWriter(writer.createStream(columnId,
+            OrcProto.Stream.Kind.DATA));
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    UnionColumnVector vec = (UnionColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        byte tag = (byte) vec.tags[0];
+        for (int i = 0; i < length; ++i) {
+          tags.write(tag);
+        }
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(tag);
+          }
+          bloomFilterUtf8.addLong(tag);
+        }
+        childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
+      }
+    } else {
+      // write the records in runs of the same tag
+      int[] currentStart = new int[vec.fields.length];
+      int[] currentLength = new int[vec.fields.length];
+      for (int i = 0; i < length; ++i) {
+        // only need to deal with the non-nulls, since the nulls were dealt
+        // with in the super method.
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          byte tag = (byte) vec.tags[offset + i];
+          tags.write(tag);
+          if (currentLength[tag] == 0) {
+            // start a new sequence
+            currentStart[tag] = i + offset;
+            currentLength[tag] = 1;
+          } else if (currentStart[tag] + currentLength[tag] == i + offset) {
+            // ok, we are extending the current run for that tag.
+            currentLength[tag] += 1;
+          } else {
+            // otherwise, we need to close off the old run and start a new one
+            childrenWriters[tag].writeBatch(vec.fields[tag],
+                currentStart[tag], currentLength[tag]);
+            currentStart[tag] = i + offset;
+            currentLength[tag] = 1;
+          }
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(tag);
+            }
+            bloomFilterUtf8.addLong(tag);
+          }
+        }
+      }
+      // write out any left over sequences
+      for (int tag = 0; tag < currentStart.length; ++tag) {
+        if (currentLength[tag] != 0) {
+          childrenWriters[tag].writeBatch(vec.fields[tag], currentStart[tag],
+              currentLength[tag]);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    super.createRowIndexEntry();
+    for (TreeWriter child : childrenWriters) {
+      child.createRowIndexEntry();
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    tags.flush();
+    for (TreeWriter child : childrenWriters) {
+      child.writeStripe(builder, stats, requiredIndexEntries);
+    }
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    tags.getPosition(recorder);
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    super.updateFileStatistics(stats);
+    for (TreeWriter child : childrenWriters) {
+      child.updateFileStatistics(stats);
+    }
+  }
+
+  @Override
+  public long estimateMemory() {
+    long children = 0;
+    for (TreeWriter writer : childrenWriters) {
+      children += writer.estimateMemory();
+    }
+    return children + super.estimateMemory() + tags.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long result = 0;
+    for (TreeWriter writer : childrenWriters) {
+      result += writer.getRawDataSize();
+    }
+    return result;
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    super.writeFileStatistics(footer);
+    for (TreeWriter child : childrenWriters) {
+      child.writeFileStatistics(footer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java
new file mode 100644
index 0000000..17d3f61
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Under the covers, varchar is written to ORC the same way as string.
+ */
+public class VarcharTreeWriter extends StringBaseTreeWriter {
+  private final int maxLength;
+
+  VarcharTreeWriter(int columnId,
+                    TypeDescription schema,
+                    WriterContext writer,
+                    boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    maxLength = schema.getMaxLength();
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    BytesColumnVector vec = (BytesColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int itemLength = Math.min(vec.length[0], maxLength);
+        if (useDictionaryEncoding) {
+          int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
+          for(int i=0; i < length; ++i) {
+            rows.add(id);
+          }
+        } else {
+          for(int i=0; i < length; ++i) {
+            directStreamOutput.write(vec.vector[0], vec.start[0],
+                itemLength);
+            lengthOutput.write(itemLength);
+          }
+        }
+        indexStatistics.updateString(vec.vector[0], vec.start[0],
+            itemLength, length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            // translate from UTF-8 to the default charset
+            bloomFilter.addString(new String(vec.vector[0],
+                vec.start[0], itemLength,
+                StandardCharsets.UTF_8));
+          }
+          bloomFilterUtf8.addBytes(vec.vector[0],
+              vec.start[0], itemLength);
+        }
+      }
+    } else {
+      for(int i=0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          int itemLength = Math.min(vec.length[offset + i], maxLength);
+          if (useDictionaryEncoding) {
+            rows.add(dictionary.add(vec.vector[offset + i],
+                vec.start[offset + i], itemLength));
+          } else {
+            directStreamOutput.write(vec.vector[offset + i],
+                vec.start[offset + i], itemLength);
+            lengthOutput.write(itemLength);
+          }
+          indexStatistics.updateString(vec.vector[offset + i],
+              vec.start[offset + i], itemLength, 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              // translate from UTF-8 to the default charset
+              bloomFilter.addString(new String(vec.vector[offset + i],
+                  vec.start[offset + i], itemLength,
+                  StandardCharsets.UTF_8));
+            }
+            bloomFilterUtf8.addBytes(vec.vector[offset + i],
+                vec.start[offset + i], itemLength);
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
new file mode 100644
index 0000000..f11d519
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.StreamName;
+
+import java.io.IOException;
+
+public interface WriterContext {
+
+  /**
+     * Create a stream to store part of a column.
+     * @param column the column id for the stream
+     * @param kind the kind of stream
+     * @return The output outStream that the section needs to be written to.
+     */
+    OutStream createStream(int column,
+                           OrcProto.Stream.Kind kind
+                           ) throws IOException;
+
+    /**
+     * Get the stride rate of the row index.
+     */
+    int getRowIndexStride();
+
+    /**
+     * Should be building the row index.
+     * @return true if we are building the index
+     */
+    boolean buildIndex();
+
+    /**
+     * Is the ORC file compressed?
+     * @return are the streams compressed
+     */
+    boolean isCompressed();
+
+    /**
+     * Get the encoding strategy to use.
+     * @return encoding strategy
+     */
+    OrcFile.EncodingStrategy getEncodingStrategy();
+
+    /**
+     * Get the bloom filter columns
+     * @return bloom filter columns
+     */
+    boolean[] getBloomFilterColumns();
+
+    /**
+     * Get bloom filter false positive percentage.
+     * @return fpp
+     */
+    double getBloomFilterFPP();
+
+    /**
+     * Get the writer's configuration.
+     * @return configuration
+     */
+    Configuration getConfiguration();
+
+    /**
+     * Get the version of the file to write.
+     */
+    OrcFile.Version getVersion();
+
+    OrcFile.BloomFilterVersion getBloomFilterVersion();
+
+    void writeIndex(StreamName name,
+                    OrcProto.RowIndex.Builder index) throws IOException;
+
+    void writeBloomFilter(StreamName name,
+                          OrcProto.BloomFilterIndex.Builder bloom
+                          ) throws IOException;
+}


[4/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.

Posted by om...@apache.org.
ORC-194. Split TreeWriters out of WriterImpl.

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/ded204a4
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/ded204a4
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/ded204a4

Branch: refs/heads/master
Commit: ded204a4a10bfad1ed739fc98f612a41005640c5
Parents: 8b103da
Author: Owen O'Malley <om...@apache.org>
Authored: Wed May 17 16:12:01 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri Jun 16 11:29:28 2017 -0700

----------------------------------------------------------------------
 .../org/apache/orc/impl/TreeReaderFactory.java  |    6 +-
 .../java/org/apache/orc/impl/WriterImpl.java    | 2639 +-----------------
 .../orc/impl/writer/BinaryTreeWriter.java       |  137 +
 .../orc/impl/writer/BooleanTreeWriter.java      |   99 +
 .../apache/orc/impl/writer/ByteTreeWriter.java  |  109 +
 .../apache/orc/impl/writer/CharTreeWriter.java  |  122 +
 .../apache/orc/impl/writer/DateTreeWriter.java  |  124 +
 .../orc/impl/writer/DecimalTreeWriter.java      |  142 +
 .../orc/impl/writer/DoubleTreeWriter.java       |  112 +
 .../apache/orc/impl/writer/FloatTreeWriter.java |  113 +
 .../orc/impl/writer/IntegerTreeWriter.java      |  127 +
 .../apache/orc/impl/writer/ListTreeWriter.java  |  162 ++
 .../apache/orc/impl/writer/MapTreeWriter.java   |  173 ++
 .../orc/impl/writer/StringBaseTreeWriter.java   |  288 ++
 .../orc/impl/writer/StringTreeWriter.java       |   93 +
 .../orc/impl/writer/StructTreeWriter.java       |  156 ++
 .../orc/impl/writer/TimestampTreeWriter.java    |  165 ++
 .../org/apache/orc/impl/writer/TreeWriter.java  |  160 ++
 .../apache/orc/impl/writer/TreeWriterBase.java  |  374 +++
 .../apache/orc/impl/writer/UnionTreeWriter.java |  176 ++
 .../orc/impl/writer/VarcharTreeWriter.java      |  103 +
 .../apache/orc/impl/writer/WriterContext.java   |   95 +
 22 files changed, 3161 insertions(+), 2514 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 4b369af..7e5c452 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.OrcProto;
+import org.apache.orc.impl.writer.TimestampTreeWriter;
 
 /**
  * Factory for creating ORC tree readers.
@@ -938,7 +939,8 @@ public class TreeReaderFactory {
         threadLocalDateFormat.get().setTimeZone(writerTimeZone);
         try {
           long epoch = threadLocalDateFormat.get()
-            .parse(WriterImpl.BASE_TIMESTAMP_STRING).getTime() / WriterImpl.MILLIS_PER_SECOND;
+            .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
+              TimestampTreeWriter.MILLIS_PER_SECOND;
           baseTimestampMap.put(timeZoneId, epoch);
           return epoch;
         } catch (ParseException e) {
@@ -977,7 +979,7 @@ public class TreeReaderFactory {
           if (millis < 0 && newNanos != 0) {
             millis -= 1;
           }
-          millis *= WriterImpl.MILLIS_PER_SECOND;
+          millis *= TimestampTreeWriter.MILLIS_PER_SECOND;
           long offset = 0;
           // If reader and writer time zones have different rules, adjust the timezone difference
           // between reader and writer taking day light savings into account.


[2/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.

Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java
new file mode 100644
index 0000000..5835b5a
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.BinaryColumnStatistics;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+
+import java.io.IOException;
+
+public class BinaryTreeWriter extends TreeWriterBase {
+  private final PositionedOutputStream stream;
+  private final IntegerWriter length;
+  private boolean isDirectV2 = true;
+
+  public BinaryTreeWriter(int columnId,
+                          TypeDescription schema,
+                          WriterContext writer,
+                          boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.stream = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    this.length = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    BytesColumnVector vec = (BytesColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        for (int i = 0; i < length; ++i) {
+          stream.write(vec.vector[0], vec.start[0],
+              vec.length[0]);
+          this.length.write(vec.length[0]);
+        }
+        indexStatistics.updateBinary(vec.vector[0], vec.start[0],
+            vec.length[0], length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+          }
+          bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          stream.write(vec.vector[offset + i],
+              vec.start[offset + i], vec.length[offset + i]);
+          this.length.write(vec.length[offset + i]);
+          indexStatistics.updateBinary(vec.vector[offset + i],
+              vec.start[offset + i], vec.length[offset + i], 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+            }
+            bloomFilterUtf8.addBytes(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+          }
+        }
+      }
+    }
+  }
+
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    stream.flush();
+    length.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    stream.getPosition(recorder);
+    length.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + stream.getBufferSize() +
+        length.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    // get total length of binary blob
+    BinaryColumnStatistics bcs = (BinaryColumnStatistics) fileStatistics;
+    return bcs.getSum();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java
new file mode 100644
index 0000000..5f572bd
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.BitFieldWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+
+import java.io.IOException;
+
+public class BooleanTreeWriter extends TreeWriterBase {
+  private final BitFieldWriter writer;
+
+  public BooleanTreeWriter(int columnId,
+                           TypeDescription schema,
+                           WriterContext writer,
+                           boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    PositionedOutputStream out = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.writer = new BitFieldWriter(out, 1);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    LongColumnVector vec = (LongColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int value = vec.vector[0] == 0 ? 0 : 1;
+        indexStatistics.updateBoolean(value != 0, length);
+        for (int i = 0; i < length; ++i) {
+          writer.write(value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          int value = vec.vector[i + offset] == 0 ? 0 : 1;
+          writer.write(value);
+          indexStatistics.updateBoolean(value != 0, 1);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    writer.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    writer.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + writer.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long num = fileStatistics.getNumberOfValues();
+    return num * JavaDataModel.get().primitive1();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java
new file mode 100644
index 0000000..edd6411
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.RunLengthByteWriter;
+
+import java.io.IOException;
+
+public class ByteTreeWriter extends TreeWriterBase {
+  private final RunLengthByteWriter writer;
+
+  public ByteTreeWriter(int columnId,
+                        TypeDescription schema,
+                        WriterContext writer,
+                        boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.writer = new RunLengthByteWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.DATA));
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    LongColumnVector vec = (LongColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        byte value = (byte) vec.vector[0];
+        indexStatistics.updateInteger(value, length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(value);
+          }
+          bloomFilterUtf8.addLong(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          writer.write(value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          byte value = (byte) vec.vector[i + offset];
+          writer.write(value);
+          indexStatistics.updateInteger(value, 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(value);
+            }
+            bloomFilterUtf8.addLong(value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    writer.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    writer.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + writer.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long num = fileStatistics.getNumberOfValues();
+    return num * JavaDataModel.get().primitive1();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java
new file mode 100644
index 0000000..92a6bab
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.TypeDescription;import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Under the covers, char is written to ORC the same way as string.
+ */
+public class CharTreeWriter extends StringBaseTreeWriter {
+  private final int itemLength;
+  private final byte[] padding;
+
+  CharTreeWriter(int columnId,
+                 TypeDescription schema,
+                 WriterContext writer,
+                 boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    itemLength = schema.getMaxLength();
+    padding = new byte[itemLength];
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    BytesColumnVector vec = (BytesColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        byte[] ptr;
+        int ptrOffset;
+        if (vec.length[0] >= itemLength) {
+          ptr = vec.vector[0];
+          ptrOffset = vec.start[0];
+        } else {
+          ptr = padding;
+          ptrOffset = 0;
+          System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
+              vec.length[0]);
+          Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
+        }
+        if (useDictionaryEncoding) {
+          int id = dictionary.add(ptr, ptrOffset, itemLength);
+          for(int i=0; i < length; ++i) {
+            rows.add(id);
+          }
+        } else {
+          for(int i=0; i < length; ++i) {
+            directStreamOutput.write(ptr, ptrOffset, itemLength);
+            lengthOutput.write(itemLength);
+          }
+        }
+        indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            // translate from UTF-8 to the default charset
+            bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+                vec.length[0], StandardCharsets.UTF_8));
+          }
+          bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+        }
+      }
+    } else {
+      for(int i=0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          byte[] ptr;
+          int ptrOffset;
+          if (vec.length[offset + i] >= itemLength) {
+            ptr = vec.vector[offset + i];
+            ptrOffset = vec.start[offset + i];
+          } else {
+            // it is the wrong length, so copy it
+            ptr = padding;
+            ptrOffset = 0;
+            System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
+                ptr, 0, vec.length[offset + i]);
+            Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
+          }
+          if (useDictionaryEncoding) {
+            rows.add(dictionary.add(ptr, ptrOffset, itemLength));
+          } else {
+            directStreamOutput.write(ptr, ptrOffset, itemLength);
+            lengthOutput.write(itemLength);
+          }
+          indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              // translate from UTF-8 to the default charset
+              bloomFilter.addString(new String(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i],
+                  StandardCharsets.UTF_8));
+            }
+            bloomFilterUtf8.addBytes(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java
new file mode 100644
index 0000000..d15fb13
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+
+import java.io.IOException;
+
+public class DateTreeWriter extends TreeWriterBase {
+  private final IntegerWriter writer;
+  private final boolean isDirectV2;
+
+  public DateTreeWriter(int columnId,
+                        TypeDescription schema,
+                        WriterContext writer,
+                        boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    OutStream out = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    LongColumnVector vec = (LongColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int value = (int) vec.vector[0];
+        indexStatistics.updateDate(value);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(value);
+          }
+          bloomFilterUtf8.addLong(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          writer.write(value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          int value = (int) vec.vector[i + offset];
+          writer.write(value);
+          indexStatistics.updateDate(value);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(value);
+            }
+            bloomFilterUtf8.addLong(value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    writer.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    writer.getPosition(recorder);
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + writer.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return fileStatistics.getNumberOfValues() *
+        JavaDataModel.get().lengthOfDate();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java
new file mode 100644
index 0000000..0428253
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+
+import java.io.IOException;
+
+public class DecimalTreeWriter extends TreeWriterBase {
+  private final PositionedOutputStream valueStream;
+
+  // These scratch buffers allow us to serialize decimals much faster.
+  private final long[] scratchLongs;
+  private final byte[] scratchBuffer;
+
+  private final IntegerWriter scaleStream;
+  private final boolean isDirectV2;
+
+  public DecimalTreeWriter(int columnId,
+                           TypeDescription schema,
+                           WriterContext writer,
+                           boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+    scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN];
+    scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+    this.scaleStream = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    DecimalColumnVector vec = (DecimalColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        HiveDecimalWritable value = vec.vector[0];
+        indexStatistics.updateDecimal(value);
+        if (createBloomFilter) {
+          String str = value.toString(scratchBuffer);
+          if (bloomFilter != null) {
+            bloomFilter.addString(str);
+          }
+          bloomFilterUtf8.addString(str);
+        }
+        for (int i = 0; i < length; ++i) {
+          value.serializationUtilsWrite(valueStream,
+              scratchLongs);
+          scaleStream.write(value.scale());
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          HiveDecimalWritable value = vec.vector[i + offset];
+          value.serializationUtilsWrite(valueStream, scratchLongs);
+          scaleStream.write(value.scale());
+          indexStatistics.updateDecimal(value);
+          if (createBloomFilter) {
+            String str = value.toString(scratchBuffer);
+            if (bloomFilter != null) {
+              bloomFilter.addString(str);
+            }
+            bloomFilterUtf8.addString(str);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    valueStream.flush();
+    scaleStream.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    valueStream.getPosition(recorder);
+    scaleStream.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + valueStream.getBufferSize() +
+        scaleStream.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return fileStatistics.getNumberOfValues() *
+        JavaDataModel.get().lengthOfDecimal();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java
new file mode 100644
index 0000000..d2c0db2
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.SerializationUtils;
+
+import java.io.IOException;
+
+public class DoubleTreeWriter extends TreeWriterBase {
+  private final PositionedOutputStream stream;
+  private final SerializationUtils utils;
+
+  public DoubleTreeWriter(int columnId,
+                          TypeDescription schema,
+                          WriterContext writer,
+                          boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.stream = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.utils = new SerializationUtils();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    DoubleColumnVector vec = (DoubleColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        double value = vec.vector[0];
+        indexStatistics.updateDouble(value);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addDouble(value);
+          }
+          bloomFilterUtf8.addDouble(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          utils.writeDouble(stream, value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          double value = vec.vector[i + offset];
+          utils.writeDouble(stream, value);
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addDouble(value);
+            }
+            bloomFilterUtf8.addDouble(value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    stream.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    stream.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + stream.getBufferSize();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long num = fileStatistics.getNumberOfValues();
+    return num * JavaDataModel.get().primitive2();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java
new file mode 100644
index 0000000..c825bf1
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.SerializationUtils;
+
+import java.io.IOException;
+
+public class FloatTreeWriter extends TreeWriterBase {
+  private final PositionedOutputStream stream;
+  private final SerializationUtils utils;
+
+  public FloatTreeWriter(int columnId,
+                         TypeDescription schema,
+                         WriterContext writer,
+                         boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.stream = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.utils = new SerializationUtils();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    DoubleColumnVector vec = (DoubleColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        float value = (float) vec.vector[0];
+        indexStatistics.updateDouble(value);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addDouble(value);
+          }
+          bloomFilterUtf8.addDouble(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          utils.writeFloat(stream, value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          float value = (float) vec.vector[i + offset];
+          utils.writeFloat(stream, value);
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addDouble(value);
+            }
+            bloomFilterUtf8.addDouble(value);
+          }
+        }
+      }
+    }
+  }
+
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    stream.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    stream.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + stream.getBufferSize();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long num = fileStatistics.getNumberOfValues();
+    return num * JavaDataModel.get().primitive1();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java
new file mode 100644
index 0000000..6036ef5
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+
+import java.io.IOException;
+
+public class IntegerTreeWriter extends TreeWriterBase {
+  private final IntegerWriter writer;
+  private boolean isDirectV2 = true;
+  private final boolean isLong;
+
+  public IntegerTreeWriter(int columnId,
+                           TypeDescription schema,
+                           WriterContext writer,
+                           boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    OutStream out = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+    this.isLong = schema.getCategory() == TypeDescription.Category.LONG;
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    LongColumnVector vec = (LongColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        long value = vec.vector[0];
+        indexStatistics.updateInteger(value, length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(value);
+          }
+          bloomFilterUtf8.addLong(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          writer.write(value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          long value = vec.vector[i + offset];
+          writer.write(value);
+          indexStatistics.updateInteger(value, 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(value);
+            }
+            bloomFilterUtf8.addLong(value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    writer.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    writer.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + writer.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long num = fileStatistics.getNumberOfValues();
+    return num * (isLong ? jdm.primitive2() : jdm.primitive1());
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java
new file mode 100644
index 0000000..2c5bd50
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+
+public class ListTreeWriter extends TreeWriterBase {
+  private final IntegerWriter lengths;
+  private final boolean isDirectV2;
+  private final TreeWriter childWriter;
+
+  ListTreeWriter(int columnId,
+                 TypeDescription schema,
+                 WriterContext writer,
+                 boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    childWriter = Factory.create(schema.getChildren().get(0), writer, true);
+    lengths = createIntegerWriter(writer.createStream(columnId,
+        OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    super.createRowIndexEntry();
+    childWriter.createRowIndexEntry();
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    ListColumnVector vec = (ListColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int childOffset = (int) vec.offsets[0];
+        int childLength = (int) vec.lengths[0];
+        for (int i = 0; i < length; ++i) {
+          lengths.write(childLength);
+          childWriter.writeBatch(vec.child, childOffset, childLength);
+        }
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(childLength);
+          }
+          bloomFilterUtf8.addLong(childLength);
+        }
+      }
+    } else {
+      // write the elements in runs
+      int currentOffset = 0;
+      int currentLength = 0;
+      for (int i = 0; i < length; ++i) {
+        if (!vec.isNull[i + offset]) {
+          int nextLength = (int) vec.lengths[offset + i];
+          int nextOffset = (int) vec.offsets[offset + i];
+          lengths.write(nextLength);
+          if (currentLength == 0) {
+            currentOffset = nextOffset;
+            currentLength = nextLength;
+          } else if (currentOffset + currentLength != nextOffset) {
+            childWriter.writeBatch(vec.child, currentOffset,
+                currentLength);
+            currentOffset = nextOffset;
+            currentLength = nextLength;
+          } else {
+            currentLength += nextLength;
+          }
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(nextLength);
+            }
+            bloomFilterUtf8.addLong(nextLength);
+          }
+        }
+      }
+      if (currentLength != 0) {
+        childWriter.writeBatch(vec.child, currentOffset,
+            currentLength);
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    lengths.flush();
+    childWriter.writeStripe(builder, stats, requiredIndexEntries);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    lengths.getPosition(recorder);
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    super.updateFileStatistics(stats);
+    childWriter.updateFileStatistics(stats);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + lengths.estimateMemory() +
+        childWriter.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return childWriter.getRawDataSize();
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    super.writeFileStatistics(footer);
+    childWriter.writeFileStatistics(footer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java
new file mode 100644
index 0000000..26ace05
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class MapTreeWriter extends TreeWriterBase {
+  private final IntegerWriter lengths;
+  private final boolean isDirectV2;
+  private final TreeWriter keyWriter;
+  private final TreeWriter valueWriter;
+
+  MapTreeWriter(int columnId,
+                TypeDescription schema,
+                WriterContext writer,
+                boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    List<TypeDescription> children = schema.getChildren();
+    keyWriter = Factory.create(children.get(0), writer, true);
+    valueWriter = Factory.create(children.get(1), writer, true);
+    lengths = createIntegerWriter(writer.createStream(columnId,
+        OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    super.createRowIndexEntry();
+    keyWriter.createRowIndexEntry();
+    valueWriter.createRowIndexEntry();
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    MapColumnVector vec = (MapColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int childOffset = (int) vec.offsets[0];
+        int childLength = (int) vec.lengths[0];
+        for (int i = 0; i < length; ++i) {
+          lengths.write(childLength);
+          keyWriter.writeBatch(vec.keys, childOffset, childLength);
+          valueWriter.writeBatch(vec.values, childOffset, childLength);
+        }
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(childLength);
+          }
+          bloomFilterUtf8.addLong(childLength);
+        }
+      }
+    } else {
+      // write the elements in runs
+      int currentOffset = 0;
+      int currentLength = 0;
+      for (int i = 0; i < length; ++i) {
+        if (!vec.isNull[i + offset]) {
+          int nextLength = (int) vec.lengths[offset + i];
+          int nextOffset = (int) vec.offsets[offset + i];
+          lengths.write(nextLength);
+          if (currentLength == 0) {
+            currentOffset = nextOffset;
+            currentLength = nextLength;
+          } else if (currentOffset + currentLength != nextOffset) {
+            keyWriter.writeBatch(vec.keys, currentOffset,
+                currentLength);
+            valueWriter.writeBatch(vec.values, currentOffset,
+                currentLength);
+            currentOffset = nextOffset;
+            currentLength = nextLength;
+          } else {
+            currentLength += nextLength;
+          }
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(nextLength);
+            }
+            bloomFilterUtf8.addLong(nextLength);
+          }
+        }
+      }
+      if (currentLength != 0) {
+        keyWriter.writeBatch(vec.keys, currentOffset,
+            currentLength);
+        valueWriter.writeBatch(vec.values, currentOffset,
+            currentLength);
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    lengths.flush();
+    keyWriter.writeStripe(builder, stats, requiredIndexEntries);
+    valueWriter.writeStripe(builder, stats, requiredIndexEntries);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    lengths.getPosition(recorder);
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    super.updateFileStatistics(stats);
+    keyWriter.updateFileStatistics(stats);
+    valueWriter.updateFileStatistics(stats);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + lengths.estimateMemory() +
+        keyWriter.estimateMemory() + valueWriter.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return keyWriter.getRawDataSize() + valueWriter.getRawDataSize();
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    super.writeFileStatistics(footer);
+    keyWriter.writeFileStatistics(footer);
+    valueWriter.writeFileStatistics(footer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
new file mode 100644
index 0000000..f49cb7f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.DynamicIntArray;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.StringRedBlackTree;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class StringBaseTreeWriter extends TreeWriterBase {
+  private static final int INITIAL_DICTIONARY_SIZE = 4096;
+  private final OutStream stringOutput;
+  protected final IntegerWriter lengthOutput;
+  private final IntegerWriter rowOutput;
+  protected final StringRedBlackTree dictionary =
+      new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
+  protected final DynamicIntArray rows = new DynamicIntArray();
+  protected final PositionedOutputStream directStreamOutput;
+  private final List<OrcProto.RowIndexEntry> savedRowIndex =
+      new ArrayList<>();
+  private final boolean buildIndex;
+  private final List<Long> rowIndexValueCount = new ArrayList<>();
+  // If the number of keys in a dictionary is greater than this fraction of
+  //the total number of non-null rows, turn off dictionary encoding
+  private final double dictionaryKeySizeThreshold;
+  protected boolean useDictionaryEncoding = true;
+  private boolean isDirectV2 = true;
+  private boolean doneDictionaryCheck;
+  private final boolean strideDictionaryCheck;
+
+  StringBaseTreeWriter(int columnId,
+                       TypeDescription schema,
+                       WriterContext writer,
+                       boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+    stringOutput = writer.createStream(id,
+        OrcProto.Stream.Kind.DICTIONARY_DATA);
+    lengthOutput = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+    rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2,
+        writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+    rowIndexValueCount.add(0L);
+    buildIndex = writer.buildIndex();
+    Configuration conf = writer.getConfiguration();
+    dictionaryKeySizeThreshold =
+        OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+    strideDictionaryCheck =
+        OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
+    doneDictionaryCheck = false;
+  }
+
+  private void checkDictionaryEncoding() {
+    if (!doneDictionaryCheck) {
+      // Set the flag indicating whether or not to use dictionary encoding
+      // based on whether or not the fraction of distinct keys over number of
+      // non-null rows is less than the configured threshold
+      float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
+      useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
+      doneDictionaryCheck = true;
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
+    // checking would not have happened. So do it again here.
+    checkDictionaryEncoding();
+
+    if (useDictionaryEncoding) {
+      flushDictionary();
+    } else {
+      // flushout any left over entries from dictionary
+      if (rows.size() > 0) {
+        flushDictionary();
+      }
+
+      // suppress the stream for every stripe if dictionary is disabled
+      stringOutput.suppress();
+    }
+
+    // we need to build the rowindex before calling super, since it
+    // writes it out.
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    if (useDictionaryEncoding) {
+      stringOutput.flush();
+      lengthOutput.flush();
+      rowOutput.flush();
+    } else {
+      directStreamOutput.flush();
+      lengthOutput.flush();
+    }
+    // reset all of the fields to be ready for the next stripe.
+    dictionary.clear();
+    savedRowIndex.clear();
+    rowIndexValueCount.clear();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+    rowIndexValueCount.add(0L);
+
+    if (!useDictionaryEncoding) {
+      // record the start positions of first index stride of next stripe i.e
+      // beginning of the direct streams when dictionary is disabled
+      recordDirectStreamPosition();
+    }
+  }
+
+  private void flushDictionary() throws IOException {
+    final int[] dumpOrder = new int[dictionary.size()];
+
+    if (useDictionaryEncoding) {
+      // Write the dictionary by traversing the red-black tree writing out
+      // the bytes and lengths; and creating the map from the original order
+      // to the final sorted order.
+
+      dictionary.visit(new StringRedBlackTree.Visitor() {
+        private int currentId = 0;
+
+        @Override
+        public void visit(StringRedBlackTree.VisitorContext context
+        ) throws IOException {
+          context.writeBytes(stringOutput);
+          lengthOutput.write(context.getLength());
+          dumpOrder[context.getOriginalPosition()] = currentId++;
+        }
+      });
+    } else {
+      // for direct encoding, we don't want the dictionary data stream
+      stringOutput.suppress();
+    }
+    int length = rows.size();
+    int rowIndexEntry = 0;
+    OrcProto.RowIndex.Builder rowIndex = getRowIndex();
+    Text text = new Text();
+    // write the values translated into the dump order.
+    for (int i = 0; i <= length; ++i) {
+      // now that we are writing out the row values, we can finalize the
+      // row index
+      if (buildIndex) {
+        while (i == rowIndexValueCount.get(rowIndexEntry) &&
+            rowIndexEntry < savedRowIndex.size()) {
+          OrcProto.RowIndexEntry.Builder base =
+              savedRowIndex.get(rowIndexEntry++).toBuilder();
+          if (useDictionaryEncoding) {
+            rowOutput.getPosition(new RowIndexPositionRecorder(base));
+          } else {
+            PositionRecorder posn = new RowIndexPositionRecorder(base);
+            directStreamOutput.getPosition(posn);
+            lengthOutput.getPosition(posn);
+          }
+          rowIndex.addEntry(base.build());
+        }
+      }
+      if (i != length) {
+        if (useDictionaryEncoding) {
+          rowOutput.write(dumpOrder[rows.get(i)]);
+        } else {
+          dictionary.getText(text, rows.get(i));
+          directStreamOutput.write(text.getBytes(), 0, text.getLength());
+          lengthOutput.write(text.getLength());
+        }
+      }
+    }
+    rows.clear();
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (useDictionaryEncoding) {
+      result.setDictionarySize(dictionary.size());
+      if (isDirectV2) {
+        result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2);
+      } else {
+        result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY);
+      }
+    } else {
+      if (isDirectV2) {
+        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+      } else {
+        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * This method doesn't call the super method, because unlike most of the
+   * other TreeWriters, this one can't record the position in the streams
+   * until the stripe is being flushed. Therefore it saves all of the entries
+   * and augments them with the final information as the stripe is written.
+   */
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    getStripeStatistics().merge(indexStatistics);
+    OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
+    rowIndexEntry.setStatistics(indexStatistics.serialize());
+    indexStatistics.reset();
+    OrcProto.RowIndexEntry base = rowIndexEntry.build();
+    savedRowIndex.add(base);
+    rowIndexEntry.clear();
+    addBloomFilterEntry();
+    recordPosition(rowIndexPosition);
+    rowIndexValueCount.add((long) rows.size());
+    if (strideDictionaryCheck) {
+      checkDictionaryEncoding();
+    }
+    if (!useDictionaryEncoding) {
+      if (rows.size() > 0) {
+        flushDictionary();
+        // just record the start positions of next index stride
+        recordDirectStreamPosition();
+      } else {
+        // record the start positions of next index stride
+        recordDirectStreamPosition();
+        getRowIndex().addEntry(base);
+      }
+    }
+  }
+
+  private void recordDirectStreamPosition() throws IOException {
+    if (rowIndexPosition != null) {
+      directStreamOutput.getPosition(rowIndexPosition);
+      lengthOutput.getPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public long estimateMemory() {
+    long parent = super.estimateMemory();
+    if (useDictionaryEncoding) {
+      return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes();
+    } else {
+      return parent + lengthOutput.estimateMemory() +
+          directStreamOutput.getBufferSize();
+    }
+  }
+
+  @Override
+  public long getRawDataSize() {
+    // ORC strings are converted to java Strings. so use JavaDataModel to
+    // compute the overall size of strings
+    StringColumnStatistics scs = (StringColumnStatistics) fileStatistics;
+    long numVals = fileStatistics.getNumberOfValues();
+    if (numVals == 0) {
+      return 0;
+    } else {
+      int avgSize = (int) (scs.getSum() / numVals);
+      return numVals * JavaDataModel.get().lengthForStringOfLength(avgSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java
new file mode 100644
index 0000000..ab6f38f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class StringTreeWriter extends StringBaseTreeWriter {
+  StringTreeWriter(int columnId,
+                   TypeDescription schema,
+                   WriterContext writer,
+                   boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    BytesColumnVector vec = (BytesColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        if (useDictionaryEncoding) {
+          int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
+          for (int i = 0; i < length; ++i) {
+            rows.add(id);
+          }
+        } else {
+          for (int i = 0; i < length; ++i) {
+            directStreamOutput.write(vec.vector[0], vec.start[0],
+                vec.length[0]);
+            lengthOutput.write(vec.length[0]);
+          }
+        }
+        indexStatistics.updateString(vec.vector[0], vec.start[0],
+            vec.length[0], length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            // translate from UTF-8 to the default charset
+            bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+                vec.length[0], StandardCharsets.UTF_8));
+          }
+          bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          if (useDictionaryEncoding) {
+            rows.add(dictionary.add(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]));
+          } else {
+            directStreamOutput.write(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+            lengthOutput.write(vec.length[offset + i]);
+          }
+          indexStatistics.updateString(vec.vector[offset + i],
+              vec.start[offset + i], vec.length[offset + i], 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              // translate from UTF-8 to the default charset
+              bloomFilter.addString(new String(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i],
+                  StandardCharsets.UTF_8));
+            }
+            bloomFilterUtf8.addBytes(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java
new file mode 100644
index 0000000..9a1384d
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StructTreeWriter extends TreeWriterBase {
+  final TreeWriter[] childrenWriters;
+
+  public StructTreeWriter(int columnId,
+                          TypeDescription schema,
+                          WriterContext writer,
+                          boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    List<TypeDescription> children = schema.getChildren();
+    childrenWriters = new TreeWriterBase[children.size()];
+    for (int i = 0; i < childrenWriters.length; ++i) {
+      childrenWriters[i] = Factory.create(children.get(i), writer, true);
+    }
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeRootBatch(VectorizedRowBatch batch, int offset,
+                             int length) throws IOException {
+    // update the statistics for the root column
+    indexStatistics.increment(length);
+    // I'm assuming that the root column isn't nullable so that I don't need
+    // to update isPresent.
+    for (int i = 0; i < childrenWriters.length; ++i) {
+      childrenWriters[i].writeBatch(batch.cols[i], offset, length);
+    }
+  }
+
+  private static void writeFields(StructColumnVector vector,
+                                  TreeWriter[] childrenWriters,
+                                  int offset, int length) throws IOException {
+    for (int field = 0; field < childrenWriters.length; ++field) {
+      childrenWriters[field].writeBatch(vector.fields[field], offset, length);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    StructColumnVector vec = (StructColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        writeFields(vec, childrenWriters, offset, length);
+      }
+    } else if (vector.noNulls) {
+      writeFields(vec, childrenWriters, offset, length);
+    } else {
+      // write the records in runs
+      int currentRun = 0;
+      boolean started = false;
+      for (int i = 0; i < length; ++i) {
+        if (!vec.isNull[i + offset]) {
+          if (!started) {
+            started = true;
+            currentRun = i;
+          }
+        } else if (started) {
+          started = false;
+          writeFields(vec, childrenWriters, offset + currentRun,
+              i - currentRun);
+        }
+      }
+      if (started) {
+        writeFields(vec, childrenWriters, offset + currentRun,
+            length - currentRun);
+      }
+    }
+  }
+
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    super.createRowIndexEntry();
+    for (TreeWriter child : childrenWriters) {
+      child.createRowIndexEntry();
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    for (TreeWriter child : childrenWriters) {
+      child.writeStripe(builder, stats, requiredIndexEntries);
+    }
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    super.updateFileStatistics(stats);
+    for (TreeWriter child : childrenWriters) {
+      child.updateFileStatistics(stats);
+    }
+  }
+
+  @Override
+  public long estimateMemory() {
+    long result = 0;
+    for (TreeWriter writer : childrenWriters) {
+      result += writer.estimateMemory();
+    }
+    return super.estimateMemory() + result;
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long result = 0;
+    for (TreeWriter writer : childrenWriters) {
+      result += writer.getRawDataSize();
+    }
+    return result;
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    super.writeFileStatistics(footer);
+    for (TreeWriter child : childrenWriters) {
+      child.writeFileStatistics(footer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
new file mode 100644
index 0000000..fae108e
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.SerializationUtils;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.TimeZone;
+
+public class TimestampTreeWriter extends TreeWriterBase {
+  public static final int MILLIS_PER_SECOND = 1000;
+  public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
+
+  private final IntegerWriter seconds;
+  private final IntegerWriter nanos;
+  private final boolean isDirectV2;
+  private final TimeZone localTimezone;
+  private final long baseEpochSecsLocalTz;
+
+  public TimestampTreeWriter(int columnId,
+                             TypeDescription schema,
+                             WriterContext writer,
+                             boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    this.seconds = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
+    this.nanos = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+    this.localTimezone = TimeZone.getDefault();
+    // for unit tests to set different time zones
+    this.baseEpochSecsLocalTz = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    TimestampColumnVector vec = (TimestampColumnVector) vector;
+    Timestamp val;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        val = vec.asScratchTimestamp(0);
+        long millis = val.getTime();
+        long utc = SerializationUtils.convertToUtc(localTimezone, millis);
+        indexStatistics.updateTimestamp(utc);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(millis);
+          }
+          bloomFilterUtf8.addLong(utc);
+        }
+        final long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
+        final long nano = formatNanos(val.getNanos());
+        for (int i = 0; i < length; ++i) {
+          seconds.write(secs);
+          nanos.write(nano);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          val = vec.asScratchTimestamp(i + offset);
+          long millis = val.getTime();
+          long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
+          long utc = SerializationUtils.convertToUtc(localTimezone, millis);
+          seconds.write(secs);
+          nanos.write(formatNanos(val.getNanos()));
+          indexStatistics.updateTimestamp(utc);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(millis);
+            }
+            bloomFilterUtf8.addLong(utc);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    seconds.flush();
+    nanos.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  private static long formatNanos(int nanos) {
+    if (nanos == 0) {
+      return 0;
+    } else if (nanos % 100 != 0) {
+      return ((long) nanos) << 3;
+    } else {
+      nanos /= 100;
+      int trailingZeros = 1;
+      while (nanos % 10 == 0 && trailingZeros < 7) {
+        nanos /= 10;
+        trailingZeros += 1;
+      }
+      return ((long) nanos) << 3 | trailingZeros;
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    seconds.getPosition(recorder);
+    nanos.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + seconds.estimateMemory() +
+        nanos.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return fileStatistics.getNumberOfValues() *
+        JavaDataModel.get().lengthOfTimestamp();
+  }
+}


[3/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.

Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index cfdddad..a5d65dd 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -20,10 +20,7 @@ package org.apache.orc.impl;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -34,43 +31,25 @@ import io.airlift.compress.lz4.Lz4Compressor;
 import io.airlift.compress.lz4.Lz4Decompressor;
 import io.airlift.compress.lzo.LzoCompressor;
 import io.airlift.compress.lzo.LzoDecompressor;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.orc.BinaryColumnStatistics;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.MemoryManager;
-import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.PhysicalWriter;
-import org.apache.orc.StringColumnStatistics;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
-import org.apache.orc.util.BloomFilter;
-import org.apache.orc.util.BloomFilterIO;
-import org.apache.orc.util.BloomFilterUtf8;
+import org.apache.orc.impl.writer.TreeWriter;
+import org.apache.orc.impl.writer.WriterContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.Text;
 
 import com.google.protobuf.ByteString;
 
@@ -108,7 +87,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private final PhysicalWriter physicalWriter;
   private final OrcFile.WriterVersion writerVersion;
 
-  private int columnCount;
   private long rowCount = 0;
   private long rowsInStripe = 0;
   private long rawDataSize = 0;
@@ -133,7 +111,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private final boolean[] bloomFilterColumns;
   private final double bloomFilterFpp;
   private final OrcFile.BloomFilterVersion bloomFilterVersion;
-  private boolean writeTimeZone;
+  private final boolean writeTimeZone;
 
   public WriterImpl(FileSystem fs,
                     Path path,
@@ -155,6 +133,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     } else {
       callbackContext = null;
     }
+    writeTimeZone = hasTimestamp(schema);
     this.adjustedStripeSize = opts.getStripeSize();
     this.version = opts.getVersion();
     this.encodingStrategy = opts.getEncodingStrategy();
@@ -181,7 +160,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     this.physicalWriter = opts.getPhysicalWriter() == null ?
         new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter();
     physicalWriter.writeHeader();
-    treeWriter = createTreeWriter(schema, new StreamFactory(), false);
+    treeWriter = TreeWriter.Factory.create(schema, new StreamFactory(), false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
           MIN_ROW_INDEX_STRIDE);
@@ -278,2524 +257,142 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     return false;
   }
 
-  private static class RowIndexPositionRecorder implements PositionRecorder {
-    private final OrcProto.RowIndexEntry.Builder builder;
 
-    RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
-      this.builder = builder;
-    }
-
-    @Override
-    public void addPosition(long position) {
-      builder.addPositions(position);
-    }
-  }
-
-  CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
-    // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
-    //       but at this point there's no close() for the stream.
-    CompressionCodec result = physicalWriter.getCompressionCodec();
-    if (result != null) {
-      switch (kind) {
-        case BLOOM_FILTER:
-        case DATA:
-        case DICTIONARY_DATA:
-        case BLOOM_FILTER_UTF8:
-          if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
-            result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
-                CompressionCodec.Modifier.TEXT));
-          } else {
-            result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
-                CompressionCodec.Modifier.TEXT));
-          }
-          break;
-        case LENGTH:
-        case DICTIONARY_COUNT:
-        case PRESENT:
-        case ROW_INDEX:
-        case SECONDARY:
-          // easily compressed using the fastest modes
-          result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
-              CompressionCodec.Modifier.BINARY));
-          break;
-        default:
-          LOG.info("Missing ORC compression modifiers for " + kind);
-          break;
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Interface from the Writer to the TreeWriters. This limits the visibility
-   * that the TreeWriters have into the Writer.
-   */
-  private class StreamFactory {
-    /**
-     * Create a stream to store part of a column.
-     * @param column the column id for the stream
-     * @param kind the kind of stream
-     * @return The output outStream that the section needs to be written to.
-     */
-    public OutStream createStream(int column,
-                                  OrcProto.Stream.Kind kind
-                                  ) throws IOException {
-      final StreamName name = new StreamName(column, kind);
-      CompressionCodec codec = getCustomizedCodec(kind);
-
-      return new OutStream(physicalWriter.toString(), bufferSize, codec,
-          physicalWriter.createDataStream(name));
-    }
-
-    /**
-     * Get the next column id.
-     * @return a number from 0 to the number of columns - 1
-     */
-    public int getNextColumnId() {
-      return columnCount++;
-    }
-
-    /**
-     * Get the stride rate of the row index.
-     */
-    public int getRowIndexStride() {
-      return rowIndexStride;
-    }
-
-    /**
-     * Should be building the row index.
-     * @return true if we are building the index
-     */
-    public boolean buildIndex() {
-      return buildIndex;
-    }
-
-    /**
-     * Is the ORC file compressed?
-     * @return are the streams compressed
-     */
-    public boolean isCompressed() {
-      return physicalWriter.getCompressionCodec() != null;
-    }
-
-    /**
-     * Get the encoding strategy to use.
-     * @return encoding strategy
-     */
-    public OrcFile.EncodingStrategy getEncodingStrategy() {
-      return encodingStrategy;
-    }
-
-    /**
-     * Get the bloom filter columns
-     * @return bloom filter columns
-     */
-    public boolean[] getBloomFilterColumns() {
-      return bloomFilterColumns;
-    }
-
-    /**
-     * Get bloom filter false positive percentage.
-     * @return fpp
-     */
-    public double getBloomFilterFPP() {
-      return bloomFilterFpp;
-    }
-
-    /**
-     * Get the writer's configuration.
-     * @return configuration
-     */
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    /**
-     * Get the version of the file to write.
-     */
-    public OrcFile.Version getVersion() {
-      return version;
-    }
-
-    public void useWriterTimeZone(boolean val) {
-      writeTimeZone = val;
-    }
-
-    public boolean hasWriterTimeZone() {
-      return writeTimeZone;
-    }
-
-    public OrcFile.BloomFilterVersion getBloomFilterVersion() {
-      return bloomFilterVersion;
-    }
-
-    public void writeIndex(StreamName name,
-                           OrcProto.RowIndex.Builder index) throws IOException {
-      physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind()));
-    }
-
-    public void writeBloomFilter(StreamName name,
-                                 OrcProto.BloomFilterIndex.Builder bloom
-                                 ) throws IOException {
-      physicalWriter.writeBloomFilter(name, bloom,
-          getCustomizedCodec(name.getKind()));
-    }
-  }
-
-  /**
-   * The writers for the specific writers of each type. This provides
-   * the generic API that they must all implement.
-   */
-  interface TreeWriter {
-
-    /**
-     * Estimate the memory currently used to buffer the stripe.
-     * @return the number of bytes
-     */
-    long estimateMemory();
-
-    /**
-     * Estimate the memory used if the file was read into Hive's Writable
-     * types. This is used as an estimate for the query optimizer.
-     * @return the number of bytes
-     */
-    long getRawDataSize();
-
-    /**
-     * Write a VectorizedRowBath to the file. This is called by the WriterImpl
-     * at the top level.
-     * @param batch the list of all of the columns
-     * @param offset the first row from the batch to write
-     * @param length the number of rows to write
-     */
-    void writeRootBatch(VectorizedRowBatch batch, int offset,
-                        int length) throws IOException;
-
-    /**
-     * Write a ColumnVector to the file. This is called recursively by
-     * writeRootBatch.
-     * @param vector the data to write
-     * @param offset the first value offset to write.
-     * @param length the number of values to write
-     */
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException;
-
-    /**
-     * Create a row index entry at the current point in the stripe.
-     */
-    void createRowIndexEntry() throws IOException;
-
-    /**
-     * Write the stripe out to the file.
-     * @param stripeFooter the stripe footer that contains the information about the
-     *                layout of the stripe. The TreeWriterBase is required to update
-     *                the footer with its information.
-     * @param stats the stripe statistics information
-     * @param requiredIndexEntries the number of index entries that are
-     *                             required. this is to check to make sure the
-     *                             row index is well formed.
-     */
-    void writeStripe(OrcProto.StripeFooter.Builder stripeFooter,
-                     OrcProto.StripeStatistics.Builder stats,
-                     int requiredIndexEntries) throws IOException;
-
-    /**
-     * During a stripe append, we need to update the file statistics.
-     * @param stripeStatistics the statistics for the new stripe
-     */
-    void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics);
-
-    /**
-     * Add the file statistics to the file footer.
-     * @param footer the file footer builder
-     */
-    void writeFileStatistics(OrcProto.Footer.Builder footer);
-  }
-
-  /**
-   * The parent class of all of the writers for each column. Each column
-   * is written by an instance of this class. The compound types (struct,
-   * list, map, and union) have children tree writers that write the children
-   * types.
-   */
-  private abstract static class TreeWriterBase implements TreeWriter {
-    protected final int id;
-    protected final BitFieldWriter isPresent;
-    private final boolean isCompressed;
-    protected final ColumnStatisticsImpl indexStatistics;
-    protected final ColumnStatisticsImpl stripeColStatistics;
-    protected final ColumnStatisticsImpl fileStatistics;
-    protected final RowIndexPositionRecorder rowIndexPosition;
-    private final OrcProto.RowIndex.Builder rowIndex;
-    private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
-    protected final BloomFilter bloomFilter;
-    protected final BloomFilterUtf8 bloomFilterUtf8;
-    protected final boolean createBloomFilter;
-    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
-    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndexUtf8;
-    protected final OrcProto.BloomFilter.Builder bloomFilterEntry;
-    private boolean foundNulls;
-    private OutStream isPresentOutStream;
-    private final StreamFactory streamFactory;
-
-    /**
-     * Create a tree writer.
-     * @param columnId the column id of the column to write
-     * @param schema the row schema
-     * @param streamFactory limited access to the Writer's data.
-     * @param nullable can the value be null?
-     */
-    TreeWriterBase(int columnId,
-                   TypeDescription schema,
-                   StreamFactory streamFactory,
-                   boolean nullable) throws IOException {
-      this.streamFactory = streamFactory;
-      this.isCompressed = streamFactory.isCompressed();
-      this.id = columnId;
-      if (nullable) {
-        isPresentOutStream = streamFactory.createStream(id,
-            OrcProto.Stream.Kind.PRESENT);
-        isPresent = new BitFieldWriter(isPresentOutStream, 1);
-      } else {
-        isPresent = null;
-      }
-      this.foundNulls = false;
-      createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
-      indexStatistics = ColumnStatisticsImpl.create(schema);
-      stripeColStatistics = ColumnStatisticsImpl.create(schema);
-      fileStatistics = ColumnStatisticsImpl.create(schema);
-      if (streamFactory.buildIndex()) {
-        rowIndex = OrcProto.RowIndex.newBuilder();
-        rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
-        rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
-      } else {
-        rowIndex = null;
-        rowIndexEntry = null;
-        rowIndexPosition = null;
-      }
-      if (createBloomFilter) {
-        bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
-        if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
-          bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
-              streamFactory.getBloomFilterFPP());
-          bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
-        } else {
-          bloomFilter = null;
-          bloomFilterIndex = null;
-        }
-        bloomFilterUtf8 = new BloomFilterUtf8(streamFactory.getRowIndexStride(),
-            streamFactory.getBloomFilterFPP());
-        bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder();
-      } else {
-        bloomFilterEntry = null;
-        bloomFilterIndex = null;
-        bloomFilterIndexUtf8 = null;
-        bloomFilter = null;
-        bloomFilterUtf8 = null;
-      }
-    }
-
-    protected OrcProto.RowIndex.Builder getRowIndex() {
-      return rowIndex;
-    }
-
-    protected ColumnStatisticsImpl getStripeStatistics() {
-      return stripeColStatistics;
-    }
-
-    protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
-      return rowIndexEntry;
-    }
-
-    IntegerWriter createIntegerWriter(PositionedOutputStream output,
-                                      boolean signed, boolean isDirectV2,
-                                      StreamFactory writer) {
-      if (isDirectV2) {
-        boolean alignedBitpacking = false;
-        if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
-          alignedBitpacking = true;
-        }
-        return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
-      } else {
-        return new RunLengthIntegerWriter(output, signed);
-      }
-    }
-
-    boolean isNewWriteFormat(StreamFactory writer) {
-      return writer.getVersion() != OrcFile.Version.V_0_11;
-    }
-
-    /**
-     * Handle the top level object write.
-     *
-     * This default method is used for all types except structs, which are the
-     * typical case. VectorizedRowBatch assumes the top level object is a
-     * struct, so we use the first column for all other types.
-     * @param batch the batch to write from
-     * @param offset the row to start on
-     * @param length the number of rows to write
-     */
-    public void writeRootBatch(VectorizedRowBatch batch, int offset,
-                               int length) throws IOException {
-      writeBatch(batch.cols[0], offset, length);
-    }
-
-    /**
-     * Write the values from the given vector from offset for length elements.
-     * @param vector the vector to write from
-     * @param offset the first value from the vector to write
-     * @param length the number of values from the vector to write
-     */
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      if (vector.noNulls) {
-        indexStatistics.increment(length);
-        if (isPresent != null) {
-          for (int i = 0; i < length; ++i) {
-            isPresent.write(1);
-          }
-        }
-      } else {
-        if (vector.isRepeating) {
-          boolean isNull = vector.isNull[0];
-          if (isPresent != null) {
-            for (int i = 0; i < length; ++i) {
-              isPresent.write(isNull ? 0 : 1);
-            }
-          }
-          if (isNull) {
-            foundNulls = true;
-            indexStatistics.setNull();
-          } else {
-            indexStatistics.increment(length);
-          }
-        } else {
-          // count the number of non-null values
-          int nonNullCount = 0;
-          for(int i = 0; i < length; ++i) {
-            boolean isNull = vector.isNull[i + offset];
-            if (!isNull) {
-              nonNullCount += 1;
-            }
-            if (isPresent != null) {
-              isPresent.write(isNull ? 0 : 1);
-            }
-          }
-          indexStatistics.increment(nonNullCount);
-          if (nonNullCount != length) {
-            foundNulls = true;
-            indexStatistics.setNull();
-          }
-        }
-      }
-    }
-
-    private void removeIsPresentPositions() {
-      for(int i=0; i < rowIndex.getEntryCount(); ++i) {
-        OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
-        List<Long> positions = entry.getPositionsList();
-        // bit streams use 3 positions if uncompressed, 4 if compressed
-        positions = positions.subList(isCompressed ? 4 : 3, positions.size());
-        entry.clearPositions();
-        entry.addAllPositions(positions);
-      }
-    }
-
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      if (isPresent != null) {
-        isPresent.flush();
-
-        // if no nulls are found in a stream, then suppress the stream
-        if(!foundNulls) {
-          isPresentOutStream.suppress();
-          // since isPresent bitstream is suppressed, update the index to
-          // remove the positions of the isPresent stream
-          if (rowIndex != null) {
-            removeIsPresentPositions();
-          }
-        }
-      }
-
-      // merge stripe-level column statistics to file statistics and write it to
-      // stripe statistics
-      fileStatistics.merge(stripeColStatistics);
-      stats.addColStats(stripeColStatistics.serialize());
-      stripeColStatistics.reset();
-
-      // reset the flag for next stripe
-      foundNulls = false;
-
-      builder.addColumns(getEncoding());
-      if (streamFactory.hasWriterTimeZone()) {
-        builder.setWriterTimezone(TimeZone.getDefault().getID());
-      }
-      if (rowIndex != null) {
-        if (rowIndex.getEntryCount() != requiredIndexEntries) {
-          throw new IllegalArgumentException("Column has wrong number of " +
-               "index entries found: " + rowIndex.getEntryCount() + " expected: " +
-               requiredIndexEntries);
-        }
-        streamFactory.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
-        rowIndex.clear();
-        rowIndexEntry.clear();
-      }
-
-      // write the bloom filter to out stream
-      if (bloomFilterIndex != null) {
-        streamFactory.writeBloomFilter(new StreamName(id,
-            OrcProto.Stream.Kind.BLOOM_FILTER), bloomFilterIndex);
-        bloomFilterIndex.clear();
-      }
-      // write the bloom filter to out stream
-      if (bloomFilterIndexUtf8 != null) {
-        streamFactory.writeBloomFilter(new StreamName(id,
-            OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), bloomFilterIndexUtf8);
-        bloomFilterIndexUtf8.clear();
-      }
-    }
-
-    /**
-     * Get the encoding for this column.
-     * @return the information about the encoding of this column
-     */
-    OrcProto.ColumnEncoding.Builder getEncoding() {
-      OrcProto.ColumnEncoding.Builder builder =
-          OrcProto.ColumnEncoding.newBuilder()
-              .setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-      if (createBloomFilter) {
-        builder.setBloomEncoding(BloomFilterIO.Encoding.CURRENT.getId());
-      }
-      return builder;
-    }
-
-    /**
-     * Create a row index entry with the previous location and the current
-     * index statistics. Also merges the index statistics into the file
-     * statistics before they are cleared. Finally, it records the start of the
-     * next index and ensures all of the children columns also create an entry.
-     */
-    public void createRowIndexEntry() throws IOException {
-      stripeColStatistics.merge(indexStatistics);
-      rowIndexEntry.setStatistics(indexStatistics.serialize());
-      indexStatistics.reset();
-      rowIndex.addEntry(rowIndexEntry);
-      rowIndexEntry.clear();
-      addBloomFilterEntry();
-      recordPosition(rowIndexPosition);
-    }
-
-    void addBloomFilterEntry() {
-      if (createBloomFilter) {
-        if (bloomFilter != null) {
-          BloomFilterIO.serialize(bloomFilterEntry, bloomFilter);
-          bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
-          bloomFilter.reset();
-        }
-        if (bloomFilterUtf8 != null) {
-          BloomFilterIO.serialize(bloomFilterEntry, bloomFilterUtf8);
-          bloomFilterIndexUtf8.addBloomFilter(bloomFilterEntry.build());
-          bloomFilterUtf8.reset();
-        }
-      }
-    }
-
-    @Override
-    public void updateFileStatistics(OrcProto.StripeStatistics stats) {
-      fileStatistics.merge(ColumnStatisticsImpl.deserialize(stats.getColStats(id)));
-    }
-
-    /**
-     * Record the current position in each of this column's streams.
-     * @param recorder where should the locations be recorded
-     */
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      if (isPresent != null) {
-        isPresent.getPosition(recorder);
-      }
-    }
-
-    /**
-     * Estimate how much memory the writer is consuming excluding the streams.
-     * @return the number of bytes.
-     */
-    public long estimateMemory() {
-      long result = 0;
-      if (isPresent != null) {
-        result = isPresentOutStream.getBufferSize();
-      }
-      return result;
-    }
-
-    @Override
-    public void writeFileStatistics(OrcProto.Footer.Builder footer) {
-      footer.addStatistics(fileStatistics.serialize());
-    }
-  }
-
-  private static class BooleanTreeWriter extends TreeWriterBase {
-    private final BitFieldWriter writer;
-
-    BooleanTreeWriter(int columnId,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      PositionedOutputStream out = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.writer = new BitFieldWriter(out, 1);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int value = vec.vector[0] == 0 ? 0 : 1;
-          indexStatistics.updateBoolean(value != 0, length);
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            int value = vec.vector[i + offset] == 0 ? 0 : 1;
-            writer.write(value);
-            indexStatistics.updateBoolean(value != 0, 1);
-          }
-        }
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      writer.flush();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + writer.estimateMemory();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      long num = fileStatistics.getNumberOfValues();
-      return num * JavaDataModel.get().primitive1();
-    }
-  }
-
-  private static class ByteTreeWriter extends TreeWriterBase {
-    private final RunLengthByteWriter writer;
-
-    ByteTreeWriter(int columnId,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      this.writer = new RunLengthByteWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA));
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          byte value = (byte) vec.vector[0];
-          indexStatistics.updateInteger(value, length);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              bloomFilter.addLong(value);
-            }
-            bloomFilterUtf8.addLong(value);
-          }
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            byte value = (byte) vec.vector[i + offset];
-            writer.write(value);
-            indexStatistics.updateInteger(value, 1);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                bloomFilter.addLong(value);
-              }
-              bloomFilterUtf8.addLong(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      writer.flush();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + writer.estimateMemory();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      long num = fileStatistics.getNumberOfValues();
-      return num * JavaDataModel.get().primitive1();
-    }
-  }
-
-  private static class IntegerTreeWriter extends TreeWriterBase {
-    private final IntegerWriter writer;
-    private boolean isDirectV2 = true;
-    private final boolean isLong;
-
-    IntegerTreeWriter(int columnId,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      OutStream out = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-      this.isLong = schema.getCategory() == TypeDescription.Category.LONG;
-    }
-
-    @Override
-    OrcProto.ColumnEncoding.Builder getEncoding() {
-      OrcProto.ColumnEncoding.Builder result = super.getEncoding();
-      if (isDirectV2) {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
-      } else {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-      }
-      return result;
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          long value = vec.vector[0];
-          indexStatistics.updateInteger(value, length);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              bloomFilter.addLong(value);
-            }
-            bloomFilterUtf8.addLong(value);
-          }
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            long value = vec.vector[i + offset];
-            writer.write(value);
-            indexStatistics.updateInteger(value, 1);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                bloomFilter.addLong(value);
-              }
-              bloomFilterUtf8.addLong(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      writer.flush();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + writer.estimateMemory();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      JavaDataModel jdm = JavaDataModel.get();
-      long num = fileStatistics.getNumberOfValues();
-      return num * (isLong ? jdm.primitive2() : jdm.primitive1());
-    }
-  }
-
-  private static class FloatTreeWriter extends TreeWriterBase {
-    private final PositionedOutputStream stream;
-    private final SerializationUtils utils;
-
-    FloatTreeWriter(int columnId,
-                      TypeDescription schema,
-                      StreamFactory writer,
-                      boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      this.stream = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.utils = new SerializationUtils();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      DoubleColumnVector vec = (DoubleColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          float value = (float) vec.vector[0];
-          indexStatistics.updateDouble(value);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              bloomFilter.addDouble(value);
-            }
-            bloomFilterUtf8.addDouble(value);
-          }
-          for(int i=0; i < length; ++i) {
-            utils.writeFloat(stream, value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            float value = (float) vec.vector[i + offset];
-            utils.writeFloat(stream, value);
-            indexStatistics.updateDouble(value);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                bloomFilter.addDouble(value);
-              }
-              bloomFilterUtf8.addDouble(value);
-            }
-          }
-        }
-      }
-    }
-
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      stream.flush();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      stream.getPosition(recorder);
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + stream.getBufferSize();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      long num = fileStatistics.getNumberOfValues();
-      return num * JavaDataModel.get().primitive1();
-    }
-  }
-
-  private static class DoubleTreeWriter extends TreeWriterBase {
-    private final PositionedOutputStream stream;
-    private final SerializationUtils utils;
-
-    DoubleTreeWriter(int columnId,
-                    TypeDescription schema,
-                    StreamFactory writer,
-                    boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      this.stream = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.utils = new SerializationUtils();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      DoubleColumnVector vec = (DoubleColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          double value = vec.vector[0];
-          indexStatistics.updateDouble(value);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              bloomFilter.addDouble(value);
-            }
-            bloomFilterUtf8.addDouble(value);
-          }
-          for(int i=0; i < length; ++i) {
-            utils.writeDouble(stream, value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            double value = vec.vector[i + offset];
-            utils.writeDouble(stream, value);
-            indexStatistics.updateDouble(value);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                bloomFilter.addDouble(value);
-              }
-              bloomFilterUtf8.addDouble(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      stream.flush();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      stream.getPosition(recorder);
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + stream.getBufferSize();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      long num = fileStatistics.getNumberOfValues();
-      return num * JavaDataModel.get().primitive2();
-    }
-  }
-
-  private static abstract class StringBaseTreeWriter extends TreeWriterBase {
-    private static final int INITIAL_DICTIONARY_SIZE = 4096;
-    private final OutStream stringOutput;
-    protected final IntegerWriter lengthOutput;
-    private final IntegerWriter rowOutput;
-    protected final StringRedBlackTree dictionary =
-        new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
-    protected final DynamicIntArray rows = new DynamicIntArray();
-    protected final PositionedOutputStream directStreamOutput;
-    private final List<OrcProto.RowIndexEntry> savedRowIndex =
-        new ArrayList<>();
-    private final boolean buildIndex;
-    private final List<Long> rowIndexValueCount = new ArrayList<>();
-    // If the number of keys in a dictionary is greater than this fraction of
-    //the total number of non-null rows, turn off dictionary encoding
-    private final double dictionaryKeySizeThreshold;
-    protected boolean useDictionaryEncoding = true;
-    private boolean isDirectV2 = true;
-    private boolean doneDictionaryCheck;
-    private final boolean strideDictionaryCheck;
-
-    StringBaseTreeWriter(int columnId,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
-      stringOutput = writer.createStream(id,
-          OrcProto.Stream.Kind.DICTIONARY_DATA);
-      lengthOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2,
-          writer);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-      rowIndexValueCount.add(0L);
-      buildIndex = writer.buildIndex();
-      Configuration conf = writer.getConfiguration();
-      dictionaryKeySizeThreshold =
-          OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
-      strideDictionaryCheck =
-          OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
-      doneDictionaryCheck = false;
-    }
-
-    private void checkDictionaryEncoding() {
-      if (!doneDictionaryCheck) {
-        // Set the flag indicating whether or not to use dictionary encoding
-        // based on whether or not the fraction of distinct keys over number of
-        // non-null rows is less than the configured threshold
-        float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
-        useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
-        doneDictionaryCheck = true;
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
-      // checking would not have happened. So do it again here.
-      checkDictionaryEncoding();
-
-      if (useDictionaryEncoding) {
-        flushDictionary();
-      } else {
-        // flushout any left over entries from dictionary
-        if (rows.size() > 0) {
-          flushDictionary();
-        }
-
-        // suppress the stream for every stripe if dictionary is disabled
-        stringOutput.suppress();
-      }
-
-      // we need to build the rowindex before calling super, since it
-      // writes it out.
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      if (useDictionaryEncoding) {
-        stringOutput.flush();
-        lengthOutput.flush();
-        rowOutput.flush();
-      } else {
-        directStreamOutput.flush();
-        lengthOutput.flush();
-      }
-      // reset all of the fields to be ready for the next stripe.
-      dictionary.clear();
-      savedRowIndex.clear();
-      rowIndexValueCount.clear();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-      rowIndexValueCount.add(0L);
-
-      if (!useDictionaryEncoding) {
-        // record the start positions of first index stride of next stripe i.e
-        // beginning of the direct streams when dictionary is disabled
-        recordDirectStreamPosition();
-      }
-    }
-
-    private void flushDictionary() throws IOException {
-      final int[] dumpOrder = new int[dictionary.size()];
-
-      if (useDictionaryEncoding) {
-        // Write the dictionary by traversing the red-black tree writing out
-        // the bytes and lengths; and creating the map from the original order
-        // to the final sorted order.
-
-        dictionary.visit(new StringRedBlackTree.Visitor() {
-          private int currentId = 0;
-          @Override
-          public void visit(StringRedBlackTree.VisitorContext context
-                           ) throws IOException {
-            context.writeBytes(stringOutput);
-            lengthOutput.write(context.getLength());
-            dumpOrder[context.getOriginalPosition()] = currentId++;
-          }
-        });
-      } else {
-        // for direct encoding, we don't want the dictionary data stream
-        stringOutput.suppress();
-      }
-      int length = rows.size();
-      int rowIndexEntry = 0;
-      OrcProto.RowIndex.Builder rowIndex = getRowIndex();
-      Text text = new Text();
-      // write the values translated into the dump order.
-      for(int i = 0; i <= length; ++i) {
-        // now that we are writing out the row values, we can finalize the
-        // row index
-        if (buildIndex) {
-          while (i == rowIndexValueCount.get(rowIndexEntry) &&
-              rowIndexEntry < savedRowIndex.size()) {
-            OrcProto.RowIndexEntry.Builder base =
-                savedRowIndex.get(rowIndexEntry++).toBuilder();
-            if (useDictionaryEncoding) {
-              rowOutput.getPosition(new RowIndexPositionRecorder(base));
-            } else {
-              PositionRecorder posn = new RowIndexPositionRecorder(base);
-              directStreamOutput.getPosition(posn);
-              lengthOutput.getPosition(posn);
-            }
-            rowIndex.addEntry(base.build());
-          }
-        }
-        if (i != length) {
-          if (useDictionaryEncoding) {
-            rowOutput.write(dumpOrder[rows.get(i)]);
-          } else {
-            dictionary.getText(text, rows.get(i));
-            directStreamOutput.write(text.getBytes(), 0, text.getLength());
-            lengthOutput.write(text.getLength());
-          }
-        }
-      }
-      rows.clear();
-    }
-
-    @Override
-    OrcProto.ColumnEncoding.Builder getEncoding() {
-      OrcProto.ColumnEncoding.Builder result = super.getEncoding();
-      if (useDictionaryEncoding) {
-        result.setDictionarySize(dictionary.size());
-        if(isDirectV2) {
-          result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2);
-        } else {
-          result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY);
-        }
-      } else {
-        if(isDirectV2) {
-          result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
-        } else {
-          result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-        }
-      }
-      return result;
-    }
-
-    /**
-     * This method doesn't call the super method, because unlike most of the
-     * other TreeWriters, this one can't record the position in the streams
-     * until the stripe is being flushed. Therefore it saves all of the entries
-     * and augments them with the final information as the stripe is written.
-     */
-    @Override
-    public void createRowIndexEntry() throws IOException {
-      getStripeStatistics().merge(indexStatistics);
-      OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
-      rowIndexEntry.setStatistics(indexStatistics.serialize());
-      indexStatistics.reset();
-      OrcProto.RowIndexEntry base = rowIndexEntry.build();
-      savedRowIndex.add(base);
-      rowIndexEntry.clear();
-      addBloomFilterEntry();
-      recordPosition(rowIndexPosition);
-      rowIndexValueCount.add((long) rows.size());
-      if (strideDictionaryCheck) {
-        checkDictionaryEncoding();
-      }
-      if (!useDictionaryEncoding) {
-        if (rows.size() > 0) {
-          flushDictionary();
-          // just record the start positions of next index stride
-          recordDirectStreamPosition();
-        } else {
-          // record the start positions of next index stride
-          recordDirectStreamPosition();
-          getRowIndex().addEntry(base);
-        }
-      }
-    }
-
-    private void recordDirectStreamPosition() throws IOException {
-      if (rowIndexPosition != null) {
-        directStreamOutput.getPosition(rowIndexPosition);
-        lengthOutput.getPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    public long estimateMemory() {
-      long parent = super.estimateMemory();
-      if (useDictionaryEncoding) {
-        return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes();
-      } else {
-        return parent + lengthOutput.estimateMemory() +
-            directStreamOutput.getBufferSize();
-      }
-    }
-
-    @Override
-    public long getRawDataSize() {
-      // ORC strings are converted to java Strings. so use JavaDataModel to
-      // compute the overall size of strings
-      StringColumnStatistics scs = (StringColumnStatistics) fileStatistics;
-      long numVals = fileStatistics.getNumberOfValues();
-      if (numVals == 0) {
-        return 0;
-      } else {
-        int avgSize = (int) (scs.getSum() / numVals);
-        return numVals * JavaDataModel.get().lengthForStringOfLength(avgSize);
-      }
-    }
-  }
-
-  private static class StringTreeWriter extends StringBaseTreeWriter {
-    StringTreeWriter(int columnId,
-                   TypeDescription schema,
-                   StreamFactory writer,
-                   boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          if (useDictionaryEncoding) {
-            int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
-            for(int i=0; i < length; ++i) {
-              rows.add(id);
-            }
-          } else {
-            for(int i=0; i < length; ++i) {
-              directStreamOutput.write(vec.vector[0], vec.start[0],
-                  vec.length[0]);
-              lengthOutput.write(vec.length[0]);
-            }
-          }
-          indexStatistics.updateString(vec.vector[0], vec.start[0],
-              vec.length[0], length);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              // translate from UTF-8 to the default charset
-              bloomFilter.addString(new String(vec.vector[0], vec.start[0],
-                  vec.length[0], StandardCharsets.UTF_8));
-            }
-            bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            if (useDictionaryEncoding) {
-              rows.add(dictionary.add(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]));
-            } else {
-              directStreamOutput.write(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-              lengthOutput.write(vec.length[offset + i]);
-            }
-            indexStatistics.updateString(vec.vector[offset + i],
-                vec.start[offset + i], vec.length[offset + i], 1);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                // translate from UTF-8 to the default charset
-                bloomFilter.addString(new String(vec.vector[offset + i],
-                    vec.start[offset + i], vec.length[offset + i],
-                    StandardCharsets.UTF_8));
-              }
-              bloomFilterUtf8.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Under the covers, char is written to ORC the same way as string.
-   */
-  private static class CharTreeWriter extends StringBaseTreeWriter {
-    private final int itemLength;
-    private final byte[] padding;
-
-    CharTreeWriter(int columnId,
-        TypeDescription schema,
-        StreamFactory writer,
-        boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      itemLength = schema.getMaxLength();
-      padding = new byte[itemLength];
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          byte[] ptr;
-          int ptrOffset;
-          if (vec.length[0] >= itemLength) {
-            ptr = vec.vector[0];
-            ptrOffset = vec.start[0];
-          } else {
-            ptr = padding;
-            ptrOffset = 0;
-            System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
-                vec.length[0]);
-            Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
-          }
-          if (useDictionaryEncoding) {
-            int id = dictionary.add(ptr, ptrOffset, itemLength);
-            for(int i=0; i < length; ++i) {
-              rows.add(id);
-            }
-          } else {
-            for(int i=0; i < length; ++i) {
-              directStreamOutput.write(ptr, ptrOffset, itemLength);
-              lengthOutput.write(itemLength);
-            }
-          }
-          indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              // translate from UTF-8 to the default charset
-              bloomFilter.addString(new String(vec.vector[0], vec.start[0],
-                  vec.length[0], StandardCharsets.UTF_8));
-            }
-            bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            byte[] ptr;
-            int ptrOffset;
-            if (vec.length[offset + i] >= itemLength) {
-              ptr = vec.vector[offset + i];
-              ptrOffset = vec.start[offset + i];
-            } else {
-              // it is the wrong length, so copy it
-              ptr = padding;
-              ptrOffset = 0;
-              System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
-                  ptr, 0, vec.length[offset + i]);
-              Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
-            }
-            if (useDictionaryEncoding) {
-              rows.add(dictionary.add(ptr, ptrOffset, itemLength));
-            } else {
-              directStreamOutput.write(ptr, ptrOffset, itemLength);
-              lengthOutput.write(itemLength);
-            }
-            indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                // translate from UTF-8 to the default charset
-                bloomFilter.addString(new String(vec.vector[offset + i],
-                    vec.start[offset + i], vec.length[offset + i],
-                    StandardCharsets.UTF_8));
-              }
-              bloomFilterUtf8.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Under the covers, varchar is written to ORC the same way as string.
-   */
-  private static class VarcharTreeWriter extends StringBaseTreeWriter {
-    private final int maxLength;
-
-    VarcharTreeWriter(int columnId,
-        TypeDescription schema,
-        StreamFactory writer,
-        boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      maxLength = schema.getMaxLength();
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int itemLength = Math.min(vec.length[0], maxLength);
-          if (useDictionaryEncoding) {
-            int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
-            for(int i=0; i < length; ++i) {
-              rows.add(id);
-            }
-          } else {
-            for(int i=0; i < length; ++i) {
-              directStreamOutput.write(vec.vector[0], vec.start[0],
-                  itemLength);
-              lengthOutput.write(itemLength);
-            }
-          }
-          indexStatistics.updateString(vec.vector[0], vec.start[0],
-              itemLength, length);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              // translate from UTF-8 to the default charset
-              bloomFilter.addString(new String(vec.vector[0],
-                  vec.start[0], itemLength,
-                  StandardCharsets.UTF_8));
-            }
-            bloomFilterUtf8.addBytes(vec.vector[0],
-                vec.start[0], itemLength);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            int itemLength = Math.min(vec.length[offset + i], maxLength);
-            if (useDictionaryEncoding) {
-              rows.add(dictionary.add(vec.vector[offset + i],
-                  vec.start[offset + i], itemLength));
-            } else {
-              directStreamOutput.write(vec.vector[offset + i],
-                  vec.start[offset + i], itemLength);
-              lengthOutput.write(itemLength);
-            }
-            indexStatistics.updateString(vec.vector[offset + i],
-                vec.start[offset + i], itemLength, 1);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                // translate from UTF-8 to the default charset
-                bloomFilter.addString(new String(vec.vector[offset + i],
-                    vec.start[offset + i], itemLength,
-                    StandardCharsets.UTF_8));
-              }
-              bloomFilterUtf8.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], itemLength);
-            }
-          }
-        }
-      }
-    }
-  }
-
-  private static class BinaryTreeWriter extends TreeWriterBase {
-    private final PositionedOutputStream stream;
-    private final IntegerWriter length;
-    private boolean isDirectV2 = true;
-
-    BinaryTreeWriter(int columnId,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      this.stream = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.length = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    OrcProto.ColumnEncoding.Builder getEncoding() {
-      OrcProto.ColumnEncoding.Builder result = super.getEncoding();
-      if (isDirectV2) {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
-      } else {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-      }
-      return result;
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      BytesColumnVector vec = (BytesColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          for(int i=0; i < length; ++i) {
-            stream.write(vec.vector[0], vec.start[0],
-                  vec.length[0]);
-            this.length.write(vec.length[0]);
-          }
-          indexStatistics.updateBinary(vec.vector[0], vec.start[0],
-              vec.length[0], length);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
-            }
-            bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            stream.write(vec.vector[offset + i],
-                vec.start[offset + i], vec.length[offset + i]);
-            this.length.write(vec.length[offset + i]);
-            indexStatistics.updateBinary(vec.vector[offset + i],
-                vec.start[offset + i], vec.length[offset + i], 1);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                bloomFilter.addBytes(vec.vector[offset + i],
-                    vec.start[offset + i], vec.length[offset + i]);
-              }
-              bloomFilterUtf8.addBytes(vec.vector[offset + i],
-                  vec.start[offset + i], vec.length[offset + i]);
-            }
-          }
-        }
-      }
-    }
-
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      stream.flush();
-      length.flush();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      stream.getPosition(recorder);
-      length.getPosition(recorder);
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + stream.getBufferSize() +
-          length.estimateMemory();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      // get total length of binary blob
-      BinaryColumnStatistics bcs = (BinaryColumnStatistics) fileStatistics;
-      return bcs.getSum();
-    }
-  }
-
-  public static final int MILLIS_PER_SECOND = 1000;
-  public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
-
-  private static class TimestampTreeWriter extends TreeWriterBase {
-    private final IntegerWriter seconds;
-    private final IntegerWriter nanos;
-    private final boolean isDirectV2;
-    private final TimeZone localTimezone;
-    private final long baseEpochSecsLocalTz;
-
-    TimestampTreeWriter(int columnId,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.seconds = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
-      this.nanos = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-      this.localTimezone = TimeZone.getDefault();
-      // for unit tests to set different time zones
-      this.baseEpochSecsLocalTz = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
-      writer.useWriterTimeZone(true);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding.Builder getEncoding() {
-      OrcProto.ColumnEncoding.Builder result = super.getEncoding();
-      if (isDirectV2) {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
-      } else {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-      }
-      return result;
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      TimestampColumnVector vec = (TimestampColumnVector) vector;
-      Timestamp val;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          val = vec.asScratchTimestamp(0);
-          long millis = val.getTime();
-          long utc = SerializationUtils.convertToUtc(localTimezone, millis);
-          indexStatistics.updateTimestamp(utc);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              bloomFilter.addLong(millis);
-            }
-            bloomFilterUtf8.addLong(utc);
-          }
-          final long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
-          final long nano = formatNanos(val.getNanos());
-          for(int i=0; i < length; ++i) {
-            seconds.write(secs);
-            nanos.write(nano);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            val = vec.asScratchTimestamp(i + offset);
-            long millis = val.getTime();
-            long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
-            long utc = SerializationUtils.convertToUtc(localTimezone, millis);
-            seconds.write(secs);
-            nanos.write(formatNanos(val.getNanos()));
-            indexStatistics.updateTimestamp(utc);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                bloomFilter.addLong(millis);
-              }
-              bloomFilterUtf8.addLong(utc);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      seconds.flush();
-      nanos.flush();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    private static long formatNanos(int nanos) {
-      if (nanos == 0) {
-        return 0;
-      } else if (nanos % 100 != 0) {
-        return ((long) nanos) << 3;
-      } else {
-        nanos /= 100;
-        int trailingZeros = 1;
-        while (nanos % 10 == 0 && trailingZeros < 7) {
-          nanos /= 10;
-          trailingZeros += 1;
-        }
-        return ((long) nanos) << 3 | trailingZeros;
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      seconds.getPosition(recorder);
-      nanos.getPosition(recorder);
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + seconds.estimateMemory() +
-          nanos.estimateMemory();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      return fileStatistics.getNumberOfValues() *
-          JavaDataModel.get().lengthOfTimestamp();
-    }
-  }
-
-  private static class DateTreeWriter extends TreeWriterBase {
-    private final IntegerWriter writer;
-    private final boolean isDirectV2;
-
-    DateTreeWriter(int columnId,
-                   TypeDescription schema,
-                   StreamFactory writer,
-                   boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      OutStream out = writer.createStream(id,
-          OrcProto.Stream.Kind.DATA);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      LongColumnVector vec = (LongColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int value = (int) vec.vector[0];
-          indexStatistics.updateDate(value);
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              bloomFilter.addLong(value);
-            }
-            bloomFilterUtf8.addLong(value);
-          }
-          for(int i=0; i < length; ++i) {
-            writer.write(value);
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            int value = (int) vec.vector[i + offset];
-            writer.write(value);
-            indexStatistics.updateDate(value);
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                bloomFilter.addLong(value);
-              }
-              bloomFilterUtf8.addLong(value);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      writer.flush();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      writer.getPosition(recorder);
-    }
-
-    @Override
-    OrcProto.ColumnEncoding.Builder getEncoding() {
-      OrcProto.ColumnEncoding.Builder result = super.getEncoding();
-      if (isDirectV2) {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
-      } else {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-      }
-      return result;
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + writer.estimateMemory();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      return fileStatistics.getNumberOfValues() *
-          JavaDataModel.get().lengthOfDate();
-    }
-  }
-
-  private static class DecimalTreeWriter extends TreeWriterBase {
-    private final PositionedOutputStream valueStream;
-
-    // These scratch buffers allow us to serialize decimals much faster.
-    private final long[] scratchLongs;
-    private final byte[] scratchBuffer;
-
-    private final IntegerWriter scaleStream;
-    private final boolean isDirectV2;
-
-    DecimalTreeWriter(int columnId,
-                        TypeDescription schema,
-                        StreamFactory writer,
-                        boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
-      scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN];
-      scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
-      this.scaleStream = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    OrcProto.ColumnEncoding.Builder getEncoding() {
-      OrcProto.ColumnEncoding.Builder result = super.getEncoding();
-      if (isDirectV2) {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
-      } else {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-      }
-      return result;
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      DecimalColumnVector vec = (DecimalColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          HiveDecimalWritable value = vec.vector[0];
-          indexStatistics.updateDecimal(value);
-          if (createBloomFilter) {
-            String str = value.toString(scratchBuffer);
-            if (bloomFilter != null) {
-              bloomFilter.addString(str);
-            }
-            bloomFilterUtf8.addString(str);
-          }
-          for(int i=0; i < length; ++i) {
-            value.serializationUtilsWrite(valueStream,
-                                          scratchLongs);
-            scaleStream.write(value.scale());
-          }
-        }
-      } else {
-        for(int i=0; i < length; ++i) {
-          if (vec.noNulls || !vec.isNull[i + offset]) {
-            HiveDecimalWritable value = vec.vector[i + offset];
-            value.serializationUtilsWrite(valueStream, scratchLongs);
-            scaleStream.write(value.scale());
-            indexStatistics.updateDecimal(value);
-            if (createBloomFilter) {
-              String str = value.toString(scratchBuffer);
-              if (bloomFilter != null) {
-                bloomFilter.addString(str);
-              }
-              bloomFilterUtf8.addString(str);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      valueStream.flush();
-      scaleStream.flush();
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      valueStream.getPosition(recorder);
-      scaleStream.getPosition(recorder);
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + valueStream.getBufferSize() +
-          scaleStream.estimateMemory();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      return fileStatistics.getNumberOfValues() *
-          JavaDataModel.get().lengthOfDecimal();
-    }
-  }
-
-  private static class StructTreeWriter extends TreeWriterBase {
-    final TreeWriter[] childrenWriters;
-
-    StructTreeWriter(int columnId,
-                     TypeDescription schema,
-                     StreamFactory writer,
-                     boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      List<TypeDescription> children = schema.getChildren();
-      childrenWriters = new TreeWriterBase[children.size()];
-      for(int i=0; i < childrenWriters.length; ++i) {
-        childrenWriters[i] = createTreeWriter(
-          children.get(i), writer,
-          true);
-      }
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    public void writeRootBatch(VectorizedRowBatch batch, int offset,
-                               int length) throws IOException {
-      // update the statistics for the root column
-      indexStatistics.increment(length);
-      // I'm assuming that the root column isn't nullable so that I don't need
-      // to update isPresent.
-      for(int i=0; i < childrenWriters.length; ++i) {
-        childrenWriters[i].writeBatch(batch.cols[i], offset, length);
-      }
-    }
-
-    private static void writeFields(StructColumnVector vector,
-                                    TreeWriter[] childrenWriters,
-                                    int offset, int length) throws IOException {
-      for(int field=0; field < childrenWriters.length; ++field) {
-        childrenWriters[field].writeBatch(vector.fields[field], offset, length);
-      }
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      StructColumnVector vec = (StructColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          writeFields(vec, childrenWriters, offset, length);
-        }
-      } else if (vector.noNulls) {
-        writeFields(vec, childrenWriters, offset, length);
-      } else {
-        // write the records in runs
-        int currentRun = 0;
-        boolean started = false;
-        for(int i=0; i < length; ++i) {
-          if (!vec.isNull[i + offset]) {
-            if (!started) {
-              started = true;
-              currentRun = i;
-            }
-          } else if (started) {
-            started = false;
-            writeFields(vec, childrenWriters, offset + currentRun,
-                i - currentRun);
-          }
-        }
-        if (started) {
-          writeFields(vec, childrenWriters, offset + currentRun,
-              length - currentRun);
-        }
-      }
-    }
-
-    @Override
-    public void createRowIndexEntry() throws IOException {
-      super.createRowIndexEntry();
-      for(TreeWriter child: childrenWriters) {
-        child.createRowIndexEntry();
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      for(TreeWriter child: childrenWriters) {
-        child.writeStripe(builder, stats, requiredIndexEntries);
-      }
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    public void updateFileStatistics(OrcProto.StripeStatistics stats) {
-      super.updateFileStatistics(stats);
-      for(TreeWriter child: childrenWriters) {
-        child.updateFileStatistics(stats);
-      }
-    }
-
-    @Override
-    public long estimateMemory() {
-      long result = 0;
-      for(TreeWriter writer: childrenWriters) {
-        result += writer.estimateMemory();
-      }
-      return super.estimateMemory() + result;
-    }
-
-    @Override
-    public long getRawDataSize() {
-      long result = 0;
-      for(TreeWriter writer: childrenWriters) {
-        result += writer.getRawDataSize();
-      }
-      return result;
-    }
-
-    @Override
-    public void writeFileStatistics(OrcProto.Footer.Builder footer) {
-      super.writeFileStatistics(footer);
-      for(TreeWriter child: childrenWriters) {
-        child.writeFileStatistics(footer);
-      }
-    }
-  }
-
-  private static class ListTreeWriter extends TreeWriterBase {
-    private final IntegerWriter lengths;
-    private final boolean isDirectV2;
-    private final TreeWriter childWriter;
-
-    ListTreeWriter(int columnId,
-                   TypeDescription schema,
-                   StreamFactory writer,
-                   boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      childWriter =
-        createTreeWriter(schema.getChildren().get(0), writer, true);
-      lengths = createIntegerWriter(writer.createStream(columnId,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    OrcProto.ColumnEncoding.Builder getEncoding() {
-      OrcProto.ColumnEncoding.Builder result = super.getEncoding();
-      if (isDirectV2) {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
-      } else {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-      }
-      return result;
-    }
-
-    @Override
-    public void createRowIndexEntry() throws IOException {
-      super.createRowIndexEntry();
-      childWriter.createRowIndexEntry();
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      ListColumnVector vec = (ListColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int childOffset = (int) vec.offsets[0];
-          int childLength = (int) vec.lengths[0];
-          for(int i=0; i < length; ++i) {
-            lengths.write(childLength);
-            childWriter.writeBatch(vec.child, childOffset, childLength);
-          }
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              bloomFilter.addLong(childLength);
-            }
-            bloomFilterUtf8.addLong(childLength);
-          }
-        }
-      } else {
-        // write the elements in runs
-        int currentOffset = 0;
-        int currentLength = 0;
-        for(int i=0; i < length; ++i) {
-          if (!vec.isNull[i + offset]) {
-            int nextLength = (int) vec.lengths[offset + i];
-            int nextOffset = (int) vec.offsets[offset + i];
-            lengths.write(nextLength);
-            if (currentLength == 0) {
-              currentOffset = nextOffset;
-              currentLength = nextLength;
-            } else if (currentOffset + currentLength != nextOffset) {
-              childWriter.writeBatch(vec.child, currentOffset,
-                  currentLength);
-              currentOffset = nextOffset;
-              currentLength = nextLength;
-            } else {
-              currentLength += nextLength;
-            }
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                bloomFilter.addLong(nextLength);
-              }
-              bloomFilterUtf8.addLong(nextLength);
-            }
+  CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
+    // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
+    //       but at this point there's no close() for the stream.
+    CompressionCodec result = physicalWriter.getCompressionCodec();
+    if (result != null) {
+      switch (kind) {
+        case BLOOM_FILTER:
+        case DATA:
+        case DICTIONARY_DATA:
+        case BLOOM_FILTER_UTF8:
+          if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
+            result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
+                CompressionCodec.Modifier.TEXT));
+          } else {
+            result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+                CompressionCodec.Modifier.TEXT));
           }
-        }
-        if (currentLength != 0) {
-          childWriter.writeBatch(vec.child, currentOffset,
-              currentLength);
-        }
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      lengths.flush();
-      childWriter.writeStripe(builder, stats, requiredIndexEntries);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
+          break;
+        case LENGTH:
+        case DICTIONARY_COUNT:
+        case PRESENT:
+        case ROW_INDEX:
+        case SECONDARY:
+          // easily compressed using the fastest modes
+          result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
+              CompressionCodec.Modifier.BINARY));
+          break;
+        default:
+          LOG.info("Missing ORC compression modifiers for " + kind);
+          break;
       }
     }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      lengths.getPosition(recorder);
-    }
-
-    @Override
-    public void updateFileStatistics(OrcProto.StripeStatistics stats) {
-      super.updateFileStatistics(stats);
-      childWriter.updateFileStatistics(stats);
-    }
-
-    @Override
-    public long estimateMemory() {
-      return super.estimateMemory() + lengths.estimateMemory() +
-          childWriter.estimateMemory();
-    }
-
-    @Override
-    public long getRawDataSize() {
-      return childWriter.getRawDataSize();
-    }
-
-    @Override
-    public void writeFileStatistics(OrcProto.Footer.Builder footer) {
-      super.writeFileStatistics(footer);
-      childWriter.writeFileStatistics(footer);
-    }
+    return result;
   }
 
-  private static class MapTreeWriter extends TreeWriterBase {
-    private final IntegerWriter lengths;
-    private final boolean isDirectV2;
-    private final TreeWriter keyWriter;
-    private final TreeWriter valueWriter;
-
-    MapTreeWriter(int columnId,
-                  TypeDescription schema,
-                  StreamFactory writer,
-                  boolean nullable) throws IOException {
-      super(columnId, schema, writer, nullable);
-      this.isDirectV2 = isNewWriteFormat(writer);
-      List<TypeDescription> children = schema.getChildren();
-      keyWriter = createTreeWriter(children.get(0), writer, true);
-      valueWriter = createTreeWriter(children.get(1), writer, true);
-      lengths = createIntegerWriter(writer.createStream(columnId,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    OrcProto.ColumnEncoding.Builder getEncoding() {
-      OrcProto.ColumnEncoding.Builder result = super.getEncoding();
-      if (isDirectV2) {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
-      } else {
-        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
-      }
-      return result;
-    }
-
-    @Override
-    public void createRowIndexEntry() throws IOException {
-      super.createRowIndexEntry();
-      keyWriter.createRowIndexEntry();
-      valueWriter.createRowIndexEntry();
-    }
-
-    @Override
-    public void writeBatch(ColumnVector vector, int offset,
-                           int length) throws IOException {
-      super.writeBatch(vector, offset, length);
-      MapColumnVector vec = (MapColumnVector) vector;
-      if (vector.isRepeating) {
-        if (vector.noNulls || !vector.isNull[0]) {
-          int childOffset = (int) vec.offsets[0];
-          int childLength = (int) vec.lengths[0];
-          for(int i=0; i < length; ++i) {
-            lengths.write(childLength);
-            keyWriter.writeBatch(vec.keys, childOffset, childLength);
-            valueWriter.writeBatch(vec.values, childOffset, childLength);
-          }
-          if (createBloomFilter) {
-            if (bloomFilter != null) {
-              bloomFilter.addLong(childLength);
-            }
-            bloomFilterUtf8.addLong(childLength);
-          }
-        }
-      } else {
-        // write the elements in runs
-        int currentOffset = 0;
-        int currentLength = 0;
-        for(int i=0; i < length; ++i) {
-          if (!vec.isNull[i + offset]) {
-            int nextLength = (int) vec.lengths[offset + i];
-            int nextOffset = (int) vec.offsets[offset + i];
-            lengths.write(nextLength);
-            if (currentLength == 0) {
-              currentOffset = nextOffset;
-              currentLength = nextLength;
-            } else if (currentOffset + currentLength != nextOffset) {
-              keyWriter.writeBatch(vec.keys, currentOffset,
-                  currentLength);
-              valueWriter.writeBatch(vec.values, currentOffset,
-                  currentLength);
-              currentOffset = nextOffset;
-              currentLength = nextLength;
-            } else {
-              currentLength += nextLength;
-            }
-            if (createBloomFilter) {
-              if (bloomFilter != null) {
-                bloomFilter.addLong(nextLength);
-              }
-              bloomFilterUtf8.addLong(nextLength);
-            }
-          }
-        }
-        if (currentLength != 0) {
-          keyWriter.writeBatch(vec.keys, currentOffset,
-              currentLength);
-          valueWriter.writeBatch(vec.values, currentOffset,
-              currentLength);
-        }
-      }
-    }
-
-    @Override
-    public void writeStripe(OrcProto.StripeFooter.Builder builder,
-                            OrcProto.StripeStatistics.Builder stats,
-                            int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, stats, requiredIndexEntries);
-      lengths.flush();
-      keyWriter.writeStripe(builder, stats, requiredIndexEntries);
-      valueWriter.writeStripe(builder, stats, requiredIndexEntries);
-      if (rowIndexPosition != null) {
-        recordPosition(rowIndexPosition);
-      }
-    }
-
-    @Override
-    void recordPosition(PositionRecorder recorder) throws IOException {
-      super.recordPosition(recorder);
-      lengths.getPosition(recorder);
-    }
-
-    @Override
-    public void updateFileStatistics(OrcProto.StripeStatistics stats) {
-      super.updateFileStatistics(stats);
-      keyWriter.updateFileStatistics(stats);
-      valueWriter.updateFileStatistics(stats);
-    }
+  /**
+   * Interface from the Writer to the TreeWriters. This limits the visibility
+   * that the TreeWriters have into the Writer.
+   */
+  private class StreamFactory implements WriterContext {
+    /**
+     * Create a stream to store part of a column.
+     * @param column the column id for the stream
+     * @param kind the kind of stream
+     * @return The output outStream that the section needs to be written to.
+     */
+    public OutStream createStream(int column,
+                                  OrcProto.Stream.Kind kind
+                                  ) throws IOException {
+      final StreamName name = new StreamName(column, kind);
+      CompressionCodec codec = getCustomizedCodec(kind);
 
-    @Ov

<TRUNCATED>