You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/05/18 15:20:42 UTC

orc git commit: ORC-193. Refactor TreeWriter API in WriterImpl.

Repository: orc
Updated Branches:
  refs/heads/master 3ec52c0da -> cea20ac5d


ORC-193. Refactor TreeWriter API in WriterImpl.

Fixes #124

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/cea20ac5
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/cea20ac5
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/cea20ac5

Branch: refs/heads/master
Commit: cea20ac5db4bb6ec42df20af55e1edb4e23e4b1b
Parents: 3ec52c0
Author: Owen O'Malley <om...@apache.org>
Authored: Wed May 17 13:52:12 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Thu May 18 08:19:49 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/orc/impl/WriterImpl.java    | 859 ++++++++++---------
 .../test/org/apache/orc/tools/TestFileDump.java |   4 +-
 2 files changed, 473 insertions(+), 390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/cea20ac5/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 32820e1..cfdddad 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -100,12 +100,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private static final int MIN_ROW_INDEX_STRIDE = 1000;
 
   private final Path path;
-  private final long defaultStripeSize;
   private long adjustedStripeSize;
   private final int rowIndexStride;
   private final CompressionKind compress;
   private int bufferSize;
-  private final long blockSize;
   private final TypeDescription schema;
   private final PhysicalWriter physicalWriter;
   private final OrcFile.WriterVersion writerVersion;
@@ -118,10 +116,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private long lastFlushOffset = 0;
   private int stripesAtLastFlush = -1;
   private final List<OrcProto.StripeInformation> stripes =
-    new ArrayList<OrcProto.StripeInformation>();
+    new ArrayList<>();
+  private final OrcProto.Metadata.Builder fileMetadata =
+      OrcProto.Metadata.newBuilder();
   private final Map<String, ByteString> userMetadata =
-    new TreeMap<String, ByteString>();
-  private final StreamFactory streamFactory = new StreamFactory();
+    new TreeMap<>();
   private final TreeWriter treeWriter;
   private final boolean buildIndex;
   private final MemoryManager memoryManager;
@@ -157,11 +156,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       callbackContext = null;
     }
     this.adjustedStripeSize = opts.getStripeSize();
-    this.defaultStripeSize = opts.getStripeSize();
     this.version = opts.getVersion();
     this.encodingStrategy = opts.getEncodingStrategy();
     this.compressionStrategy = opts.getCompressionStrategy();
-    this.blockSize = opts.getBlockSize();
     this.compress = opts.getCompress();
     this.rowIndexStride = opts.getRowIndexStride();
     this.memoryManager = opts.getMemoryManager();
@@ -170,7 +167,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     if (opts.isEnforceBufferSize()) {
       this.bufferSize = opts.getBufferSize();
     } else {
-      this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
+      this.bufferSize = getEstimatedBufferSize(adjustedStripeSize,
           numColumns, opts.getBufferSize());
     }
     if (version == OrcFile.Version.V_0_11) {
@@ -184,7 +181,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     this.physicalWriter = opts.getPhysicalWriter() == null ?
         new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter();
     physicalWriter.writeHeader();
-    treeWriter = createTreeWriter(schema, streamFactory, false);
+    treeWriter = createTreeWriter(schema, new StreamFactory(), false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
           MIN_ROW_INDEX_STRIDE);
@@ -193,7 +190,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     // ensure that we are able to handle callbacks before we register ourselves
     memoryManager.addWriter(path, opts.getStripeSize(), this);
     LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
-        " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
+        " compression: {} bufferSize: {}", path, adjustedStripeSize, opts.getBlockSize(),
         compress, bufferSize);
   }
 
