You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2015/11/18 15:52:19 UTC

[17/17] hive git commit: HIVE-11981: ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)

HIVE-11981: ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/249c4ef1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/249c4ef1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/249c4ef1

Branch: refs/heads/master
Commit: 249c4ef1377860eb6ec9c3aedb5c1f930479a6ec
Parents: 7f65e36
Author: Matt McCline <mm...@hortonworks.com>
Authored: Wed Nov 18 05:11:35 2015 -0800
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Wed Nov 18 05:11:35 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    4 +
 .../mapreduce/FosterStorageHandler.java         |   37 +
 .../hive/hcatalog/mapreduce/InputJobInfo.java   |    8 +-
 .../hive/hcatalog/streaming/TestStreaming.java  |    2 +
 .../streaming/mutate/StreamingAssert.java       |    2 +
 .../hive/ql/txn/compactor/TestCompactor.java    |  246 ++-
 .../test/resources/testconfiguration.properties |   16 +
 .../hive/llap/io/api/impl/LlapInputFormat.java  |   40 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |    7 +-
 .../hadoop/hive/ql/exec/FetchOperator.java      |    7 +
 .../apache/hadoop/hive/ql/exec/MapOperator.java |   22 +-
 .../hadoop/hive/ql/exec/TableScanOperator.java  |   20 +
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  104 +-
 .../ql/exec/spark/SparkReduceRecordHandler.java |   11 +-
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |    2 +-
 .../hive/ql/exec/tez/ReduceRecordSource.java    |   18 +-
 .../hive/ql/exec/vector/VectorExtractRow.java   |    9 +-
 .../ql/exec/vector/VectorGroupByOperator.java   |    2 +-
 .../exec/vector/VectorMapJoinBaseOperator.java  |    2 +-
 .../exec/vector/VectorSMBMapJoinOperator.java   |    2 +-
 .../ql/exec/vector/VectorizationContext.java    |   35 +-
 .../ql/exec/vector/VectorizedBatchUtil.java     |  204 +-
 .../ql/exec/vector/VectorizedColumnarSerDe.java |  277 ---
 .../ql/exec/vector/VectorizedRowBatchCtx.java   |  509 ++---
 .../mapjoin/VectorMapJoinCommonOperator.java    |   66 +-
 .../VectorMapJoinGenerateResultOperator.java    |    5 +-
 .../hadoop/hive/ql/io/HiveInputFormat.java      |    5 +
 .../apache/hadoop/hive/ql/io/IOConstants.java   |   11 +
 .../io/SelfDescribingInputFormatInterface.java  |   27 +
 .../hive/ql/io/VectorizedRCFileInputFormat.java |   81 -
 .../ql/io/VectorizedRCFileRecordReader.java     |  261 ---
 .../ql/io/orc/ConversionTreeReaderFactory.java  |   38 -
 .../apache/hadoop/hive/ql/io/orc/OrcFile.java   |    2 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   60 +-
 .../hadoop/hive/ql/io/orc/OrcOutputFormat.java  |   84 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |   61 +-
 .../apache/hadoop/hive/ql/io/orc/OrcUtils.java  |  547 ++++++
 .../apache/hadoop/hive/ql/io/orc/Reader.java    |   15 +
 .../hive/ql/io/orc/RecordReaderFactory.java     |  274 ---
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java |   17 +-
 .../hadoop/hive/ql/io/orc/SchemaEvolution.java  |  185 ++
 .../hive/ql/io/orc/TreeReaderFactory.java       |  182 +-
 .../ql/io/orc/VectorizedOrcAcidRowReader.java   |   45 +-
 .../ql/io/orc/VectorizedOrcInputFormat.java     |   45 +-
 .../parquet/VectorizedParquetInputFormat.java   |   25 +-
 .../hive/ql/optimizer/GenMapRedUtils.java       |   18 +
 .../hive/ql/optimizer/SimpleFetchOptimizer.java |    1 +
 .../hive/ql/optimizer/physical/Vectorizer.java  |  502 +++--
 .../ql/optimizer/physical/Vectorizer.java.orig  | 1744 ------------------
 .../apache/hadoop/hive/ql/plan/BaseWork.java    |   30 +-
 .../hadoop/hive/ql/plan/PartitionDesc.java      |   15 +-
 .../hive/ql/plan/VectorPartitionConversion.java |  166 ++
 .../hive/ql/plan/VectorPartitionDesc.java       |  110 ++
 .../ql/exec/vector/TestVectorRowObject.java     |    5 +-
 .../hive/ql/exec/vector/TestVectorSerDeRow.java |   10 +-
 .../exec/vector/TestVectorizedRowBatchCtx.java  |  357 ----
 .../hive/ql/io/orc/TestInputOutputFormat.java   |   34 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |   42 +
 .../ql/io/orc/TestOrcRawRecordMerger.java.orig  | 1150 ++++++++++++
 .../schema_evol_orc_acid_mapwork_part.q         |  171 ++
 .../schema_evol_orc_acid_mapwork_table.q        |  129 ++
 .../schema_evol_orc_acidvec_mapwork_part.q      |  171 ++
 .../schema_evol_orc_acidvec_mapwork_table.q     |  129 ++
 .../schema_evol_orc_nonvec_fetchwork_part.q     |   96 +
 .../schema_evol_orc_nonvec_fetchwork_table.q    |   56 +
 .../schema_evol_orc_nonvec_mapwork_part.q       |   96 +
 .../schema_evol_orc_nonvec_mapwork_table.q      |   56 +
 .../schema_evol_orc_vec_mapwork_part.q          |   96 +
 .../schema_evol_orc_vec_mapwork_table.q         |   56 +
 .../schema_evol_text_fetchwork_table.q          |   56 +
 .../schema_evol_text_mapwork_table.q            |   56 +
 .../schema_evol_text_nonvec_fetchwork_part.q    |   96 +
 .../schema_evol_text_nonvec_fetchwork_table.q   |   66 +
 .../schema_evol_text_nonvec_mapwork_part.q      |   96 +
 .../schema_evol_text_nonvec_mapwork_table.q     |   66 +
 .../schema_evol_orc_acid_mapwork_part.q.out     | 1035 +++++++++++
 .../schema_evol_orc_acid_mapwork_table.q.out    |  649 +++++++
 .../schema_evol_orc_acidvec_mapwork_part.q.out  | 1035 +++++++++++
 .../schema_evol_orc_acidvec_mapwork_table.q.out |  649 +++++++
 .../schema_evol_orc_nonvec_fetchwork_part.q.out |  642 +++++++
 ...schema_evol_orc_nonvec_fetchwork_table.q.out |  298 +++
 .../schema_evol_orc_nonvec_mapwork_part.q.out   |  642 +++++++
 .../schema_evol_orc_nonvec_mapwork_table.q.out  |  298 +++
 .../schema_evol_orc_vec_mapwork_part.q.out      |  642 +++++++
 .../schema_evol_orc_vec_mapwork_table.q.out     |  298 +++
 .../schema_evol_text_fetchwork_table.q.out      |  298 +++
 .../schema_evol_text_mapwork_table.q.out        |  298 +++
 ...schema_evol_text_nonvec_fetchwork_part.q.out |  642 +++++++
 ...chema_evol_text_nonvec_fetchwork_table.q.out |  297 +++
 .../schema_evol_text_nonvec_mapwork_part.q.out  |  642 +++++++
 .../schema_evol_text_nonvec_mapwork_table.q.out |  297 +++
 .../tez/schema_evol_orc_acid_mapwork_part.q.out | 1035 +++++++++++
 .../schema_evol_orc_acid_mapwork_table.q.out    |  649 +++++++
 .../schema_evol_orc_acidvec_mapwork_part.q.out  | 1035 +++++++++++
 .../schema_evol_orc_acidvec_mapwork_table.q.out |  649 +++++++
 .../schema_evol_orc_nonvec_fetchwork_part.q.out |  642 +++++++
 ...schema_evol_orc_nonvec_fetchwork_table.q.out |  298 +++
 .../schema_evol_orc_nonvec_mapwork_part.q.out   |  642 +++++++
 .../schema_evol_orc_nonvec_mapwork_table.q.out  |  298 +++
 .../tez/schema_evol_orc_vec_mapwork_part.q.out  |  642 +++++++
 .../tez/schema_evol_orc_vec_mapwork_table.q.out |  298 +++
 .../tez/schema_evol_text_fetchwork_table.q.out  |  298 +++
 .../tez/schema_evol_text_mapwork_table.q.out    |  298 +++
 ...schema_evol_text_nonvec_fetchwork_part.q.out |  642 +++++++
 ...chema_evol_text_nonvec_fetchwork_table.q.out |  297 +++
 .../schema_evol_text_nonvec_mapwork_part.q.out  |  642 +++++++
 .../schema_evol_text_nonvec_mapwork_table.q.out |  297 +++
 .../tez/vector_partition_diff_num_cols.q.out    |    1 +
 .../vector_partition_diff_num_cols.q.out        |    1 +
 .../hive/serde2/typeinfo/TypeInfoUtils.java     |   36 +
 .../hive/ql/exec/vector/VectorizedRowBatch.java |   20 +
 111 files changed, 22617 insertions(+), 4063 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 953e52c..7cab9ae 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -969,6 +969,10 @@ public class HiveConf extends Configuration {
         "The threshold for the input file size of the small tables; if the file size is smaller \n" +
         "than this threshold, it will try to convert the common join into map join"),
 
