You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/04/20 22:05:22 UTC

[05/30] hive git commit: Revert "HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline)"

Revert "HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline)"

This reverts commit 0dd4621f34f6043071474220a082268cda124b9d.


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d559b347
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d559b347
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d559b347

Branch: refs/heads/llap
Commit: d559b34755010b5ed3ecc31fa423d01788e5e875
Parents: 40e0c38
Author: Matt McCline <mm...@hortonworks.com>
Authored: Fri Apr 15 16:00:18 2016 -0700
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Fri Apr 15 16:00:18 2016 -0700

----------------------------------------------------------------------
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   45 +-
 orc/src/java/org/apache/orc/OrcUtils.java       |   75 -
 orc/src/java/org/apache/orc/Reader.java         |    6 -
 orc/src/java/org/apache/orc/RecordReader.java   |    8 +-
 .../java/org/apache/orc/TypeDescription.java    |   62 +-
 .../org/apache/orc/impl/BitFieldReader.java     |    5 +-
 .../java/org/apache/orc/impl/IntegerReader.java |   26 +-
 .../apache/orc/impl/RunLengthByteReader.java    |   36 +-
 .../apache/orc/impl/RunLengthIntegerReader.java |   31 +-
 .../orc/impl/RunLengthIntegerReaderV2.java      |   33 +-
 .../java/org/apache/orc/impl/WriterImpl.java    |   47 +-
 .../ql/exec/vector/VectorizedRowBatchCtx.java   |   13 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   43 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |    3 +-
 .../hadoop/hive/ql/io/orc/ReaderImpl.java       |   12 +-
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java |   50 +-
 .../hadoop/hive/ql/io/orc/SchemaEvolution.java  |  234 ++-
 .../hive/ql/io/orc/TreeReaderFactory.java       |  838 ++++-----
 .../ql/io/orc/VectorizedOrcInputFormat.java     |   32 +-
 .../hadoop/hive/ql/io/orc/WriterImpl.java       |    2 +
 .../hive/ql/io/orc/TestTypeDescription.java     |    4 +-
 .../hive/ql/io/orc/TestVectorOrcFile.java       | 1634 +++++++++---------
 .../hive/ql/io/orc/TestVectorizedORCReader.java |    7 +-
 .../hive/ql/exec/vector/BytesColumnVector.java  |   11 -
 .../ql/exec/vector/TimestampColumnVector.java   |    2 +-
 .../hive/ql/exec/vector/UnionColumnVector.java  |    2 +
 26 files changed, 1476 insertions(+), 1785 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index baaa4d7..7ee263d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.llap.io.decode;
 
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -28,12 +27,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.CompressionCodec;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
@@ -77,35 +71,6 @@ public class OrcEncodedDataConsumer
     stripes[m.getStripeIx()] = m;
   }
 
-  private static ColumnVector createColumn(OrcProto.Type type,
-                                           int batchSize) {
-    switch (type.getKind()) {
-      case BOOLEAN:
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-      case DATE:
-        return new LongColumnVector(batchSize);
-      case FLOAT:
-      case DOUBLE:
-        return new DoubleColumnVector(batchSize);
-      case BINARY:
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        return new BytesColumnVector(batchSize);
-      case TIMESTAMP:
-        return new TimestampColumnVector(batchSize);
-      case DECIMAL:
-        return new DecimalColumnVector(batchSize, type.getPrecision(),
-            type.getScale());
-      default:
-        throw new IllegalArgumentException("LLAP does not support " +
-            type.getKind());
-    }
-  }
-
   @Override
   protected void decodeBatch(OrcEncodedColumnBatch batch,
       Consumer<ColumnVectorBatch> downstreamConsumer) {
@@ -147,15 +112,9 @@ public class OrcEncodedDataConsumer
         ColumnVectorBatch cvb = cvbPool.take();
         assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split.
         cvb.size = batchSize;
-        List<OrcProto.Type> types = fileMetadata.getTypes();
-        int[] columnMapping = batch.getColumnIxs();
+
         for (int idx = 0; idx < batch.getColumnIxs().length; idx++) {
-          if (cvb.cols[idx] == null) {
-            // skip over the top level struct, but otherwise assume no complex
-            // types
-            cvb.cols[idx] = createColumn(types.get(columnMapping[idx]), batchSize);
-          }
-          columnReaders[idx].nextVector(cvb.cols[idx], null, batchSize);
+          cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize);
         }
 
         // we are done reading a batch, send it to consumer for processing

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcUtils.java b/orc/src/java/org/apache/orc/OrcUtils.java
index 2ebe9a7..2e93254 100644
--- a/orc/src/java/org/apache/orc/OrcUtils.java
+++ b/orc/src/java/org/apache/orc/OrcUtils.java
@@ -449,79 +449,4 @@ public class OrcUtils {
     return columnId;
   }
 
