You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/06/16 18:29:59 UTC

[2/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java
new file mode 100644
index 0000000..5835b5a
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.BinaryColumnStatistics;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+
+import java.io.IOException;
+
+public class BinaryTreeWriter extends TreeWriterBase {
+  private final PositionedOutputStream stream;
+  private final IntegerWriter length;
+  private boolean isDirectV2 = true;
+
+  public BinaryTreeWriter(int columnId,
+                          TypeDescription schema,
+                          WriterContext writer,
+                          boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.stream = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    this.length = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    BytesColumnVector vec = (BytesColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        for (int i = 0; i < length; ++i) {
+          stream.write(vec.vector[0], vec.start[0],
+              vec.length[0]);
+          this.length.write(vec.length[0]);
+        }
+        indexStatistics.updateBinary(vec.vector[0], vec.start[0],
+            vec.length[0], length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+          }
+          bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          stream.write(vec.vector[offset + i],
+              vec.start[offset + i], vec.length[offset + i]);
+          this.length.write(vec.length[offset + i]);
+          indexStatistics.updateBinary(vec.vector[offset + i],
+              vec.start[offset + i], vec.length[offset + i], 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addBytes(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i]);
+            }
+            bloomFilterUtf8.addBytes(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+          }
+        }
+      }
+    }
+  }
+
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    stream.flush();
+    length.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    stream.getPosition(recorder);
+    length.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + stream.getBufferSize() +
+        length.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    // get total length of binary blob
+    BinaryColumnStatistics bcs = (BinaryColumnStatistics) fileStatistics;
+    return bcs.getSum();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java
new file mode 100644
index 0000000..5f572bd
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.BitFieldWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+
+import java.io.IOException;
+
+public class BooleanTreeWriter extends TreeWriterBase {
+  private final BitFieldWriter writer;
+
+  public BooleanTreeWriter(int columnId,
+                           TypeDescription schema,
+                           WriterContext writer,
+                           boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    PositionedOutputStream out = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.writer = new BitFieldWriter(out, 1);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    LongColumnVector vec = (LongColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int value = vec.vector[0] == 0 ? 0 : 1;
+        indexStatistics.updateBoolean(value != 0, length);
+        for (int i = 0; i < length; ++i) {
+          writer.write(value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          int value = vec.vector[i + offset] == 0 ? 0 : 1;
+          writer.write(value);
+          indexStatistics.updateBoolean(value != 0, 1);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    writer.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    writer.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + writer.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long num = fileStatistics.getNumberOfValues();
+    return num * JavaDataModel.get().primitive1();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java
new file mode 100644
index 0000000..edd6411
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.RunLengthByteWriter;
+
+import java.io.IOException;
+
+public class ByteTreeWriter extends TreeWriterBase {
+  private final RunLengthByteWriter writer;
+
+  public ByteTreeWriter(int columnId,
+                        TypeDescription schema,
+                        WriterContext writer,
+                        boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.writer = new RunLengthByteWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.DATA));
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    LongColumnVector vec = (LongColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        byte value = (byte) vec.vector[0];
+        indexStatistics.updateInteger(value, length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(value);
+          }
+          bloomFilterUtf8.addLong(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          writer.write(value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          byte value = (byte) vec.vector[i + offset];
+          writer.write(value);
+          indexStatistics.updateInteger(value, 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(value);
+            }
+            bloomFilterUtf8.addLong(value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    writer.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    writer.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + writer.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long num = fileStatistics.getNumberOfValues();
+    return num * JavaDataModel.get().primitive1();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java
new file mode 100644
index 0000000..92a6bab
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.TypeDescription;import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Under the covers, char is written to ORC the same way as string.
+ */
+public class CharTreeWriter extends StringBaseTreeWriter {
+  private final int itemLength;
+  private final byte[] padding;
+
+  CharTreeWriter(int columnId,
+                 TypeDescription schema,
+                 WriterContext writer,
+                 boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    itemLength = schema.getMaxLength();
+    padding = new byte[itemLength];
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    BytesColumnVector vec = (BytesColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        byte[] ptr;
+        int ptrOffset;
+        if (vec.length[0] >= itemLength) {
+          ptr = vec.vector[0];
+          ptrOffset = vec.start[0];
+        } else {
+          ptr = padding;
+          ptrOffset = 0;
+          System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
+              vec.length[0]);
+          Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
+        }
+        if (useDictionaryEncoding) {
+          int id = dictionary.add(ptr, ptrOffset, itemLength);
+          for(int i=0; i < length; ++i) {
+            rows.add(id);
+          }
+        } else {
+          for(int i=0; i < length; ++i) {
+            directStreamOutput.write(ptr, ptrOffset, itemLength);
+            lengthOutput.write(itemLength);
+          }
+        }
+        indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            // translate from UTF-8 to the default charset
+            bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+                vec.length[0], StandardCharsets.UTF_8));
+          }
+          bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+        }
+      }
+    } else {
+      for(int i=0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          byte[] ptr;
+          int ptrOffset;
+          if (vec.length[offset + i] >= itemLength) {
+            ptr = vec.vector[offset + i];
+            ptrOffset = vec.start[offset + i];
+          } else {
+            // it is the wrong length, so copy it
+            ptr = padding;
+            ptrOffset = 0;
+            System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
+                ptr, 0, vec.length[offset + i]);
+            Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
+          }
+          if (useDictionaryEncoding) {
+            rows.add(dictionary.add(ptr, ptrOffset, itemLength));
+          } else {
+            directStreamOutput.write(ptr, ptrOffset, itemLength);
+            lengthOutput.write(itemLength);
+          }
+          indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              // translate from UTF-8 to the default charset
+              bloomFilter.addString(new String(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i],
+                  StandardCharsets.UTF_8));
+            }
+            bloomFilterUtf8.addBytes(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java
new file mode 100644
index 0000000..d15fb13
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+
+import java.io.IOException;
+
+public class DateTreeWriter extends TreeWriterBase {
+  private final IntegerWriter writer;
+  private final boolean isDirectV2;
+
+  public DateTreeWriter(int columnId,
+                        TypeDescription schema,
+                        WriterContext writer,
+                        boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    OutStream out = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    LongColumnVector vec = (LongColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int value = (int) vec.vector[0];
+        indexStatistics.updateDate(value);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(value);
+          }
+          bloomFilterUtf8.addLong(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          writer.write(value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          int value = (int) vec.vector[i + offset];
+          writer.write(value);
+          indexStatistics.updateDate(value);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(value);
+            }
+            bloomFilterUtf8.addLong(value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    writer.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    writer.getPosition(recorder);
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + writer.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return fileStatistics.getNumberOfValues() *
+        JavaDataModel.get().lengthOfDate();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java
new file mode 100644
index 0000000..0428253
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+
+import java.io.IOException;
+
+public class DecimalTreeWriter extends TreeWriterBase {
+  private final PositionedOutputStream valueStream;
+
+  // These scratch buffers allow us to serialize decimals much faster.
+  private final long[] scratchLongs;
+  private final byte[] scratchBuffer;
+
+  private final IntegerWriter scaleStream;
+  private final boolean isDirectV2;
+
+  public DecimalTreeWriter(int columnId,
+                           TypeDescription schema,
+                           WriterContext writer,
+                           boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+    scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN];
+    scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+    this.scaleStream = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    DecimalColumnVector vec = (DecimalColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        HiveDecimalWritable value = vec.vector[0];
+        indexStatistics.updateDecimal(value);
+        if (createBloomFilter) {
+          String str = value.toString(scratchBuffer);
+          if (bloomFilter != null) {
+            bloomFilter.addString(str);
+          }
+          bloomFilterUtf8.addString(str);
+        }
+        for (int i = 0; i < length; ++i) {
+          value.serializationUtilsWrite(valueStream,
+              scratchLongs);
+          scaleStream.write(value.scale());
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          HiveDecimalWritable value = vec.vector[i + offset];
+          value.serializationUtilsWrite(valueStream, scratchLongs);
+          scaleStream.write(value.scale());
+          indexStatistics.updateDecimal(value);
+          if (createBloomFilter) {
+            String str = value.toString(scratchBuffer);
+            if (bloomFilter != null) {
+              bloomFilter.addString(str);
+            }
+            bloomFilterUtf8.addString(str);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    valueStream.flush();
+    scaleStream.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    valueStream.getPosition(recorder);
+    scaleStream.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + valueStream.getBufferSize() +
+        scaleStream.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return fileStatistics.getNumberOfValues() *
+        JavaDataModel.get().lengthOfDecimal();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java
new file mode 100644
index 0000000..d2c0db2
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.SerializationUtils;
+
+import java.io.IOException;
+
+public class DoubleTreeWriter extends TreeWriterBase {
+  private final PositionedOutputStream stream;
+  private final SerializationUtils utils;
+
+  public DoubleTreeWriter(int columnId,
+                          TypeDescription schema,
+                          WriterContext writer,
+                          boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.stream = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.utils = new SerializationUtils();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    DoubleColumnVector vec = (DoubleColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        double value = vec.vector[0];
+        indexStatistics.updateDouble(value);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addDouble(value);
+          }
+          bloomFilterUtf8.addDouble(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          utils.writeDouble(stream, value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          double value = vec.vector[i + offset];
+          utils.writeDouble(stream, value);
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addDouble(value);
+            }
+            bloomFilterUtf8.addDouble(value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    stream.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    stream.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + stream.getBufferSize();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long num = fileStatistics.getNumberOfValues();
+    return num * JavaDataModel.get().primitive2();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java
new file mode 100644
index 0000000..c825bf1
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.SerializationUtils;
+
+import java.io.IOException;
+
+public class FloatTreeWriter extends TreeWriterBase {
+  private final PositionedOutputStream stream;
+  private final SerializationUtils utils;
+
+  public FloatTreeWriter(int columnId,
+                         TypeDescription schema,
+                         WriterContext writer,
+                         boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.stream = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.utils = new SerializationUtils();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    DoubleColumnVector vec = (DoubleColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        float value = (float) vec.vector[0];
+        indexStatistics.updateDouble(value);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addDouble(value);
+          }
+          bloomFilterUtf8.addDouble(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          utils.writeFloat(stream, value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          float value = (float) vec.vector[i + offset];
+          utils.writeFloat(stream, value);
+          indexStatistics.updateDouble(value);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addDouble(value);
+            }
+            bloomFilterUtf8.addDouble(value);
+          }
+        }
+      }
+    }
+  }
+
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    stream.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    stream.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + stream.getBufferSize();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long num = fileStatistics.getNumberOfValues();
+    return num * JavaDataModel.get().primitive1();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java
new file mode 100644
index 0000000..6036ef5
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+
+import java.io.IOException;
+
+public class IntegerTreeWriter extends TreeWriterBase {
+  private final IntegerWriter writer;
+  private boolean isDirectV2 = true;
+  private final boolean isLong;
+
+  public IntegerTreeWriter(int columnId,
+                           TypeDescription schema,
+                           WriterContext writer,
+                           boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    OutStream out = writer.createStream(id,
+        OrcProto.Stream.Kind.DATA);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+    this.isLong = schema.getCategory() == TypeDescription.Category.LONG;
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    LongColumnVector vec = (LongColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        long value = vec.vector[0];
+        indexStatistics.updateInteger(value, length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(value);
+          }
+          bloomFilterUtf8.addLong(value);
+        }
+        for (int i = 0; i < length; ++i) {
+          writer.write(value);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          long value = vec.vector[i + offset];
+          writer.write(value);
+          indexStatistics.updateInteger(value, 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(value);
+            }
+            bloomFilterUtf8.addLong(value);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    writer.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    writer.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + writer.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long num = fileStatistics.getNumberOfValues();
+    return num * (isLong ? jdm.primitive2() : jdm.primitive1());
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java
new file mode 100644
index 0000000..2c5bd50
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+
+public class ListTreeWriter extends TreeWriterBase {
+  private final IntegerWriter lengths;
+  private final boolean isDirectV2;
+  private final TreeWriter childWriter;
+
+  ListTreeWriter(int columnId,
+                 TypeDescription schema,
+                 WriterContext writer,
+                 boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    childWriter = Factory.create(schema.getChildren().get(0), writer, true);
+    lengths = createIntegerWriter(writer.createStream(columnId,
+        OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    super.createRowIndexEntry();
+    childWriter.createRowIndexEntry();
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    ListColumnVector vec = (ListColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int childOffset = (int) vec.offsets[0];
+        int childLength = (int) vec.lengths[0];
+        for (int i = 0; i < length; ++i) {
+          lengths.write(childLength);
+          childWriter.writeBatch(vec.child, childOffset, childLength);
+        }
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(childLength);
+          }
+          bloomFilterUtf8.addLong(childLength);
+        }
+      }
+    } else {
+      // write the elements in runs
+      int currentOffset = 0;
+      int currentLength = 0;
+      for (int i = 0; i < length; ++i) {
+        if (!vec.isNull[i + offset]) {
+          int nextLength = (int) vec.lengths[offset + i];
+          int nextOffset = (int) vec.offsets[offset + i];
+          lengths.write(nextLength);
+          if (currentLength == 0) {
+            currentOffset = nextOffset;
+            currentLength = nextLength;
+          } else if (currentOffset + currentLength != nextOffset) {
+            childWriter.writeBatch(vec.child, currentOffset,
+                currentLength);
+            currentOffset = nextOffset;
+            currentLength = nextLength;
+          } else {
+            currentLength += nextLength;
+          }
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(nextLength);
+            }
+            bloomFilterUtf8.addLong(nextLength);
+          }
+        }
+      }
+      if (currentLength != 0) {
+        childWriter.writeBatch(vec.child, currentOffset,
+            currentLength);
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    lengths.flush();
+    childWriter.writeStripe(builder, stats, requiredIndexEntries);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    lengths.getPosition(recorder);
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    super.updateFileStatistics(stats);
+    childWriter.updateFileStatistics(stats);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + lengths.estimateMemory() +
+        childWriter.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return childWriter.getRawDataSize();
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    super.writeFileStatistics(footer);
+    childWriter.writeFileStatistics(footer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java
new file mode 100644
index 0000000..26ace05
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class MapTreeWriter extends TreeWriterBase {
+  private final IntegerWriter lengths;
+  private final boolean isDirectV2;
+  private final TreeWriter keyWriter;
+  private final TreeWriter valueWriter;
+
+  MapTreeWriter(int columnId,
+                TypeDescription schema,
+                WriterContext writer,
+                boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    List<TypeDescription> children = schema.getChildren();
+    keyWriter = Factory.create(children.get(0), writer, true);
+    valueWriter = Factory.create(children.get(1), writer, true);
+    lengths = createIntegerWriter(writer.createStream(columnId,
+        OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    super.createRowIndexEntry();
+    keyWriter.createRowIndexEntry();
+    valueWriter.createRowIndexEntry();
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    MapColumnVector vec = (MapColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        int childOffset = (int) vec.offsets[0];
+        int childLength = (int) vec.lengths[0];
+        for (int i = 0; i < length; ++i) {
+          lengths.write(childLength);
+          keyWriter.writeBatch(vec.keys, childOffset, childLength);
+          valueWriter.writeBatch(vec.values, childOffset, childLength);
+        }
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(childLength);
+          }
+          bloomFilterUtf8.addLong(childLength);
+        }
+      }
+    } else {
+      // write the elements in runs
+      int currentOffset = 0;
+      int currentLength = 0;
+      for (int i = 0; i < length; ++i) {
+        if (!vec.isNull[i + offset]) {
+          int nextLength = (int) vec.lengths[offset + i];
+          int nextOffset = (int) vec.offsets[offset + i];
+          lengths.write(nextLength);
+          if (currentLength == 0) {
+            currentOffset = nextOffset;
+            currentLength = nextLength;
+          } else if (currentOffset + currentLength != nextOffset) {
+            keyWriter.writeBatch(vec.keys, currentOffset,
+                currentLength);
+            valueWriter.writeBatch(vec.values, currentOffset,
+                currentLength);
+            currentOffset = nextOffset;
+            currentLength = nextLength;
+          } else {
+            currentLength += nextLength;
+          }
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(nextLength);
+            }
+            bloomFilterUtf8.addLong(nextLength);
+          }
+        }
+      }
+      if (currentLength != 0) {
+        keyWriter.writeBatch(vec.keys, currentOffset,
+            currentLength);
+        valueWriter.writeBatch(vec.values, currentOffset,
+            currentLength);
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    lengths.flush();
+    keyWriter.writeStripe(builder, stats, requiredIndexEntries);
+    valueWriter.writeStripe(builder, stats, requiredIndexEntries);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    lengths.getPosition(recorder);
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    super.updateFileStatistics(stats);
+    keyWriter.updateFileStatistics(stats);
+    valueWriter.updateFileStatistics(stats);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + lengths.estimateMemory() +
+        keyWriter.estimateMemory() + valueWriter.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return keyWriter.getRawDataSize() + valueWriter.getRawDataSize();
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    super.writeFileStatistics(footer);
+    keyWriter.writeFileStatistics(footer);
+    valueWriter.writeFileStatistics(footer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
new file mode 100644
index 0000000..f49cb7f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.DynamicIntArray;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.StringRedBlackTree;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class StringBaseTreeWriter extends TreeWriterBase {
+  private static final int INITIAL_DICTIONARY_SIZE = 4096;
+  private final OutStream stringOutput;
+  protected final IntegerWriter lengthOutput;
+  private final IntegerWriter rowOutput;
+  protected final StringRedBlackTree dictionary =
+      new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
+  protected final DynamicIntArray rows = new DynamicIntArray();
+  protected final PositionedOutputStream directStreamOutput;
+  private final List<OrcProto.RowIndexEntry> savedRowIndex =
+      new ArrayList<>();
+  private final boolean buildIndex;
+  private final List<Long> rowIndexValueCount = new ArrayList<>();
+  // If the number of keys in a dictionary is greater than this fraction of
+  //the total number of non-null rows, turn off dictionary encoding
+  private final double dictionaryKeySizeThreshold;
+  protected boolean useDictionaryEncoding = true;
+  private boolean isDirectV2 = true;
+  private boolean doneDictionaryCheck;
+  private final boolean strideDictionaryCheck;
+
+  StringBaseTreeWriter(int columnId,
+                       TypeDescription schema,
+                       WriterContext writer,
+                       boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+    stringOutput = writer.createStream(id,
+        OrcProto.Stream.Kind.DICTIONARY_DATA);
+    lengthOutput = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+    rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2,
+        writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+    rowIndexValueCount.add(0L);
+    buildIndex = writer.buildIndex();
+    Configuration conf = writer.getConfiguration();
+    dictionaryKeySizeThreshold =
+        OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+    strideDictionaryCheck =
+        OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
+    doneDictionaryCheck = false;
+  }
+
+  private void checkDictionaryEncoding() {
+    if (!doneDictionaryCheck) {
+      // Set the flag indicating whether or not to use dictionary encoding
+      // based on whether or not the fraction of distinct keys over number of
+      // non-null rows is less than the configured threshold
+      float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
+      useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
+      doneDictionaryCheck = true;
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
+    // checking would not have happened. So do it again here.
+    checkDictionaryEncoding();
+
+    if (useDictionaryEncoding) {
+      flushDictionary();
+    } else {
+      // flushout any left over entries from dictionary
+      if (rows.size() > 0) {
+        flushDictionary();
+      }
+
+      // suppress the stream for every stripe if dictionary is disabled
+      stringOutput.suppress();
+    }
+
+    // we need to build the rowindex before calling super, since it
+    // writes it out.
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    if (useDictionaryEncoding) {
+      stringOutput.flush();
+      lengthOutput.flush();
+      rowOutput.flush();
+    } else {
+      directStreamOutput.flush();
+      lengthOutput.flush();
+    }
+    // reset all of the fields to be ready for the next stripe.
+    dictionary.clear();
+    savedRowIndex.clear();
+    rowIndexValueCount.clear();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+    rowIndexValueCount.add(0L);
+
+    if (!useDictionaryEncoding) {
+      // record the start positions of first index stride of next stripe i.e
+      // beginning of the direct streams when dictionary is disabled
+      recordDirectStreamPosition();
+    }
+  }
+
+  private void flushDictionary() throws IOException {
+    final int[] dumpOrder = new int[dictionary.size()];
+
+    if (useDictionaryEncoding) {
+      // Write the dictionary by traversing the red-black tree writing out
+      // the bytes and lengths; and creating the map from the original order
+      // to the final sorted order.
+
+      dictionary.visit(new StringRedBlackTree.Visitor() {
+        private int currentId = 0;
+
+        @Override
+        public void visit(StringRedBlackTree.VisitorContext context
+        ) throws IOException {
+          context.writeBytes(stringOutput);
+          lengthOutput.write(context.getLength());
+          dumpOrder[context.getOriginalPosition()] = currentId++;
+        }
+      });
+    } else {
+      // for direct encoding, we don't want the dictionary data stream
+      stringOutput.suppress();
+    }
+    int length = rows.size();
+    int rowIndexEntry = 0;
+    OrcProto.RowIndex.Builder rowIndex = getRowIndex();
+    Text text = new Text();
+    // write the values translated into the dump order.
+    for (int i = 0; i <= length; ++i) {
+      // now that we are writing out the row values, we can finalize the
+      // row index
+      if (buildIndex) {
+        while (i == rowIndexValueCount.get(rowIndexEntry) &&
+            rowIndexEntry < savedRowIndex.size()) {
+          OrcProto.RowIndexEntry.Builder base =
+              savedRowIndex.get(rowIndexEntry++).toBuilder();
+          if (useDictionaryEncoding) {
+            rowOutput.getPosition(new RowIndexPositionRecorder(base));
+          } else {
+            PositionRecorder posn = new RowIndexPositionRecorder(base);
+            directStreamOutput.getPosition(posn);
+            lengthOutput.getPosition(posn);
+          }
+          rowIndex.addEntry(base.build());
+        }
+      }
+      if (i != length) {
+        if (useDictionaryEncoding) {
+          rowOutput.write(dumpOrder[rows.get(i)]);
+        } else {
+          dictionary.getText(text, rows.get(i));
+          directStreamOutput.write(text.getBytes(), 0, text.getLength());
+          lengthOutput.write(text.getLength());
+        }
+      }
+    }
+    rows.clear();
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (useDictionaryEncoding) {
+      result.setDictionarySize(dictionary.size());
+      if (isDirectV2) {
+        result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2);
+      } else {
+        result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY);
+      }
+    } else {
+      if (isDirectV2) {
+        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+      } else {
+        result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * This method doesn't call the super method, because unlike most of the
+   * other TreeWriters, this one can't record the position in the streams
+   * until the stripe is being flushed. Therefore it saves all of the entries
+   * and augments them with the final information as the stripe is written.
+   */
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    getStripeStatistics().merge(indexStatistics);
+    OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
+    rowIndexEntry.setStatistics(indexStatistics.serialize());
+    indexStatistics.reset();
+    OrcProto.RowIndexEntry base = rowIndexEntry.build();
+    savedRowIndex.add(base);
+    rowIndexEntry.clear();
+    addBloomFilterEntry();
+    recordPosition(rowIndexPosition);
+    rowIndexValueCount.add((long) rows.size());
+    if (strideDictionaryCheck) {
+      checkDictionaryEncoding();
+    }
+    if (!useDictionaryEncoding) {
+      if (rows.size() > 0) {
+        flushDictionary();
+        // just record the start positions of next index stride
+        recordDirectStreamPosition();
+      } else {
+        // record the start positions of next index stride
+        recordDirectStreamPosition();
+        getRowIndex().addEntry(base);
+      }
+    }
+  }
+
+  private void recordDirectStreamPosition() throws IOException {
+    if (rowIndexPosition != null) {
+      directStreamOutput.getPosition(rowIndexPosition);
+      lengthOutput.getPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public long estimateMemory() {
+    long parent = super.estimateMemory();
+    if (useDictionaryEncoding) {
+      return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes();
+    } else {
+      return parent + lengthOutput.estimateMemory() +
+          directStreamOutput.getBufferSize();
+    }
+  }
+
+  @Override
+  public long getRawDataSize() {
+    // ORC strings are converted to java Strings. so use JavaDataModel to
+    // compute the overall size of strings
+    StringColumnStatistics scs = (StringColumnStatistics) fileStatistics;
+    long numVals = fileStatistics.getNumberOfValues();
+    if (numVals == 0) {
+      return 0;
+    } else {
+      int avgSize = (int) (scs.getSum() / numVals);
+      return numVals * JavaDataModel.get().lengthForStringOfLength(avgSize);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java
new file mode 100644
index 0000000..ab6f38f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class StringTreeWriter extends StringBaseTreeWriter {
+  StringTreeWriter(int columnId,
+                   TypeDescription schema,
+                   WriterContext writer,
+                   boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    BytesColumnVector vec = (BytesColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        if (useDictionaryEncoding) {
+          int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
+          for (int i = 0; i < length; ++i) {
+            rows.add(id);
+          }
+        } else {
+          for (int i = 0; i < length; ++i) {
+            directStreamOutput.write(vec.vector[0], vec.start[0],
+                vec.length[0]);
+            lengthOutput.write(vec.length[0]);
+          }
+        }
+        indexStatistics.updateString(vec.vector[0], vec.start[0],
+            vec.length[0], length);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            // translate from UTF-8 to the default charset
+            bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+                vec.length[0], StandardCharsets.UTF_8));
+          }
+          bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          if (useDictionaryEncoding) {
+            rows.add(dictionary.add(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]));
+          } else {
+            directStreamOutput.write(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+            lengthOutput.write(vec.length[offset + i]);
+          }
+          indexStatistics.updateString(vec.vector[offset + i],
+              vec.start[offset + i], vec.length[offset + i], 1);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              // translate from UTF-8 to the default charset
+              bloomFilter.addString(new String(vec.vector[offset + i],
+                  vec.start[offset + i], vec.length[offset + i],
+                  StandardCharsets.UTF_8));
+            }
+            bloomFilterUtf8.addBytes(vec.vector[offset + i],
+                vec.start[offset + i], vec.length[offset + i]);
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java
new file mode 100644
index 0000000..9a1384d
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StructTreeWriter extends TreeWriterBase {
+  final TreeWriter[] childrenWriters;
+
+  public StructTreeWriter(int columnId,
+                          TypeDescription schema,
+                          WriterContext writer,
+                          boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    List<TypeDescription> children = schema.getChildren();
+    childrenWriters = new TreeWriterBase[children.size()];
+    for (int i = 0; i < childrenWriters.length; ++i) {
+      childrenWriters[i] = Factory.create(children.get(i), writer, true);
+    }
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void writeRootBatch(VectorizedRowBatch batch, int offset,
+                             int length) throws IOException {
+    // update the statistics for the root column
+    indexStatistics.increment(length);
+    // I'm assuming that the root column isn't nullable so that I don't need
+    // to update isPresent.
+    for (int i = 0; i < childrenWriters.length; ++i) {
+      childrenWriters[i].writeBatch(batch.cols[i], offset, length);
+    }
+  }
+
+  private static void writeFields(StructColumnVector vector,
+                                  TreeWriter[] childrenWriters,
+                                  int offset, int length) throws IOException {
+    for (int field = 0; field < childrenWriters.length; ++field) {
+      childrenWriters[field].writeBatch(vector.fields[field], offset, length);
+    }
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    StructColumnVector vec = (StructColumnVector) vector;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        writeFields(vec, childrenWriters, offset, length);
+      }
+    } else if (vector.noNulls) {
+      writeFields(vec, childrenWriters, offset, length);
+    } else {
+      // write the records in runs
+      int currentRun = 0;
+      boolean started = false;
+      for (int i = 0; i < length; ++i) {
+        if (!vec.isNull[i + offset]) {
+          if (!started) {
+            started = true;
+            currentRun = i;
+          }
+        } else if (started) {
+          started = false;
+          writeFields(vec, childrenWriters, offset + currentRun,
+              i - currentRun);
+        }
+      }
+      if (started) {
+        writeFields(vec, childrenWriters, offset + currentRun,
+            length - currentRun);
+      }
+    }
+  }
+
+  @Override
+  public void createRowIndexEntry() throws IOException {
+    super.createRowIndexEntry();
+    for (TreeWriter child : childrenWriters) {
+      child.createRowIndexEntry();
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    for (TreeWriter child : childrenWriters) {
+      child.writeStripe(builder, stats, requiredIndexEntries);
+    }
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  @Override
+  public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+    super.updateFileStatistics(stats);
+    for (TreeWriter child : childrenWriters) {
+      child.updateFileStatistics(stats);
+    }
+  }
+
+  @Override
+  public long estimateMemory() {
+    long result = 0;
+    for (TreeWriter writer : childrenWriters) {
+      result += writer.estimateMemory();
+    }
+    return super.estimateMemory() + result;
+  }
+
+  @Override
+  public long getRawDataSize() {
+    long result = 0;
+    for (TreeWriter writer : childrenWriters) {
+      result += writer.getRawDataSize();
+    }
+    return result;
+  }
+
+  @Override
+  public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+    super.writeFileStatistics(footer);
+    for (TreeWriter child : childrenWriters) {
+      child.writeFileStatistics(footer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
new file mode 100644
index 0000000..fae108e
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.SerializationUtils;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.TimeZone;
+
+public class TimestampTreeWriter extends TreeWriterBase {
+  public static final int MILLIS_PER_SECOND = 1000;
+  public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
+
+  private final IntegerWriter seconds;
+  private final IntegerWriter nanos;
+  private final boolean isDirectV2;
+  private final TimeZone localTimezone;
+  private final long baseEpochSecsLocalTz;
+
+  public TimestampTreeWriter(int columnId,
+                             TypeDescription schema,
+                             WriterContext writer,
+                             boolean nullable) throws IOException {
+    super(columnId, schema, writer, nullable);
+    this.isDirectV2 = isNewWriteFormat(writer);
+    this.seconds = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
+    this.nanos = createIntegerWriter(writer.createStream(id,
+        OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+    this.localTimezone = TimeZone.getDefault();
+    // for unit tests to set different time zones
+    this.baseEpochSecsLocalTz = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
+  }
+
+  @Override
+  OrcProto.ColumnEncoding.Builder getEncoding() {
+    OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+    if (isDirectV2) {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+    } else {
+      result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+    }
+    return result;
+  }
+
+  @Override
+  public void writeBatch(ColumnVector vector, int offset,
+                         int length) throws IOException {
+    super.writeBatch(vector, offset, length);
+    TimestampColumnVector vec = (TimestampColumnVector) vector;
+    Timestamp val;
+    if (vector.isRepeating) {
+      if (vector.noNulls || !vector.isNull[0]) {
+        val = vec.asScratchTimestamp(0);
+        long millis = val.getTime();
+        long utc = SerializationUtils.convertToUtc(localTimezone, millis);
+        indexStatistics.updateTimestamp(utc);
+        if (createBloomFilter) {
+          if (bloomFilter != null) {
+            bloomFilter.addLong(millis);
+          }
+          bloomFilterUtf8.addLong(utc);
+        }
+        final long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
+        final long nano = formatNanos(val.getNanos());
+        for (int i = 0; i < length; ++i) {
+          seconds.write(secs);
+          nanos.write(nano);
+        }
+      }
+    } else {
+      for (int i = 0; i < length; ++i) {
+        if (vec.noNulls || !vec.isNull[i + offset]) {
+          val = vec.asScratchTimestamp(i + offset);
+          long millis = val.getTime();
+          long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
+          long utc = SerializationUtils.convertToUtc(localTimezone, millis);
+          seconds.write(secs);
+          nanos.write(formatNanos(val.getNanos()));
+          indexStatistics.updateTimestamp(utc);
+          if (createBloomFilter) {
+            if (bloomFilter != null) {
+              bloomFilter.addLong(millis);
+            }
+            bloomFilterUtf8.addLong(utc);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                          OrcProto.StripeStatistics.Builder stats,
+                          int requiredIndexEntries) throws IOException {
+    super.writeStripe(builder, stats, requiredIndexEntries);
+    seconds.flush();
+    nanos.flush();
+    if (rowIndexPosition != null) {
+      recordPosition(rowIndexPosition);
+    }
+  }
+
+  private static long formatNanos(int nanos) {
+    if (nanos == 0) {
+      return 0;
+    } else if (nanos % 100 != 0) {
+      return ((long) nanos) << 3;
+    } else {
+      nanos /= 100;
+      int trailingZeros = 1;
+      while (nanos % 10 == 0 && trailingZeros < 7) {
+        nanos /= 10;
+        trailingZeros += 1;
+      }
+      return ((long) nanos) << 3 | trailingZeros;
+    }
+  }
+
+  @Override
+  void recordPosition(PositionRecorder recorder) throws IOException {
+    super.recordPosition(recorder);
+    seconds.getPosition(recorder);
+    nanos.getPosition(recorder);
+  }
+
+  @Override
+  public long estimateMemory() {
+    return super.estimateMemory() + seconds.estimateMemory() +
+        nanos.estimateMemory();
+  }
+
+  @Override
+  public long getRawDataSize() {
+    return fileStatistics.getNumberOfValues() *
+        JavaDataModel.get().lengthOfTimestamp();
+  }
+}