You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/02 04:37:47 UTC

[12/22] hive git commit: HIVE-4243. Fix column names in ORC metadata.

HIVE-4243. Fix column names in ORC metadata.


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7b1ed3d3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7b1ed3d3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7b1ed3d3

Branch: refs/heads/llap
Commit: 7b1ed3d3037860e2b7fc24b760a993f5e928b816
Parents: 99fa337
Author: Owen O'Malley <om...@apache.org>
Authored: Fri Sep 4 16:11:13 2015 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Thu Oct 1 13:07:03 2015 +0200

----------------------------------------------------------------------
 .../hive/ql/io/orc/ColumnStatisticsImpl.java    |  55 +-
 .../apache/hadoop/hive/ql/io/orc/OrcFile.java   |  33 +-
 .../hadoop/hive/ql/io/orc/OrcOutputFormat.java  | 145 ++++-
 .../apache/hadoop/hive/ql/io/orc/OrcUtils.java  | 177 +-----
 .../hadoop/hive/ql/io/orc/ReaderImpl.java       |   2 +-
 .../hadoop/hive/ql/io/orc/TypeDescription.java  | 466 ++++++++++++++++
 .../apache/hadoop/hive/ql/io/orc/Writer.java    |   9 +
 .../hadoop/hive/ql/io/orc/WriterImpl.java       | 550 +++++++++----------
 .../hadoop/hive/ql/io/orc/orc_proto.proto       |   1 +
 .../hive/ql/io/orc/TestColumnStatistics.java    |  43 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  15 +-
 .../hadoop/hive/ql/io/orc/TestOrcFile.java      |  41 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |   2 +-
 .../hadoop/hive/ql/io/orc/TestOrcWideTable.java | 224 +-------
 .../hive/ql/io/orc/TestTypeDescription.java     |  67 +++
 .../resources/orc-file-dump-bloomfilter.out     |   2 +-
 .../resources/orc-file-dump-bloomfilter2.out    |   2 +-
 .../orc-file-dump-dictionary-threshold.out      |   2 +-
 ql/src/test/resources/orc-file-dump.json        |   2 +-
 ql/src/test/resources/orc-file-dump.out         |   2 +-
 ql/src/test/resources/orc-file-has-null.out     |   2 +-
 .../clientpositive/annotate_stats_part.q.out    |   6 +-
 .../clientpositive/annotate_stats_table.q.out   |   4 +-
 .../dynpart_sort_opt_vectorization.q.out        |  16 +-
 .../dynpart_sort_optimization2.q.out            |   8 +-
 .../extrapolate_part_stats_full.q.out           |  24 +-
 .../extrapolate_part_stats_partial.q.out        |  76 +--
 .../extrapolate_part_stats_partial_ndv.q.out    |  38 +-
 .../results/clientpositive/orc_analyze.q.out    |  46 +-
 .../results/clientpositive/orc_file_dump.q.out  |  18 +-
 .../clientpositive/orc_int_type_promotion.q.out |   6 +-
 .../clientpositive/spark/vectorized_ptf.q.out   | 108 ++--
 .../tez/dynpart_sort_opt_vectorization.q.out    |  16 +-
 .../tez/dynpart_sort_optimization2.q.out        |   8 +-
 .../clientpositive/tez/orc_analyze.q.out        |  46 +-
 .../clientpositive/tez/union_fast_stats.q.out   |  16 +-
 .../clientpositive/tez/vector_outer_join1.q.out |  48 +-
 .../clientpositive/tez/vector_outer_join4.q.out |  48 +-
 .../clientpositive/tez/vectorized_ptf.q.out     | 108 ++--
 .../clientpositive/union_fast_stats.q.out       |  16 +-
 .../results/clientpositive/vectorized_ptf.q.out | 104 ++--
 41 files changed, 1468 insertions(+), 1134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
index 15a3e2c..f39d3e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
@@ -22,8 +22,6 @@ import java.sql.Timestamp;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 
@@ -964,35 +962,30 @@ class ColumnStatisticsImpl implements ColumnStatistics {
     return builder;
   }
 