-  /**
-   * Translate the given rootColumn from the list of types to a TypeDescription.
-   * @param types all of the types
-   * @param rootColumn translate this type
-   * @return a new TypeDescription that matches the given rootColumn
-   */
-  public static
-        TypeDescription convertTypeFromProtobuf(List<OrcProto.Type> types,
-                                                int rootColumn) {
-    OrcProto.Type type = types.get(rootColumn);
-    switch (type.getKind()) {
-      case BOOLEAN:
-        return TypeDescription.createBoolean();
-      case BYTE:
-        return TypeDescription.createByte();
-      case SHORT:
-        return TypeDescription.createShort();
-      case INT:
-        return TypeDescription.createInt();
-      case LONG:
-        return TypeDescription.createLong();
-      case FLOAT:
-        return TypeDescription.createFloat();
-      case DOUBLE:
-        return TypeDescription.createDouble();
-      case STRING:
-        return TypeDescription.createString();
-      case CHAR:
-        return TypeDescription.createChar()
-            .withMaxLength(type.getMaximumLength());
-      case VARCHAR:
-        return TypeDescription.createVarchar()
-            .withMaxLength(type.getMaximumLength());
-      case BINARY:
-        return TypeDescription.createBinary();
-      case TIMESTAMP:
-        return TypeDescription.createTimestamp();
-      case DATE:
-        return TypeDescription.createDate();
-      case DECIMAL: {
-        TypeDescription result = TypeDescription.createDecimal();
-        if (type.hasScale()) {
-          result.withScale(type.getScale());
-        }
-        if (type.hasPrecision()) {
-          result.withPrecision(type.getPrecision());
-        }
-        return result;
-      }
-      case LIST:
-        return TypeDescription.createList(
-            convertTypeFromProtobuf(types, type.getSubtypes(0)));
-      case MAP:
-        return TypeDescription.createMap(
-            convertTypeFromProtobuf(types, type.getSubtypes(0)),
-            convertTypeFromProtobuf(types, type.getSubtypes(1)));
-      case STRUCT: {
-        TypeDescription result = TypeDescription.createStruct();
-        for(int f=0; f < type.getSubtypesCount(); ++f) {
-          result.addField(type.getFieldNames(f),
-              convertTypeFromProtobuf(types, type.getSubtypes(f)));
-        }
-        return result;
-      }
-      case UNION: {
-        TypeDescription result = TypeDescription.createUnion();
-        for(int f=0; f < type.getSubtypesCount(); ++f) {
-          result.addUnionChild(
-              convertTypeFromProtobuf(types, type.getSubtypes(f)));
-        }
-        return result;
-      }
-    }
-    throw new IllegalArgumentException("Unknown ORC type " + type.getKind());
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/Reader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/Reader.java b/orc/src/java/org/apache/orc/Reader.java
index 62a05e9..be722b5 100644
--- a/orc/src/java/org/apache/orc/Reader.java
+++ b/orc/src/java/org/apache/orc/Reader.java
@@ -116,15 +116,9 @@ public interface Reader {
   ColumnStatistics[] getStatistics();
 
   /**
-   * Get the type of rows in this ORC file.
-   */
-  TypeDescription getSchema();
-
-  /**
    * Get the list of types contained in the file. The root type is the first
    * type in the list.
    * @return the list of flattened types
-   * @deprecated use getSchema instead
    */
   List<OrcProto.Type> getTypes();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/RecordReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/RecordReader.java b/orc/src/java/org/apache/orc/RecordReader.java
index 09ba0f0..7229dda 100644
--- a/orc/src/java/org/apache/orc/RecordReader.java
+++ b/orc/src/java/org/apache/orc/RecordReader.java
@@ -30,11 +30,13 @@ public interface RecordReader {
    * controlled by the callers. Caller need to look at
    * VectorizedRowBatch.size of the retunred object to know the batch
    * size read.
-   * @param batch a row batch object to read into
-   * @return were more rows available to read?
+   * @param previousBatch a row batch object that can be reused by the reader
+   * @return the row batch that was read. The batch will have a non-zero row
+   *         count if the pointer isn't at the end of the file
    * @throws java.io.IOException
    */
-  boolean nextBatch(VectorizedRowBatch batch) throws IOException;
+  VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch
+			       ) throws IOException;
 
   /**
    * Get the row number of the row that will be returned by the following

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/TypeDescription.java b/orc/src/java/org/apache/orc/TypeDescription.java
index b8e057e..bd900ac 100644
--- a/orc/src/java/org/apache/orc/TypeDescription.java
+++ b/orc/src/java/org/apache/orc/TypeDescription.java
@@ -61,7 +61,7 @@ public class TypeDescription {
     LIST("array", false),
     MAP("map", false),
     STRUCT("struct", false),
-    UNION("uniontype", false);
+    UNION("union", false);
 
     Category(String name, boolean isPrimitive) {
       this.name = name;
@@ -258,66 +258,6 @@ public class TypeDescription {
     return id;
   }
 
-  public TypeDescription clone() {
-    TypeDescription result = new TypeDescription(category);
-    result.maxLength = maxLength;
-    result.precision = precision;
-    result.scale = scale;
-    if (fieldNames != null) {
-      result.fieldNames.addAll(fieldNames);
-    }
-    if (children != null) {
-      for(TypeDescription child: children) {
-        TypeDescription clone = child.clone();
-        clone.parent = result;
-        result.children.add(clone);
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public int hashCode() {
-    return getId();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || other.getClass() != TypeDescription.class) {
-      return false;
-    }
-    if (other == this) {
-      return true;
-    }
-    TypeDescription castOther = (TypeDescription) other;
-    if (category != castOther.category ||
-        getId() != castOther.getId() ||
-        getMaximumId() != castOther.getMaximumId() ||
-        maxLength != castOther.maxLength ||
-        scale != castOther.scale ||
-        precision != castOther.precision) {
-      return false;
-    }
-    if (children != null) {
-      if (children.size() != castOther.children.size()) {
-        return false;
-      }
-      for (int i = 0; i < children.size(); ++i) {
-        if (!children.get(i).equals(castOther.children.get(i))) {
-          return false;
-        }
-      }
-    }
-    if (category == Category.STRUCT) {
-      for(int i=0; i < fieldNames.size(); ++i) {
-        if (!fieldNames.get(i).equals(castOther.fieldNames.get(i))) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
   /**
    * Get the maximum id assigned to this type or its children.
    * The first call will cause all of the the ids in tree to be assigned, so

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/BitFieldReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/BitFieldReader.java b/orc/src/java/org/apache/orc/impl/BitFieldReader.java
index dda7355..8d9d3cb 100644
--- a/orc/src/java/org/apache/orc/impl/BitFieldReader.java
+++ b/orc/src/java/org/apache/orc/impl/BitFieldReader.java
@@ -137,7 +137,7 @@ public class BitFieldReader {
                          long previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
-      if (previous.noNulls || !previous.isNull[i]) {
+      if (!previous.isNull[i]) {
         previous.vector[i] = next();
       } else {
         // The default value of null for int types in vectorized
@@ -150,8 +150,7 @@ public class BitFieldReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && ((previous.vector[0] != previous.vector[i]) ||
-          (previous.isNull[0] != previous.isNull[i]))) {
+          && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
         previous.isRepeating = false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
index 8bef0f1..7dfd289 100644
--- a/orc/src/java/org/apache/orc/impl/IntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
  * Interface for reading integers.
@@ -57,25 +57,9 @@ public interface IntegerReader {
 
   /**
    * Return the next available vector for values.
-   * @param column the column being read
-   * @param data the vector to read into
-   * @param length the number of numbers to read
-   * @throws IOException
-   */
-   void nextVector(ColumnVector column,
-                   long[] data,
-                   int length
-                   ) throws IOException;
-
-  /**
-   * Return the next available vector for values. Does not change the
-   * value of column.isRepeating.
-   * @param column the column being read
-   * @param data the vector to read into
-   * @param length the number of numbers to read
+   * @return
    * @throws IOException
    */
-  void nextVector(ColumnVector column,
-                  int[] data,
-                  int length
-                  ) throws IOException;}
+   void nextVector(LongColumnVector previous, final int previousLen)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
index 24bd051..380f3391 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
 import java.io.EOFException;
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
  * A reader that reads a sequence of bytes. A control byte is read before
@@ -92,16 +92,16 @@ public class RunLengthByteReader {
     return result;
   }
 
-  public void nextVector(ColumnVector previous, long[] data, long size)
+  public void nextVector(LongColumnVector previous, long previousLen)
       throws IOException {
     previous.isRepeating = true;
-    for (int i = 0; i < size; i++) {
+    for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
-        data[i] = next();
+        previous.vector[i] = next();
       } else {
         // The default value of null for int types in vectorized
         // processing is 1, so set that if the value is null
-        data[i] = 1;
+        previous.vector[i] = 1;
       }
 
       // The default value for nulls in Vectorization for int types is 1
@@ -109,36 +109,12 @@ public class RunLengthByteReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && ((data[0] != data[i]) ||
-              (previous.isNull[0] != previous.isNull[i]))) {
+          && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
         previous.isRepeating = false;
       }
     }
   }
 
