You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/01/12 18:56:47 UTC

[16/18] hive git commit: HIVE-12625: Backport to branch-1 HIVE-11981 ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema evolution (Prasanth Jayachan

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 0adbea1..e1c2f31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
 import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 
 /**
@@ -73,7 +75,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
   private static final Log LOG = LogFactory.getLog(VectorMapJoinGenerateResultOperator.class.getName());
   private static final String CLASS_NAME = VectorMapJoinGenerateResultOperator.class.getName();
 
-  private transient PrimitiveTypeInfo[] bigTablePrimitiveTypeInfos;
+  //------------------------------------------------------------------------------------------------
+
+  private transient TypeInfo[] bigTableTypeInfos;
 
   private transient VectorSerializeRow bigTableVectorSerializeRow;
 
@@ -394,14 +398,14 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
 
   private void setupSpillSerDe(VectorizedRowBatch batch) throws HiveException {
 
-    PrimitiveTypeInfo[] inputObjInspectorsTypeInfos =
-        VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+    TypeInfo[] inputObjInspectorsTypeInfos =
+        VectorizedBatchUtil.typeInfosFromStructObjectInspector(
                (StructObjectInspector) inputObjInspectors[posBigTable]);
 
     List<Integer> projectedColumns = vContext.getProjectedColumns();
     int projectionSize = vContext.getProjectedColumns().size();
 
-    List<PrimitiveTypeInfo> typeInfoList = new ArrayList<PrimitiveTypeInfo>();
+    List<TypeInfo> typeInfoList = new ArrayList<TypeInfo>();
     List<Integer> noNullsProjectionList = new ArrayList<Integer>();
     for (int i = 0; i < projectionSize; i++) {
       int projectedColumn = projectedColumns.get(i);
@@ -413,17 +417,19 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
 
     int[] noNullsProjection = ArrayUtils.toPrimitive(noNullsProjectionList.toArray(new Integer[0]));
     int noNullsProjectionSize = noNullsProjection.length;
-    bigTablePrimitiveTypeInfos = typeInfoList.toArray(new PrimitiveTypeInfo[0]);
+    bigTableTypeInfos = typeInfoList.toArray(new TypeInfo[0]);
 
     bigTableVectorSerializeRow =
-            new VectorSerializeRow(new LazyBinarySerializeWrite(noNullsProjectionSize));
+            new VectorSerializeRow(
+                new LazyBinarySerializeWrite(noNullsProjectionSize));
 
     bigTableVectorSerializeRow.init(
-                bigTablePrimitiveTypeInfos,
-                noNullsProjectionList);
+                bigTableTypeInfos,
+                noNullsProjection);
 
-    bigTableVectorDeserializeRow = new VectorDeserializeRow(
-            new LazyBinaryDeserializeRead(bigTablePrimitiveTypeInfos));
+    bigTableVectorDeserializeRow =
+        new VectorDeserializeRow(
+            new LazyBinaryDeserializeRead(bigTableTypeInfos));
 
     bigTableVectorDeserializeRow.init(noNullsProjection);
   }
@@ -833,4 +839,4 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
     sb.append("]");
     return sb.toString();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 8f60e9d..6d4c198 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 
@@ -33,6 +34,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Pattern;
 
 /**
@@ -572,4 +575,20 @@ public class AcidUtils {
       original.add(stat);
     }
   }
+
+  public static boolean isTablePropertyTransactional(Properties props) {
+    String resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    if (resultStr == null) {
+      resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
+    }
+    return resultStr != null && resultStr.equalsIgnoreCase("true");
+  }
+
+  public static boolean isTablePropertyTransactional(Map<String, String> parameters) {
+    String resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    if (resultStr == null) {
+      resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
+    }
+    return resultStr != null && resultStr.equalsIgnoreCase("true");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 2d6e752..181ce84 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -490,11 +490,16 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     // ensure filters are not set from previous pushFilters
     jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR);
     jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
+
+    Utilities.unsetSchemaEvolution(jobConf);
+
     TableScanDesc scanDesc = tableScan.getConf();
     if (scanDesc == null) {
       return;
     }
 
+    Utilities.addTableSchemaToConf(jobConf, tableScan);
+
     // construct column name list and types for reference by filter push down
     Utilities.setColumnNameList(jobConf, tableScan);
     Utilities.setColumnTypeList(jobConf, tableScan);

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
index 9879dfe..8d94da8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
@@ -36,6 +36,17 @@ public final class IOConstants {
   public static final String AVRO = "AVRO";
   public static final String AVROFILE = "AVROFILE";
 
+  /**
+   * The desired TABLE column names and types for input format schema evolution.
+   * This is different than COLUMNS and COLUMNS_TYPES, which are based on individual partition
+   * metadata.
+   *
+   * Virtual columns and partition columns are not included
+   *
+   */
+  public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns";
+  public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types";
+
   @VisibleForTesting
   public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde";
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
new file mode 100644
index 0000000..6c455bd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+/**
+ * Marker interface to indicate a given input format is self-describing and
+ * can perform schema evolution itself.
+ */
+public interface SelfDescribingInputFormatInterface {
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
deleted file mode 100644
index e9e1d5a..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * A MapReduce/Hive Vectorized input format for RC files.
- */
-public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
-  implements InputFormatChecker {
-
-  public VectorizedRCFileInputFormat() {
-    setMinSplitSize(SequenceFile.SYNC_INTERVAL);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(InputSplit split, JobConf job,
-      Reporter reporter) throws IOException {
-
-    reporter.setStatus(split.toString());
-
-    return new VectorizedRCFileRecordReader(job, (FileSplit) split);
-  }
-
-  @Override
-  public boolean validateInput(FileSystem fs, HiveConf conf,
-      List<FileStatus> files) throws IOException {
-    if (files.size() <= 0) {
-      return false;
-    }
-    for (int fileId = 0; fileId < files.size(); fileId++) {
-      RCFile.Reader reader = null;
-      try {
-        reader = new RCFile.Reader(fs, files.get(fileId)
-            .getPath(), conf);
-        reader.close();
-        reader = null;
-      } catch (IOException e) {
-        return false;
-      } finally {
-        if (null != reader) {
-          reader.close();
-        }
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
deleted file mode 100644
index 4cc1c2f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.WeakHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-
-/**
- * RCFileRecordReader.
- */
-public class VectorizedRCFileRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> {
-
-  private final Reader in;
-  private final long start;
-  private final long end;
-  private boolean more = true;
-  protected Configuration conf;
-  private final FileSplit split;
-  private final boolean useCache;
-  private VectorizedRowBatchCtx rbCtx;
-  private final LongWritable keyCache = new LongWritable();
-  private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable();
-  private boolean addPartitionCols = true;
-  private final DataOutputBuffer buffer = new DataOutputBuffer();
-
-  private static RCFileSyncCache syncCache = new RCFileSyncCache();
-
-  private static final class RCFileSyncEntry {
-    long end;
-    long endSync;
-  }
-
-  private static final class RCFileSyncCache {
-
-    private final Map<String, RCFileSyncEntry> cache;
-
-    public RCFileSyncCache() {
-      cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>());
-    }
-
-    public void put(FileSplit split, long endSync) {
-      Path path = split.getPath();
-      long end = split.getStart() + split.getLength();
-      String key = path.toString() + "+" + String.format("%d", end);
-
-      RCFileSyncEntry entry = new RCFileSyncEntry();
-      entry.end = end;
-      entry.endSync = endSync;
-      if (entry.endSync >= entry.end) {
-        cache.put(key, entry);
-      }
-    }
-
-    public long get(FileSplit split) {
-      Path path = split.getPath();
-      long start = split.getStart();
-      String key = path.toString() + "+" + String.format("%d", start);
-      RCFileSyncEntry entry = cache.get(key);
-      if (entry != null) {
-        return entry.endSync;
-      }
-      return -1;
-    }
-  }
-
-  public VectorizedRCFileRecordReader(Configuration conf, FileSplit split)
-      throws IOException {
-
-    Path path = split.getPath();
-    FileSystem fs = path.getFileSystem(conf);
-    this.in = new RCFile.Reader(fs, path, conf);
-    this.end = split.getStart() + split.getLength();
-    this.conf = conf;
-    this.split = split;
-
-    useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE);
-
-    if (split.getStart() > in.getPosition()) {
-      long oldSync = useCache ? syncCache.get(split) : -1;
-      if (oldSync == -1) {
-        in.sync(split.getStart()); // sync to start
-      } else {
-        in.seek(oldSync);
-      }
-    }
-
-    this.start = in.getPosition();
-
-    more = start < end;
-    try {
-      rbCtx = new VectorizedRowBatchCtx();
-      rbCtx.init(conf, split);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public Class<?> getKeyClass() {
-    return LongWritable.class;
-  }
-
-  public Class<?> getValueClass() {
-    return BytesRefArrayWritable.class;
-  }
-
-  @Override
-  public NullWritable createKey() {
-    return NullWritable.get();
-  }
-
-  @Override
-  public VectorizedRowBatch createValue() {
-    VectorizedRowBatch result;
-    try {
-      result = rbCtx.createVectorizedRowBatch();
-    } catch (HiveException e) {
-      throw new RuntimeException("Error creating a batch", e);
-    }
-    return result;
-  }
-
-  public boolean nextBlock() throws IOException {
-    return in.nextBlock();
-  }
-
-  @Override
-  public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
-
-    // Reset column fields noNull values to true
-    VectorizedBatchUtil.setNoNullFields(value);
-    buffer.reset();
-    value.selectedInUse = false;
-    for (int i = 0; i < value.numCols; i++) {
-      value.cols[i].isRepeating = false;
-    }
-
-    int i = 0;
-    try {
-
-      for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
-        more = next(keyCache);
-        if (more) {
-          // Check and update partition cols if necessary. Ideally this should be done
-          // in CreateValue() as the partition is constant per split. But since Hive uses
-          // CombineHiveRecordReader and as this does not call CreateValue() for
-          // each new RecordReader it creates, this check is required in next()
-          if (addPartitionCols) {
-            rbCtx.addPartitionColsToBatch(value);
-            addPartitionCols = false;
-          }
-          in.getCurrentRow(colsCache);
-          // Currently RCFile reader does not support reading vectorized
-          // data. Populating the batch by adding one row at a time.
-          rbCtx.addRowToBatch(i, (Writable) colsCache, value, buffer);
-        } else {
-          break;
-        }
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Error while getting next row", e);
-    }
-    value.size = i;
-    return more;
-  }
-
-  protected boolean next(LongWritable key) throws IOException {
-    if (!more) {
-      return false;
-    }
-
-    more = in.next(key);
-
-    long lastSeenSyncPos = in.lastSeenSyncPos();
-
-    if (lastSeenSyncPos >= end) {
-      if (useCache) {
-        syncCache.put(split, lastSeenSyncPos);
-      }
-      more = false;
-      return more;
-    }
-    return more;
-  }
-
-  /**
-   * Return the progress within the input split.
-   *
-   * @return 0.0 to 1.0 of the input byte range
-   */
-  public float getProgress() throws IOException {
-    if (end == start) {
-      return 0.0f;
-    } else {
-      return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start));
-    }
-  }
-
-  public long getPos() throws IOException {
-    return in.getPosition();
-  }
-
-  public KeyBuffer getKeyBuffer() {
-    return in.getCurrentKeyBufferObj();
-  }
-
-  protected void seek(long pos) throws IOException {
-    in.seek(pos);
-  }
-
-  public void sync(long pos) throws IOException {
-    in.sync(pos);
-  }
-
-  public void resetBuffer() {
-    in.resetBuffer();
-  }
-
-  public long getStart() {
-    return start;
-  }
-
-  public void close() throws IOException {
-    in.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
deleted file mode 100644
index aaf4eb4..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Factory for creating ORC tree readers. These tree readers can handle type promotions and type
- * conversions.
- */
-public class ConversionTreeReaderFactory extends TreeReaderFactory {
-
-  // TODO: This is currently only a place holder for type conversions.
-
-  public static TreeReader createTreeReader(int columnId,
-      List<OrcProto.Type> types,
-      boolean[] included,
-      boolean skipCorrupt
-  ) throws IOException {
-    return TreeReaderFactory.createTreeReader(columnId, types, included, skipCorrupt);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index e3e6893..d81a12d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -53,7 +54,9 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -103,7 +106,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   InputFormatChecker, VectorizedInputFormatInterface,
-    AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination {
+  SelfDescribingInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>,
+  CombineHiveInputFormat.AvoidSplitCombination {
 
   static enum SplitStrategyKind{
     HYBRID,
@@ -222,7 +226,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                                   Configuration conf,
                                                   long offset, long length
                                                   ) throws IOException {
+
+    /**
+     * Do we have schema on read in the configuration variables?
+     *
+     * NOTE: This code path is NOT used by ACID.  OrcInputFormat.getRecordReader intercepts for
+     * ACID tables creates raw record merger, etc.
+     */
+    TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ false);
+
     Reader.Options options = new Reader.Options().range(offset, length);
+    options.schema(schema);
     boolean isOriginal = isOriginal(file);
     List<OrcProto.Type> types = file.getTypes();
     options.include(genIncludedColumns(types, conf, isOriginal));
@@ -1167,7 +1181,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     if (vectorMode) {
       return (org.apache.hadoop.mapred.RecordReader)
-          new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit);
+          new VectorizedOrcAcidRowReader(inner, conf,
+              Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit);
     }
     return new NullKeyRecordReader(inner, conf);
   }
@@ -1218,10 +1233,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
+  // The schema type description does not include the ACID fields (i.e. it is the
+  // non-ACID original schema).
+  private static boolean SCHEMA_TYPES_IS_ORIGINAL = true;
 
   @Override
   public RowReader<OrcStruct> getReader(InputSplit inputSplit,
-                                        Options options) throws IOException {
+                                        Options options)
+                                            throws IOException {
     final OrcSplit split = (OrcSplit) inputSplit;
     final Path path = split.getPath();
     Path root;
@@ -1236,36 +1255,33 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
     final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas());
     final Configuration conf = options.getConfiguration();
+
+
+    /**
+     * Do we have schema on read in the configuration variables?
+     */
+    TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true);
+    if (schema == null) {
+      throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg());
+    }
+
     final Reader reader;
     final int bucket;
-    Reader.Options readOptions = new Reader.Options();
+    Reader.Options readOptions = new Reader.Options().schema(schema);
     readOptions.range(split.getStart(), split.getLength());
+
+    // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
+    final List<Type> schemaTypes = OrcUtils.getOrcTypes(schema);
+    readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
+    setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
+
     if (split.hasBase()) {
       bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
           .getBucket();
       reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
-      final List<OrcProto.Type> types = reader.getTypes();
-      readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
-      setSearchArgument(readOptions, types, conf, split.isOriginal());
     } else {
       bucket = (int) split.getStart();
       reader = null;
-      if(deltas != null && deltas.length > 0) {
-        Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket);
-        OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
-        FileSystem fs = readerOptions.getFilesystem();
-        if(fs == null) {
-          fs = path.getFileSystem(options.getConfiguration());
-        }
-        if(fs.exists(bucketPath)) {
-        /* w/o schema evolution (which ACID doesn't support yet) all delta
-        files have the same schema, so choosing the 1st one*/
-          final List<OrcProto.Type> types =
-            OrcFile.createReader(bucketPath, readerOptions).getTypes();
-          readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
-          setSearchArgument(readOptions, types, conf, split.isOriginal());
-        }
-      }
     }
     String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
                                 Long.MAX_VALUE + ":");
@@ -1278,9 +1294,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
       @Override
       public ObjectInspector getObjectInspector() {
-        return ((StructObjectInspector) records.getObjectInspector())
-            .getAllStructFieldRefs().get(OrcRecordUpdater.ROW)
-            .getFieldObjectInspector();
+        return OrcStruct.createObjectInspector(0, schemaTypes);
       }
 
       @Override
@@ -1367,5 +1381,4 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         bucket, validTxnList, new Reader.Options(), deltaDirectory);
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index ab0c364..bad2a4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -18,30 +18,25 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -57,6 +52,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   private final Configuration conf;
   private final boolean collapse;
   private final RecordReader baseReader;
+  private final ObjectInspector objectInspector;
   private final long offset;
   private final long length;
   private final ValidTxnList validTxnList;
@@ -443,6 +439,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     this.offset = options.getOffset();
     this.length = options.getLength();
     this.validTxnList = validTxnList;
+
+    TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true);
+    if (typeDescr == null) {
+      throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg());
+    }
+
+    objectInspector = OrcRecordUpdater.createEventSchema
+        (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));
+
     // modify the options to reflect the event instead of the base row
     Reader.Options eventOptions = createEventOptions(options);
     if (reader == null) {
@@ -672,46 +677,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
   @Override
   public ObjectInspector getObjectInspector() {
-    // Read the configuration parameters
-    String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
-    // NOTE: if "columns.types" is missing, all columns will be of String type
-    String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
-
-    // Parse the configuration parameters
-    ArrayList<String> columnNames = new ArrayList<String>();
-    Deque<Integer> virtualColumns = new ArrayDeque<Integer>();
-    if (columnNameProperty != null && columnNameProperty.length() > 0) {
-      String[] colNames = columnNameProperty.split(",");
-      for (int i = 0; i < colNames.length; i++) {
-        if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) {
-          virtualColumns.addLast(i);
-        } else {
-          columnNames.add(colNames[i]);
-        }
-      }
-    }
-    if (columnTypeProperty == null) {
-      // Default type: all string
-      StringBuilder sb = new StringBuilder();
-      for (int i = 0; i < columnNames.size(); i++) {
-        if (i > 0) {
-          sb.append(":");
-        }
-        sb.append("string");
-      }
-      columnTypeProperty = sb.toString();
-    }
-
-    ArrayList<TypeInfo> fieldTypes =
-        TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
-    while (virtualColumns.size() > 0) {
-      fieldTypes.remove(virtualColumns.removeLast());
-    }
-    StructTypeInfo rowType = new StructTypeInfo();
-    rowType.setAllStructFieldNames(columnNames);
-    rowType.setAllStructFieldTypeInfos(fieldTypes);
-    return OrcRecordUpdater.createEventSchema
-        (OrcStruct.createObjectInspector(rowType));
+    return objectInspector;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
index db2ca15..ad4a9e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -30,6 +31,21 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 
 import com.google.common.collect.Lists;
 
@@ -204,4 +220,550 @@ public class OrcUtils {
     return numWriters;
   }
 
+  /**
+   * Convert a Hive type property string that contains separated type names into a list of
+   * TypeDescription objects.
+   * @return the list of TypeDescription objects.
+   */
+  public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty(
+      String hiveTypeProperty) {
+
+    // CONSDIER: We need a type name parser for TypeDescription.
+
+    ArrayList<TypeInfo> typeInfoList = TypeInfoUtils.getTypeInfosFromTypeString(hiveTypeProperty);
+    ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size());
+    for (TypeInfo typeInfo : typeInfoList) {
+      typeDescrList.add(convertTypeInfo(typeInfo));
+    }
+    return typeDescrList;
+  }
+
+  public static TypeDescription convertTypeInfo(TypeInfo info) {
+    switch (info.getCategory()) {
+      case PRIMITIVE: {
+        PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
+        switch (pinfo.getPrimitiveCategory()) {
+          case BOOLEAN:
+            return TypeDescription.createBoolean();
+          case BYTE:
+            return TypeDescription.createByte();
+          case SHORT:
+            return TypeDescription.createShort();
+          case INT:
+            return TypeDescription.createInt();
+          case LONG:
+            return TypeDescription.createLong();
+          case FLOAT:
+            return TypeDescription.createFloat();
+          case DOUBLE:
+            return TypeDescription.createDouble();
+          case STRING:
+            return TypeDescription.createString();
+          case DATE:
+            return TypeDescription.createDate();
+          case TIMESTAMP:
+            return TypeDescription.createTimestamp();
+          case BINARY:
+            return TypeDescription.createBinary();
+          case DECIMAL: {
+            DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
+            return TypeDescription.createDecimal()
+                .withScale(dinfo.getScale())
+                .withPrecision(dinfo.getPrecision());
+          }
+          case VARCHAR: {
+            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+            return TypeDescription.createVarchar()
+                .withMaxLength(cinfo.getLength());
+          }
+          case CHAR: {
+            BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+            return TypeDescription.createChar()
+                .withMaxLength(cinfo.getLength());
+          }
+          default:
+            throw new IllegalArgumentException("ORC doesn't handle primitive" +
+                " category " + pinfo.getPrimitiveCategory());
+        }
+      }
+      case LIST: {
+        ListTypeInfo linfo = (ListTypeInfo) info;
+        return TypeDescription.createList
+            (convertTypeInfo(linfo.getListElementTypeInfo()));
+      }
+      case MAP: {
+        MapTypeInfo minfo = (MapTypeInfo) info;
+        return TypeDescription.createMap
+            (convertTypeInfo(minfo.getMapKeyTypeInfo()),
+                convertTypeInfo(minfo.getMapValueTypeInfo()));
+      }
+      case UNION: {
+        UnionTypeInfo minfo = (UnionTypeInfo) info;
+        TypeDescription result = TypeDescription.createUnion();
+        for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
+          result.addUnionChild(convertTypeInfo(child));
+        }
+        return result;
+      }
+      case STRUCT: {
+        StructTypeInfo sinfo = (StructTypeInfo) info;
+        TypeDescription result = TypeDescription.createStruct();
+        for(String fieldName: sinfo.getAllStructFieldNames()) {
+          result.addField(fieldName,
+              convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
+        }
+        return result;
+      }
+      default:
+        throw new IllegalArgumentException("ORC doesn't handle " +
+            info.getCategory());
+    }
+  }
+
+  public static List<OrcProto.Type> getOrcTypes(TypeDescription typeDescr) {
+    List<OrcProto.Type> result = Lists.newArrayList();
+    appendOrcTypes(result, typeDescr);
+    return result;
+  }
+
+  private static void appendOrcTypes(List<OrcProto.Type> result, TypeDescription typeDescr) {
+    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+    List<TypeDescription> children = typeDescr.getChildren();
+    switch (typeDescr.getCategory()) {
+    case BOOLEAN:
+      type.setKind(OrcProto.Type.Kind.BOOLEAN);
+      break;
+    case BYTE:
+      type.setKind(OrcProto.Type.Kind.BYTE);
+      break;
+    case SHORT:
+      type.setKind(OrcProto.Type.Kind.SHORT);
+      break;
+    case INT:
+      type.setKind(OrcProto.Type.Kind.INT);
+      break;
+    case LONG:
+      type.setKind(OrcProto.Type.Kind.LONG);
+      break;
+    case FLOAT:
+      type.setKind(OrcProto.Type.Kind.FLOAT);
+      break;
+    case DOUBLE:
+      type.setKind(OrcProto.Type.Kind.DOUBLE);
+      break;
+    case STRING:
+      type.setKind(OrcProto.Type.Kind.STRING);
+      break;
+    case CHAR:
+      type.setKind(OrcProto.Type.Kind.CHAR);
+      type.setMaximumLength(typeDescr.getMaxLength());
+      break;
+    case VARCHAR:
+      type.setKind(Type.Kind.VARCHAR);
+      type.setMaximumLength(typeDescr.getMaxLength());
+      break;
+    case BINARY:
+      type.setKind(OrcProto.Type.Kind.BINARY);
+      break;
+    case TIMESTAMP:
+      type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+      break;
+    case DATE:
+      type.setKind(OrcProto.Type.Kind.DATE);
+      break;
+    case DECIMAL:
+      type.setKind(OrcProto.Type.Kind.DECIMAL);
+      type.setPrecision(typeDescr.getPrecision());
+      type.setScale(typeDescr.getScale());
+      break;
+    case LIST:
+      type.setKind(OrcProto.Type.Kind.LIST);
+      type.addSubtypes(children.get(0).getId());
+      break;
+    case MAP:
+      type.setKind(OrcProto.Type.Kind.MAP);
+      for(TypeDescription t: children) {
+        type.addSubtypes(t.getId());
+      }
+      break;
+    case STRUCT:
+      type.setKind(OrcProto.Type.Kind.STRUCT);
+      for(TypeDescription t: children) {
+        type.addSubtypes(t.getId());
+      }
+      for(String field: typeDescr.getFieldNames()) {
+        type.addFieldNames(field);
+      }
+      break;
+    case UNION:
+      type.setKind(OrcProto.Type.Kind.UNION);
+      for(TypeDescription t: children) {
+        type.addSubtypes(t.getId());
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown category: " +
+          typeDescr.getCategory());
+    }
+    result.add(type.build());
+    if (children != null) {
+      for(TypeDescription child: children) {
+        appendOrcTypes(result, child);
+      }
+    }
+  }
+
+  /**
+   * NOTE: This method ignores the subtype numbers in the TypeDescription rebuilds the subtype
+   * numbers based on the length of the result list being appended.
+   *
+   * @param result
+   * @param typeInfo
+   */
+  public static void appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result,
+      TypeDescription typeDescr) {
+
+    int subtype = result.size();
+    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+    boolean needsAdd = true;
+    List<TypeDescription> children = typeDescr.getChildren();
+    switch (typeDescr.getCategory()) {
+    case BOOLEAN:
+      type.setKind(OrcProto.Type.Kind.BOOLEAN);
+      break;
+    case BYTE:
+      type.setKind(OrcProto.Type.Kind.BYTE);
+      break;
+    case SHORT:
+      type.setKind(OrcProto.Type.Kind.SHORT);
+      break;
+    case INT:
+      type.setKind(OrcProto.Type.Kind.INT);
+      break;
+    case LONG:
+      type.setKind(OrcProto.Type.Kind.LONG);
+      break;
+    case FLOAT:
+      type.setKind(OrcProto.Type.Kind.FLOAT);
+      break;
+    case DOUBLE:
+      type.setKind(OrcProto.Type.Kind.DOUBLE);
+      break;
+    case STRING:
+      type.setKind(OrcProto.Type.Kind.STRING);
+      break;
+    case CHAR:
+      type.setKind(OrcProto.Type.Kind.CHAR);
+      type.setMaximumLength(typeDescr.getMaxLength());
+      break;
+    case VARCHAR:
+      type.setKind(Type.Kind.VARCHAR);
+      type.setMaximumLength(typeDescr.getMaxLength());
+      break;
+    case BINARY:
+      type.setKind(OrcProto.Type.Kind.BINARY);
+      break;
+    case TIMESTAMP:
+      type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+      break;
+    case DATE:
+      type.setKind(OrcProto.Type.Kind.DATE);
+      break;
+    case DECIMAL:
+      type.setKind(OrcProto.Type.Kind.DECIMAL);
+      type.setPrecision(typeDescr.getPrecision());
+      type.setScale(typeDescr.getScale());
+      break;
+    case LIST:
+      type.setKind(OrcProto.Type.Kind.LIST);
+      type.addSubtypes(++subtype);
+      result.add(type.build());
+      needsAdd = false;
+      appendOrcTypesRebuildSubtypes(result, children.get(0));
+      break;
+    case MAP:
+      {
+        // Make room for MAP type.
+        result.add(null);
+  
+        // Add MAP type pair in order to determine their subtype values.
+        appendOrcTypesRebuildSubtypes(result, children.get(0));
+        int subtype2 = result.size();
+        appendOrcTypesRebuildSubtypes(result, children.get(1));
+        type.setKind(OrcProto.Type.Kind.MAP);
+        type.addSubtypes(subtype + 1);
+        type.addSubtypes(subtype2);
+        result.set(subtype, type.build());
+        needsAdd = false;
+      }
+      break;
+    case STRUCT:
+      {
+        List<String> fieldNames = typeDescr.getFieldNames();
+
+        // Make room for STRUCT type.
+        result.add(null);
+
+        List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size());
+        for(TypeDescription child: children) {
+          int fieldSubtype = result.size();
+          fieldSubtypes.add(fieldSubtype);
+          appendOrcTypesRebuildSubtypes(result, child);
+        }
+
+        type.setKind(OrcProto.Type.Kind.STRUCT);
+
+        for (int i = 0 ; i < fieldNames.size(); i++) {
+          type.addSubtypes(fieldSubtypes.get(i));
+          type.addFieldNames(fieldNames.get(i));
+        }
+        result.set(subtype, type.build());
+        needsAdd = false;
+      }
+      break;
+    case UNION:
+      {
+        // Make room for UNION type.
+        result.add(null);
+
+        List<Integer> unionSubtypes = new ArrayList<Integer>(children.size());
+        for(TypeDescription child: children) {
+          int unionSubtype = result.size();
+          unionSubtypes.add(unionSubtype);
+          appendOrcTypesRebuildSubtypes(result, child);
+        }
+
+        type.setKind(OrcProto.Type.Kind.UNION);
+        for (int i = 0 ; i < children.size(); i++) {
+          type.addSubtypes(unionSubtypes.get(i));
+        }
+        result.set(subtype, type.build());
+        needsAdd = false;
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown category: " + typeDescr.getCategory());
+    }
+    if (needsAdd) {
+      result.add(type.build());
+    }
+  }
+
+  /**
+   * NOTE: This method ignores the subtype numbers in the OrcProto.Type rebuilds the subtype
+   * numbers based on the length of the result list being appended.
+   *
+   * @param result
+   * @param typeInfo
+   */
+  public static int appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result,
+      List<OrcProto.Type> types, int columnId) {
+
+    OrcProto.Type oldType = types.get(columnId++);
+
+    int subtype = result.size();
+    OrcProto.Type.Builder builder = OrcProto.Type.newBuilder();
+    boolean needsAdd = true;
+    switch (oldType.getKind()) {
+    case BOOLEAN:
+      builder.setKind(OrcProto.Type.Kind.BOOLEAN);
+      break;
+    case BYTE:
+      builder.setKind(OrcProto.Type.Kind.BYTE);
+      break;
+    case SHORT:
+      builder.setKind(OrcProto.Type.Kind.SHORT);
+      break;
+    case INT:
+      builder.setKind(OrcProto.Type.Kind.INT);
+      break;
+    case LONG:
+      builder.setKind(OrcProto.Type.Kind.LONG);
+      break;
+    case FLOAT:
+      builder.setKind(OrcProto.Type.Kind.FLOAT);
+      break;
+    case DOUBLE:
+      builder.setKind(OrcProto.Type.Kind.DOUBLE);
+      break;
+    case STRING:
+      builder.setKind(OrcProto.Type.Kind.STRING);
+      break;
+    case CHAR:
+      builder.setKind(OrcProto.Type.Kind.CHAR);
+      builder.setMaximumLength(oldType.getMaximumLength());
+      break;
+    case VARCHAR:
+      builder.setKind(Type.Kind.VARCHAR);
+      builder.setMaximumLength(oldType.getMaximumLength());
+      break;
+    case BINARY:
+      builder.setKind(OrcProto.Type.Kind.BINARY);
+      break;
+    case TIMESTAMP:
+      builder.setKind(OrcProto.Type.Kind.TIMESTAMP);
+      break;
+    case DATE:
+      builder.setKind(OrcProto.Type.Kind.DATE);
+      break;
+    case DECIMAL:
+      builder.setKind(OrcProto.Type.Kind.DECIMAL);
+      builder.setPrecision(oldType.getPrecision());
+      builder.setScale(oldType.getScale());
+      break;
+    case LIST:
+      builder.setKind(OrcProto.Type.Kind.LIST);
+      builder.addSubtypes(++subtype);
+      result.add(builder.build());
+      needsAdd = false;
+      columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+      break;
+    case MAP:
+      {
+        // Make room for MAP type.
+        result.add(null);
+  
+        // Add MAP type pair in order to determine their subtype values.
+        columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+        int subtype2 = result.size();
+        columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+        builder.setKind(OrcProto.Type.Kind.MAP);
+        builder.addSubtypes(subtype + 1);
+        builder.addSubtypes(subtype2);
+        result.set(subtype, builder.build());
+        needsAdd = false;
+      }
+      break;
+    case STRUCT:
+      {
+        List<String> fieldNames = oldType.getFieldNamesList();
+
+        // Make room for STRUCT type.
+        result.add(null);
+
+        List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size());
+        for(int i = 0 ; i < fieldNames.size(); i++) {
+          int fieldSubtype = result.size();
+          fieldSubtypes.add(fieldSubtype);
+          columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+        }
+
+        builder.setKind(OrcProto.Type.Kind.STRUCT);
+
+        for (int i = 0 ; i < fieldNames.size(); i++) {
+          builder.addSubtypes(fieldSubtypes.get(i));
+          builder.addFieldNames(fieldNames.get(i));
+        }
+        result.set(subtype, builder.build());
+        needsAdd = false;
+      }
+      break;
+    case UNION:
+      {
+        int subtypeCount = oldType.getSubtypesCount();
+
+        // Make room for UNION type.
+        result.add(null);
+
+        List<Integer> unionSubtypes = new ArrayList<Integer>(subtypeCount);
+        for(int i = 0 ; i < subtypeCount; i++) {
+          int unionSubtype = result.size();
+          unionSubtypes.add(unionSubtype);
+          columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+        }
+
+        builder.setKind(OrcProto.Type.Kind.UNION);
+        for (int i = 0 ; i < subtypeCount; i++) {
+          builder.addSubtypes(unionSubtypes.get(i));
+        }
+        result.set(subtype, builder.build());
+        needsAdd = false;
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown category: " + oldType.getKind());
+    }
+    if (needsAdd) {
+      result.add(builder.build());
+    }
+    return columnId;
+  }
+
+  public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcid) {
+
+    String columnNameProperty = null;
+    String columnTypeProperty = null;
+
+    ArrayList<String> schemaEvolutionColumnNames = null;
+    ArrayList<TypeDescription> schemaEvolutionTypeDescrs = null;
+
+    boolean haveSchemaEvolutionProperties = false;
+    if (isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) {
+
+      columnNameProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+      columnTypeProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+
+      haveSchemaEvolutionProperties =
+          (columnNameProperty != null && columnTypeProperty != null);
+
+      if (haveSchemaEvolutionProperties) {
+        schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+        if (schemaEvolutionColumnNames.size() == 0) {
+          haveSchemaEvolutionProperties = false;
+        } else {
+          schemaEvolutionTypeDescrs =
+              OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+          if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+            haveSchemaEvolutionProperties = false;
+          }
+        }
+      }
+    }
+
+    if (haveSchemaEvolutionProperties) {
+      LOG.info("Using schema evolution configuration variables " +
+          "schema.evolution.columns " +
+          schemaEvolutionColumnNames.toString() +
+          " / schema.evolution.columns.types " +
+          schemaEvolutionTypeDescrs.toString() +
+          " (isAcid " +
+          isAcid +
+          ")");
+
+    } else {
+
+      // Try regular properties;
+      columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
+      columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+      if (columnTypeProperty == null || columnNameProperty == null) {
+        return null;
+      }
+
+      schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+      if (schemaEvolutionColumnNames.size() == 0) {
+        return null;
+      }
+      schemaEvolutionTypeDescrs =
+          OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+      if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+        return null;
+      }
+      LOG.info("Using column configuration variables " +
+          "columns " +
+          schemaEvolutionColumnNames.toString() +
+          " / columns.types " +
+          schemaEvolutionTypeDescrs.toString() +
+          " (isAcid " +
+          isAcid +
+          ")");
+    }
+
+    // Desired schema does not include virtual columns or partition columns.
+    TypeDescription result = TypeDescription.createStruct();
+    for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
+      result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i));
+    }
+
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 8558592..6dbe461 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
@@ -152,6 +153,7 @@ public interface Reader {
     private boolean[] include;
     private long offset = 0;
     private long length = Long.MAX_VALUE;
+    private TypeDescription schema;
     private SearchArgument sarg = null;
     private String[] columnNames = null;
 
@@ -178,6 +180,14 @@ public interface Reader {
     }
 
     /**
+     * Set the schema on read type description.
+     */
+    public Options schema(TypeDescription schema) {
+      this.schema = schema;
+      return this;
+    }
+
+    /**
      * Set search argument for predicate push down.
      * @param sarg the search argument
      * @param columnNames the column names for
@@ -201,6 +211,10 @@ public interface Reader {
       return length;
     }
 
+    public TypeDescription getSchema() {
+      return schema;
+    }
+
     public SearchArgument getSearchArgument() {
       return sarg;
     }
@@ -222,6 +236,7 @@ public interface Reader {
       result.include = include;
       result.offset = offset;
       result.length = length;
+      result.schema = schema;
       result.sarg = sarg;
       result.columnNames = columnNames;
       return result;

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
deleted file mode 100644
index 8740ee6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-
-import com.google.common.collect.Lists;
-
-/**
- * Factory to create ORC tree readers. It also compares file schema with schema specified on read
- * to see if type promotions are possible.
- */
-public class RecordReaderFactory {
-  static final Log LOG = LogFactory.getLog(RecordReaderFactory.class);
-  private static final boolean isLogInfoEnabled = LOG.isInfoEnabled();
-
-  public static TreeReaderFactory.TreeReader createTreeReader(int colId,
-      Configuration conf,
-      List<OrcProto.Type> fileSchema,
-      boolean[] included,
-      boolean skipCorrupt) throws IOException {
-    final boolean isAcid = checkAcidSchema(fileSchema);
-    final List<OrcProto.Type> originalFileSchema;
-    if (isAcid) {
-      originalFileSchema = fileSchema.subList(fileSchema.get(0).getSubtypesCount(),
-          fileSchema.size());
-    } else {
-      originalFileSchema = fileSchema;
-    }
-    final int numCols = originalFileSchema.get(0).getSubtypesCount();
-    List<OrcProto.Type> schemaOnRead = getSchemaOnRead(numCols, conf);
-    List<OrcProto.Type> schemaUsed = getMatchingSchema(fileSchema, schemaOnRead);
-    if (schemaUsed == null) {
-      return TreeReaderFactory.createTreeReader(colId, fileSchema, included, skipCorrupt);
-    } else {
-      return ConversionTreeReaderFactory.createTreeReader(colId, schemaUsed, included, skipCorrupt);
-    }
-  }
-
-  private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
-    if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
-      List<String> acidFields = OrcRecordUpdater.getAcidEventFields();
-      List<String> rootFields = fileSchema.get(0).getFieldNamesList();
-      if (acidFields.equals(rootFields)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private static List<OrcProto.Type> getMatchingSchema(List<OrcProto.Type> fileSchema,
-      List<OrcProto.Type> schemaOnRead) {
-    if (schemaOnRead == null) {
-      if (isLogInfoEnabled) {
-        LOG.info("Schema is not specified on read. Using file schema.");
-      }
-      return null;
-    }
-
-    if (fileSchema.size() != schemaOnRead.size()) {
-      if (isLogInfoEnabled) {
-        LOG.info("Schema on read column count does not match file schema's column count." +
-            " Falling back to using file schema.");
-      }
-      return null;
-    } else {
-      List<OrcProto.Type> result = Lists.newArrayList(fileSchema);
-      // check type promotion. ORC can only support type promotions for integer types
-      // short -> int -> bigint as same integer readers are used for the above types.
-      boolean canPromoteType = false;
-      for (int i = 0; i < fileSchema.size(); i++) {
-        OrcProto.Type fColType = fileSchema.get(i);
-        OrcProto.Type rColType = schemaOnRead.get(i);
-        if (!fColType.getKind().equals(rColType.getKind())) {
-
-          if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
-
-            if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
-                rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
-              // type promotion possible, converting SHORT to INT/LONG requested type
-              result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build());
-              canPromoteType = true;
-            } else {
-              canPromoteType = false;
-            }
-
-          } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
-
-            if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
-              // type promotion possible, converting INT to LONG requested type
-              result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build());
-              canPromoteType = true;
-            } else {
-              canPromoteType = false;
-            }
-
-          } else {
-            canPromoteType = false;
-          }
-        }
-      }
-
-      if (canPromoteType) {
-        if (isLogInfoEnabled) {
-          LOG.info("Integer type promotion happened in ORC record reader. Using promoted schema.");
-        }
-        return result;
-      }
-    }
-
-    return null;
-  }
-
-  private static List<OrcProto.Type> getSchemaOnRead(int numCols, Configuration conf) {
-    String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
-    final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
-    if (columnTypeProperty == null || columnNameProperty == null) {
-      return null;
-    }
-
-    ArrayList<String> columnNames = Lists.newArrayList(columnNameProperty.split(","));
-    ArrayList<TypeInfo> fieldTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
-    StructTypeInfo structTypeInfo = new StructTypeInfo();
-    // Column types from conf includes virtual and partition columns at the end. We consider only
-    // the actual columns in the file.
-    structTypeInfo.setAllStructFieldNames(Lists.newArrayList(columnNames.subList(0, numCols)));
-    structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(fieldTypes.subList(0, numCols)));
-    ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(structTypeInfo);
-    return getOrcTypes(oi);
-  }
-
-  private static List<OrcProto.Type> getOrcTypes(ObjectInspector inspector) {
-    List<OrcProto.Type> result = Lists.newArrayList();
-    getOrcTypesImpl(result, inspector);
-    return result;
-  }
-
-  private static void getOrcTypesImpl(List<OrcProto.Type> result, ObjectInspector inspector) {
-    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
-    switch (inspector.getCategory()) {
-      case PRIMITIVE:
-        switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
-          case BOOLEAN:
-            type.setKind(OrcProto.Type.Kind.BOOLEAN);
-            break;
-          case BYTE:
-            type.setKind(OrcProto.Type.Kind.BYTE);
-            break;
-          case SHORT:
-            type.setKind(OrcProto.Type.Kind.SHORT);
-            break;
-          case INT:
-            type.setKind(OrcProto.Type.Kind.INT);
-            break;
-          case LONG:
-            type.setKind(OrcProto.Type.Kind.LONG);
-            break;
-          case FLOAT:
-            type.setKind(OrcProto.Type.Kind.FLOAT);
-            break;
-          case DOUBLE:
-            type.setKind(OrcProto.Type.Kind.DOUBLE);
-            break;
-          case STRING:
-            type.setKind(OrcProto.Type.Kind.STRING);
-            break;
-          case CHAR:
-            // The char length needs to be written to file and should be available
-            // from the object inspector
-            CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) inspector)
-                .getTypeInfo();
-            type.setKind(OrcProto.Type.Kind.CHAR);
-            type.setMaximumLength(charTypeInfo.getLength());
-            break;
-          case VARCHAR:
-            // The varchar length needs to be written to file and should be available
-            // from the object inspector
-            VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) inspector)
-                .getTypeInfo();
-            type.setKind(OrcProto.Type.Kind.VARCHAR);
-            type.setMaximumLength(typeInfo.getLength());
-            break;
-          case BINARY:
-            type.setKind(OrcProto.Type.Kind.BINARY);
-            break;
-          case TIMESTAMP:
-            type.setKind(OrcProto.Type.Kind.TIMESTAMP);
-            break;
-          case DATE:
-            type.setKind(OrcProto.Type.Kind.DATE);
-            break;
-          case DECIMAL:
-            DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) ((PrimitiveObjectInspector) inspector)
-                .getTypeInfo();
-            type.setKind(OrcProto.Type.Kind.DECIMAL);
-            type.setPrecision(decTypeInfo.precision());
-            type.setScale(decTypeInfo.scale());
-            break;
-          default:
-            throw new IllegalArgumentException("Unknown primitive category: " +
-                ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
-        }
-        result.add(type.build());
-        break;
-      case LIST:
-        type.setKind(OrcProto.Type.Kind.LIST);
-        result.add(type.build());
-        getOrcTypesImpl(result, ((ListObjectInspector) inspector).getListElementObjectInspector());
-        break;
-      case MAP:
-        type.setKind(OrcProto.Type.Kind.MAP);
-        result.add(type.build());
-        getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapKeyObjectInspector());
-        getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapValueObjectInspector());
-        break;
-      case STRUCT:
-        type.setKind(OrcProto.Type.Kind.STRUCT);
-        result.add(type.build());
-        for (StructField field : ((StructObjectInspector) inspector).getAllStructFieldRefs()) {
-          getOrcTypesImpl(result, field.getFieldObjectInspector());
-        }
-        break;
-      case UNION:
-        type.setKind(OrcProto.Type.Kind.UNION);
-        result.add(type.build());
-        for (ObjectInspector oi : ((UnionObjectInspector) inspector).getObjectInspectors()) {
-          getOrcTypesImpl(result, oi);
-        }
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown category: " + inspector.getCategory());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index c2d280d..24834a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
 import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -154,15 +156,27 @@ class RecordReaderImpl implements RecordReader {
   }
 
   protected RecordReaderImpl(List<StripeInformation> stripes,
-                   FileSystem fileSystem,
-                   Path path,
-                   Reader.Options options,
-                   List<OrcProto.Type> types,
-                   CompressionCodec codec,
-                   int bufferSize,
-                   long strideRate,
-                   Configuration conf
-                   ) throws IOException {
+                             FileSystem fileSystem,
+                             Path path,
+                             Reader.Options options,
+                             List<OrcProto.Type> types,
+                             CompressionCodec codec,
+                             int bufferSize,
+                             long strideRate,
+                             Configuration conf
+                             ) throws IOException {
+
+    TreeReaderSchema treeReaderSchema;
+    if (options.getSchema() == null) {
+      treeReaderSchema = new TreeReaderSchema().fileTypes(types).schemaTypes(types);
+    } else {
+
+      // Now that we are creating a record reader for a file, validate that the schema to read
+      // is compatible with the file schema.
+      //
+      List<Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
+      treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
+    }
     this.path = path;
     this.file = fileSystem.open(path);
     this.codec = codec;
@@ -200,7 +214,7 @@ class RecordReaderImpl implements RecordReader {
     firstRow = skippedRows;
     totalRowCount = rows;
     boolean skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
-    reader = RecordReaderFactory.createTreeReader(0, conf, types, included, skipCorrupt);
+    reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt);
     indexes = new OrcProto.RowIndex[types.size()];
     bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
     advanceToNextRow(reader, 0L, true);
@@ -1085,6 +1099,7 @@ class RecordReaderImpl implements RecordReader {
       } else {
         result = (VectorizedRowBatch) previous;
         result.selectedInUse = false;
+        reader.setVectorColumnCount(result.getDataColumnCount());
         reader.nextVector(result.cols, (int) batchSize);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
new file mode 100644
index 0000000..9d00eb2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
+
+/**
+ * Take the file types and the (optional) configuration column names/types and see if there
+ * has been schema evolution.
+ */
+public class SchemaEvolution {
+
+  private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
+
+  public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes,
+      List<OrcProto.Type> schemaTypes) throws IOException {
+
+    // For ACID, the row is the ROW field in the outer STRUCT.
+    final boolean isAcid = checkAcidSchema(fileTypes);
+    final List<OrcProto.Type> rowSchema;
+    int rowSubtype;
+    if (isAcid) {
+      rowSubtype = OrcRecordUpdater.ROW + 1;
+      rowSchema = fileTypes.subList(rowSubtype, fileTypes.size());
+    } else {
+      rowSubtype = 0;
+      rowSchema = fileTypes;
+    }
+
+    // Do checking on the overlap.  Additional columns will be defaulted to NULL.
+
+    int numFileColumns = rowSchema.get(0).getSubtypesCount();
+    int numDesiredColumns = schemaTypes.get(0).getSubtypesCount();
+
+    int numReadColumns = Math.min(numFileColumns, numDesiredColumns);
+
+    /**
+     * Check type promotion.
+     *
+     * Currently, we only support integer type promotions that can be done "implicitly".
+     * That is, we know that using a bigger integer tree reader on the original smaller integer
+     * column will "just work".
+     *
+     * In the future, other type promotions might require type conversion.
+     */
+    // short -> int -> bigint as same integer readers are used for the above types.
+
+    for (int i = 0; i < numReadColumns; i++) {
+      OrcProto.Type fColType = fileTypes.get(rowSubtype + i);
+      OrcProto.Type rColType = schemaTypes.get(i);
+      if (!fColType.getKind().equals(rColType.getKind())) {
+
+        boolean ok = false;
+        if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
+
+          if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
+              rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+            // type promotion possible, converting SHORT to INT/LONG requested type
+            ok = true;
+          }
+        } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
+
+          if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+            // type promotion possible, converting INT to LONG requested type
+            ok = true;
+          }
+        }
+
+        if (!ok) {
+          throw new IOException("ORC does not support type conversion from " +
+              fColType.getKind().name() + " to " + rColType.getKind().name());
+        }
+      }
+    }
+
+    List<Type> fullSchemaTypes;
+
+    if (isAcid) {
+      fullSchemaTypes = new ArrayList<OrcProto.Type>();
+
+      // This copies the ACID struct type which is subtype = 0.
+      // It has field names "operation" through "row".
+      // And we copy the types for all fields EXCEPT ROW (which must be last!).
+
+      for (int i = 0; i < rowSubtype; i++) {
+        fullSchemaTypes.add(fileTypes.get(i).toBuilder().build());
+      }
+
+      // Add the row struct type.
+      OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0);
+    } else {
+      fullSchemaTypes = schemaTypes;
+    }
+
+    int innerStructSubtype = rowSubtype;
+
+    // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() +
+    //     " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString());
+
+    return new TreeReaderSchema().
+        fileTypes(fileTypes).
+        schemaTypes(fullSchemaTypes).
+        innerStructSubtype(innerStructSubtype);
+  }
+
+  private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
+    if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
+      List<String> rootFields = fileSchema.get(0).getFieldNamesList();
+      if (acidEventFieldNames.equals(rootFields)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @param typeDescr
+   * @return ORC types for the ACID event based on the row's type description
+   */
+  public static List<Type> createEventSchema(TypeDescription typeDescr) {
+
+    List<Type> result = new ArrayList<Type>();
+
+    OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+    type.setKind(OrcProto.Type.Kind.STRUCT);
+    type.addAllFieldNames(acidEventFieldNames);
+    for (int i = 0; i < acidEventFieldNames.size(); i++) {
+      type.addSubtypes(i + 1);
+    }
+    result.add(type.build());
+
+    // Automatically add all fields except the last (ROW).
+    for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) {
+      type.clear();
+      type.setKind(acidEventOrcTypeKinds.get(i));
+      result.add(type.build());
+    }
+
+    OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr);
+    return result;
+  }
+
+  public static final List<String> acidEventFieldNames= new ArrayList<String>();
+  static {
+    acidEventFieldNames.add("operation");
+    acidEventFieldNames.add("originalTransaction");
+    acidEventFieldNames.add("bucket");
+    acidEventFieldNames.add("rowId");
+    acidEventFieldNames.add("currentTransaction");
+    acidEventFieldNames.add("row");
+  }
+  public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds =
+      new ArrayList<OrcProto.Type.Kind>();
+  static {
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+    acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT);
+  }
+}
\ No newline at end of file