+
+    HIVE_SCHEMA_EVOLUTION("hive.exec.schema.evolution", true,
+        "Use schema evolution to convert self-describing file format's data to the schema desired by the reader."),
+
     HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0,
         "A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
index 5a95467..bc56d77 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -35,6 +36,10 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hive.hcatalog.common.HCatConstants;
 import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -48,6 +53,8 @@ import java.util.Map;
  */
 public class FosterStorageHandler extends DefaultStorageHandler {
 
+  private static final Logger LOG = LoggerFactory.getLogger(FosterStorageHandler.class);
+
   public Configuration conf;
   /** The directory under which data is initially written for a non partitioned table */
   protected static final String TEMP_DIR_NAME = "_TEMP";
@@ -98,6 +105,36 @@ public class FosterStorageHandler extends DefaultStorageHandler {
   @Override
   public void configureInputJobProperties(TableDesc tableDesc,
                       Map<String, String> jobProperties) {
+
+    try {
+      Map<String, String> tableProperties = tableDesc.getJobProperties();
+
+      String jobInfoProperty = tableProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+      if (jobInfoProperty != null) {
+
+        InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobInfoProperty);
+
+        HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+        HCatSchema dataColumns = tableInfo.getDataColumns();
+        List<HCatFieldSchema> dataFields = dataColumns.getFields();
+        StringBuilder columnNamesSb = new StringBuilder();
+        StringBuilder typeNamesSb = new StringBuilder();
+        for (HCatFieldSchema dataField : dataFields) {
+        if (columnNamesSb.length() > 0) {
+            columnNamesSb.append(",");
+            typeNamesSb.append(":");
+          }
+          columnNamesSb.append(dataField.getName());
+          typeNamesSb.append(dataField.getTypeString());
+        }
+        jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString());
+        jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString());
+
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to set output path", e);
+    }
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
index 1f23f3f..7ec6ae3 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
@@ -182,9 +182,11 @@ public class InputJobInfo implements Serializable {
     ObjectInputStream partInfoReader =
       new ObjectInputStream(new InflaterInputStream(ois));
     partitions = (List<PartInfo>)partInfoReader.readObject();
-    for (PartInfo partInfo : partitions) {
-      if (partInfo.getTableInfo() == null) {
-        partInfo.setTableInfo(this.tableInfo);
+    if (partitions != null) {
+      for (PartInfo partInfo : partitions) {
+        if (partInfo.getTableInfo() == null) {
+          partInfo.setTableInfo(this.tableInfo);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 806dbdb..1723ff1 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -451,6 +451,8 @@ public class TestStreaming {
     JobConf job = new JobConf();
     job.set("mapred.input.dir", partitionPath.toString());
     job.set("bucket_count", Integer.toString(buckets));
+    job.set("columns", "id,msg");
+    job.set("columns.types", "bigint:string");
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     InputSplit[] splits = inf.getSplits(job, buckets);
     Assert.assertEquals(buckets, splits.length);

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index 477ed8c..339e9ef 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -128,6 +128,8 @@ public class StreamingAssert {
     JobConf job = new JobConf();
     job.set("mapred.input.dir", partitionLocation.toString());
     job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets()));
+    job.set("columns", "id,msg");
+    job.set("columns.types", "bigint:string");
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     InputSplit[] splits = inputFormat.getSplits(job, 1);
     assertEquals(1, splits.length);

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index e2910dd..dabe434 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -128,7 +128,194 @@ public class TestCompactor {
       driver.close();
     }
   }
-  
+
+  /**
+   * Simple schema evolution add columns with partitioning.
+   * @throws Exception
+   */
+  @Test
+  public void schemaEvolutionAddColDynamicPartitioningInsert() throws Exception {
+    String tblName = "dpct";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " PARTITIONED BY(ds string)" +
+      " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+      " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+
+    // First INSERT round.
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " +
+        "'today'), (2, 'wilma', 'yesterday')", driver);
+
+    // ALTER TABLE ... ADD COLUMNS
+    executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver);
+
+    // Validate there is an added NULL for column c.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+
+    // Second INSERT round with new inserts into previously existing partition 'yesterday'.
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) values " +
+        "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " +
+        "(5, 'doc', 1902, 'yesterday')",
+        driver);
+
+    // Validate there the new insertions for column c.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+    Initiator initiator = new Initiator();
+    initiator.setThreadId((int)initiator.getId());
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0);
+    initiator.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean();
+    stop.set(true);
+    initiator.init(stop, new AtomicBoolean());
+    initiator.run();
+
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(4, compacts.size());
+    SortedSet<String> partNames = new TreeSet<String>();
+    for (int i = 0; i < compacts.size(); i++) {
+      Assert.assertEquals("default", compacts.get(i).getDbname());
+      Assert.assertEquals(tblName, compacts.get(i).getTablename());
+      Assert.assertEquals("initiated", compacts.get(i).getState());
+      partNames.add(compacts.get(i).getPartitionname());
+    }
+    List<String> names = new ArrayList<String>(partNames);
+    Assert.assertEquals("ds=last_century", names.get(0));
+    Assert.assertEquals("ds=soon", names.get(1));
+    Assert.assertEquals("ds=today", names.get(2));
+    Assert.assertEquals("ds=yesterday", names.get(3));
+
+    // Validate after compaction.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+  }
+
+  @Test
+  public void schemaEvolutionAddColDynamicPartitioningUpdate() throws Exception {
+    String tblName = "udpct";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " PARTITIONED BY(ds string)" +
+      " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+      " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " +
+        "'today'), (2, 'wilma', 'yesterday')", driver);
+
+    executeStatementOnDriver("update " + tblName + " set b = 'barney'", driver);
+
+    // Validate the update.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\tyesterday", valuesReadFromHiveDriver.get(1));
+
+    // ALTER TABLE ... ADD COLUMNS
+    executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver);
+
+    // Validate there is an added NULL for column c.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+
+    // Second INSERT round with new inserts into previously existing partition 'yesterday'.
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) values " +
+        "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " +
+        "(5, 'doc', 1902, 'yesterday')",
+        driver);
+
+    // Validate there the new insertions for column c.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+    executeStatementOnDriver("update " + tblName + " set c = 2000", driver);
+
+    // Validate the update of new column c, even in old rows.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4));
+
+    Initiator initiator = new Initiator();
+    initiator.setThreadId((int)initiator.getId());
+    // Set to 1 so insert doesn't set it off but update does
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 1);
+    initiator.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean();
+    stop.set(true);
+    initiator.init(stop, new AtomicBoolean());
+    initiator.run();
+
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(4, compacts.size());
+    SortedSet<String> partNames = new TreeSet<String>();
+    for (int i = 0; i < compacts.size(); i++) {
+      Assert.assertEquals("default", compacts.get(i).getDbname());
+      Assert.assertEquals(tblName, compacts.get(i).getTablename());
+      Assert.assertEquals("initiated", compacts.get(i).getState());
+      partNames.add(compacts.get(i).getPartitionname());
+    }
+    List<String> names = new ArrayList<String>(partNames);
+    Assert.assertEquals("ds=last_century", names.get(0));
+    Assert.assertEquals("ds=soon", names.get(1));
+    Assert.assertEquals("ds=today", names.get(2));
+    Assert.assertEquals("ds=yesterday", names.get(3));
+
+    // Validate after compaction.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4));
+  }
+
   /**
    * After each major compaction, stats need to be updated on each column of the
    * table/partition which previously had stats.
@@ -255,7 +442,9 @@ public class TestCompactor {
     t.run();
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
+    if (1 != compacts.size()) {
+      Assert.fail("Expecting 1 file and found " + compacts.size() + " files " + compacts.toString());
+    }
     Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
 
     stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName,
@@ -409,6 +598,8 @@ public class TestCompactor {
     String dbName = "default";
     String tblName = "cws";
     List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -452,9 +643,12 @@ public class TestCompactor {
         }
       }
       Arrays.sort(names);
-      Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
-          "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"});
-      checkExpectedTxnsPresent(null, new Path[]{resultFile}, 0, 1L, 4L);
+      String[] expected = new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+      if (!Arrays.deepEquals(expected, names)) {
+        Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+      }
+      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 1L, 4L);
 
     } finally {
       connection.close();
@@ -466,6 +660,8 @@ public class TestCompactor {
     String dbName = "default";
     String tblName = "cws";
     List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -500,10 +696,12 @@ public class TestCompactor {
       FileSystem fs = FileSystem.get(conf);
       FileStatus[] stat =
           fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
-      Assert.assertEquals(1, stat.length);
+      if (1 != stat.length) {
+        Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+      }
       String name = stat[0].getPath().getName();
       Assert.assertEquals(name, "base_0000004");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
     } finally {
       connection.close();
     }
@@ -514,6 +712,8 @@ public class TestCompactor {
     String dbName = "default";
     String tblName = "cws";
     List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -561,9 +761,12 @@ public class TestCompactor {
         }
       }
       Arrays.sort(names);
-      Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
-          "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"});
-      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, 0, 1L, 4L);
+      String[] expected = new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"};
+      if (!Arrays.deepEquals(expected, names)) {
+        Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+      }
+      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
     } finally {
       connection.close();
     }
@@ -574,6 +777,8 @@ public class TestCompactor {
     String dbName = "default";
     String tblName = "cws";
     List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -613,10 +818,17 @@ public class TestCompactor {
       FileSystem fs = FileSystem.get(conf);
       FileStatus[] stat =
           fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
-      Assert.assertEquals(1, stat.length);
+      if (1 != stat.length) {
+        Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat));
+      }
+      if (1 != stat.length) {
+        Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
+      }
       String name = stat[0].getPath().getName();
-      Assert.assertEquals(name, "base_0000006");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+      if (!name.equals("base_0000006")) {
+        Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
+      }
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
     } finally {
       connection.close();
     }
@@ -642,7 +854,8 @@ public class TestCompactor {
     }
   }
 
-  private void checkExpectedTxnsPresent(Path base, Path[] deltas, int bucket, long min, long max)
+  private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty,
+      String columnTypesProperty, int bucket, long min, long max)
       throws IOException {
     ValidTxnList txnList = new ValidTxnList() {
       @Override
@@ -678,8 +891,11 @@ public class TestCompactor {
 
     OrcInputFormat aif = new OrcInputFormat();
 
+    Configuration conf = new Configuration();
+    conf.set("columns", columnNamesProperty);
+    conf.set("columns.types", columnTypesProperty);
     AcidInputFormat.RawReader<OrcStruct> reader =
-        aif.getRawReader(new Configuration(), false, bucket, txnList, base, deltas);
+        aif.getRawReader(conf, false, bucket, txnList, base, deltas);
     RecordIdentifier identifier = reader.createKey();
     OrcStruct value = reader.createValue();
     long currentTxn = min;

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 402914c..ece43cc 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -162,6 +162,22 @@ minitez.query.files.shared=alter_merge_2_orc.q,\
   ptf_matchpath.q,\
   ptf_streaming.q,\
   sample1.q,\
+  schema_evol_text_nonvec_mapwork_table.q,\
+  schema_evol_text_nonvec_fetchwork_table.q,\
+  schema_evol_orc_nonvec_fetchwork_part.q,\
+  schema_evol_orc_nonvec_mapwork_part.q,\
+  schema_evol_text_nonvec_fetchwork_part.q,\
+  schema_evol_text_nonvec_mapwork_part.q,\
+  schema_evol_orc_acid_mapwork_part.q,\
+  schema_evol_orc_acid_mapwork_table.q,\
+  schema_evol_orc_acidvec_mapwork_table.q,\
+  schema_evol_orc_acidvec_mapwork_part.q,\
+  schema_evol_orc_vec_mapwork_part.q,\
+  schema_evol_text_fetchwork_table.q,\
+  schema_evol_text_mapwork_table.q,\
+  schema_evol_orc_vec_mapwork_table.q,\
+  schema_evol_orc_nonvec_mapwork_table.q,\
+  schema_evol_orc_nonvec_fetchwork_table.q,\
   selectDistinctStar.q,\
   script_env_var1.q,\
   script_env_var2.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 51f4c8e..b57366c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -33,11 +33,13 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
@@ -54,7 +56,8 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
 public class LlapInputFormat
-  implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface {
+  implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface,
+  SelfDescribingInputFormatInterface {
   @SuppressWarnings("rawtypes")
   private final InputFormat sourceInputFormat;
   private final ColumnVectorProducer cvp;
@@ -104,6 +107,8 @@ public class LlapInputFormat
     private final SearchArgument sarg;
     private final String[] columnNames;
     private final VectorizedRowBatchCtx rbCtx;
+    private final boolean[] columnsToIncludeTruncated;
+    private final Object[] partitionValues;
 
     private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
     private ColumnVectorBatch lastCvb = null;
@@ -118,19 +123,28 @@ public class LlapInputFormat
     private long firstReturnTime;
 
     public LlapRecordReader(
-        JobConf job, FileSplit split, List<Integer> includedCols, String hostName) {
+        JobConf job, FileSplit split, List<Integer> includedCols, String hostName)
+            throws IOException {
       this.split = split;
       this.columnIds = includedCols;
       this.sarg = ConvertAstToSearchArg.createFromConf(job);
       this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
       this.counters = new QueryFragmentCounters(job);
       this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
-      try {
-        rbCtx = new VectorizedRowBatchCtx();
-        rbCtx.init(job, split);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
+
+      MapWork mapWork = Utilities.getMapWork(job);
+      rbCtx = mapWork.getVectorizedRowBatchCtx();
+
+      columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(job);
+
+      int partitionColumnCount = rbCtx.getPartitionColumnCount();
+      if (partitionColumnCount > 0) {
+        partitionValues = new Object[partitionColumnCount];
+        rbCtx.getPartitionValues(rbCtx, job, split, partitionValues);
+      } else {
+        partitionValues = null;
       }
+
       startRead();
     }
 
@@ -143,10 +157,8 @@ public class LlapInputFormat
       // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
       boolean wasFirst = isFirst;
       if (isFirst) {
-        try {
-          rbCtx.addPartitionColsToBatch(value);
-        } catch (HiveException e) {
-          throw new IOException(e);
+        if (partitionValues != null) {
+          rbCtx.addPartitionColsToBatch(value, partitionValues);
         }
         isFirst = false;
       }
@@ -244,11 +256,7 @@ public class LlapInputFormat
 
     @Override
     public VectorizedRowBatch createValue() {
-      try {
-        return rbCtx.createVectorizedRowBatch();
-      } catch (HiveException e) {
-        throw new RuntimeException("Error creating a batch", e);
-      }
+      return rbCtx.createVectorizedRowBatch(columnsToIncludeTruncated);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 39a881a..892587a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -484,8 +484,13 @@ public enum ErrorMsg {
 
 
   INVALID_FILE_FORMAT_IN_LOAD(30019, "The file that you are trying to load does not match the" +
-      " file format of the destination table.")
+      " file format of the destination table."),
 
+  SCHEMA_REQUIRED_TO_READ_ACID_TABLES(30020, "Neither the configuration variables " +
+          "schema.evolution.columns / schema.evolution.columns.types " +
+          "nor the " +
+          "columns / columns.types " +
+          "are set.  Table schema information is required to read ACID tables")
   ;
 
   private int errorCode;

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 157115b..ad36093 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -132,6 +132,10 @@ public class FetchOperator implements Serializable {
     this.job = job;
     this.work = work;
     this.operator = operator;
+    if (operator instanceof TableScanOperator) {
+      Utilities.addTableSchemaToConf(job,
+          (TableScanOperator) operator);
+    }
     this.vcCols = vcCols;
     this.hasVC = vcCols != null && !vcCols.isEmpty();
     this.isStatReader = work.getTblDesc() == null;
@@ -599,6 +603,9 @@ public class FetchOperator implements Serializable {
   }
 
   private boolean needConversion(PartitionDesc partitionDesc) {
+    if (Utilities.isInputFileFormatSelfDescribing(partitionDesc)) {
+      return false;
+    }
     return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc));
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index c2a5726..99724c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.MapOperator.MapOpCtx;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 
@@ -202,8 +204,13 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
     opCtx.partName = String.valueOf(partSpec);
     opCtx.deserializer = pd.getDeserializer(hconf);
 
-    StructObjectInspector partRawRowObjectInspector =
-        (StructObjectInspector) opCtx.deserializer.getObjectInspector();
+    StructObjectInspector partRawRowObjectInspector;
+    if (Utilities.isInputFileFormatSelfDescribing(pd)) {
+      partRawRowObjectInspector = tableRowOI;
+    } else {
+      partRawRowObjectInspector =
+          (StructObjectInspector) opCtx.deserializer.getObjectInspector();
+    }
 
     opCtx.partTblObjectInspectorConverter =
         ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI);
@@ -304,8 +311,15 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
         PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
         TableDesc tableDesc = pd.getTableDesc();
         Deserializer partDeserializer = pd.getDeserializer(hconf);
-        StructObjectInspector partRawRowObjectInspector =
-            (StructObjectInspector) partDeserializer.getObjectInspector();
+
+        StructObjectInspector partRawRowObjectInspector;
+        if (Utilities.isInputFileFormatSelfDescribing(pd)) {
+          Deserializer tblDeserializer = tableDesc.getDeserializer(hconf);
+          partRawRowObjectInspector = (StructObjectInspector) tblDeserializer.getObjectInspector();
+        } else {
+          partRawRowObjectInspector =
+              (StructObjectInspector) partDeserializer.getObjectInspector();
+        }
 
         StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc);
         if ((tblRawRowObjectInspector == null) ||

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index 6e4f474..90c83e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -67,6 +67,13 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
 
   private String defaultPartitionName;
 
+  /**
+   * These values are saved during MapWork, FetchWork, etc preparation and later added to the the
+   * JobConf of each task.
+   */
+  private String schemaEvolutionColumns;
+  private String schemaEvolutionColumnsTypes;
+
   public TableDesc getTableDesc() {
     return tableDesc;
   }