-  /**
-   * Read the next size bytes into the data array, skipping over any slots
-   * where isNull is true.
-   * @param isNull if non-null, skip any rows where isNull[r] is true
-   * @param data the array to read into
-   * @param size the number of elements to read
-   * @throws IOException
-   */
-  public void nextVector(boolean[] isNull, int[] data,
-                         long size) throws IOException {
-    if (isNull == null) {
-      for(int i=0; i < size; ++i) {
-        data[i] = next();
-      }
-    } else {
-      for(int i=0; i < size; ++i) {
-        if (!isNull[i]) {
-          data[i] = next();
-        }
-      }
-    }
-  }
-
   public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
index b91a263..0c90cde 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
 import java.io.EOFException;
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
  * A reader that reads a sequence of integers.
@@ -99,17 +99,15 @@ public class RunLengthIntegerReader implements IntegerReader {
   }
 
   @Override
-  public void nextVector(ColumnVector previous,
-                         long[] data,
-                         int previousLen) throws IOException {
+  public void nextVector(LongColumnVector previous, final int previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
-        data[i] = next();
+        previous.vector[i] = next();
       } else {
         // The default value of null for int type in vectorized
         // processing is 1, so set that if the value is null
-        data[i] = 1;
+        previous.vector[i] = 1;
       }
 
       // The default value for nulls in Vectorization for int types is 1
@@ -117,32 +115,13 @@ public class RunLengthIntegerReader implements IntegerReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) {
+          && (previous.vector[i - 1] != previous.vector[i] || previous.isNull[i - 1] != previous.isNull[i])) {
         previous.isRepeating = false;
       }
     }
   }
 
   @Override