@@ -268,7 +265,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
   @Override
   public boolean checkMemory(double newScale) throws IOException {
-    long limit = (long) Math.round(adjustedStripeSize * newScale);
+    long limit = Math.round(adjustedStripeSize * newScale);
     long size = treeWriter.estimateMemory();
     if (LOG.isDebugEnabled()) {
       LOG.debug("ORC writer " + physicalWriter + " size = " + size +
@@ -339,7 +336,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * @param column the column id for the stream
      * @param kind the kind of stream
      * @return The output outStream that the section needs to be written to.
-     * @throws IOException
      */
     public OutStream createStream(int column,
                                   OrcProto.Stream.Kind kind
@@ -391,14 +387,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     /**
-     * Get the compression strategy to use.
-     * @return compression strategy
-     */
-    public OrcFile.CompressionStrategy getCompressionStrategy() {
-      return compressionStrategy;
-    }
-
-    /**
      * Get the bloom filter columns
      * @return bloom filter columns
      */
@@ -455,19 +443,89 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   /**
+   * The writers for the specific writers of each type. This provides
+   * the generic API that they must all implement.
+   */
+  interface TreeWriter {
+
+    /**
+     * Estimate the memory currently used to buffer the stripe.
+     * @return the number of bytes
+     */
+    long estimateMemory();
+
+    /**
+     * Estimate the memory used if the file was read into Hive's Writable
+     * types. This is used as an estimate for the query optimizer.
+     * @return the number of bytes
+     */
+    long getRawDataSize();
+
+    /**
+     * Write a VectorizedRowBath to the file. This is called by the WriterImpl
+     * at the top level.
+     * @param batch the list of all of the columns
+     * @param offset the first row from the batch to write
+     * @param length the number of rows to write
+     */
+    void writeRootBatch(VectorizedRowBatch batch, int offset,
+                        int length) throws IOException;
+
+    /**
+     * Write a ColumnVector to the file. This is called recursively by
+     * writeRootBatch.
+     * @param vector the data to write
+     * @param offset the first value offset to write.
+     * @param length the number of values to write
+     */
+    void writeBatch(ColumnVector vector, int offset,
+                    int length) throws IOException;
+
+    /**
+     * Create a row index entry at the current point in the stripe.
+     */
+    void createRowIndexEntry() throws IOException;
+
+    /**
+     * Write the stripe out to the file.
+     * @param stripeFooter the stripe footer that contains the information about the
+     *                layout of the stripe. The TreeWriterBase is required to update
+     *                the footer with its information.
+     * @param stats the stripe statistics information
+     * @param requiredIndexEntries the number of index entries that are
+     *                             required. this is to check to make sure the
+     *                             row index is well formed.
+     */
+    void writeStripe(OrcProto.StripeFooter.Builder stripeFooter,
+                     OrcProto.StripeStatistics.Builder stats,
+                     int requiredIndexEntries) throws IOException;
+
+    /**
+     * During a stripe append, we need to update the file statistics.
+     * @param stripeStatistics the statistics for the new stripe
+     */
+    void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics);
+
+    /**
+     * Add the file statistics to the file footer.
+     * @param footer the file footer builder
+     */
+    void writeFileStatistics(OrcProto.Footer.Builder footer);
+  }
+
+  /**
    * The parent class of all of the writers for each column. Each column
    * is written by an instance of this class. The compound types (struct,
    * list, map, and union) have children tree writers that write the children
    * types.
    */
-  private abstract static class TreeWriter {
+  private abstract static class TreeWriterBase implements TreeWriter {
     protected final int id;
     protected final BitFieldWriter isPresent;
     private final boolean isCompressed;
     protected final ColumnStatisticsImpl indexStatistics;
     protected final ColumnStatisticsImpl stripeColStatistics;
-    private final ColumnStatisticsImpl fileStatistics;
-    protected TreeWriter[] childrenWriters;
+    protected final ColumnStatisticsImpl fileStatistics;
     protected final RowIndexPositionRecorder rowIndexPosition;
     private final OrcProto.RowIndex.Builder rowIndex;
     private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
@@ -479,7 +537,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     protected final OrcProto.BloomFilter.Builder bloomFilterEntry;
     private boolean foundNulls;
     private OutStream isPresentOutStream;
-    private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
     private final StreamFactory streamFactory;
 
     /**
@@ -488,12 +545,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * @param schema the row schema
      * @param streamFactory limited access to the Writer's data.
      * @param nullable can the value be null?
-     * @throws IOException
      */
-    TreeWriter(int columnId,
-               TypeDescription schema,
-               StreamFactory streamFactory,
-               boolean nullable) throws IOException {
+    TreeWriterBase(int columnId,
+                   TypeDescription schema,
+                   StreamFactory streamFactory,
+                   boolean nullable) throws IOException {
       this.streamFactory = streamFactory;
       this.isCompressed = streamFactory.isCompressed();
       this.id = columnId;
@@ -509,7 +565,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       indexStatistics = ColumnStatisticsImpl.create(schema);
       stripeColStatistics = ColumnStatisticsImpl.create(schema);
       fileStatistics = ColumnStatisticsImpl.create(schema);
-      childrenWriters = new TreeWriter[0];
       if (streamFactory.buildIndex()) {
         rowIndex = OrcProto.RowIndex.newBuilder();
         rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
@@ -519,7 +574,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         rowIndexEntry = null;
         rowIndexPosition = null;
       }
-      stripeStatsBuilders = new ArrayList<>();
       if (createBloomFilter) {
         bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
         if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
@@ -581,10 +635,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * @param batch the batch to write from
      * @param offset the row to start on
      * @param length the number of rows to write
-     * @throws IOException
      */
-    void writeRootBatch(VectorizedRowBatch batch, int offset,
-                        int length) throws IOException {
+    public void writeRootBatch(VectorizedRowBatch batch, int offset,
+                               int length) throws IOException {
       writeBatch(batch.cols[0], offset, length);
     }
 
@@ -593,10 +646,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * @param vector the vector to write from
      * @param offset the first value from the vector to write
      * @param length the number of values from the vector to write
-     * @throws IOException
      */
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    @Override
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       if (vector.noNulls) {
         indexStatistics.increment(length);
         if (isPresent != null) {
@@ -650,18 +703,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       }
     }
 
-    /**
-     * Write the stripe out to the file.
-     * @param builder the stripe footer that contains the information about the
-     *                layout of the stripe. The TreeWriter is required to update
-     *                the footer with its information.
-     * @param requiredIndexEntries the number of index entries that are
-     *                             required. this is to check to make sure the
-     *                             row index is well formed.
-     * @throws IOException
-     */
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
       if (isPresent != null) {
         isPresent.flush();
 
@@ -678,9 +722,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
       // merge stripe-level column statistics to file statistics and write it to
       // stripe statistics
-      OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
-      writeStripeStatistics(stripeStatsBuilder, this);
-      stripeStatsBuilders.add(stripeStatsBuilder);
+      fileStatistics.merge(stripeColStatistics);
+      stats.addColStats(stripeColStatistics.serialize());
+      stripeColStatistics.reset();
 
       // reset the flag for next stripe
       foundNulls = false;
@@ -714,20 +758,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       }
     }
 
-    private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
-        TreeWriter treeWriter) {
-      treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
-      builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
-      treeWriter.stripeColStatistics.reset();
-      for (TreeWriter child : treeWriter.getChildrenWriters()) {
-        writeStripeStatistics(builder, child);
-      }
-    }
-
-    TreeWriter[] getChildrenWriters() {
-      return childrenWriters;
-    }
-
     /**
      * Get the encoding for this column.
      * @return the information about the encoding of this column
@@ -747,9 +777,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * index statistics. Also merges the index statistics into the file
      * statistics before they are cleared. Finally, it records the start of the
      * next index and ensures all of the children columns also create an entry.
-     * @throws IOException
      */
-    void createRowIndexEntry() throws IOException {
+    public void createRowIndexEntry() throws IOException {
       stripeColStatistics.merge(indexStatistics);
       rowIndexEntry.setStatistics(indexStatistics.serialize());
       indexStatistics.reset();
@@ -757,9 +786,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       rowIndexEntry.clear();
       addBloomFilterEntry();
       recordPosition(rowIndexPosition);
-      for(TreeWriter child: childrenWriters) {
-        child.createRowIndexEntry();
-      }
     }
 
     void addBloomFilterEntry() {
@@ -777,10 +803,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       }
     }
 
+    @Override
+    public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+      fileStatistics.merge(ColumnStatisticsImpl.deserialize(stats.getColStats(id)));
+    }
+
     /**
      * Record the current position in each of this column's streams.
      * @param recorder where should the locations be recorded
-     * @throws IOException
      */
     void recordPosition(PositionRecorder recorder) throws IOException {
       if (isPresent != null) {
@@ -792,19 +822,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * Estimate how much memory the writer is consuming excluding the streams.
      * @return the number of bytes.
      */
-    long estimateMemory() {
+    public long estimateMemory() {
       long result = 0;
       if (isPresent != null) {
         result = isPresentOutStream.getBufferSize();
       }
-      for (TreeWriter child: childrenWriters) {
-        result += child.estimateMemory();
-      }
       return result;
     }
+
+    @Override
+    public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+      footer.addStatistics(fileStatistics.serialize());
+    }
   }
 
-  private static class BooleanTreeWriter extends TreeWriter {
+  private static class BooleanTreeWriter extends TreeWriterBase {
     private final BitFieldWriter writer;
 
     BooleanTreeWriter(int columnId,
@@ -821,8 +853,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       LongColumnVector vec = (LongColumnVector) vector;
       if (vector.isRepeating) {
@@ -845,9 +877,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       writer.flush();
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
@@ -861,12 +894,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       return super.estimateMemory() + writer.estimateMemory();
     }
+
+    @Override
+    public long getRawDataSize() {
+      long num = fileStatistics.getNumberOfValues();
+      return num * JavaDataModel.get().primitive1();
+    }
   }
 
-  private static class ByteTreeWriter extends TreeWriter {
+  private static class ByteTreeWriter extends TreeWriterBase {
     private final RunLengthByteWriter writer;
 
     ByteTreeWriter(int columnId,
@@ -882,8 +921,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       LongColumnVector vec = (LongColumnVector) vector;
       if (vector.isRepeating) {
@@ -918,9 +957,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       writer.flush();
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
@@ -934,14 +974,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       return super.estimateMemory() + writer.estimateMemory();
     }
+
+    @Override
+    public long getRawDataSize() {
+      long num = fileStatistics.getNumberOfValues();
+      return num * JavaDataModel.get().primitive1();
+    }
   }
 
-  private static class IntegerTreeWriter extends TreeWriter {
+  private static class IntegerTreeWriter extends TreeWriterBase {
     private final IntegerWriter writer;
     private boolean isDirectV2 = true;
+    private final boolean isLong;
 
     IntegerTreeWriter(int columnId,
                       TypeDescription schema,
@@ -955,6 +1002,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
       }
+      this.isLong = schema.getCategory() == TypeDescription.Category.LONG;
     }
 
     @Override
@@ -969,8 +1017,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       LongColumnVector vec = (LongColumnVector) vector;
       if (vector.isRepeating) {
@@ -1005,9 +1053,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       writer.flush();
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
@@ -1021,12 +1070,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       return super.estimateMemory() + writer.estimateMemory();
     }
+
+    @Override
+    public long getRawDataSize() {
+      JavaDataModel jdm = JavaDataModel.get();
+      long num = fileStatistics.getNumberOfValues();
+      return num * (isLong ? jdm.primitive2() : jdm.primitive1());
+    }
   }
 
-  private static class FloatTreeWriter extends TreeWriter {
+  private static class FloatTreeWriter extends TreeWriterBase {
     private final PositionedOutputStream stream;
     private final SerializationUtils utils;
 
@@ -1044,8 +1100,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       DoubleColumnVector vec = (DoubleColumnVector) vector;
       if (vector.isRepeating) {
@@ -1081,9 +1137,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       stream.flush();
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
@@ -1097,12 +1154,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       return super.estimateMemory() + stream.getBufferSize();
     }
+
+    @Override
+    public long getRawDataSize() {
+      long num = fileStatistics.getNumberOfValues();
+      return num * JavaDataModel.get().primitive1();
+    }
   }
 
-  private static class DoubleTreeWriter extends TreeWriter {
+  private static class DoubleTreeWriter extends TreeWriterBase {
     private final PositionedOutputStream stream;
     private final SerializationUtils utils;
 
@@ -1120,8 +1183,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       DoubleColumnVector vec = (DoubleColumnVector) vector;
       if (vector.isRepeating) {
@@ -1156,9 +1219,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       stream.flush();
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
@@ -1172,12 +1236,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       return super.estimateMemory() + stream.getBufferSize();
     }
+
+    @Override
+    public long getRawDataSize() {
+      long num = fileStatistics.getNumberOfValues();
+      return num * JavaDataModel.get().primitive2();
+    }
   }
 
-  private static abstract class StringBaseTreeWriter extends TreeWriter {
+  private static abstract class StringBaseTreeWriter extends TreeWriterBase {
     private static final int INITIAL_DICTIONARY_SIZE = 4096;
     private final OutStream stringOutput;
     protected final IntegerWriter lengthOutput;
@@ -1187,9 +1257,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     protected final DynamicIntArray rows = new DynamicIntArray();
     protected final PositionedOutputStream directStreamOutput;
     private final List<OrcProto.RowIndexEntry> savedRowIndex =
-        new ArrayList<OrcProto.RowIndexEntry>();
+        new ArrayList<>();
     private final boolean buildIndex;
-    private final List<Long> rowIndexValueCount = new ArrayList<Long>();
+    private final List<Long> rowIndexValueCount = new ArrayList<>();
     // If the number of keys in a dictionary is greater than this fraction of
     //the total number of non-null rows, turn off dictionary encoding
     private final double dictionaryKeySizeThreshold;
@@ -1224,7 +1294,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       doneDictionaryCheck = false;
     }
 
-    private boolean checkDictionaryEncoding() {
+    private void checkDictionaryEncoding() {
       if (!doneDictionaryCheck) {
         // Set the flag indicating whether or not to use dictionary encoding
         // based on whether or not the fraction of distinct keys over number of
@@ -1233,12 +1303,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
         doneDictionaryCheck = true;
       }
-      return useDictionaryEncoding;
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
       // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
       // checking would not have happened. So do it again here.
       checkDictionaryEncoding();
@@ -1257,7 +1327,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
       // we need to build the rowindex before calling super, since it
       // writes it out.
-      super.writeStripe(builder, requiredIndexEntries);
+      super.writeStripe(builder, stats, requiredIndexEntries);
       if (useDictionaryEncoding) {
         stringOutput.flush();
         lengthOutput.flush();
@@ -1365,10 +1435,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * other TreeWriters, this one can't record the position in the streams
      * until the stripe is being flushed. Therefore it saves all of the entries
      * and augments them with the final information as the stripe is written.
-     * @throws IOException
      */
     @Override
-    void createRowIndexEntry() throws IOException {
+    public void createRowIndexEntry() throws IOException {
       getStripeStatistics().merge(indexStatistics);
       OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
       rowIndexEntry.setStatistics(indexStatistics.serialize());
@@ -1378,7 +1447,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       rowIndexEntry.clear();
       addBloomFilterEntry();
       recordPosition(rowIndexPosition);
-      rowIndexValueCount.add(Long.valueOf(rows.size()));
+      rowIndexValueCount.add((long) rows.size());
       if (strideDictionaryCheck) {
         checkDictionaryEncoding();
       }
@@ -1403,7 +1472,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       long parent = super.estimateMemory();
       if (useDictionaryEncoding) {
         return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes();
@@ -1412,6 +1481,20 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
             directStreamOutput.getBufferSize();
       }
     }
+
+    @Override
+    public long getRawDataSize() {
+      // ORC strings are converted to java Strings. so use JavaDataModel to
+      // compute the overall size of strings
+      StringColumnStatistics scs = (StringColumnStatistics) fileStatistics;
+      long numVals = fileStatistics.getNumberOfValues();
+      if (numVals == 0) {
+        return 0;
+      } else {
+        int avgSize = (int) (scs.getSum() / numVals);
+        return numVals * JavaDataModel.get().lengthForStringOfLength(avgSize);
+      }
+    }
   }
 
   private static class StringTreeWriter extends StringBaseTreeWriter {
@@ -1423,8 +1506,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       BytesColumnVector vec = (BytesColumnVector) vector;
       if (vector.isRepeating) {
@@ -1498,8 +1581,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       BytesColumnVector vec = (BytesColumnVector) vector;
       if (vector.isRepeating) {
@@ -1591,8 +1674,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       BytesColumnVector vec = (BytesColumnVector) vector;
       if (vector.isRepeating) {
@@ -1653,7 +1736,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
   }
 
-  private static class BinaryTreeWriter extends TreeWriter {
+  private static class BinaryTreeWriter extends TreeWriterBase {
     private final PositionedOutputStream stream;
     private final IntegerWriter length;
     private boolean isDirectV2 = true;
@@ -1685,8 +1768,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       BytesColumnVector vec = (BytesColumnVector) vector;
       if (vector.isRepeating) {
@@ -1728,9 +1811,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       stream.flush();
       length.flush();
       if (rowIndexPosition != null) {
@@ -1746,16 +1830,23 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       return super.estimateMemory() + stream.getBufferSize() +
           length.estimateMemory();
     }
+
+    @Override
+    public long getRawDataSize() {
+      // get total length of binary blob
+      BinaryColumnStatistics bcs = (BinaryColumnStatistics) fileStatistics;
+      return bcs.getSum();
+    }
   }
 
   public static final int MILLIS_PER_SECOND = 1000;
   public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
 
-  private static class TimestampTreeWriter extends TreeWriter {
+  private static class TimestampTreeWriter extends TreeWriterBase {
     private final IntegerWriter seconds;
     private final IntegerWriter nanos;
     private final boolean isDirectV2;
@@ -1793,8 +1884,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       TimestampColumnVector vec = (TimestampColumnVector) vector;
       Timestamp val;
@@ -1839,9 +1930,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       seconds.flush();
       nanos.flush();
       if (rowIndexPosition != null) {
@@ -1873,13 +1965,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       return super.estimateMemory() + seconds.estimateMemory() +
           nanos.estimateMemory();
     }
+
+    @Override
+    public long getRawDataSize() {
+      return fileStatistics.getNumberOfValues() *
+          JavaDataModel.get().lengthOfTimestamp();
+    }
   }
 
-  private static class DateTreeWriter extends TreeWriter {
+  private static class DateTreeWriter extends TreeWriterBase {
     private final IntegerWriter writer;
     private final boolean isDirectV2;
 
@@ -1898,8 +1996,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       LongColumnVector vec = (LongColumnVector) vector;
       if (vector.isRepeating) {
@@ -1934,9 +2032,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       writer.flush();
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
@@ -1961,12 +2060,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       return super.estimateMemory() + writer.estimateMemory();
     }
+
+    @Override
+    public long getRawDataSize() {
+      return fileStatistics.getNumberOfValues() *
+          JavaDataModel.get().lengthOfDate();
+    }
   }
 
-  private static class DecimalTreeWriter extends TreeWriter {
+  private static class DecimalTreeWriter extends TreeWriterBase {
     private final PositionedOutputStream valueStream;
 
     // These scratch buffers allow us to serialize decimals much faster.
@@ -2004,8 +2109,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       DecimalColumnVector vec = (DecimalColumnVector) vector;
       if (vector.isRepeating) {
@@ -2045,9 +2150,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       valueStream.flush();
       scaleStream.flush();
       if (rowIndexPosition != null) {
@@ -2063,20 +2169,28 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
+    public long estimateMemory() {
       return super.estimateMemory() + valueStream.getBufferSize() +
           scaleStream.estimateMemory();
     }
+
+    @Override
+    public long getRawDataSize() {
+      return fileStatistics.getNumberOfValues() *
+          JavaDataModel.get().lengthOfDecimal();
+    }
   }
 
-  private static class StructTreeWriter extends TreeWriter {
+  private static class StructTreeWriter extends TreeWriterBase {
+    final TreeWriter[] childrenWriters;
+
     StructTreeWriter(int columnId,
                      TypeDescription schema,
                      StreamFactory writer,
                      boolean nullable) throws IOException {
       super(columnId, schema, writer, nullable);
       List<TypeDescription> children = schema.getChildren();
-      childrenWriters = new TreeWriter[children.size()];
+      childrenWriters = new TreeWriterBase[children.size()];
       for(int i=0; i < childrenWriters.length; ++i) {
         childrenWriters[i] = createTreeWriter(
           children.get(i), writer,
@@ -2088,8 +2202,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeRootBatch(VectorizedRowBatch batch, int offset,
-                        int length) throws IOException {
+    public void writeRootBatch(VectorizedRowBatch batch, int offset,
+                               int length) throws IOException {
       // update the statistics for the root column
       indexStatistics.increment(length);
       // I'm assuming that the root column isn't nullable so that I don't need
@@ -2108,8 +2222,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       StructColumnVector vec = (StructColumnVector) vector;
       if (vector.isRepeating) {
@@ -2142,21 +2256,65 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void createRowIndexEntry() throws IOException {
+      super.createRowIndexEntry();
+      for(TreeWriter child: childrenWriters) {
+        child.createRowIndexEntry();
+      }
+    }
+
+    @Override
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       for(TreeWriter child: childrenWriters) {
-        child.writeStripe(builder, requiredIndexEntries);
+        child.writeStripe(builder, stats, requiredIndexEntries);
       }
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
       }
     }
+
+    @Override
+    public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+      super.updateFileStatistics(stats);
+      for(TreeWriter child: childrenWriters) {
+        child.updateFileStatistics(stats);
+      }
+    }
+
+    @Override
+    public long estimateMemory() {
+      long result = 0;
+      for(TreeWriter writer: childrenWriters) {
+        result += writer.estimateMemory();
+      }
+      return super.estimateMemory() + result;
+    }
+
+    @Override
+    public long getRawDataSize() {
+      long result = 0;
+      for(TreeWriter writer: childrenWriters) {
+        result += writer.getRawDataSize();
+      }
+      return result;
+    }
+
+    @Override
+    public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+      super.writeFileStatistics(footer);
+      for(TreeWriter child: childrenWriters) {
+        child.writeFileStatistics(footer);
+      }
+    }
   }
 
-  private static class ListTreeWriter extends TreeWriter {
+  private static class ListTreeWriter extends TreeWriterBase {
     private final IntegerWriter lengths;
     private final boolean isDirectV2;
+    private final TreeWriter childWriter;
 
     ListTreeWriter(int columnId,
                    TypeDescription schema,
@@ -2164,8 +2322,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                    boolean nullable) throws IOException {
       super(columnId, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
-      childrenWriters = new TreeWriter[1];
-      childrenWriters[0] =
+      childWriter =
         createTreeWriter(schema.getChildren().get(0), writer, true);
       lengths = createIntegerWriter(writer.createStream(columnId,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
@@ -2186,8 +2343,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void createRowIndexEntry() throws IOException {
+      super.createRowIndexEntry();
+      childWriter.createRowIndexEntry();
+    }
+
+    @Override
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       ListColumnVector vec = (ListColumnVector) vector;
       if (vector.isRepeating) {
@@ -2196,7 +2359,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           int childLength = (int) vec.lengths[0];
           for(int i=0; i < length; ++i) {
             lengths.write(childLength);
-            childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
+            childWriter.writeBatch(vec.child, childOffset, childLength);
           }
           if (createBloomFilter) {
             if (bloomFilter != null) {
@@ -2218,7 +2381,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
               currentOffset = nextOffset;
               currentLength = nextLength;
             } else if (currentOffset + currentLength != nextOffset) {
-              childrenWriters[0].writeBatch(vec.child, currentOffset,
+              childWriter.writeBatch(vec.child, currentOffset,
                   currentLength);
               currentOffset = nextOffset;
               currentLength = nextLength;
@@ -2234,20 +2397,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           }
         }
         if (currentLength != 0) {
-          childrenWriters[0].writeBatch(vec.child, currentOffset,
+          childWriter.writeBatch(vec.child, currentOffset,
               currentLength);
         }
       }
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       lengths.flush();
-      for(TreeWriter child: childrenWriters) {
-        child.writeStripe(builder, requiredIndexEntries);
-      }
+      childWriter.writeStripe(builder, stats, requiredIndexEntries);
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
       }
@@ -2260,14 +2422,34 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
-      return super.estimateMemory() + lengths.estimateMemory();
+    public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+      super.updateFileStatistics(stats);
+      childWriter.updateFileStatistics(stats);
+    }
+
+    @Override
+    public long estimateMemory() {
+      return super.estimateMemory() + lengths.estimateMemory() +
+          childWriter.estimateMemory();
+    }
+
+    @Override
+    public long getRawDataSize() {
+      return childWriter.getRawDataSize();
+    }
+
+    @Override
+    public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+      super.writeFileStatistics(footer);
+      childWriter.writeFileStatistics(footer);
     }
   }
 
-  private static class MapTreeWriter extends TreeWriter {
+  private static class MapTreeWriter extends TreeWriterBase {
     private final IntegerWriter lengths;
     private final boolean isDirectV2;
+    private final TreeWriter keyWriter;
+    private final TreeWriter valueWriter;
 
     MapTreeWriter(int columnId,
                   TypeDescription schema,
@@ -2275,12 +2457,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                   boolean nullable) throws IOException {
       super(columnId, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
-      childrenWriters = new TreeWriter[2];
       List<TypeDescription> children = schema.getChildren();
-      childrenWriters[0] =
-        createTreeWriter(children.get(0), writer, true);
-      childrenWriters[1] =
-        createTreeWriter(children.get(1), writer, true);
+      keyWriter = createTreeWriter(children.get(0), writer, true);
+      valueWriter = createTreeWriter(children.get(1), writer, true);
       lengths = createIntegerWriter(writer.createStream(columnId,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       if (rowIndexPosition != null) {
@@ -2300,8 +2479,15 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void createRowIndexEntry() throws IOException {
+      super.createRowIndexEntry();
+      keyWriter.createRowIndexEntry();
+      valueWriter.createRowIndexEntry();
+    }
+
+    @Override
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       MapColumnVector vec = (MapColumnVector) vector;
       if (vector.isRepeating) {
@@ -2310,8 +2496,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           int childLength = (int) vec.lengths[0];
           for(int i=0; i < length; ++i) {
             lengths.write(childLength);
-            childrenWriters[0].writeBatch(vec.keys, childOffset, childLength);
-            childrenWriters[1].writeBatch(vec.values, childOffset, childLength);
+            keyWriter.writeBatch(vec.keys, childOffset, childLength);
+            valueWriter.writeBatch(vec.values, childOffset, childLength);
           }
           if (createBloomFilter) {
             if (bloomFilter != null) {
@@ -2333,9 +2519,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
               currentOffset = nextOffset;
               currentLength = nextLength;
             } else if (currentOffset + currentLength != nextOffset) {
-              childrenWriters[0].writeBatch(vec.keys, currentOffset,
+              keyWriter.writeBatch(vec.keys, currentOffset,
                   currentLength);
-              childrenWriters[1].writeBatch(vec.values, currentOffset,
+              valueWriter.writeBatch(vec.values, currentOffset,
                   currentLength);
               currentOffset = nextOffset;
               currentLength = nextLength;
@@ -2351,22 +2537,22 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           }
         }
         if (currentLength != 0) {
-          childrenWriters[0].writeBatch(vec.keys, currentOffset,
+          keyWriter.writeBatch(vec.keys, currentOffset,
               currentLength);
-          childrenWriters[1].writeBatch(vec.values, currentOffset,
+          valueWriter.writeBatch(vec.values, currentOffset,
               currentLength);
         }
       }
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       lengths.flush();
-      for(TreeWriter child: childrenWriters) {
-        child.writeStripe(builder, requiredIndexEntries);
-      }
+      keyWriter.writeStripe(builder, stats, requiredIndexEntries);
+      valueWriter.writeStripe(builder, stats, requiredIndexEntries);
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
       }
@@ -2379,13 +2565,34 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
-      return super.estimateMemory() + lengths.estimateMemory();
+    public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+      super.updateFileStatistics(stats);
+      keyWriter.updateFileStatistics(stats);
+      valueWriter.updateFileStatistics(stats);
+    }
+
+    @Override
+    public long estimateMemory() {
+      return super.estimateMemory() + lengths.estimateMemory() +
+          keyWriter.estimateMemory() + valueWriter.estimateMemory();
+    }
+
+    @Override
+    public long getRawDataSize() {
+      return keyWriter.getRawDataSize() + valueWriter.getRawDataSize();
+    }
+
+    @Override
+    public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+      super.writeFileStatistics(footer);
+      keyWriter.writeFileStatistics(footer);
+      valueWriter.writeFileStatistics(footer);
     }
   }
 
-  private static class UnionTreeWriter extends TreeWriter {
+  private static class UnionTreeWriter extends TreeWriterBase {
     private final RunLengthByteWriter tags;
+    private final TreeWriter[] childrenWriters;
 
     UnionTreeWriter(int columnId,
                   TypeDescription schema,
@@ -2393,7 +2600,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                   boolean nullable) throws IOException {
       super(columnId, schema, writer, nullable);
       List<TypeDescription> children = schema.getChildren();
-      childrenWriters = new TreeWriter[children.size()];
+      childrenWriters = new TreeWriterBase[children.size()];
       for(int i=0; i < childrenWriters.length; ++i) {
         childrenWriters[i] =
             createTreeWriter(children.get(i), writer, true);
@@ -2407,8 +2614,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeBatch(ColumnVector vector, int offset,
-                    int length) throws IOException {
+    public void writeBatch(ColumnVector vector, int offset,
+                           int length) throws IOException {
       super.writeBatch(vector, offset, length);
       UnionColumnVector vec = (UnionColumnVector) vector;
       if (vector.isRepeating) {
@@ -2468,12 +2675,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    void writeStripe(OrcProto.StripeFooter.Builder builder,
-                     int requiredIndexEntries) throws IOException {
-      super.writeStripe(builder, requiredIndexEntries);
+    public void createRowIndexEntry() throws IOException {
+      super.createRowIndexEntry();
+      for(TreeWriter child: childrenWriters) {
+        child.createRowIndexEntry();
+      }
+    }
+
+    @Override
+    public void writeStripe(OrcProto.StripeFooter.Builder builder,
+                            OrcProto.StripeStatistics.Builder stats,
+                            int requiredIndexEntries) throws IOException {
+      super.writeStripe(builder, stats, requiredIndexEntries);
       tags.flush();
       for(TreeWriter child: childrenWriters) {
-        child.writeStripe(builder, requiredIndexEntries);
+        child.writeStripe(builder, stats, requiredIndexEntries);
       }
       if (rowIndexPosition != null) {
         recordPosition(rowIndexPosition);
@@ -2487,8 +2703,37 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     @Override
-    long estimateMemory() {
-      return super.estimateMemory() + tags.estimateMemory();
+    public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+      super.updateFileStatistics(stats);
+      for(TreeWriter child: childrenWriters) {
+        child.updateFileStatistics(stats);
+      }
+    }
+
+    @Override
+    public long estimateMemory() {
+      long children = 0;
+      for(TreeWriter writer: childrenWriters) {
+        children += writer.estimateMemory();
+      }
+      return children + super.estimateMemory() + tags.estimateMemory();
+    }
+
+    @Override
+    public long getRawDataSize() {
+      long result = 0;
+      for(TreeWriter writer: childrenWriters) {
+        result += writer.getRawDataSize();
+      }
+      return result;
+    }
+
+    @Override
+    public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+      super.writeFileStatistics(footer);
+      for(TreeWriter child: childrenWriters) {
+        child.writeFileStatistics(footer);
+      }
     }
   }
 
@@ -2554,90 +2799,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
   private static void writeTypes(OrcProto.Footer.Builder builder,
                                  TypeDescription schema) {
-    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
-    List<TypeDescription> children = schema.getChildren();
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        type.setKind(OrcProto.Type.Kind.BOOLEAN);
-        break;
-      case BYTE:
-        type.setKind(OrcProto.Type.Kind.BYTE);
-        break;
-      case SHORT:
-        type.setKind(OrcProto.Type.Kind.SHORT);
-        break;
-      case INT:
-        type.setKind(OrcProto.Type.Kind.INT);
-        break;
-      case LONG:
-        type.setKind(OrcProto.Type.Kind.LONG);
-        break;
-      case FLOAT:
-        type.setKind(OrcProto.Type.Kind.FLOAT);
-        break;
-      case DOUBLE:
-        type.setKind(OrcProto.Type.Kind.DOUBLE);
-        break;
-      case STRING:
-        type.setKind(OrcProto.Type.Kind.STRING);
-        break;
-      case CHAR:
-        type.setKind(OrcProto.Type.Kind.CHAR);
-        type.setMaximumLength(schema.getMaxLength());
-        break;
-      case VARCHAR:
-        type.setKind(OrcProto.Type.Kind.VARCHAR);
-        type.setMaximumLength(schema.getMaxLength());
-        break;
-      case BINARY:
-        type.setKind(OrcProto.Type.Kind.BINARY);
-        break;
-      case TIMESTAMP:
-        type.setKind(OrcProto.Type.Kind.TIMESTAMP);
-        break;
-      case DATE:
-        type.setKind(OrcProto.Type.Kind.DATE);
-        break;
-      case DECIMAL:
-        type.setKind(OrcProto.Type.Kind.DECIMAL);
-        type.setPrecision(schema.getPrecision());
-        type.setScale(schema.getScale());
-        break;
-      case LIST:
-        type.setKind(OrcProto.Type.Kind.LIST);
-        type.addSubtypes(children.get(0).getId());
-        break;
-      case MAP:
-        type.setKind(OrcProto.Type.Kind.MAP);
-        for(TypeDescription t: children) {
-          type.addSubtypes(t.getId());
-        }
-        break;
-      case STRUCT:
-        type.setKind(OrcProto.Type.Kind.STRUCT);
-        for(TypeDescription t: children) {
-          type.addSubtypes(t.getId());
-        }
-        for(String field: schema.getFieldNames()) {
-          type.addFieldNames(field);
-        }
-        break;
-      case UNION:
-        type.setKind(OrcProto.Type.Kind.UNION);
-        for(TypeDescription t: children) {
-          type.addSubtypes(t.getId());
-        }
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown category: " +
-          schema.getCategory());
-    }
-    builder.addTypes(type);
-    if (children != null) {
-      for(TypeDescription child: children) {
-        writeTypes(builder, child);
-      }
-    }
+    builder.addAllTypes(OrcUtils.getOrcTypes(schema));
   }
 
   private void createRowIndexEntry() throws IOException {
@@ -2658,7 +2820,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
       OrcProto.StripeFooter.Builder builder =
           OrcProto.StripeFooter.newBuilder();
-      treeWriter.writeStripe(builder, requiredIndexEntries);
+      OrcProto.StripeStatistics.Builder stats =
+          OrcProto.StripeStatistics.newBuilder();
+      treeWriter.writeStripe(builder, stats, requiredIndexEntries);
+      fileMetadata.addStripeStats(stats.build());
       OrcProto.StripeInformation.Builder dirEntry =
           OrcProto.StripeInformation.newBuilder()
               .setNumberOfRows(rowsInStripe);
@@ -2670,58 +2835,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   private long computeRawDataSize() {
-    return getRawDataSize(treeWriter, schema);
-  }
-
-  private long getRawDataSize(TreeWriter child,
-                              TypeDescription schema) {
-    long total = 0;
-    long numVals = child.fileStatistics.getNumberOfValues();
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-      case BYTE:
-      case SHORT:
-      case INT:
-      case FLOAT:
-        return numVals * JavaDataModel.get().primitive1();
-      case LONG:
-      case DOUBLE:
-        return numVals * JavaDataModel.get().primitive2();
-      case STRING:
-      case VARCHAR:
-      case CHAR:
-        // ORC strings are converted to java Strings. so use JavaDataModel to
-        // compute the overall size of strings
-        StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics;
-        numVals = numVals == 0 ? 1 : numVals;
-        int avgStringLen = (int) (scs.getSum() / numVals);
-        return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen);
-      case DECIMAL:
-        return numVals * JavaDataModel.get().lengthOfDecimal();
-      case DATE:
-        return numVals * JavaDataModel.get().lengthOfDate();
-      case BINARY:
-        // get total length of binary blob
-        BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics;
-        return bcs.getSum();
-      case TIMESTAMP:
-        return numVals * JavaDataModel.get().lengthOfTimestamp();
-      case LIST:
-      case MAP:
-      case UNION:
-      case STRUCT: {
-        TreeWriter[] childWriters = child.getChildrenWriters();
-        List<TypeDescription> childTypes = schema.getChildren();
-        for (int i=0; i < childWriters.length; ++i) {
-          total += getRawDataSize(childWriters[i], childTypes.get(i));
-        }
-        break;
-      }
-      default:
-        LOG.debug("Unknown object inspector category.");
-        break;
-    }
-    return total;
+    return treeWriter.getRawDataSize();
   }
 
   private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
@@ -2738,18 +2852,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
   private void writeFileStatistics(OrcProto.Footer.Builder builder,
                                    TreeWriter writer) throws IOException {
-    builder.addStatistics(writer.fileStatistics.serialize());
-    for(TreeWriter child: writer.getChildrenWriters()) {
-      writeFileStatistics(builder, child);
-    }
+    writer.writeFileStatistics(builder);
   }
 
   private void writeMetadata() throws IOException {
-    OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder();
-    for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) {
-      builder.addStripeStats(ssb.build());
-    }
-    physicalWriter.writeFileMetadata(builder);
+    physicalWriter.writeFileMetadata(fileMetadata);
   }
 
   private long writePostScript() throws IOException {
@@ -2899,10 +3006,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         dirEntry);
 
     // since we have already written the stripe, just update stripe statistics
-    treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder());
-
-    // update file level statistics
-    updateFileStatistics(stripeStatistics);
+    treeWriter.updateFileStatistics(stripeStatistics);
+    fileMetadata.addStripeStats(stripeStatistics);
 
     stripes.add(dirEntry.build());
 
@@ -2911,28 +3016,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     rowsInStripe = 0;
   }
 
-  private void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics) {
-    List<OrcProto.ColumnStatistics> cs = stripeStatistics.getColStatsList();
-    List<TreeWriter> allWriters = getAllColumnTreeWriters(treeWriter);
-    for (int i = 0; i < allWriters.size(); i++) {
-      allWriters.get(i).fileStatistics.merge(ColumnStatisticsImpl.deserialize(cs.get(i)));
-    }
-  }
-
-  private List<TreeWriter> getAllColumnTreeWriters(TreeWriter rootTreeWriter) {
-    List<TreeWriter> result = new ArrayList<>();
-    getAllColumnTreeWritersImpl(rootTreeWriter, result);
-    return result;
-  }
-
-  private void getAllColumnTreeWritersImpl(TreeWriter tw,
-      List<TreeWriter> result) {
-    result.add(tw);
-    for (TreeWriter child : tw.childrenWriters) {
-      getAllColumnTreeWritersImpl(child, result);
-    }
-  }
-
   @Override
   public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) {
     if (userMetadata != null) {

http://git-wip-us.apache.org/repos/asf/orc/blob/cea20ac5/java/tools/src/test/org/apache/orc/tools/TestFileDump.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/org/apache/orc/tools/TestFileDump.java b/java/tools/src/test/org/apache/orc/tools/TestFileDump.java
index 1556ab4..ecdc5fc 100644
--- a/java/tools/src/test/org/apache/orc/tools/TestFileDump.java
+++ b/java/tools/src/test/org/apache/orc/tools/TestFileDump.java
@@ -308,7 +308,7 @@ public class TestFileDump {
     writer.addRowBatch(batch);
 
     writer.close();
-    assertEquals(1564, 0, writer.getRawDataSize());
+    assertEquals(1564, writer.getRawDataSize());
     assertEquals(2, writer.getNumberOfRows());
     PrintStream origOut = System.out;
     ByteArrayOutputStream myOut = new ByteArrayOutputStream();
@@ -322,7 +322,7 @@ public class TestFileDump {
     Assert.assertEquals("{\"b\":true,\"bt\":10,\"s\":100,\"i\":1000,\"l\":10000,\"f\":4,\"d\":20,\"de\":\"4.2222\",\"t\":\"2014-11-25 18:09:24.0\",\"dt\":\"2014-11-25\",\"str\":\"string\",\"c\":\"hello\",\"vc\":\"hello\",\"m\":[{\"_key\":\"k1\",\"_value\":\"v1\"}],\"a\":[100,200],\"st\":{\"i\":10,\"s\":\"foo\"}}", lines[0]);
     Assert.assertEquals("{\"b\":false,\"bt\":20,\"s\":200,\"i\":2000,\"l\":20000,\"f\":8,\"d\":40,\"de\":\"2.2222\",\"t\":\"2014-11-25 18:02:44.0\",\"dt\":\"2014-09-28\",\"str\":\"abcd\",\"c\":\"world\",\"vc\":\"world\",\"m\":[{\"_key\":\"k3\",\"_value\":\"v3\"}],\"a\":[200,300],\"st\":{\"i\":20,\"s\":\"bar\"}}", lines[1]);
   }
-  
+
   // Test that if the fraction of rows that have distinct strings is greater than the configured
   // threshold dictionary encoding is turned off.  If dictionary encoding is turned off the length
   // of the dictionary stream for the column will be 0 in the ORC file dump.