@@ -75,6 +82,19 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
     this.tableDesc = tableDesc;
   }
 
+  public void setSchemaEvolution(String schemaEvolutionColumns, String schemaEvolutionColumnsTypes) {
+    this.schemaEvolutionColumns = schemaEvolutionColumns;
+    this.schemaEvolutionColumnsTypes = schemaEvolutionColumnsTypes;
+  }
+
+  public String getSchemaEvolutionColumns() {
+    return schemaEvolutionColumns;
+  }
+
+  public String getSchemaEvolutionColumnsTypes() {
+    return schemaEvolutionColumnsTypes;
+  }
+
   /**
    * Other than gathering statistics for the ANALYZE command, the table scan operator
    * does not do anything special other than just forwarding the row. Since the table

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index de2eb98..0700e2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -106,6 +106,7 @@ import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -121,6 +122,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -128,11 +131,14 @@ import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
@@ -173,6 +179,12 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
@@ -473,11 +485,6 @@ public final class Utilities {
     }
   }
 
-  public static Map<Integer, String> getMapWorkVectorScratchColumnTypeMap(Configuration hiveConf) {
-    MapWork mapWork = getMapWork(hiveConf);
-    return mapWork.getVectorScratchColumnTypeMap();
-  }
-
   public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
     try {
       Graph stageGraph = plan.getQueryPlan().getStageGraph();
@@ -3782,6 +3789,22 @@ public final class Utilities {
     return false;
   }
 
+  /**
+   * @param conf
+   * @return the configured VectorizedRowBatchCtx for a MapWork task.
+   */
+  public static VectorizedRowBatchCtx getVectorizedRowBatchCtx(Configuration conf) {
+    VectorizedRowBatchCtx result = null;
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) &&
+        Utilities.getPlanPath(conf) != null) {
+      MapWork mapWork = Utilities.getMapWork(conf);
+      if (mapWork != null && mapWork.getVectorMode()) {
+        result = mapWork.getVectorizedRowBatchCtx();
+      }
+    }
+    return result;
+  }
+
   public static boolean isVectorMode(Configuration conf, MapWork mapWork) {
     return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
         && mapWork.getVectorMode();
@@ -3993,6 +4016,7 @@ public final class Utilities {
     }
 
   }