-  public void nextVector(ColumnVector vector,
-                         int[] data,
-                         int size) throws IOException {
-    if (vector.noNulls) {
-      for(int r=0; r < data.length && r < size; ++r) {
-        data[r] = (int) next();
-      }
-    } else if (!(vector.isRepeating && vector.isNull[0])) {
-      for(int r=0; r < data.length && r < size; ++r) {
-        if (!vector.isNull[r]) {
-          data[r] = (int) next();
-        } else {
-          data[r] = 1;
-        }
-      }
-    }
-  }
-
-  @Override
   public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
index 610d9b5..c6d685a 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
@@ -21,9 +21,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
  * A reader that reads a sequence of light weight compressed integers. Refer
@@ -360,17 +360,15 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
   }
 
   @Override
-  public void nextVector(ColumnVector previous,
-                         long[] data,
-                         int previousLen) throws IOException {
+  public void nextVector(LongColumnVector previous, final int previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
-        data[i] = next();
+        previous.vector[i] = next();
       } else {
         // The default value of null for int type in vectorized
         // processing is 1, so set that if the value is null
-        data[i] = 1;
+        previous.vector[i] = 1;
       }
 
       // The default value for nulls in Vectorization for int types is 1
@@ -378,29 +376,10 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
       // when determining the isRepeating flag.
       if (previous.isRepeating
           && i > 0
-          && (data[0] != data[i] ||
-          previous.isNull[0] != previous.isNull[i])) {
+          && (previous.vector[i - 1] != previous.vector[i] ||
+          previous.isNull[i - 1] != previous.isNull[i])) {
         previous.isRepeating = false;
       }
     }
   }