-  static ColumnStatisticsImpl create(ObjectInspector inspector) {
-    switch (inspector.getCategory()) {
-      case PRIMITIVE:
-        switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
-          case BOOLEAN:
-            return new BooleanStatisticsImpl();
-          case BYTE:
-          case SHORT:
-          case INT:
-          case LONG:
-            return new IntegerStatisticsImpl();
-          case FLOAT:
-          case DOUBLE:
-            return new DoubleStatisticsImpl();
-          case STRING:
-          case CHAR:
-          case VARCHAR:
-            return new StringStatisticsImpl();
-          case DECIMAL:
-            return new DecimalStatisticsImpl();
-          case DATE:
-            return new DateStatisticsImpl();
-          case TIMESTAMP:
-            return new TimestampStatisticsImpl();
-          case BINARY:
-            return new BinaryStatisticsImpl();
-          default:
-            return new ColumnStatisticsImpl();
-        }
+  static ColumnStatisticsImpl create(TypeDescription schema) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return new BooleanStatisticsImpl();
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        return new IntegerStatisticsImpl();
+      case FLOAT:
+      case DOUBLE:
+        return new DoubleStatisticsImpl();
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return new StringStatisticsImpl();
+      case DECIMAL:
+        return new DecimalStatisticsImpl();
+      case DATE:
+        return new DateStatisticsImpl();
+      case TIMESTAMP:
+        return new TimestampStatisticsImpl();
+      case BINARY:
+        return new BinaryStatisticsImpl();
       default:
         return new ColumnStatisticsImpl();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index a60ebb4..23dec4a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 /**
  * Contains factory methods to read or write ORC files.
@@ -102,7 +103,9 @@ public final class OrcFile {
    */
   public enum WriterVersion {
     ORIGINAL(0),
-      HIVE_8732(1); // corrupted stripe/file maximum column statistics
+      HIVE_8732(1), // corrupted stripe/file maximum column statistics
+      HIVE_4243(2), // use real column names from Hive tables
+      FUTURE(Integer.MAX_VALUE); // a version from a future writer
 
     private final int id;
 
@@ -205,7 +208,9 @@ public final class OrcFile {
   public static class WriterOptions {
     private final Configuration configuration;
     private FileSystem fileSystemValue = null;
-    private ObjectInspector inspectorValue = null;
+    private boolean explicitSchema = false;
+    private TypeDescription schema = null;
+    private ObjectInspector inspector = null;
     private long stripeSizeValue;
     private long blockSizeValue;
     private int rowIndexStrideValue;
@@ -355,11 +360,26 @@ public final class OrcFile {
     }
 
     /**
-     * A required option that sets the object inspector for the rows. Used
-     * to determine the schema for the file.
+     * A required option that sets the object inspector for the rows. If
+     * setSchema is not called, it also defines the schema.
      */
     public WriterOptions inspector(ObjectInspector value) {
-      inspectorValue = value;
+      this.inspector = value;
+      if (!explicitSchema) {
+        schema = OrcOutputFormat.convertTypeInfo(
+            TypeInfoUtils.getTypeInfoFromObjectInspector(value));
+      }
+      return this;
+    }
+
+    /**
+     * Set the schema for the file. This is a required parameter.
+     * @param schema the schema for the file.
+     * @return this
+     */
+    public WriterOptions setSchema(TypeDescription schema) {
+      this.explicitSchema = true;
+      this.schema = schema;
       return this;
     }
 
@@ -426,7 +446,8 @@ public final class OrcFile {
     FileSystem fs = opts.fileSystemValue == null ?
       path.getFileSystem(opts.configuration) : opts.fileSystemValue;
 
-    return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue,
+    return new WriterImpl(fs, path, opts.configuration, opts.inspector,
+                          opts.schema,
                           opts.stripeSizeValue, opts.compressValue,
                           opts.bufferSizeValue, opts.rowIndexStrideValue,
                           opts.memoryManagerValue, opts.blockPaddingValue,

http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index ea4ebb4..ad24c58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -20,12 +20,17 @@ package org.apache.hadoop.hive.ql.io.orc;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
@@ -36,6 +41,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -52,6 +66,90 @@ import org.apache.hadoop.util.Progressable;
 public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
                         implements AcidOutputFormat<NullWritable, OrcSerdeRow> {
 
+  private static final Log LOG = LogFactory.getLog(OrcOutputFormat.class);
+
+  static TypeDescription convertTypeInfo(TypeInfo info) {
+    switch (info.getCategory()) {
+      case PRIMITIVE: {
+        PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
+        switch (pinfo.getPrimitiveCategory()) {
+          case BOOLEAN:
+            return TypeDescription.createBoolean();
+          case BYTE:
+            return TypeDescription.createByte();
+          case SHORT:
+            return TypeDescription.createShort();
+          case INT:
+            return TypeDescription.createInt();
+          case LONG:
+            return TypeDescription.createLong();
+          case FLOAT:
+            return TypeDescription.createFloat();
+          case DOUBLE:
+            return TypeDescription.createDouble();
+          case STRING:
+            return TypeDescription.createString();
+          case DATE:
+            return TypeDescription.createDate();
+          case TIMESTAMP:
+            return TypeDescription.createTimestamp();
+          case BINARY:
+            return TypeDescription.createBinary();
+          case DECIMAL: {
+            DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
+            return TypeDescription.createDecimal()
+                .withScale(dinfo.getScale())
+                .withPrecision(dinfo.getPrecision());
+          }
+          case VARCHAR: {
+            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+            return TypeDescription.createVarchar()
+                .withMaxLength(cinfo.getLength());
+          }
+          case CHAR: {
+            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+            return TypeDescription.createChar()
+                .withMaxLength(cinfo.getLength());
+          }
+          default:
+            throw new IllegalArgumentException("ORC doesn't handle primitive" +
+                " category " + pinfo.getPrimitiveCategory());
+        }
+      }
+      case LIST: {
+        ListTypeInfo linfo = (ListTypeInfo) info;
+        return TypeDescription.createList
+            (convertTypeInfo(linfo.getListElementTypeInfo()));
+      }
+      case MAP: {
+        MapTypeInfo minfo = (MapTypeInfo) info;
+        return TypeDescription.createMap
+            (convertTypeInfo(minfo.getMapKeyTypeInfo()),
+                convertTypeInfo(minfo.getMapValueTypeInfo()));
+      }
+      case UNION: {
+        UnionTypeInfo minfo = (UnionTypeInfo) info;
+        TypeDescription result = TypeDescription.createUnion();
+        for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
+          result.addUnionChild(convertTypeInfo(child));
+        }
+        return result;
+      }
+      case STRUCT: {
+        StructTypeInfo sinfo = (StructTypeInfo) info;
+        TypeDescription result = TypeDescription.createStruct();
+        for(String fieldName: sinfo.getAllStructFieldNames()) {
+          result.addField(fieldName,
+              convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
+        }
+        return result;
+      }
+      default:
+        throw new IllegalArgumentException("ORC doesn't handle " +
+            info.getCategory());
+    }
+  }
+
   private static class OrcRecordWriter
       implements RecordWriter<NullWritable, OrcSerdeRow>,
                  StatsProvidingRecordWriter {
@@ -115,7 +213,44 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
   }
 
   private OrcFile.WriterOptions getOptions(JobConf conf, Properties props) {
-    return OrcFile.writerOptions(props, conf);
+    OrcFile.WriterOptions result = OrcFile.writerOptions(props, conf);
+    if (props != null) {
+      final String columnNameProperty =
+          props.getProperty(IOConstants.COLUMNS);
+      final String columnTypeProperty =
+          props.getProperty(IOConstants.COLUMNS_TYPES);
+      if (columnNameProperty != null &&
+          !columnNameProperty.isEmpty() &&
+          columnTypeProperty != null &&
+          !columnTypeProperty.isEmpty()) {
+        List<String> columnNames;
+        List<TypeInfo> columnTypes;
+
+        if (columnNameProperty.length() == 0) {
+          columnNames = new ArrayList<String>();
+        } else {
+          columnNames = Arrays.asList(columnNameProperty.split(","));
+        }
+
+        if (columnTypeProperty.length() == 0) {
+          columnTypes = new ArrayList<TypeInfo>();
+        } else {
+          columnTypes =
+              TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+        }
+
+        TypeDescription schema = TypeDescription.createStruct();
+        for (int i = 0; i < columnNames.size(); ++i) {
+          schema.addField(columnNames.get(i),
+              convertTypeInfo(columnTypes.get(i)));
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("ORC schema = " + schema);
+        }
+        result.setSchema(schema);
+      }
+    }
+    return result;
   }
 
   @Override
@@ -123,7 +258,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
   getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
                   Progressable reporter) throws IOException {
     return new
-        OrcRecordWriter(new Path(name), getOptions(conf,null));
+        OrcRecordWriter(new Path(name), getOptions(conf, null));
   }
 
 
@@ -135,7 +270,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
                          boolean isCompressed,
                          Properties tableProperties,
                          Progressable reporter) throws IOException {
-    return new OrcRecordWriter(path, getOptions(conf,tableProperties));
+    return new OrcRecordWriter(path, getOptions(conf, tableProperties));
   }
 
   private class DummyOrcRecordUpdater implements RecordUpdater {
@@ -229,8 +364,8 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
   }
 
   @Override
-  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecordWriter(Path path,
-                                           Options options) throws IOException {
+  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter
+        getRawRecordWriter(Path path, Options options) throws IOException {
     final Path filename = AcidUtils.createFilename(path, options);
     final OrcFile.WriterOptions opts =
         OrcFile.writerOptions(options.getConfiguration());

http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
index db2ca15..3e2af23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
@@ -18,20 +18,10 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-
-import com.google.common.collect.Lists;
 
 public class OrcUtils {
   private static final Log LOG = LogFactory.getLog(OrcUtils.class);
@@ -49,159 +39,44 @@ public class OrcUtils {
    * index 5 correspond to column d. After flattening list<string> gets 2 columns.
    *
    * @param selectedColumns - comma separated list of selected column names
-   * @param allColumns      - comma separated list of all column names
-   * @param inspector       - object inspector
+   * @param schema       - object schema
    * @return - boolean array with true value set for the specified column names
    */
-  public static boolean[] includeColumns(String selectedColumns, String allColumns,
-      ObjectInspector inspector) {
-    int numFlattenedCols = getFlattenedColumnsCount(inspector);
-    boolean[] results = new boolean[numFlattenedCols];
+  public static boolean[] includeColumns(String selectedColumns,
+                                         TypeDescription schema) {
+    int numFlattenedCols = schema.getMaximumId();
+    boolean[] results = new boolean[numFlattenedCols + 1];
     if ("*".equals(selectedColumns)) {
       Arrays.fill(results, true);
       return results;
     }
-    if (selectedColumns != null && !selectedColumns.isEmpty()) {
-      includeColumnsImpl(results, selectedColumns.toLowerCase(), allColumns, inspector);
-    }
-    return results;
-  }
-
-  private static void includeColumnsImpl(boolean[] includeColumns, String selectedColumns,
-      String allColumns,
-      ObjectInspector inspector) {
-      Map<String, List<Integer>> columnSpanMap = getColumnSpan(allColumns, inspector);
-      LOG.info("columnSpanMap: " + columnSpanMap);
-
-      String[] selCols = selectedColumns.split(",");
-      for (String sc : selCols) {
-        if (columnSpanMap.containsKey(sc)) {
-          List<Integer> colSpan = columnSpanMap.get(sc);
-          int start = colSpan.get(0);
-          int end = colSpan.get(1);
-          for (int i = start; i <= end; i++) {
-            includeColumns[i] = true;
+    if (selectedColumns != null &&
+        schema.getCategory() == TypeDescription.Category.STRUCT) {
+      List<String> fieldNames = schema.getFieldNames();
+      List<TypeDescription> fields = schema.getChildren();
+      for (String column: selectedColumns.split((","))) {
+        TypeDescription col = findColumn(column, fieldNames, fields);
+        if (col != null) {
+          for(int i=col.getId(); i <= col.getMaximumId(); ++i) {
+            results[i] = true;
           }
         }
       }
-
-      LOG.info("includeColumns: " + Arrays.toString(includeColumns));
     }
-
-  private static Map<String, List<Integer>> getColumnSpan(String allColumns,
-      ObjectInspector inspector) {
-    // map that contains the column span for each column. Column span is the number of columns
-    // required after flattening. For a given object inspector this map contains the start column
-    // id and end column id (both inclusive) after flattening.
-    // EXAMPLE:
-    // schema: struct<a:int, b:float, c:map<string,int>>
-    // column span map for the above struct will be
-    // a => [1,1], b => [2,2], c => [3,5]
-    Map<String, List<Integer>> columnSpanMap = new HashMap<String, List<Integer>>();
-    if (allColumns != null) {
-      String[] columns = allColumns.split(",");
-      int startIdx = 0;
-      int endIdx = 0;
-      if (inspector instanceof StructObjectInspector) {
-        StructObjectInspector soi = (StructObjectInspector) inspector;
-        List<? extends StructField> fields = soi.getAllStructFieldRefs();
-        for (int i = 0; i < fields.size(); i++) {
-          StructField sf = fields.get(i);
-
-          // we get the type (category) from object inspector but column name from the argument.
-          // The reason for this is hive (FileSinkOperator) does not pass the actual column names,
-          // instead it passes the internal column names (_col1,_col2).
-          ObjectInspector sfOI = sf.getFieldObjectInspector();
-          String colName = columns[i];
-
-          startIdx = endIdx + 1;
-          switch (sfOI.getCategory()) {
-            case PRIMITIVE:
-              endIdx += 1;
-              break;
-            case STRUCT:
-              endIdx += 1;
-              StructObjectInspector structInsp = (StructObjectInspector) sfOI;
-              List<? extends StructField> structFields = structInsp.getAllStructFieldRefs();
-              for (int j = 0; j < structFields.size(); ++j) {
-                endIdx += getFlattenedColumnsCount(structFields.get(j).getFieldObjectInspector());
-              }
-              break;
-            case MAP:
-              endIdx += 1;
-              MapObjectInspector mapInsp = (MapObjectInspector) sfOI;
-              endIdx += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
-              endIdx += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
-              break;
-            case LIST:
-              endIdx += 1;
-              ListObjectInspector listInsp = (ListObjectInspector) sfOI;
-              endIdx += getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
-              break;
-            case UNION:
-              endIdx += 1;
-              UnionObjectInspector unionInsp = (UnionObjectInspector) sfOI;
-              List<ObjectInspector> choices = unionInsp.getObjectInspectors();
-              for (int j = 0; j < choices.size(); ++j) {
-                endIdx += getFlattenedColumnsCount(choices.get(j));
-              }
-              break;
-            default:
-              throw new IllegalArgumentException("Bad category: " +
-                  inspector.getCategory());
-          }
-
-          columnSpanMap.put(colName, Lists.newArrayList(startIdx, endIdx));
-        }
-      }
-    }
-    return columnSpanMap;
+    return results;
   }
 
-  /**
-   * Returns the number of columns after flatting complex types.
-   *
-   * @param inspector - object inspector
-   * @return
-   */
-  public static int getFlattenedColumnsCount(ObjectInspector inspector) {
-    int numWriters = 0;
-    switch (inspector.getCategory()) {
-      case PRIMITIVE:
-        numWriters += 1;
-        break;
-      case STRUCT:
-        numWriters += 1;
-        StructObjectInspector structInsp = (StructObjectInspector) inspector;
-        List<? extends StructField> fields = structInsp.getAllStructFieldRefs();
-        for (int i = 0; i < fields.size(); ++i) {
-          numWriters += getFlattenedColumnsCount(fields.get(i).getFieldObjectInspector());
-        }
-        break;
-      case MAP:
-        numWriters += 1;
-        MapObjectInspector mapInsp = (MapObjectInspector) inspector;
-        numWriters += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector());
-        numWriters += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector());
-        break;
-      case LIST:
-        numWriters += 1;
-        ListObjectInspector listInsp = (ListObjectInspector) inspector;
-        numWriters += getFlattenedColumnsCount(listInsp.getListElementObjectInspector());
-        break;
-      case UNION:
-        numWriters += 1;
-        UnionObjectInspector unionInsp = (UnionObjectInspector) inspector;
-        List<ObjectInspector> choices = unionInsp.getObjectInspectors();
-        for (int i = 0; i < choices.size(); ++i) {
-          numWriters += getFlattenedColumnsCount(choices.get(i));
-        }
-        break;
-      default:
-        throw new IllegalArgumentException("Bad category: " +
-            inspector.getCategory());
+  private static TypeDescription findColumn(String columnName,
+                                            List<String> fieldNames,
+                                            List<TypeDescription> fields) {
+    int i = 0;
+    for(String fieldName: fieldNames) {
+      if (fieldName.equalsIgnoreCase(columnName)) {
+        return fields.get(i);
+      } else {
+        i += 1;
+      }
     }
-    return numWriters;
+    return null;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 23b3b55..36fb858 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -347,7 +347,7 @@ public class ReaderImpl implements Reader {
         return version;
       }
     }
-    return OrcFile.WriterVersion.ORIGINAL;
+    return OrcFile.WriterVersion.FUTURE;
   }
 
   /** Extracts the necessary metadata from an externally store buffer (fullFooterBuffer). */

http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
new file mode 100644
index 0000000..3481bb3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This is the description of the types in an ORC file.
+ */
+public class TypeDescription {
+  private static final int MAX_PRECISION = 38;
+  private static final int MAX_SCALE = 38;
+  private static final int DEFAULT_PRECISION = 38;
+  private static final int DEFAULT_SCALE = 10;
+  private static final int DEFAULT_LENGTH = 256;
+  public enum Category {
+    BOOLEAN("boolean", true),
+    BYTE("tinyint", true),
+    SHORT("smallint", true),
+    INT("int", true),
+    LONG("bigint", true),
+    FLOAT("float", true),
+    DOUBLE("double", true),
+    STRING("string", true),
+    DATE("date", true),
+    TIMESTAMP("timestamp", true),
+    BINARY("binary", true),
+    DECIMAL("decimal", true),
+    VARCHAR("varchar", true),
+    CHAR("char", true),
+    LIST("array", false),
+    MAP("map", false),
+    STRUCT("struct", false),
+    UNION("union", false);
+
+    Category(String name, boolean isPrimitive) {
+      this.name = name;
+      this.isPrimitive = isPrimitive;
+    }
+
+    final boolean isPrimitive;
+    final String name;
+
+    public boolean isPrimitive() {
+      return isPrimitive;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  public static TypeDescription createBoolean() {
+    return new TypeDescription(Category.BOOLEAN);
+  }
+
+  public static TypeDescription createByte() {
+    return new TypeDescription(Category.BYTE);
+  }
+
+  public static TypeDescription createShort() {
+    return new TypeDescription(Category.SHORT);
+  }
+
+  public static TypeDescription createInt() {
+    return new TypeDescription(Category.INT);
+  }
+
+  public static TypeDescription createLong() {
+    return new TypeDescription(Category.LONG);
+  }
+
+  public static TypeDescription createFloat() {
+    return new TypeDescription(Category.FLOAT);
+  }
+
+  public static TypeDescription createDouble() {
+    return new TypeDescription(Category.DOUBLE);
+  }
+
+  public static TypeDescription createString() {
+    return new TypeDescription(Category.STRING);
+  }
+
+  public static TypeDescription createDate() {
+    return new TypeDescription(Category.DATE);
+  }
+
+  public static TypeDescription createTimestamp() {
+    return new TypeDescription(Category.TIMESTAMP);
+  }
+
+  public static TypeDescription createBinary() {
+    return new TypeDescription(Category.BINARY);
+  }
+
+  public static TypeDescription createDecimal() {
+    return new TypeDescription(Category.DECIMAL);
+  }
+
+  /**
+   * For decimal types, set the precision.
+   * @param precision the new precision
+   * @return this
+   */
+  public TypeDescription withPrecision(int precision) {
+    if (category != Category.DECIMAL) {
+      throw new IllegalArgumentException("precision is only allowed on decimal"+
+         " and not " + category.name);
+    } else if (precision < 1 || precision > MAX_PRECISION || scale > precision){
+      throw new IllegalArgumentException("precision " + precision +
+          " is out of range 1 .. " + scale);
+    }
+    this.precision = precision;
+    return this;
+  }
+
+  /**
+   * For decimal types, set the scale.
+   * @param scale the new scale
+   * @return this
+   */
+  public TypeDescription withScale(int scale) {
+    if (category != Category.DECIMAL) {
+      throw new IllegalArgumentException("scale is only allowed on decimal"+
+          " and not " + category.name);
+    } else if (scale < 0 || scale > MAX_SCALE || scale > precision) {
+      throw new IllegalArgumentException("scale is out of range at " + scale);
+    }
+    this.scale = scale;
+    return this;
+  }
+
+  public static TypeDescription createVarchar() {
+    return new TypeDescription(Category.VARCHAR);
+  }
+
+  public static TypeDescription createChar() {
+    return new TypeDescription(Category.CHAR);
+  }
+
+  /**
+   * Set the maximum length for char and varchar types.
+   * @param maxLength the maximum value
+   * @return this
+   */
+  public TypeDescription withMaxLength(int maxLength) {
+    if (category != Category.VARCHAR && category != Category.CHAR) {
+      throw new IllegalArgumentException("maxLength is only allowed on char" +
+                   " and varchar and not " + category.name);
+    }
+    this.maxLength = maxLength;
+    return this;
+  }
+
+  public static TypeDescription createList(TypeDescription childType) {
+    TypeDescription result = new TypeDescription(Category.LIST);
+    result.children.add(childType);
+    childType.parent = result;
+    return result;
+  }
+
+  public static TypeDescription createMap(TypeDescription keyType,
+                                          TypeDescription valueType) {
+    TypeDescription result = new TypeDescription(Category.MAP);
+    result.children.add(keyType);
+    result.children.add(valueType);
+    keyType.parent = result;
+    valueType.parent = result;
+    return result;
+  }
+
+  public static TypeDescription createUnion() {
+    return new TypeDescription(Category.UNION);
+  }
+
+  public static TypeDescription createStruct() {
+    return new TypeDescription(Category.STRUCT);
+  }
+
+  /**
+   * Add a child to a union type.
+   * @param child a new child type to add
+   * @return the union type.
+   */
+  public TypeDescription addUnionChild(TypeDescription child) {
+    if (category != Category.UNION) {
+      throw new IllegalArgumentException("Can only add types to union type" +
+          " and not " + category);
+    }
+    children.add(child);
+    child.parent = this;
+    return this;
+  }
+
+  /**
+   * Add a field to a struct type as it is built.
+   * @param field the field name
+   * @param fieldType the type of the field
+   * @return the struct type
+   */
+  public TypeDescription addField(String field, TypeDescription fieldType) {
+    if (category != Category.STRUCT) {
+      throw new IllegalArgumentException("Can only add fields to struct type" +
+          " and not " + category);
+    }
+    fieldNames.add(field);
+    children.add(fieldType);
+    fieldType.parent = this;
+    return this;
+  }
+
+  /**
+   * Get the id for this type.
+   * The first call will cause all of the the ids in tree to be assigned, so
+   * it should not be called before the type is completely built.
+   * @return the sequential id
+   */
+  public int getId() {
+    // if the id hasn't been assigned, assign all of the ids from the root
+    if (id == -1) {
+      TypeDescription root = this;
+      while (root.parent != null) {
+        root = root.parent;
+      }
+      root.assignIds(0);
+    }
+    return id;
+  }
+
+  /**
+   * Get the maximum id assigned to this type or its children.
+   * The first call will cause all of the the ids in tree to be assigned, so
+   * it should not be called before the type is completely built.
+   * @return the maximum id assigned under this type
+   */
+  public int getMaximumId() {
+    // if the id hasn't been assigned, assign all of the ids from the root
+    if (maxId == -1) {
+      TypeDescription root = this;
+      while (root.parent != null) {
+        root = root.parent;
+      }
+      root.assignIds(0);
+    }
+    return maxId;
+  }
+
+  /**
+   * Get the kind of this type.
+   * @return get the category for this type.
+   */
+  public Category getCategory() {
+    return category;
+  }
+
+  /**
+   * Get the maximum length of the type. Only used for char and varchar types.
+   * @return the maximum length of the string type
+   */
+  public int getMaxLength() {
+    return maxLength;
+  }
+
+  /**
+   * Get the precision of the decimal type.
+   * @return the number of digits for the precision.
+   */
+  public int getPrecision() {
+    return precision;
+  }
+
+  /**
+   * Get the scale of the decimal type.
+   * @return the number of digits for the scale.
+   */
+  public int getScale() {
+    return scale;
+  }
+
+  /**
+   * For struct types, get the list of field names.
+   * @return the list of field names.
+   */
+  public List<String> getFieldNames() {
+    return Collections.unmodifiableList(fieldNames);
+  }
+
+  /**
+   * Get the subtypes of this type.
+   * @return the list of children types
+   */
+  public List<TypeDescription> getChildren() {
+    return children == null ? null : Collections.unmodifiableList(children);
+  }
+
+  /**
+   * Assign ids to all of the nodes under this one.
+   * @param startId the lowest id to assign
+   * @return the next available id
+   */
+  private int assignIds(int startId) {
+    id = startId++;
+    if (children != null) {
+      for (TypeDescription child : children) {
+        startId = child.assignIds(startId);
+      }
+    }
+    maxId = startId - 1;
+    return startId;
+  }
+
+  private TypeDescription(Category category) {
+    this.category = category;
+    if (category.isPrimitive) {
+      children = null;
+    } else {
+      children = new ArrayList<>();
+    }
+    if (category == Category.STRUCT) {
+      fieldNames = new ArrayList<>();
+    } else {
+      fieldNames = null;
+    }
+  }
+
+  private int id = -1;
+  private int maxId = -1;
+  private TypeDescription parent;
+  private final Category category;
+  private final List<TypeDescription> children;
+  private final List<String> fieldNames;
+  private int maxLength = DEFAULT_LENGTH;
+  private int precision = DEFAULT_PRECISION;
+  private int scale = DEFAULT_SCALE;
+
+  public void printToBuffer(StringBuilder buffer) {
+    buffer.append(category.name);
+    switch (category) {
+      case DECIMAL:
+        buffer.append('(');
+        buffer.append(precision);
+        buffer.append(',');
+        buffer.append(scale);
+        buffer.append(')');
+        break;
+      case CHAR:
+      case VARCHAR:
+        buffer.append('(');
+        buffer.append(maxLength);
+        buffer.append(')');
+        break;
+      case LIST:
+      case MAP:
+      case UNION:
+        buffer.append('<');
+        for(int i=0; i < children.size(); ++i) {
+          if (i != 0) {
+            buffer.append(',');
+          }
+          children.get(i).printToBuffer(buffer);
+        }
+        buffer.append('>');
+        break;
+      case STRUCT:
+        buffer.append('<');
+        for(int i=0; i < children.size(); ++i) {
+          if (i != 0) {
+            buffer.append(',');
+          }
+          buffer.append(fieldNames.get(i));
+          buffer.append(':');
+          children.get(i).printToBuffer(buffer);
+        }
+        buffer.append('>');
+        break;
+      default:
+        break;
+    }
+  }
+
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    printToBuffer(buffer);
+    return buffer.toString();
+  }
+
+  private void printJsonToBuffer(String prefix, StringBuilder buffer,
+                                 int indent) {
+    for(int i=0; i < indent; ++i) {
+      buffer.append(' ');
+    }
+    buffer.append(prefix);
+    buffer.append("{\"category\": \"");
+    buffer.append(category.name);
+    buffer.append("\", \"id\": ");
+    buffer.append(getId());
+    buffer.append(", \"max\": ");
+    buffer.append(maxId);
+    switch (category) {
+      case DECIMAL:
+        buffer.append(", \"precision\": ");
+        buffer.append(precision);
+        buffer.append(", \"scale\": ");
+        buffer.append(scale);
+        break;
+      case CHAR:
+      case VARCHAR:
+        buffer.append(", \"length\": ");
+        buffer.append(maxLength);
+        break;
+      case LIST:
+      case MAP:
+      case UNION:
+        buffer.append(", \"children\": [");
+        for(int i=0; i < children.size(); ++i) {
+          buffer.append('\n');
+          children.get(i).printJsonToBuffer("", buffer, indent + 2);
+          if (i != children.size() - 1) {
+            buffer.append(',');
+          }
+        }
+        buffer.append("]");
+        break;
+      case STRUCT:
+        buffer.append(", \"fields\": [");
+        for(int i=0; i < children.size(); ++i) {
+          buffer.append('\n');
+          children.get(i).printJsonToBuffer("\"" + fieldNames.get(i) + "\": ",
+              buffer, indent + 2);
+          if (i != children.size() - 1) {
+            buffer.append(',');
+          }
+        }
+        buffer.append(']');
+        break;
+      default:
+        break;
+    }
+    buffer.append('}');
+  }
+
+  public String toJson() {
+    StringBuilder buffer = new StringBuilder();
+    printJsonToBuffer("", buffer, 0);
+    return buffer.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
index 6411e3f..8991f2d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -26,6 +28,13 @@ import java.util.List;
  * The interface for writing ORC files.
  */
 public interface Writer {
+
+  /**
+   * Get the schema for this writer
+   * @return the file schema
+   */
+  TypeDescription getSchema();
+
   /**
    * Add arbitrary meta-data to the ORC file. This may be called at any point
    * until the Writer is closed. If the same key is passed a second time, the

http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 7aa8d65..767d3f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy;
@@ -54,7 +53,6 @@ import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
@@ -72,9 +70,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
@@ -127,6 +122,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private final int bufferSize;
   private final long blockSize;
   private final double paddingTolerance;
+  private final TypeDescription schema;
+
   // the streams that make up the current stripe
   private final Map<StreamName, BufferedStream> streams =
     new TreeMap<StreamName, BufferedStream>();
@@ -165,6 +162,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       Path path,
       Configuration conf,
       ObjectInspector inspector,
+      TypeDescription schema,
       long stripeSize,
       CompressionKind compress,
       int bufferSize,
@@ -183,6 +181,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     this.path = path;
     this.conf = conf;
     this.callback = callback;
+    this.schema = schema;
     if (callback != null) {
       callbackContext = new OrcFile.WriterContext(){
 
@@ -207,21 +206,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     this.memoryManager = memoryManager;
     buildIndex = rowIndexStride > 0;
     codec = createCodec(compress);
-    String allColumns = conf.get(IOConstants.COLUMNS);
-    if (allColumns == null) {
-      allColumns = getColumnNamesFromInspector(inspector);
-    }
-    this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize);
+    int numColumns = schema.getMaximumId() + 1;
+    this.bufferSize = getEstimatedBufferSize(getMemoryAvailableForORC(),
+        codec != null, numColumns, bufferSize);
     if (version == OrcFile.Version.V_0_11) {
       /* do not write bloom filters for ORC v11 */
-      this.bloomFilterColumns =
-          OrcUtils.includeColumns(null, allColumns, inspector);
+      this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
     } else {
       this.bloomFilterColumns =
-          OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector);
+          OrcUtils.includeColumns(bloomFilterColumnNames, schema);
     }
     this.bloomFilterFpp = bloomFilterFpp;
-    treeWriter = createTreeWriter(inspector, streamFactory, false);
+    treeWriter = createTreeWriter(inspector, schema, streamFactory, false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
           MIN_ROW_INDEX_STRIDE);
@@ -231,62 +227,42 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     memoryManager.addWriter(path, stripeSize, this);
   }
 
-  private String getColumnNamesFromInspector(ObjectInspector inspector) {
-    List<String> fieldNames = Lists.newArrayList();
-    Joiner joiner = Joiner.on(",");
-    if (inspector instanceof StructObjectInspector) {
-      StructObjectInspector soi = (StructObjectInspector) inspector;
-      List<? extends StructField> fields = soi.getAllStructFieldRefs();
-      for(StructField sf : fields) {
-        fieldNames.add(sf.getFieldName());
-      }
-    }
-    return joiner.join(fieldNames);
-  }
+  static int getEstimatedBufferSize(long availableMem,
+                                    boolean isCompressed,
+                                    int columnCount, int bs) {
+    if (columnCount > COLUMN_COUNT_THRESHOLD) {
+      // In BufferedStream, there are 3 outstream buffers (compressed,
+      // uncompressed and overflow) and list of previously compressed buffers.
+      // Since overflow buffer is rarely used, lets consider only 2 allocation.
+      // Also, initially, the list of compression buffers will be empty.
+      final int outStreamBuffers = isCompressed ? 2 : 1;
 
-  @VisibleForTesting
-  int getEstimatedBufferSize(int bs) {
-      return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs);
-  }
+      // max possible streams per column is 5. For string columns, there is
+      // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams.
+      final int maxStreams = 5;
 
-  int getEstimatedBufferSize(String colNames, int bs) {
-    long availableMem = getMemoryAvailableForORC();
-    if (colNames != null) {
-      final int numCols = colNames.split(",").length;
-      if (numCols > COLUMN_COUNT_THRESHOLD) {
-        // In BufferedStream, there are 3 outstream buffers (compressed,
-        // uncompressed and overflow) and list of previously compressed buffers.
-        // Since overflow buffer is rarely used, lets consider only 2 allocation.
-        // Also, initially, the list of compression buffers will be empty.
-        final int outStreamBuffers = codec == null ? 1 : 2;
-
-        // max possible streams per column is 5. For string columns, there is
-        // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams.
-        final int maxStreams = 5;
-
-        // Lets assume 10% memory for holding dictionary in memory and other
-        // object allocations
-        final long miscAllocation = (long) (0.1f * availableMem);
-
-        // compute the available memory
-        final long remainingMem = availableMem - miscAllocation;
-
-        int estBufferSize = (int) (remainingMem /
-            (maxStreams * outStreamBuffers * numCols));
-        estBufferSize = getClosestBufferSize(estBufferSize, bs);
-        if (estBufferSize > bs) {
-          estBufferSize = bs;
-        }
+      // Lets assume 10% memory for holding dictionary in memory and other
+      // object allocations
+      final long miscAllocation = (long) (0.1f * availableMem);
 
-        LOG.info("WIDE TABLE - Number of columns: " + numCols +
-            " Chosen compression buffer size: " + estBufferSize);
-        return estBufferSize;
+      // compute the available memory
+      final long remainingMem = availableMem - miscAllocation;
+
+      int estBufferSize = (int) (remainingMem /
+          (maxStreams * outStreamBuffers * columnCount));
+      estBufferSize = getClosestBufferSize(estBufferSize);
+      if (estBufferSize > bs) {
+        estBufferSize = bs;
       }
+
+      LOG.info("WIDE TABLE - Number of columns: " + columnCount +
+          " Chosen compression buffer size: " + estBufferSize);
+      return estBufferSize;
     }
     return bs;
   }
 
-  private int getClosestBufferSize(int estBufferSize, int bs) {
+  private static int getClosestBufferSize(int estBufferSize) {
     final int kb4 = 4 * 1024;
     final int kb8 = 8 * 1024;
     final int kb16 = 16 * 1024;
@@ -546,15 +522,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     /**
-     * Get the current column id. After creating all tree writers this count should tell how many
-     * columns (including columns within nested complex objects) are created in total.
-     * @return current column id
-     */
-    public int getCurrentColumnId() {
-      return columnCount;
-    }
-
-    /**
      * Get the stride rate of the row index.
      */
     public int getRowIndexStride() {
@@ -666,11 +633,13 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * Create a tree writer.
      * @param columnId the column id of the column to write
      * @param inspector the object inspector to use
+     * @param schema the row schema
      * @param streamFactory limited access to the Writer's data.
      * @param nullable can the value be null?
      * @throws IOException
      */
     TreeWriter(int columnId, ObjectInspector inspector,
+               TypeDescription schema,
                StreamFactory streamFactory,
                boolean nullable) throws IOException {
       this.streamFactory = streamFactory;
@@ -686,9 +655,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       }
       this.foundNulls = false;
       createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
-      indexStatistics = ColumnStatisticsImpl.create(inspector);
-      stripeColStatistics = ColumnStatisticsImpl.create(inspector);
-      fileStatistics = ColumnStatisticsImpl.create(inspector);
+      indexStatistics = ColumnStatisticsImpl.create(schema);
+      stripeColStatistics = ColumnStatisticsImpl.create(schema);
+      fileStatistics = ColumnStatisticsImpl.create(schema);
       childrenWriters = new TreeWriter[0];
       rowIndex = OrcProto.RowIndex.newBuilder();
       rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
@@ -749,7 +718,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     /**
      * Add a new value to the column.
-     * @param obj
+     * @param obj the object to write
      * @throws IOException
      */
     void write(Object obj) throws IOException {
@@ -919,9 +888,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     BooleanTreeWriter(int columnId,
                       ObjectInspector inspector,
+                      TypeDescription schema,
                       StreamFactory writer,
                       boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       PositionedOutputStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.writer = new BitFieldWriter(out, 1);
@@ -958,9 +928,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     ByteTreeWriter(int columnId,
                       ObjectInspector inspector,
+                      TypeDescription schema,
                       StreamFactory writer,
                       boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       this.writer = new RunLengthByteWriter(writer.createStream(id,
           OrcProto.Stream.Kind.DATA));
       recordPosition(rowIndexPosition);
@@ -1003,9 +974,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     IntegerTreeWriter(int columnId,
                       ObjectInspector inspector,
+                      TypeDescription schema,
                       StreamFactory writer,
                       boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       OutStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
@@ -1079,9 +1051,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     FloatTreeWriter(int columnId,
                       ObjectInspector inspector,
+                      TypeDescription schema,
                       StreamFactory writer,
                       boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.utils = new SerializationUtils();
@@ -1123,9 +1096,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     DoubleTreeWriter(int columnId,
                     ObjectInspector inspector,
+                    TypeDescription schema,
                     StreamFactory writer,
                     boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.utils = new SerializationUtils();
@@ -1184,9 +1158,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     StringTreeWriter(int columnId,
                      ObjectInspector inspector,
+                     TypeDescription schema,
                      StreamFactory writer,
                      boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
       stringOutput = writer.createStream(id,
           OrcProto.Stream.Kind.DICTIONARY_DATA);
@@ -1423,9 +1398,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     CharTreeWriter(int columnId,
         ObjectInspector inspector,
+        TypeDescription schema,
         StreamFactory writer,
         boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
     }
 
     /**
@@ -1445,9 +1421,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     VarcharTreeWriter(int columnId,
         ObjectInspector inspector,
+        TypeDescription schema,
         StreamFactory writer,
         boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
     }
 
     /**
@@ -1467,9 +1444,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     BinaryTreeWriter(int columnId,
                      ObjectInspector inspector,
+                     TypeDescription schema,
                      StreamFactory writer,
                      boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
@@ -1531,9 +1509,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     TimestampTreeWriter(int columnId,
                      ObjectInspector inspector,
+                     TypeDescription schema,
                      StreamFactory writer,
                      boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
       this.seconds = createIntegerWriter(writer.createStream(id,
           OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
@@ -1610,9 +1589,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     DateTreeWriter(int columnId,
                    ObjectInspector inspector,
+                   TypeDescription schema,
                    StreamFactory writer,
                    boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       OutStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
@@ -1666,9 +1646,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     DecimalTreeWriter(int columnId,
                         ObjectInspector inspector,
+                        TypeDescription schema,
                         StreamFactory writer,
                         boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
       valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
       this.scaleStream = createIntegerWriter(writer.createStream(id,
@@ -1726,16 +1707,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     private final List<? extends StructField> fields;
     StructTreeWriter(int columnId,
                      ObjectInspector inspector,
+                     TypeDescription schema,
                      StreamFactory writer,
                      boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
+      List<TypeDescription> children = schema.getChildren();
       StructObjectInspector structObjectInspector =
         (StructObjectInspector) inspector;
       fields = structObjectInspector.getAllStructFieldRefs();
-      childrenWriters = new TreeWriter[fields.size()];
+      childrenWriters = new TreeWriter[children.size()];
       for(int i=0; i < childrenWriters.length; ++i) {
+        ObjectInspector childOI = i < fields.size() ?
+            fields.get(i).getFieldObjectInspector() : null;
         childrenWriters[i] = createTreeWriter(
-          fields.get(i).getFieldObjectInspector(), writer, true);
+          childOI, children.get(i), writer,
+          true);
       }
       recordPosition(rowIndexPosition);
     }
@@ -1770,15 +1756,16 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     ListTreeWriter(int columnId,
                    ObjectInspector inspector,
+                   TypeDescription schema,
                    StreamFactory writer,
                    boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
-      ListObjectInspector listObjectInspector = (ListObjectInspector) inspector;
+      ObjectInspector childOI =
+        ((ListObjectInspector) inspector).getListElementObjectInspector();
       childrenWriters = new TreeWriter[1];
       childrenWriters[0] =
-        createTreeWriter(listObjectInspector.getListElementObjectInspector(),
-          writer, true);
+        createTreeWriter(childOI, schema.getChildren().get(0), writer, true);
       lengths = createIntegerWriter(writer.createStream(columnId,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       recordPosition(rowIndexPosition);
@@ -1834,16 +1821,20 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     MapTreeWriter(int columnId,
                   ObjectInspector inspector,
+                  TypeDescription schema,
                   StreamFactory writer,
                   boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
       MapObjectInspector insp = (MapObjectInspector) inspector;
       childrenWriters = new TreeWriter[2];
+      List<TypeDescription> children = schema.getChildren();
       childrenWriters[0] =
-        createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
+        createTreeWriter(insp.getMapKeyObjectInspector(), children.get(0),
+                         writer, true);
       childrenWriters[1] =
-        createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
+        createTreeWriter(insp.getMapValueObjectInspector(), children.get(1),
+                         writer, true);
       lengths = createIntegerWriter(writer.createStream(columnId,
           OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       recordPosition(rowIndexPosition);
@@ -1901,14 +1892,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     UnionTreeWriter(int columnId,
                   ObjectInspector inspector,
+                  TypeDescription schema,
                   StreamFactory writer,
                   boolean nullable) throws IOException {
-      super(columnId, inspector, writer, nullable);
+      super(columnId, inspector, schema, writer, nullable);
       UnionObjectInspector insp = (UnionObjectInspector) inspector;
       List<ObjectInspector> choices = insp.getObjectInspectors();
-      childrenWriters = new TreeWriter[choices.size()];
+      List<TypeDescription> children = schema.getChildren();
+      childrenWriters = new TreeWriter[children.size()];
       for(int i=0; i < childrenWriters.length; ++i) {
-        childrenWriters[i] = createTreeWriter(choices.get(i), writer, true);
+        childrenWriters[i] = createTreeWriter(choices.get(i),
+                                              children.get(i), writer, true);
       }
       tags =
         new RunLengthByteWriter(writer.createStream(columnId,
@@ -1949,168 +1943,151 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   private static TreeWriter createTreeWriter(ObjectInspector inspector,
+                                             TypeDescription schema,
                                              StreamFactory streamFactory,
                                              boolean nullable) throws IOException {
-    switch (inspector.getCategory()) {
-      case PRIMITIVE:
-        switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
-          case BOOLEAN:
-            return new BooleanTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case BYTE:
-            return new ByteTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case SHORT:
-          case INT:
-          case LONG:
-            return new IntegerTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case FLOAT:
-            return new FloatTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case DOUBLE:
-            return new DoubleTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case STRING:
-            return new StringTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case CHAR:
-            return new CharTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case VARCHAR:
-            return new VarcharTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case BINARY:
-            return new BinaryTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case TIMESTAMP:
-            return new TimestampTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case DATE:
-            return new DateTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory, nullable);
-          case DECIMAL:
-            return new DecimalTreeWriter(streamFactory.getNextColumnId(),
-                inspector, streamFactory,  nullable);
-          default:
-            throw new IllegalArgumentException("Bad primitive category " +
-              ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
-        }
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return new BooleanTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case BYTE:
+        return new ByteTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case SHORT:
+      case INT:
+      case LONG:
+        return new IntegerTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case FLOAT:
+        return new FloatTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case DOUBLE:
+        return new DoubleTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case STRING:
+        return new StringTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case CHAR:
+        return new CharTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case VARCHAR:
+        return new VarcharTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case BINARY:
+        return new BinaryTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case TIMESTAMP:
+        return new TimestampTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case DATE:
+        return new DateTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
+      case DECIMAL:
+        return new DecimalTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory,  nullable);
       case STRUCT:
-        return new StructTreeWriter(streamFactory.getNextColumnId(), inspector,
-            streamFactory, nullable);
+        return new StructTreeWriter(streamFactory.getNextColumnId(),
+            inspector, schema, streamFactory, nullable);
       case MAP:
         return new MapTreeWriter(streamFactory.getNextColumnId(), inspector,
-            streamFactory, nullable);
+            schema, streamFactory, nullable);
       case LIST:
         return new ListTreeWriter(streamFactory.getNextColumnId(), inspector,
-            streamFactory, nullable);
+            schema, streamFactory, nullable);
       case UNION:
         return new UnionTreeWriter(streamFactory.getNextColumnId(), inspector,
-            streamFactory, nullable);
+            schema, streamFactory, nullable);
       default:
         throw new IllegalArgumentException("Bad category: " +
-          inspector.getCategory());
+            schema.getCategory());
     }
   }
 
   private static void writeTypes(OrcProto.Footer.Builder builder,
-                                 TreeWriter treeWriter) {
+                                 TypeDescription schema) {
     OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
-    switch (treeWriter.inspector.getCategory()) {
-      case PRIMITIVE:
-        switch (((PrimitiveObjectInspector) treeWriter.inspector).
-                 getPrimitiveCategory()) {
-          case BOOLEAN:
-            type.setKind(OrcProto.Type.Kind.BOOLEAN);
-            break;
-          case BYTE:
-            type.setKind(OrcProto.Type.Kind.BYTE);
-            break;
-          case SHORT:
-            type.setKind(OrcProto.Type.Kind.SHORT);
-            break;
-          case INT:
-            type.setKind(OrcProto.Type.Kind.INT);
-            break;
-          case LONG:
-            type.setKind(OrcProto.Type.Kind.LONG);
-            break;
-          case FLOAT:
-            type.setKind(OrcProto.Type.Kind.FLOAT);
-            break;
-          case DOUBLE:
-            type.setKind(OrcProto.Type.Kind.DOUBLE);
-            break;
-          case STRING:
-            type.setKind(OrcProto.Type.Kind.STRING);
-            break;
-          case CHAR:
-            // The char length needs to be written to file and should be available
-            // from the object inspector
-            CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo();
-            type.setKind(Type.Kind.CHAR);
-            type.setMaximumLength(charTypeInfo.getLength());
-            break;
-          case VARCHAR:
-            // The varchar length needs to be written to file and should be available
-            // from the object inspector
-            VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo();
-            type.setKind(Type.Kind.VARCHAR);
-            type.setMaximumLength(typeInfo.getLength());
-            break;
-          case BINARY:
-            type.setKind(OrcProto.Type.Kind.BINARY);
-            break;
-          case TIMESTAMP:
-            type.setKind(OrcProto.Type.Kind.TIMESTAMP);
-            break;
-          case DATE:
-            type.setKind(OrcProto.Type.Kind.DATE);
-            break;
-          case DECIMAL:
-            DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)((PrimitiveObjectInspector)treeWriter.inspector).getTypeInfo();
-            type.setKind(OrcProto.Type.Kind.DECIMAL);
-            type.setPrecision(decTypeInfo.precision());
-            type.setScale(decTypeInfo.scale());
-            break;
-          default:
-            throw new IllegalArgumentException("Unknown primitive category: " +
-              ((PrimitiveObjectInspector) treeWriter.inspector).
-                getPrimitiveCategory());
-        }
+    List<TypeDescription> children = schema.getChildren();
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        type.setKind(OrcProto.Type.Kind.BOOLEAN);
+        break;
+      case BYTE:
+        type.setKind(OrcProto.Type.Kind.BYTE);
+        break;
+      case SHORT:
+        type.setKind(OrcProto.Type.Kind.SHORT);
+        break;
+      case INT:
+        type.setKind(OrcProto.Type.Kind.INT);
+        break;
+      case LONG:
+        type.setKind(OrcProto.Type.Kind.LONG);
+        break;
+      case FLOAT:
+        type.setKind(OrcProto.Type.Kind.FLOAT);
+        break;
+      case DOUBLE:
+        type.setKind(OrcProto.Type.Kind.DOUBLE);
+        break;
+      case STRING:
+        type.setKind(OrcProto.Type.Kind.STRING);
+        break;
+      case CHAR:
+        type.setKind(OrcProto.Type.Kind.CHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case VARCHAR:
+        type.setKind(Type.Kind.VARCHAR);
+        type.setMaximumLength(schema.getMaxLength());
+        break;
+      case BINARY:
+        type.setKind(OrcProto.Type.Kind.BINARY);
+        break;
+      case TIMESTAMP:
+        type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+        break;
+      case DATE:
+        type.setKind(OrcProto.Type.Kind.DATE);
+        break;
+      case DECIMAL:
+        type.setKind(OrcProto.Type.Kind.DECIMAL);
+        type.setPrecision(schema.getPrecision());
+        type.setScale(schema.getScale());
         break;
       case LIST:
         type.setKind(OrcProto.Type.Kind.LIST);
-        type.addSubtypes(treeWriter.childrenWriters[0].id);
+        type.addSubtypes(children.get(0).getId());
         break;
       case MAP:
         type.setKind(OrcProto.Type.Kind.MAP);
-        type.addSubtypes(treeWriter.childrenWriters[0].id);
-        type.addSubtypes(treeWriter.childrenWriters[1].id);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
+        }
         break;
       case STRUCT:
         type.setKind(OrcProto.Type.Kind.STRUCT);
-        for(TreeWriter child: treeWriter.childrenWriters) {
-          type.addSubtypes(child.id);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
         }
-        for(StructField field: ((StructTreeWriter) treeWriter).fields) {
-          type.addFieldNames(field.getFieldName());
+        for(String field: schema.getFieldNames()) {
+          type.addFieldNames(field);
         }
         break;
       case UNION:
         type.setKind(OrcProto.Type.Kind.UNION);
-        for(TreeWriter child: treeWriter.childrenWriters) {
-          type.addSubtypes(child.id);
+        for(TypeDescription t: children) {
+          type.addSubtypes(t.getId());
         }
         break;
       default:
         throw new IllegalArgumentException("Unknown category: " +
-          treeWriter.inspector.getCategory());
+          schema.getCategory());
     }
     builder.addTypes(type);
-    for(TreeWriter child: treeWriter.childrenWriters) {
-      writeTypes(builder, child);
+    if (children != null) {
+      for(TypeDescription child: children) {
+        writeTypes(builder, child);
+      }
     }
   }
 
@@ -2243,73 +2220,58 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   private long computeRawDataSize() {
-    long result = 0;
-    for (TreeWriter child : treeWriter.getChildrenWriters()) {
-      result += getRawDataSizeFromInspectors(child, child.inspector);
-    }
-    return result;
+    return getRawDataSize(treeWriter, schema);
   }
 
-  private long getRawDataSizeFromInspectors(TreeWriter child, ObjectInspector oi) {
+  private long getRawDataSize(TreeWriter child,
+                              TypeDescription schema) {
     long total = 0;
-    switch (oi.getCategory()) {
-    case PRIMITIVE:
-      total += getRawDataSizeFromPrimitives(child, oi);
-      break;
-    case LIST:
-    case MAP:
-    case UNION:
-    case STRUCT:
-      for (TreeWriter tw : child.childrenWriters) {
-        total += getRawDataSizeFromInspectors(tw, tw.inspector);
-      }
-      break;
-    default:
-      LOG.debug("Unknown object inspector category.");
-      break;
-    }
-    return total;
-  }
-
-  private long getRawDataSizeFromPrimitives(TreeWriter child, ObjectInspector oi) {
-    long result = 0;
     long numVals = child.fileStatistics.getNumberOfValues();
-    switch (((PrimitiveObjectInspector) oi).getPrimitiveCategory()) {
-    case BOOLEAN:
-    case BYTE:
-    case SHORT:
-    case INT:
-    case FLOAT:
-      return numVals * JavaDataModel.get().primitive1();
-    case LONG:
-    case DOUBLE:
-      return numVals * JavaDataModel.get().primitive2();
-    case STRING:
-    case VARCHAR:
-    case CHAR:
-      // ORC strings are converted to java Strings. so use JavaDataModel to
-      // compute the overall size of strings
-      child = (StringTreeWriter) child;
-      StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics;
-      numVals = numVals == 0 ? 1 : numVals;
-      int avgStringLen = (int) (scs.getSum() / numVals);
-      return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen);
-    case DECIMAL:
-      return numVals * JavaDataModel.get().lengthOfDecimal();
-    case DATE:
-      return numVals * JavaDataModel.get().lengthOfDate();
-    case BINARY:
-      // get total length of binary blob
-      BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics;
-      return bcs.getSum();
-    case TIMESTAMP:
-      return numVals * JavaDataModel.get().lengthOfTimestamp();
-    default:
-      LOG.debug("Unknown primitive category.");
-      break;
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case FLOAT:
+        return numVals * JavaDataModel.get().primitive1();
+      case LONG:
+      case DOUBLE:
+        return numVals * JavaDataModel.get().primitive2();
+      case STRING:
+      case VARCHAR:
+      case CHAR:
+        // ORC strings are converted to java Strings. so use JavaDataModel to
+        // compute the overall size of strings
+        StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics;
+        numVals = numVals == 0 ? 1 : numVals;
+        int avgStringLen = (int) (scs.getSum() / numVals);
+        return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen);
+      case DECIMAL:
+        return numVals * JavaDataModel.get().lengthOfDecimal();
+      case DATE:
+        return numVals * JavaDataModel.get().lengthOfDate();
+      case BINARY:
+        // get total length of binary blob
+        BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics;
+        return bcs.getSum();
+      case TIMESTAMP:
+        return numVals * JavaDataModel.get().lengthOfTimestamp();
+      case LIST:
+      case MAP:
+      case UNION:
+      case STRUCT: {
+        TreeWriter[] childWriters = child.getChildrenWriters();
+        List<TypeDescription> childTypes = schema.getChildren();
+        for (int i=0; i < childWriters.length; ++i) {
+          total += getRawDataSize(childWriters[i], childTypes.get(i));
+        }
+        break;
+      }
+      default:
+        LOG.debug("Unknown object inspector category.");
+        break;
     }
-
-    return result;
+    return total;
   }
 
   private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
@@ -2356,7 +2318,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     // populate raw data size
     rawDataSize = computeRawDataSize();
     // serialize the types
-    writeTypes(builder, treeWriter);
+    writeTypes(builder, schema);
     // add the stripe information
     for(OrcProto.StripeInformation stripe: stripes) {
       builder.addStripes(stripe);
@@ -2385,7 +2347,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         .setMagic(OrcFile.MAGIC)
         .addVersion(version.getMajor())
         .addVersion(version.getMinor())
-        .setWriterVersion(OrcFile.WriterVersion.HIVE_8732.getId());
+        .setWriterVersion(OrcFile.WriterVersion.HIVE_4243.getId());
     if (compress != CompressionKind.NONE) {
       builder.setCompressionBlockSize(bufferSize);
     }
@@ -2410,6 +2372,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   @Override
+  public TypeDescription getSchema() {
+    return schema;
+  }
+
+  @Override
   public void addUserMetadata(String name, ByteBuffer value) {
     userMetadata.put(name, ByteString.copyFrom(value));
   }
@@ -2493,12 +2460,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     getStream();
     long start = rawWriter.getPos();
-    long stripeLen = length;
     long availBlockSpace = blockSize - (start % blockSize);
 
     // see if stripe can fit in the current hdfs block, else pad the remaining
     // space in the block
-    if (stripeLen < blockSize && stripeLen > availBlockSpace &&
+    if (length < blockSize && length > availBlockSpace &&
         addBlockPadding) {
       byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
       LOG.info(String.format("Padding ORC by %d bytes while merging..",

http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
----------------------------------------------------------------------
diff --git a/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto b/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
index 3b7a9b3..acadef9 100644
--- a/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
+++ b/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
@@ -213,6 +213,7 @@ message PostScript {
   // Version of the writer:
   //   0 (or missing) = original
   //   1 = HIVE-8732 fixed
+  //   2 = HIVE-4243 fixed
   optional uint32 writerVersion = 6;
   // Leave this last in the record
   optional string magic = 8000;

http://git-wip-us.apache.org/repos/asf/hive/blob/7b1ed3d3/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
index 4d30377..4e3bc90 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
@@ -48,11 +48,10 @@ public class TestColumnStatistics {
 
   @Test
   public void testLongMerge() throws Exception {
-    ObjectInspector inspector =
-        PrimitiveObjectInspectorFactory.javaIntObjectInspector;
+    TypeDescription schema = TypeDescription.createInt();
 
-    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
-    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
     stats1.updateInteger(10);
     stats1.updateInteger(10);
     stats2.updateInteger(1);
@@ -71,11 +70,10 @@ public class TestColumnStatistics {
 
   @Test
   public void testDoubleMerge() throws Exception {
-    ObjectInspector inspector =
-        PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
+    TypeDescription schema = TypeDescription.createDouble();
 
-    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
-    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
     stats1.updateDouble(10.0);
     stats1.updateDouble(100.0);
     stats2.updateDouble(1.0);
@@ -95,11 +93,10 @@ public class TestColumnStatistics {
 
   @Test
   public void testStringMerge() throws Exception {
-    ObjectInspector inspector =
-        PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+    TypeDescription schema = TypeDescription.createString();
 
-    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
-    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
     stats1.updateString(new Text("bob"));
     stats1.updateString(new Text("david"));
     stats1.updateString(new Text("charles"));
@@ -119,11 +116,10 @@ public class TestColumnStatistics {
 
   @Test
   public void testDateMerge() throws Exception {
-    ObjectInspector inspector =
-        PrimitiveObjectInspectorFactory.javaDateObjectInspector;
+    TypeDescription schema = TypeDescription.createDate();
 
-    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
-    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
     stats1.updateDate(new DateWritable(1000));
     stats1.updateDate(new DateWritable(100));
     stats2.updateDate(new DateWritable(10));
@@ -142,11 +138,10 @@ public class TestColumnStatistics {
 
   @Test
   public void testTimestampMerge() throws Exception {
-    ObjectInspector inspector =
-        PrimitiveObjectInspectorFactory.javaTimestampObjectInspector;
+    TypeDescription schema = TypeDescription.createTimestamp();
 
-    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
-    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
     stats1.updateTimestamp(new Timestamp(10));
     stats1.updateTimestamp(new Timestamp(100));
     stats2.updateTimestamp(new Timestamp(1));
@@ -165,11 +160,11 @@ public class TestColumnStatistics {
 
   @Test
   public void testDecimalMerge() throws Exception {
-    ObjectInspector inspector =
-        PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector;
+    TypeDescription schema = TypeDescription.createDecimal()
+        .withPrecision(38).withScale(16);
 
-    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector);
-    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector);
+    ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
+    ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
     stats1.updateDecimal(HiveDecimal.create(10));
     stats1.updateDecimal(HiveDecimal.create(100));
     stats2.updateDecimal(HiveDecimal.create(1));