+
   public static List<String> getStatsTmpDirs(BaseWork work, Configuration conf) {
 
     List<String> statsTmpDirs = new ArrayList<>();
@@ -4020,4 +4044,74 @@ public final class Utilities {
     }
     return statsTmpDirs;
   }
+
+  public static boolean isInputFileFormatSelfDescribing(PartitionDesc pd) {
+    Class<?> inputFormatClass = pd.getInputFileFormatClass();
+    return SelfDescribingInputFormatInterface.class.isAssignableFrom(inputFormatClass);
+  }
+
+  public static boolean isInputFileFormatVectorized(PartitionDesc pd) {
+    Class<?> inputFormatClass = pd.getInputFileFormatClass();
+    return VectorizedInputFormatInterface.class.isAssignableFrom(inputFormatClass);
+  }
+
+  public static void addSchemaEvolutionToTableScanOperator(Table table,
+      TableScanOperator tableScanOp) {
+    String colNames = MetaStoreUtils.getColumnNamesFromFieldSchema(table.getSd().getCols());
+    String colTypes = MetaStoreUtils.getColumnTypesFromFieldSchema(table.getSd().getCols());
+    tableScanOp.setSchemaEvolution(colNames, colTypes);
+  }
+
+  public static void addSchemaEvolutionToTableScanOperator(StructObjectInspector structOI,
+      TableScanOperator tableScanOp) {
+    String colNames = ObjectInspectorUtils.getFieldNames(structOI);
+    String colTypes = ObjectInspectorUtils.getFieldTypes(structOI);
+    tableScanOp.setSchemaEvolution(colNames, colTypes);
+  }
+
+  public static void unsetSchemaEvolution(Configuration conf) {
+    conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+    conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+  }
+
+  public static void addTableSchemaToConf(Configuration conf,
+      TableScanOperator tableScanOp) {
+    String schemaEvolutionColumns = tableScanOp.getSchemaEvolutionColumns();
+    if (schemaEvolutionColumns != null) {
+      conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, tableScanOp.getSchemaEvolutionColumns());
+      conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, tableScanOp.getSchemaEvolutionColumnsTypes());
+    } else {
+      LOG.info("schema.evolution.columns and schema.evolution.columns.types not available");
+    }
+  }
+
+  /**
+   * Create row key and value object inspectors for reduce vectorization.
+   * The row object inspector used by ReduceWork needs to be a **standard**
+   * struct object inspector, not just any struct object inspector.
+   * @param keyInspector
+   * @param valueInspector
+   * @return OI
+   * @throws HiveException
+   */
+  public static StandardStructObjectInspector constructVectorizedReduceRowOI(
+      StructObjectInspector keyInspector, StructObjectInspector valueInspector)
+          throws HiveException {
+
+    ArrayList<String> colNames = new ArrayList<String>();
+    ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+    List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
+    for (StructField field: fields) {
+      colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+      ois.add(field.getFieldObjectInspector());
+    }
+    fields = valueInspector.getAllStructFieldRefs();
+    for (StructField field: fields) {
+      colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+      ois.add(field.getFieldObjectInspector());
+    }
+    StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
+
+    return rowObjectInspector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 5fbefec..439e0df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -153,10 +154,6 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
           /* vectorization only works with struct object inspectors */
           valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag];
 