-
-  @Override
-  public void nextVector(ColumnVector vector,
-                         int[] data,
-                         int size) throws IOException {
-    if (vector.noNulls) {
-      for(int r=0; r < data.length && r < size; ++r) {
-        data[r] = (int) next();
-      }
-    } else if (!(vector.isRepeating && vector.isNull[0])) {
-      for(int r=0; r < data.length && r < size; ++r) {
-        if (!vector.isNull[r]) {
-          data[r] = (int) next();
-        } else {
-          data[r] = 1;
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index b2966e0..f8afe06 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -1693,10 +1693,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
   }
 
-  public static long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
-  public static long NANOS_PER_MILLI = 1000000;
   public static final int MILLIS_PER_SECOND = 1000;
   static final int NANOS_PER_SECOND = 1000000000;
+  static final int MILLIS_PER_NANO  = 1000000;
   public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
 
   private static class TimestampTreeWriter extends TreeWriter {
@@ -2262,36 +2261,32 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         }
       } else {
         // write the records in runs of the same tag
-        int[] currentStart = new int[vec.fields.length];
-        int[] currentLength = new int[vec.fields.length];
+        byte prevTag = 0;
+        int currentRun = 0;
+        boolean started = false;
         for(int i=0; i < length; ++i) {
-          // only need to deal with the non-nulls, since the nulls were dealt
-          // with in the super method.
-          if (vec.noNulls || !vec.isNull[i + offset]) {
+          if (!vec.isNull[i + offset]) {
             byte tag = (byte) vec.tags[offset + i];
             tags.write(tag);
-            if (currentLength[tag] == 0) {
-              // start a new sequence
-              currentStart[tag] = i + offset;
-              currentLength[tag] = 1;
-            } else if (currentStart[tag] + currentLength[tag] == i + offset) {
-              // ok, we are extending the current run for that tag.
-              currentLength[tag] += 1;
-            } else {
-              // otherwise, we need to close off the old run and start a new one
-              childrenWriters[tag].writeBatch(vec.fields[tag],
-                  currentStart[tag], currentLength[tag]);
-              currentStart[tag] = i + offset;
-              currentLength[tag] = 1;
+            if (!started) {
+              started = true;
+              currentRun = i;
+              prevTag = tag;
+            } else if (tag != prevTag) {
+              childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+                  offset + currentRun, i - currentRun);
+              currentRun = i;
+              prevTag = tag;
             }
+          } else if (started) {
+            started = false;
+            childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+                offset + currentRun, i - currentRun);
           }
         }
-        // write out any left over sequences
-        for(int tag=0; tag < currentStart.length; ++tag) {
-          if (currentLength[tag] != 0) {
-            childrenWriters[tag].writeBatch(vec.fields[tag], currentStart[tag],
-                currentLength[tag]);
-          }
+        if (started) {
+          childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+              offset + currentRun, length - currentRun);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
index 82a97e0..0724191 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
@@ -215,9 +215,12 @@ public class VectorizedRowBatchCtx {
     LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated));
     int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
     VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
-    for (int i = 0; i < dataColumnCount; i++) {
-      TypeInfo typeInfo = rowColumnTypeInfos[i];
-      result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+
+    for (int i = 0; i < columnsToIncludeTruncated.length; i++) {
+      if (columnsToIncludeTruncated[i]) {
+        TypeInfo typeInfo = rowColumnTypeInfos[i];
+        result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+      }
     }
 
     for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) {
@@ -473,8 +476,8 @@ public class VectorizedRowBatchCtx {
             bcv.isNull[0] = true;
             bcv.isRepeating = true;
           } else {
-            bcv.setVal(0, sVal.getBytes());
-            bcv.isRepeating = true;
+            bcv.fill(sVal.getBytes());
+            bcv.isNull[0] = false;
           }
         }
         break;

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fcb8ca4..fe0be7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -301,7 +301,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     /**
      * Do we have schema on read in the configuration variables?
      */
-    TypeDescription schema = getDesiredRowTypeDescr(conf, false, Integer.MAX_VALUE);
+    TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ false);
 
     Reader.Options options = new Reader.Options().range(offset, length);
     options.schema(schema);
@@ -1743,7 +1743,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     /**
      * Do we have schema on read in the configuration variables?
      */
-    TypeDescription schema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+    TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ true);
 
     final Reader reader;
     final int bucket;
@@ -1994,13 +1994,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   /**
    * Convert a Hive type property string that contains separated type names into a list of
    * TypeDescription objects.
-   * @param hiveTypeProperty the desired types from hive
-   * @param maxColumns the maximum number of desired columns
    * @return the list of TypeDescription objects.
    */
-  public static ArrayList<TypeDescription>
-      typeDescriptionsFromHiveTypeProperty(String hiveTypeProperty,
-                                           int maxColumns) {
+  public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty(
+      String hiveTypeProperty) {
 
     // CONSDIER: We need a type name parser for TypeDescription.
 
@@ -2008,9 +2005,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size());
     for (TypeInfo typeInfo : typeInfoList) {
       typeDescrList.add(convertTypeInfo(typeInfo));
-      if (typeDescrList.size() >= maxColumns) {
-        break;
-      }
     }
     return typeDescrList;
   }
@@ -2097,18 +2091,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
-  /**
-   * Generate the desired schema for reading the file.
-   * @param conf the configuration
-   * @param isAcidRead is this an acid format?
-   * @param dataColumns the desired number of data columns for vectorized read
-   * @return the desired schema or null if schema evolution isn't enabled
-   * @throws IOException
-   */
-  public static TypeDescription getDesiredRowTypeDescr(Configuration conf,
-                                                       boolean isAcidRead,
-                                                       int dataColumns
-                                                       ) throws IOException {
+  public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcidRead)
+      throws IOException {
 
     String columnNameProperty = null;
     String columnTypeProperty = null;
@@ -2131,10 +2115,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           haveSchemaEvolutionProperties = false;
         } else {
           schemaEvolutionTypeDescrs =
-              typeDescriptionsFromHiveTypeProperty(columnTypeProperty,
-                  dataColumns);
-          if (schemaEvolutionTypeDescrs.size() !=
-              Math.min(dataColumns, schemaEvolutionColumnNames.size())) {
+              typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+          if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
             haveSchemaEvolutionProperties = false;
           }
         }
@@ -2165,9 +2147,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         return null;
       }
       schemaEvolutionTypeDescrs =
-          typeDescriptionsFromHiveTypeProperty(columnTypeProperty, dataColumns);
-      if (schemaEvolutionTypeDescrs.size() !=
-          Math.min(dataColumns, schemaEvolutionColumnNames.size())) {
+          typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+      if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
         return null;
       }
 
@@ -2181,7 +2162,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         }
         columnNum++;
       }
-      if (virtualColumnClipNum != -1 && virtualColumnClipNum < dataColumns) {
+      if (virtualColumnClipNum != -1) {
         schemaEvolutionColumnNames =
             Lists.newArrayList(schemaEvolutionColumnNames.subList(0, virtualColumnClipNum));
         schemaEvolutionTypeDescrs = Lists.newArrayList(schemaEvolutionTypeDescrs.subList(0, virtualColumnClipNum));
@@ -2198,7 +2179,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     // Desired schema does not include virtual columns or partition columns.
     TypeDescription result = TypeDescription.createStruct();
-    for (int i = 0; i < schemaEvolutionTypeDescrs.size(); i++) {
+    for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
       result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i));
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 0dd58b7..1fce282 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -447,8 +447,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     this.length = options.getLength();
     this.validTxnList = validTxnList;
 
-    TypeDescription typeDescr =
-        OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+    TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ true);
 
     objectInspector = OrcRecordUpdater.createEventSchema
         (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 0bcf9e3..a031a92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -26,8 +26,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.orc.OrcUtils;
-import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.impl.ColumnStatisticsImpl;
@@ -73,7 +71,6 @@ public class ReaderImpl implements Reader {
   private final List<OrcProto.StripeStatistics> stripeStats;
   private final int metadataSize;
   protected final List<OrcProto.Type> types;
-  private final TypeDescription schema;
   private final List<OrcProto.UserMetadataItem> userMetadata;
   private final List<OrcProto.ColumnStatistics> fileStats;
   private final List<StripeInformation> stripes;
@@ -246,11 +243,6 @@ public class ReaderImpl implements Reader {
     return result;
   }
 
-  @Override
-  public TypeDescription getSchema() {
-    return schema;
-  }
-
   /**
    * Ensure this is an ORC file to prevent users from trying to read text
    * files or RC files as ORC files.
@@ -394,9 +386,7 @@ public class ReaderImpl implements Reader {
       this.writerVersion = footerMetaData.writerVersion;
       this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList());
     }
-    this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0);
   }
-
   /**
    * Get the WriterVersion based on the ORC file postscript.
    * @param writerVersion the integer writer version
@@ -678,7 +668,7 @@ public class ReaderImpl implements Reader {
       options.include(include);
     }
     return new RecordReaderImpl(this.getStripes(), fileSystem, path,
-        options, schema, types, codec, bufferSize, rowIndexStride, conf);
+        options, types, codec, bufferSize, rowIndexStride, conf);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index c214658..3975409 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -27,8 +27,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.orc.BooleanColumnStatistics;
-import org.apache.orc.TypeDescription;
+import org.apache.orc.OrcUtils;
 import org.apache.orc.impl.BufferChunk;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.impl.ColumnStatisticsImpl;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -96,6 +98,7 @@ public class RecordReaderImpl implements RecordReader {
   private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
+  private final Configuration conf;
   private final MetadataReader metadata;
   private final DataReader dataReader;
 
@@ -142,33 +145,33 @@ public class RecordReaderImpl implements RecordReader {
                              FileSystem fileSystem,
                              Path path,
                              Reader.Options options,
-                             TypeDescription fileSchema,
                              List<OrcProto.Type> types,
                              CompressionCodec codec,
                              int bufferSize,
                              long strideRate,
                              Configuration conf
                              ) throws IOException {
-    SchemaEvolution treeReaderSchema;
-    this.included = options.getInclude();
-    included[0] = true;
+
+    TreeReaderFactory.TreeReaderSchema treeReaderSchema;
     if (options.getSchema() == null) {
       if (LOG.isInfoEnabled()) {
         LOG.info("Schema on read not provided -- using file schema " + types.toString());
       }
-      treeReaderSchema = new SchemaEvolution(fileSchema, included);
+      treeReaderSchema = new TreeReaderFactory.TreeReaderSchema().fileTypes(types).schemaTypes(types);
     } else {
 
       // Now that we are creating a record reader for a file, validate that the schema to read
       // is compatible with the file schema.
       //
-      treeReaderSchema = new SchemaEvolution(fileSchema, options.getSchema(),
-          included);
+      List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
+      treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
     }
     this.path = path;
     this.codec = codec;
     this.types = types;
     this.bufferSize = bufferSize;
+    this.included = options.getInclude();
+    this.conf = conf;
     this.rowIndexStride = strideRate;
     this.metadata = new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size());
     SearchArgument sarg = options.getSearchArgument();
@@ -207,8 +210,7 @@ public class RecordReaderImpl implements RecordReader {
       skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf);
     }
 
-    reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
-        treeReaderSchema, included, skipCorrupt);
+    reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt);
     indexes = new OrcProto.RowIndex[types.size()];
     bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
     advanceToNextRow(reader, 0L, true);
@@ -237,7 +239,7 @@ public class RecordReaderImpl implements RecordReader {
     return metadata.readStripeFooter(stripe);
   }
 
-  enum Location {
+  static enum Location {
     BEFORE, MIN, MIDDLE, MAX, AFTER
   }
 
@@ -1050,27 +1052,31 @@ public class RecordReaderImpl implements RecordReader {
   }
 
   @Override
-  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+  public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException {
     try {
+      final VectorizedRowBatch result;
       if (rowInStripe >= rowCountInStripe) {
         currentStripe += 1;
-        if (currentStripe >= stripes.size()) {
-          batch.size = 0;
-          return false;
-        }
         readStripe();
       }
 
-      int batchSize = computeBatchSize(batch.getMaxSize());
+      final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE);
 
       rowInStripe += batchSize;
-      reader.setVectorColumnCount(batch.getDataColumnCount());
-      reader.nextBatch(batch, batchSize);
+      if (previous == null) {
+        ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
+        result = new VectorizedRowBatch(cols.length);
+        result.cols = cols;
+      } else {
+        result = previous;
+        result.selectedInUse = false;
+        reader.setVectorColumnCount(result.getDataColumnCount());
+        reader.nextVector(result.cols, batchSize);
+      }
 
-      batch.size = (int) batchSize;
-      batch.selectedInUse = false;
+      result.size = batchSize;
       advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-      return batch.size  != 0;
+      return result;
     } catch (IOException e) {
       // Rethrow exception with file name in log message
       throw new IOException("Error reading file: " + path, e);

http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
index 6747691..f28ca13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
@@ -20,12 +20,13 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
 import org.apache.orc.TypeDescription;
 
 /**
@@ -33,134 +34,103 @@ import org.apache.orc.TypeDescription;
  * has been schema evolution.
  */
 public class SchemaEvolution {
-  private final Map<TypeDescription, TypeDescription> readerToFile;
-  private final boolean[] included;
-  private final TypeDescription readerSchema;
+
   private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
 
-  public SchemaEvolution(TypeDescription readerSchema, boolean[] included) {
-    this.included = included;
-    readerToFile = null;
-    this.readerSchema = readerSchema;
-  }
+  public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes,
+      List<OrcProto.Type> schemaTypes) throws IOException {
 
-  public SchemaEvolution(TypeDescription fileSchema,
-                         TypeDescription readerSchema,
-                         boolean[] included) throws IOException {
-    readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1);
-    this.included = included;
-    if (checkAcidSchema(fileSchema)) {
-      this.readerSchema = createEventSchema(readerSchema);
+    // For ACID, the row is the ROW field in the outer STRUCT.
+    final boolean isAcid = checkAcidSchema(fileTypes);
+    final List<OrcProto.Type> rowSchema;
+    int rowSubtype;
+    if (isAcid) {
+      rowSubtype = OrcRecordUpdater.ROW + 1;
+      rowSchema = fileTypes.subList(rowSubtype, fileTypes.size());
     } else {
-      this.readerSchema = readerSchema;
+      rowSubtype = 0;
+      rowSchema = fileTypes;
     }
-    buildMapping(fileSchema, this.readerSchema);
-  }
 
-  public TypeDescription getReaderSchema() {
-    return readerSchema;
-  }
+    // Do checking on the overlap.  Additional columns will be defaulted to NULL.
 
-  public TypeDescription getFileType(TypeDescription readerType) {
-    TypeDescription result;
-    if (readerToFile == null) {
-      if (included == null || included[readerType.getId()]) {
-        result = readerType;
-      } else {
-        result = null;
-      }
-    } else {
-      result = readerToFile.get(readerType);
-    }
-    return result;
-  }
+    int numFileColumns = rowSchema.get(0).getSubtypesCount();
+    int numDesiredColumns = schemaTypes.get(0).getSubtypesCount();
 
-  void buildMapping(TypeDescription fileType,
-                    TypeDescription readerType) throws IOException {
-    // if the column isn't included, don't map it
-    if (included != null && !included[readerType.getId()]) {
-      return;
-    }
-    boolean isOk = true;
-    // check the easy case first
-    if (fileType.getCategory() == readerType.getCategory()) {
-      switch (readerType.getCategory()) {
-        case BOOLEAN:
-        case BYTE:
-        case SHORT:
-        case INT:
-        case LONG:
-        case DOUBLE:
-        case FLOAT:
-        case STRING:
-        case TIMESTAMP:
-        case BINARY:
-        case DATE:
-          // these are always a match
-          break;
-        case CHAR:
-        case VARCHAR:
-          isOk = fileType.getMaxLength() == readerType.getMaxLength();
-          break;
-        case DECIMAL:
-          // TODO we don't enforce scale and precision checks, but probably should
-          break;
-        case UNION:
-        case MAP:
-        case LIST: {
-          // these must be an exact match
-          List<TypeDescription> fileChildren = fileType.getChildren();
-          List<TypeDescription> readerChildren = readerType.getChildren();
-          if (fileChildren.size() == readerChildren.size()) {
-            for(int i=0; i < fileChildren.size(); ++i) {
-              buildMapping(fileChildren.get(i), readerChildren.get(i));
-            }
-          } else {
-            isOk = false;
+    int numReadColumns = Math.min(numFileColumns, numDesiredColumns);
+
+    /**
+     * Check type promotion.
+     *
+     * Currently, we only support integer type promotions that can be done "implicitly".
+     * That is, we know that using a bigger integer tree reader on the original smaller integer
+     * column will "just work".
+     *
+     * In the future, other type promotions might require type conversion.
+     */
+    // short -> int -> bigint as same integer readers are used for the above types.
+
+    for (int i = 0; i < numReadColumns; i++) {
+      OrcProto.Type fColType = fileTypes.get(rowSubtype + i);
+      OrcProto.Type rColType = schemaTypes.get(i);
+      if (!fColType.getKind().equals(rColType.getKind())) {
+
+        boolean ok = false;
+        if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
+
+          if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
+              rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+            // type promotion possible, converting SHORT to INT/LONG requested type
+            ok = true;
           }
-          break;
-        }
-        case STRUCT: {
-          // allow either side to have fewer fields than the other
-          List<TypeDescription> fileChildren = fileType.getChildren();
-          List<TypeDescription> readerChildren = readerType.getChildren();
-          int jointSize = Math.min(fileChildren.size(), readerChildren.size());
-          for(int i=0; i < jointSize; ++i) {
-            buildMapping(fileChildren.get(i), readerChildren.get(i));
+        } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
+
+          if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+            // type promotion possible, converting INT to LONG requested type
+            ok = true;
           }
-          break;
         }
-        default:
-          throw new IllegalArgumentException("Unknown type " + readerType);
-      }
-    } else {
-      switch (fileType.getCategory()) {
-        case SHORT:
-          if (readerType.getCategory() != TypeDescription.Category.INT &&
-              readerType.getCategory() != TypeDescription.Category.LONG) {
-            isOk = false;
-          }
-          break;
-        case INT:
-          if (readerType.getCategory() != TypeDescription.Category.LONG) {
-            isOk = false;
-          }
-          break;
-        default:
-          isOk = false;
+
+        if (!ok) {
+          throw new IOException("ORC does not support type conversion from " +
+              fColType.getKind().name() + " to " + rColType.getKind().name());
+        }
       }
     }
-    if (isOk) {
-      readerToFile.put(readerType, fileType);
+
+    List<OrcProto.Type> fullSchemaTypes;
+
+    if (isAcid) {
+      fullSchemaTypes = new ArrayList<OrcProto.Type>();
+
+      // This copies the ACID struct type which is subtype = 0.
+      // It has field names "operation" through "row".
+      // And we copy the types for all fields EXCEPT ROW (which must be last!).
+
+      for (int i = 0; i < rowSubtype; i++) {
+        fullSchemaTypes.add(fileTypes.get(i).toBuilder().build());
+      }
+
+      // Add the row struct type.
+      OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0);
     } else {
-      throw new IOException("ORC does not support type conversion from " +
-          fileType + " to " + readerType);
+      fullSchemaTypes = schemaTypes;
     }
+
+    int innerStructSubtype = rowSubtype;
+
+    // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() +
+    //     " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString());
+
+    return new TreeReaderSchema().
+        fileTypes(fileTypes).
+        schemaTypes(fullSchemaTypes).
+        innerStructSubtype(innerStructSubtype);
   }
 
-  private static boolean checkAcidSchema(TypeDescription type) {
-    if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
-      List<String> rootFields = type.getFieldNames();
+  private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
+    if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
+      List<String> rootFields = fileSchema.get(0).getFieldNamesList();
       if (acidEventFieldNames.equals(rootFields)) {
         return true;
       }
@@ -172,14 +142,26 @@ public class SchemaEvolution {
    * @param typeDescr
    * @return ORC types for the ACID event based on the row's type description
    */
-  public static TypeDescription createEventSchema(TypeDescription typeDescr) {
-    TypeDescription result = TypeDescription.createStruct()
-        .addField("operation", TypeDescription.createInt())
-        .addField("originalTransaction", TypeDescription.createLong())
-        .addField("bucket", TypeDescription.createInt())
-        .addField("rowId", TypeDescription.createLong())
-        .addField("currentTransaction", TypeDescription.createLong())
-        .addField("row", typeDescr.clone());
+  public static List<OrcProto.Type> createEventSchema(TypeDescription typeDescr) {
+
+    List<OrcProto.Type> result = new ArrayList<OrcProto.Type>();
+
+    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+    type.setKind(OrcProto.Type.Kind.STRUCT);
+    type.addAllFieldNames(acidEventFieldNames);
+    for (int i = 0; i < acidEventFieldNames.size(); i++) {
+      type.addSubtypes(i + 1);
+    }
+    result.add(type.build());
+
+    // Automatically add all fields except the last (ROW).
+    for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) {
+      type.clear();
+      type.setKind(acidEventOrcTypeKinds.get(i));
+      result.add(type.build());
+    }
+
+    OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr);
     return result;
   }
 
@@ -192,4 +174,14 @@ public class SchemaEvolution {
     acidEventFieldNames.add("currentTransaction");
     acidEventFieldNames.add("row");
   }
+  public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds =
+      new ArrayList<OrcProto.Type.Kind>();
+  static {
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT);
+  }
 }