-          ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = VectorizedBatchUtil.
-              constructVectorizedRowBatch(keyStructInspector,
-              valueStructInspectors[tag], gWork.getVectorScratchColumnTypeMap());
-          batches[tag] = pair.getFirst();
           final int totalColumns = keysColumnOffset
               + valueStructInspectors[tag].getAllStructFieldRefs().size();
           valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
@@ -165,7 +162,11 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
           valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
               .genVectorStructExpressionWritables(valueStructInspectors[tag])));
 
-          rowObjectInspector[tag] = pair.getSecond();
+          rowObjectInspector[tag] = Utilities.constructVectorizedReduceRowOI(keyStructInspector,
+              valueStructInspectors[tag]);
+          batches[tag] = gWork.getVectorizedRowBatchCtx().createVectorizedRowBatch();
+
+
         } else {
           ois.add(keyObjectInspector);
           ois.add(valueObjectInspector[tag]);

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 8768847..efcf88c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -244,7 +244,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode();
     sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc,
         valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
-        redWork.getVectorScratchColumnTypeMap());
+        redWork.getVectorizedRowBatchCtx());
     ois[tag] = sources[tag].getObjectInspector();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index aff5765..1f75d07 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -22,11 +22,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -35,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -52,7 +50,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
@@ -124,7 +121,7 @@ public class ReduceRecordSource implements RecordSource {
 
   void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
       TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag,
-      Map<Integer, String> vectorScratchColumnTypeMap)
+      VectorizedRowBatchCtx batchContext)
       throws Exception {
 
     ObjectInspector keyObjectInspector;
@@ -175,10 +172,9 @@ public class ReduceRecordSource implements RecordSource {
             .asList(VectorExpressionWriterFactory
                 .genVectorStructExpressionWritables(valueStructInspectors)));
 
-        ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair =
-            VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, valueStructInspectors, vectorScratchColumnTypeMap);
-        rowObjectInspector = pair.getSecond();
-        batch = pair.getFirst();
+        rowObjectInspector = Utilities.constructVectorizedReduceRowOI(keyStructInspector,
+            valueStructInspectors);
+        batch = batchContext.createVectorizedRowBatch();
 
         // Setup vectorized deserialization for the key and value.
         BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) inputKeyDeserializer;
@@ -186,7 +182,7 @@ public class ReduceRecordSource implements RecordSource {
         keyBinarySortableDeserializeToRow =
                   new VectorDeserializeRow<BinarySortableDeserializeRead>(
                         new BinarySortableDeserializeRead(
-                                  VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+                                  VectorizedBatchUtil.typeInfosFromStructObjectInspector(
                                       keyStructInspector),
                                   binarySortableSerDe.getSortOrders()));
         keyBinarySortableDeserializeToRow.init(0);
@@ -196,7 +192,7 @@ public class ReduceRecordSource implements RecordSource {
           valueLazyBinaryDeserializeToRow =
                   new VectorDeserializeRow<LazyBinaryDeserializeRead>(
                         new LazyBinaryDeserializeRead(
-                                  VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+                            VectorizedBatchUtil.typeInfosFromStructObjectInspector(
                                        valueStructInspectors)));
           valueLazyBinaryDeserializeToRow.init(firstValueColumnOffset);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index 94a60be..4100bc5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -468,6 +468,9 @@ public abstract class VectorExtractRow {
         int start = colVector.start[adjustedIndex];
         int length = colVector.length[adjustedIndex];
 
+        if (value == null) {
+          LOG.info("null string entry: batchIndex " + batchIndex + " columnIndex " + columnIndex);
+        }
         // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
         text.set(value, start, length);
 
@@ -727,9 +730,9 @@ public abstract class VectorExtractRow {
   }
 
   public void extractRow(int batchIndex, Object[] objects) {
-    int i = 0;
-    for (Extractor extracter : extracters) {
-      objects[i++] = extracter.extract(batchIndex);
+    for (int i = 0; i < extracters.length; i++) {
+      Extractor extracter = extracters[i];
+      objects[i] = extracter.extract(batchIndex);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 35bbaef..0524c08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -813,7 +813,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
           outputFieldNames, objectInspectors);
       if (isVectorOutput) {
         vrbCtx = new VectorizedRowBatchCtx();
-        vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) outputObjInspector);
+        vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames());
         outputBatch = vrbCtx.createVectorizedRowBatch();
         vectorAssignRowSameBatch = new VectorAssignRowSameBatch();
         vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns());

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
index e378d0d..4b1d9ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
@@ -90,7 +90,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
     super.initializeOp(hconf);
 
     vrbCtx = new VectorizedRowBatchCtx();
-    vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector);
+    vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
 
     outputBatch = vrbCtx.createVectorizedRowBatch();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
index dcd2d57..9ff9b77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
@@ -146,7 +146,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
     super.initializeOp(hconf);
 
     vrbCtx = new VectorizedRowBatchCtx();
-    vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector);
+    vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
 
     outputBatch = vrbCtx.createVectorizedRowBatch();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index e7a829e..95a4b9d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -144,6 +144,8 @@ public class VectorizationContext {
 
   VectorExpressionDescriptor vMap;
 
+  private List<String> initialColumnNames;
+
   private List<Integer> projectedColumns;
   private List<String> projectionColumnNames;
   private Map<String, Integer> projectionColumnMap;
@@ -161,6 +163,7 @@ public class VectorizationContext {
       LOG.debug("VectorizationContext consructor contextName " + contextName + " level "
           + level + " initialColumnNames " + initialColumnNames);
     }
+    this.initialColumnNames = initialColumnNames;
     this.projectionColumnNames = initialColumnNames;
 
     projectedColumns = new ArrayList<Integer>();
@@ -183,6 +186,7 @@ public class VectorizationContext {
     if (LOG.isDebugEnabled()) {
       LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level);
     }
+    initialColumnNames = new ArrayList<String>();
     projectedColumns = new ArrayList<Integer>();
     projectionColumnNames = new ArrayList<String>();
     projectionColumnMap = new HashMap<String, Integer>();
@@ -198,6 +202,7 @@ public class VectorizationContext {
     this.contextName = contextName;
     level = vContext.level + 1;
     LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level);
+    this.initialColumnNames = vContext.initialColumnNames;
     this.projectedColumns = new ArrayList<Integer>();
     this.projectionColumnNames = new ArrayList<String>();
     this.projectionColumnMap = new HashMap<String, Integer>();
@@ -210,6 +215,7 @@ public class VectorizationContext {
   // Add an initial column to a vectorization context when
   // a vectorized row batch is being created.
   public void addInitialColumn(String columnName) {
+    initialColumnNames.add(columnName);
     int index = projectedColumns.size();
     projectedColumns.add(index);
     projectionColumnNames.add(columnName);
@@ -238,6 +244,10 @@ public class VectorizationContext {
     projectionColumnMap.put(columnName, vectorBatchColIndex);
   }
 
+  public List<String> getInitialColumnNames() {
+    return initialColumnNames;
+  }
+
   public List<Integer> getProjectedColumns() {
     return projectedColumns;
   }
@@ -1038,7 +1048,9 @@ public class VectorizationContext {
     VectorExpressionDescriptor.Descriptor descriptor = builder.build();
     Class<?> vclass = this.vMap.getVectorExpressionClass(udfClass, descriptor);
     if (vclass == null) {
-      LOG.info("No vector udf found for " + udfClass.getSimpleName() + ", descriptor: " + descriptor);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No vector udf found for "+udfClass.getSimpleName() + ", descriptor: "+descriptor);
+      }
       return null;
     }
     Mode childrenMode = getChildrenMode(mode, udfClass);
@@ -2337,11 +2349,11 @@ public class VectorizationContext {
             return ColumnVector.Type.DECIMAL;
 
           default:
-            throw new HiveException("Unexpected primitive type category " + primitiveCategory);
+            throw new RuntimeException("Unexpected primitive type category " + primitiveCategory);
         }
       }
       default:
-        throw new HiveException("Unexpected type category " +
+        throw new RuntimeException("Unexpected type category " +
             typeInfo.getCategory());
     }
   }
@@ -2452,13 +2464,16 @@ public class VectorizationContext {
     return firstOutputColumnIndex;
   }
 
-  public Map<Integer, String> getScratchColumnTypeMap() {
-    Map<Integer, String> map = new HashMap<Integer, String>();
+  public String[] getScratchColumnTypeNames() {
+    String[] result = new String[ocm.outputColCount];
     for (int i = 0; i < ocm.outputColCount; i++) {
-      String type = ocm.outputColumnsTypes[i];
-      map.put(i+this.firstOutputColumnIndex, type);
+      String typeName = ocm.outputColumnsTypes[i];
+      if (typeName.equalsIgnoreCase("long")) {
+        typeName = "bigint";   // Convert our synonym to a real Hive type name.
+      }
+      result[i] =  typeName;
     }
-    return map;
+    return result;
   }
 
   @Override
@@ -2478,9 +2493,7 @@ public class VectorizationContext {
     }
     sb.append("sorted projectionColumnMap ").append(sortedColumnMap).append(", ");
 
-    Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger);
-    sortedScratchColumnTypeMap.putAll(getScratchColumnTypeMap());
-    sb.append("sorted scratchColumnTypeMap ").append(sortedScratchColumnTypeMap);
+    sb.append("scratchColumnTypeNames ").append(getScratchColumnTypeNames().toString());
 
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 3d6d6e0..d75d185 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -56,9 +56,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -114,16 +118,24 @@ public class VectorizedBatchUtil {
     batch.size = size;
   }
 
-  /**
-   * Convert an ObjectInspector into a ColumnVector of the appropriate
-   * type.
-   */
-  public static ColumnVector createColumnVector(ObjectInspector inspector
-                                                ) throws HiveException {
-    switch(inspector.getCategory()) {
-      case PRIMITIVE:
-        PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector;
-        switch(poi.getPrimitiveCategory()) {
+  public static ColumnVector createColumnVector(String typeName) {
+    typeName = typeName.toLowerCase();
+
+    // Allow undecorated CHAR and VARCHAR to support scratch column type names.
+    if (typeName.equals("char") || typeName.equals("varchar")) {
+      return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+    }
+
+    TypeInfo typeInfo = (TypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+    return createColumnVector(typeInfo);
+  }
+
+  public static ColumnVector createColumnVector(TypeInfo typeInfo) {
+    switch(typeInfo.getCategory()) {
+    case PRIMITIVE:
+      {
+        PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+        switch(primitiveTypeInfo.getPrimitiveCategory()) {
           case BOOLEAN:
           case BYTE:
           case SHORT:
@@ -143,142 +155,53 @@ public class VectorizedBatchUtil {
           case VARCHAR:
             return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
           case DECIMAL:
-            DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo();
+            DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo;
             return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
                 tInfo.precision(), tInfo.scale());
           default:
-            throw new HiveException("Vectorizaton is not supported for datatype:"
-                + poi.getPrimitiveCategory());
+            throw new RuntimeException("Vectorizaton is not supported for datatype:"
+                + primitiveTypeInfo.getPrimitiveCategory());
         }
-      case STRUCT: {
-        StructObjectInspector soi = (StructObjectInspector) inspector;
-        List<? extends StructField> fieldList = soi.getAllStructFieldRefs();
-        ColumnVector[] children = new ColumnVector[fieldList.size()];
+      }
+    case STRUCT:
+      {
+        StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+        List<TypeInfo> typeInfoList = structTypeInfo.getAllStructFieldTypeInfos();
+        ColumnVector[] children = new ColumnVector[typeInfoList.size()];
         for(int i=0; i < children.length; ++i) {
           children[i] =
-              createColumnVector(fieldList.get(i).getFieldObjectInspector());
+              createColumnVector(typeInfoList.get(i));
         }
         return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
             children);
       }
-      case UNION: {
-        UnionObjectInspector uoi = (UnionObjectInspector) inspector;
-        List<ObjectInspector> fieldList = uoi.getObjectInspectors();
-        ColumnVector[] children = new ColumnVector[fieldList.size()];
+    case UNION:
+      {
+        UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+        List<TypeInfo> typeInfoList = unionTypeInfo.getAllUnionObjectTypeInfos();
+        ColumnVector[] children = new ColumnVector[typeInfoList.size()];
         for(int i=0; i < children.length; ++i) {
-          children[i] = createColumnVector(fieldList.get(i));
+          children[i] = createColumnVector(typeInfoList.get(i));
         }
         return new UnionColumnVector(VectorizedRowBatch.DEFAULT_SIZE, children);
       }
-      case LIST: {
-        ListObjectInspector loi = (ListObjectInspector) inspector;
+    case LIST:
+      {
+        ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
         return new ListColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
-            createColumnVector(loi.getListElementObjectInspector()));
+            createColumnVector(listTypeInfo.getListElementTypeInfo()));
       }
-      case MAP: {
-        MapObjectInspector moi = (MapObjectInspector) inspector;
+    case MAP:
+      {
+        MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
         return new MapColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
-            createColumnVector(moi.getMapKeyObjectInspector()),
-            createColumnVector(moi.getMapValueObjectInspector()));
+            createColumnVector(mapTypeInfo.getMapKeyTypeInfo()),
+            createColumnVector(mapTypeInfo.getMapValueTypeInfo()));
       }
-      default:
-        throw new HiveException("Vectorization is not supported for datatype:"
-            + inspector.getCategory());
-    }
-
-  }
-
-  /**
-   * Walk through the object inspector and add column vectors
-   *
-   * @param oi
-   * @param cvList
-   *          ColumnVectors are populated in this list
-   */
-  private static void allocateColumnVector(StructObjectInspector oi,
-      List<ColumnVector> cvList) throws HiveException {
-    if (cvList == null) {
-      throw new HiveException("Null columnvector list");
-    }
-    if (oi == null) {
-      return;
-    }
-    final List<? extends StructField> fields = oi.getAllStructFieldRefs();
-    for(StructField field : fields) {
-      ObjectInspector fieldObjectInspector = field.getFieldObjectInspector();
-      cvList.add(createColumnVector(fieldObjectInspector));
-    }
-  }
-
-
-  /**
-   * Create VectorizedRowBatch from ObjectInspector
-   *
-   * @param oi
-   * @return
-   * @throws HiveException
-   */
-  public static VectorizedRowBatch constructVectorizedRowBatch(
-      StructObjectInspector oi) throws HiveException {
-    final List<ColumnVector> cvList = new LinkedList<ColumnVector>();
-    allocateColumnVector(oi, cvList);
-    final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size());
-    int i = 0;
-    for(ColumnVector cv : cvList) {
-      result.cols[i++] = cv;
-    }
-    return result;
-  }
-
-  /**
-   * Create VectorizedRowBatch from key and value object inspectors
-   * The row object inspector used by ReduceWork needs to be a **standard**
-   * struct object inspector, not just any struct object inspector.
-   * @param keyInspector
-   * @param valueInspector
-   * @param vectorScratchColumnTypeMap
-   * @return VectorizedRowBatch, OI
-   * @throws HiveException
-   */
-  public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch(
-      StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap)
-          throws HiveException {
-
-    ArrayList<String> colNames = new ArrayList<String>();
-    ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
-    List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
-    for (StructField field: fields) {
-      colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
-      ois.add(field.getFieldObjectInspector());
-    }
-    fields = valueInspector.getAllStructFieldRefs();
-    for (StructField field: fields) {
-      colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
-      ois.add(field.getFieldObjectInspector());
+    default:
+      throw new RuntimeException("Vectorization is not supported for datatype:"
+          + typeInfo.getCategory());
     }
-    StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
-
-    VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
-    batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector);
-    return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector);
-  }
-
-  /**
-   * Iterates through all columns in a given row and populates the batch
-   *
-   * @param row
-   * @param oi
-   * @param rowIndex
-   * @param batch
-   * @param buffer
-   * @throws HiveException
-   */
-  public static void addRowToBatch(Object row, StructObjectInspector oi,
-          int rowIndex,
-          VectorizedRowBatch batch,
-          DataOutputBuffer buffer
-          ) throws HiveException {
-    addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer);
   }
 
   /**
@@ -621,31 +544,30 @@ public class VectorizedBatchUtil {
     return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids);
   }
 
-  public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector(
+  public static String[] columnNamesFromStructObjectInspector(
       StructObjectInspector structObjectInspector) throws HiveException {
 
     List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
-    PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()];
+    String[] result = new String[fields.size()];
 
     int i = 0;
     for(StructField field : fields) {
-      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
-          field.getFieldObjectInspector().getTypeName());
-      result[i++] =  (PrimitiveTypeInfo) typeInfo;
+      result[i++] =  field.getFieldName();
     }
     return result;
   }
 
-  public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames(
-      String[] typeNames) throws HiveException {
-
-    PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[typeNames.length];
+  public static TypeInfo[] typeInfosFromTypeNames(String[] typeNames) throws HiveException {
+    ArrayList<TypeInfo> typeInfoList =
+        TypeInfoUtils.typeInfosFromTypeNames(Arrays.asList(typeNames));
+    return typeInfoList.toArray(new TypeInfo[0]);
+  }
 
-    for(int i = 0; i < typeNames.length; i++) {
-      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]);
-      result[i] =  (PrimitiveTypeInfo) typeInfo;
-    }
-    return result;
+  public static TypeInfo[] typeInfosFromStructObjectInspector(
+      StructObjectInspector structObjectInspector) {
+    ArrayList<TypeInfo> typeInfoList =
+        TypeInfoUtils.typeInfosFromStructObjectInspector(structObjectInspector);
+    return typeInfoList.toArray(new TypeInfo[0]);
   }
 
   static ColumnVector cloneColumnVector(ColumnVector source