You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/01/12 18:56:49 UTC

[18/18] hive git commit: HIVE-12625: Backport to branch-1 HIVE-11981 ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema evolution (Prasanth Jayachan

HIVE-12625: Backport to branch-1 HIVE-11981 ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized)  (Matt McCline, reviewed by Prasanth J)
HIVE-12728: Apply DDL restrictions for ORC schema evolution (Prasanth Jayachandran reviewed by Matt McCline and Gunther Hagleitner)
HIVE-12799: Always use Schema Evolution for ACID (Matt McCline, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0fd9069e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0fd9069e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0fd9069e

Branch: refs/heads/branch-1
Commit: 0fd9069e9b8e6c97d2068cc449aa14c9773027ca
Parents: 9b5f1ff
Author: Matt McCline <mm...@hortonworks.com>
Authored: Tue Jan 12 09:55:57 2016 -0800
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Tue Jan 12 09:55:57 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    4 +
 .../mapreduce/FosterStorageHandler.java         |   37 +
 .../hive/hcatalog/mapreduce/InputJobInfo.java   |    8 +-
 .../hive/hcatalog/streaming/TestStreaming.java  |    2 +
 .../hive/ql/txn/compactor/TestCompactor.java    |  246 ++-
 .../test/resources/testconfiguration.properties |   16 +
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   13 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |  102 +-
 .../hadoop/hive/ql/exec/FetchOperator.java      |    8 +
 .../apache/hadoop/hive/ql/exec/MapOperator.java |   25 +-
 .../hadoop/hive/ql/exec/TableScanOperator.java  |   20 +
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  110 +-
 .../ql/exec/spark/SparkReduceRecordHandler.java |   11 +-
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |    2 +-
 .../hive/ql/exec/tez/ReduceRecordSource.java    |   15 +-
 .../hive/ql/exec/vector/BytesColumnVector.java  |   99 +-
 .../hive/ql/exec/vector/ColumnVector.java       |   92 +-
 .../ql/exec/vector/DecimalColumnVector.java     |   82 +-
 .../hive/ql/exec/vector/DoubleColumnVector.java |   67 +-
 .../hive/ql/exec/vector/LongColumnVector.java   |   73 +-
 .../ql/exec/vector/VectorDeserializeRow.java    |   37 +-
 .../hive/ql/exec/vector/VectorExtractRow.java   |   11 +-
 .../ql/exec/vector/VectorGroupByOperator.java   |    2 +-
 .../exec/vector/VectorMapJoinBaseOperator.java  |    4 +-
 .../exec/vector/VectorSMBMapJoinOperator.java   |    2 +-
 .../hive/ql/exec/vector/VectorSerializeRow.java |   12 +-
 .../ql/exec/vector/VectorizationContext.java    |  113 +-
 .../ql/exec/vector/VectorizedBatchUtil.java     |  201 +-
 .../ql/exec/vector/VectorizedColumnarSerDe.java |  277 ---
 .../hive/ql/exec/vector/VectorizedRowBatch.java |   20 +
 .../ql/exec/vector/VectorizedRowBatchCtx.java   |  556 ++----
 .../mapjoin/VectorMapJoinCommonOperator.java    |   65 +-
 .../VectorMapJoinGenerateResultOperator.java    |   28 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |   19 +
 .../hadoop/hive/ql/io/HiveInputFormat.java      |    5 +
 .../apache/hadoop/hive/ql/io/IOConstants.java   |   11 +
 .../io/SelfDescribingInputFormatInterface.java  |   27 +
 .../hive/ql/io/VectorizedRCFileInputFormat.java |   81 -
 .../ql/io/VectorizedRCFileRecordReader.java     |  261 ---
 .../ql/io/orc/ConversionTreeReaderFactory.java  |   38 -
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   67 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |   62 +-
 .../apache/hadoop/hive/ql/io/orc/OrcUtils.java  |  562 ++++++
 .../apache/hadoop/hive/ql/io/orc/Reader.java    |   15 +
 .../hive/ql/io/orc/RecordReaderFactory.java     |  269 ---
 .../hadoop/hive/ql/io/orc/RecordReaderImpl.java |   35 +-
 .../hadoop/hive/ql/io/orc/SchemaEvolution.java  |  185 ++
 .../hive/ql/io/orc/TreeReaderFactory.java       |  188 +-
 .../hadoop/hive/ql/io/orc/TypeDescription.java  |  514 ++++++
 .../ql/io/orc/VectorizedOrcAcidRowReader.java   |   45 +-
 .../ql/io/orc/VectorizedOrcInputFormat.java     |   53 +-
 .../parquet/VectorizedParquetInputFormat.java   |   26 +-
 .../hive/ql/optimizer/GenMapRedUtils.java       |   18 +
 .../hive/ql/optimizer/SimpleFetchOptimizer.java |    1 +
 .../hive/ql/optimizer/physical/Vectorizer.java  |  501 +++--
 .../ql/optimizer/physical/Vectorizer.java.orig  | 1744 ------------------
 .../apache/hadoop/hive/ql/plan/BaseWork.java    |   30 +-
 .../hadoop/hive/ql/plan/PartitionDesc.java      |   15 +-
 .../hive/ql/plan/VectorPartitionConversion.java |  166 ++
 .../hive/ql/plan/VectorPartitionDesc.java       |  110 ++
 .../ql/exec/vector/TestVectorRowObject.java     |    5 +-
 .../hive/ql/exec/vector/TestVectorSerDeRow.java |    9 +-
 .../exec/vector/TestVectorizedRowBatchCtx.java  |  355 ----
 .../hive/ql/io/orc/TestInputOutputFormat.java   |   49 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |   42 +
 .../ql/io/orc/TestOrcRawRecordMerger.java.orig  | 1150 ++++++++++++
 .../hive/ql/io/orc/TestVectorizedORCReader.java |   75 +-
 .../clientnegative/orc_change_fileformat.q      |    3 +
 .../clientnegative/orc_change_fileformat_acid.q |    3 +
 .../queries/clientnegative/orc_change_serde.q   |    3 +
 .../clientnegative/orc_change_serde_acid.q      |    3 +
 .../clientnegative/orc_reorder_columns1.q       |    3 +
 .../clientnegative/orc_reorder_columns1_acid.q  |    3 +
 .../clientnegative/orc_reorder_columns2.q       |    3 +
 .../clientnegative/orc_reorder_columns2_acid.q  |    3 +
 .../clientnegative/orc_replace_columns1.q       |    3 +
 .../clientnegative/orc_replace_columns1_acid.q  |    3 +
 .../clientnegative/orc_replace_columns2.q       |    3 +
 .../clientnegative/orc_replace_columns2_acid.q  |    3 +
 .../clientnegative/orc_replace_columns3.q       |    4 +
 .../clientnegative/orc_replace_columns3_acid.q  |    4 +
 .../clientnegative/orc_type_promotion1.q        |    3 +
 .../clientnegative/orc_type_promotion1_acid.q   |    3 +
 .../clientnegative/orc_type_promotion2.q        |   10 +
 .../clientnegative/orc_type_promotion2_acid.q   |   10 +
 .../clientnegative/orc_type_promotion3.q        |    3 +
 .../clientnegative/orc_type_promotion3_acid.q   |    3 +
 .../test/queries/clientpositive/dbtxnmgr_ddl1.q |    1 -
 .../test/queries/clientpositive/load_orc_part.q |   10 -
 .../clientpositive/orc_int_type_promotion.q     |   14 +-
 .../clientpositive/orc_schema_evolution.q       |   39 +
 .../schema_evol_orc_acid_mapwork_part.q         |  173 ++
 .../schema_evol_orc_acid_mapwork_table.q        |  131 ++
 .../schema_evol_orc_acidvec_mapwork_part.q      |  173 ++
 .../schema_evol_orc_acidvec_mapwork_table.q     |  131 ++
 .../schema_evol_orc_nonvec_fetchwork_part.q     |   97 +
 .../schema_evol_orc_nonvec_fetchwork_table.q    |   57 +
 .../schema_evol_orc_nonvec_mapwork_part.q       |   97 +
 .../schema_evol_orc_nonvec_mapwork_table.q      |   57 +
 .../schema_evol_orc_vec_mapwork_part.q          |   97 +
 .../schema_evol_orc_vec_mapwork_table.q         |   57 +
 .../schema_evol_text_fetchwork_table.q          |   57 +
 .../schema_evol_text_mapwork_table.q            |   57 +
 .../schema_evol_text_nonvec_fetchwork_part.q    |   97 +
 .../schema_evol_text_nonvec_fetchwork_table.q   |   67 +
 .../schema_evol_text_nonvec_mapwork_part.q      |   97 +
 .../schema_evol_text_nonvec_mapwork_table.q     |   67 +
 .../clientnegative/orc_change_fileformat.q.out  |   13 +
 .../orc_change_fileformat_acid.q.out            |   13 +
 .../clientnegative/orc_change_serde.q.out       |   13 +
 .../clientnegative/orc_change_serde_acid.q.out  |   13 +
 .../clientnegative/orc_reorder_columns1.q.out   |   13 +
 .../orc_reorder_columns1_acid.q.out             |   13 +
 .../clientnegative/orc_reorder_columns2.q.out   |   13 +
 .../orc_reorder_columns2_acid.q.out             |   13 +
 .../clientnegative/orc_replace_columns1.q.out   |   13 +
 .../orc_replace_columns1_acid.q.out             |   13 +
 .../clientnegative/orc_replace_columns2.q.out   |   13 +
 .../orc_replace_columns2_acid.q.out             |   13 +
 .../clientnegative/orc_replace_columns3.q.out   |   21 +
 .../orc_replace_columns3_acid.q.out             |   21 +
 .../clientnegative/orc_type_promotion1.q.out    |   13 +
 .../orc_type_promotion1_acid.q.out              |   13 +
 .../clientnegative/orc_type_promotion2.q.out    |   69 +
 .../orc_type_promotion2_acid.q.out              |   69 +
 .../clientnegative/orc_type_promotion3.q.out    |   13 +
 .../orc_type_promotion3_acid.q.out              |   13 +
 .../results/clientpositive/dbtxnmgr_ddl1.q.out  |    9 -
 .../results/clientpositive/load_orc_part.q.out  |   52 -
 .../clientpositive/orc_int_type_promotion.q.out |   86 +-
 .../clientpositive/orc_schema_evolution.q.out   |  211 +++
 .../schema_evol_orc_acid_mapwork_part.q.out     | 1037 +++++++++++
 .../schema_evol_orc_acid_mapwork_table.q.out    |  651 +++++++
 .../schema_evol_orc_acidvec_mapwork_part.q.out  | 1037 +++++++++++
 .../schema_evol_orc_acidvec_mapwork_table.q.out |  651 +++++++
 .../schema_evol_orc_nonvec_fetchwork_part.q.out |  642 +++++++
 ...schema_evol_orc_nonvec_fetchwork_table.q.out |  298 +++
 .../schema_evol_orc_nonvec_mapwork_part.q.out   |  642 +++++++
 .../schema_evol_orc_nonvec_mapwork_table.q.out  |  298 +++
 .../schema_evol_orc_vec_mapwork_part.q.out      |  642 +++++++
 .../schema_evol_orc_vec_mapwork_table.q.out     |  298 +++
 .../schema_evol_text_fetchwork_table.q.out      |  298 +++
 .../schema_evol_text_mapwork_table.q.out        |  298 +++
 ...schema_evol_text_nonvec_fetchwork_part.q.out |  642 +++++++
 ...chema_evol_text_nonvec_fetchwork_table.q.out |  297 +++
 .../schema_evol_text_nonvec_mapwork_part.q.out  |  642 +++++++
 .../schema_evol_text_nonvec_mapwork_table.q.out |  297 +++
 .../tez/schema_evol_orc_acid_mapwork_part.q.out | 1037 +++++++++++
 .../schema_evol_orc_acid_mapwork_table.q.out    |  651 +++++++
 .../schema_evol_orc_acidvec_mapwork_part.q.out  | 1037 +++++++++++
 .../schema_evol_orc_acidvec_mapwork_table.q.out |  651 +++++++
 .../schema_evol_orc_nonvec_fetchwork_part.q.out |  642 +++++++
 ...schema_evol_orc_nonvec_fetchwork_table.q.out |  298 +++
 .../schema_evol_orc_nonvec_mapwork_part.q.out   |  642 +++++++
 .../schema_evol_orc_nonvec_mapwork_table.q.out  |  298 +++
 .../tez/schema_evol_orc_vec_mapwork_part.q.out  |  642 +++++++
 .../tez/schema_evol_orc_vec_mapwork_table.q.out |  298 +++
 .../tez/schema_evol_text_fetchwork_table.q.out  |  298 +++
 .../tez/schema_evol_text_mapwork_table.q.out    |  298 +++
 ...schema_evol_text_nonvec_fetchwork_part.q.out |  642 +++++++
 ...chema_evol_text_nonvec_fetchwork_table.q.out |  297 +++
 .../schema_evol_text_nonvec_mapwork_part.q.out  |  642 +++++++
 .../schema_evol_text_nonvec_mapwork_table.q.out |  297 +++
 .../tez/vector_partition_diff_num_cols.q.out    |    1 +
 .../vector_partition_diff_num_cols.q.out        |    1 +
 .../fast/BinarySortableDeserializeRead.java     |   23 +-
 .../fast/BinarySortableSerializeWrite.java      |    5 -
 .../hive/serde2/fast/DeserializeRead.java       |    6 +-
 .../lazy/fast/LazySimpleDeserializeRead.java    |  175 +-
 .../lazy/fast/LazySimpleSerializeWrite.java     |   11 +-
 .../hive/serde2/lazybinary/LazyBinarySerDe.java |   29 +
 .../fast/LazyBinaryDeserializeRead.java         |   29 +-
 .../fast/LazyBinarySerializeWrite.java          |    3 +-
 .../hive/serde2/typeinfo/TypeInfoUtils.java     |   36 +
 174 files changed, 24596 insertions(+), 4554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e793174..787e25e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -910,6 +910,10 @@ public class HiveConf extends Configuration {
         "The threshold for the input file size of the small tables; if the file size is smaller \n" +
         "than this threshold, it will try to convert the common join into map join"),
 
+
+    HIVE_SCHEMA_EVOLUTION("hive.exec.schema.evolution", false,
+        "Use schema evolution to convert self-describing file format's data to the schema desired by the reader."),
+
     HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0,
         "A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
index 5a95467..bc56d77 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -35,6 +36,10 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hive.hcatalog.common.HCatConstants;
 import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -48,6 +53,8 @@ import java.util.Map;
  */
 public class FosterStorageHandler extends DefaultStorageHandler {
 
+  private static final Logger LOG = LoggerFactory.getLogger(FosterStorageHandler.class);
+
   public Configuration conf;
   /** The directory under which data is initially written for a non partitioned table */
   protected static final String TEMP_DIR_NAME = "_TEMP";
@@ -98,6 +105,36 @@ public class FosterStorageHandler extends DefaultStorageHandler {
   @Override
   public void configureInputJobProperties(TableDesc tableDesc,
                       Map<String, String> jobProperties) {
+
+    try {
+      Map<String, String> tableProperties = tableDesc.getJobProperties();
+
+      String jobInfoProperty = tableProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+      if (jobInfoProperty != null) {
+
+        InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobInfoProperty);
+
+        HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+        HCatSchema dataColumns = tableInfo.getDataColumns();
+        List<HCatFieldSchema> dataFields = dataColumns.getFields();
+        StringBuilder columnNamesSb = new StringBuilder();
+        StringBuilder typeNamesSb = new StringBuilder();
+        for (HCatFieldSchema dataField : dataFields) {
+        if (columnNamesSb.length() > 0) {
+            columnNamesSb.append(",");
+            typeNamesSb.append(":");
+          }
+          columnNamesSb.append(dataField.getName());
+          typeNamesSb.append(dataField.getTypeString());
+        }
+        jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString());
+        jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString());
+
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to set output path", e);
+    }
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
index 1f23f3f..7ec6ae3 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
@@ -182,9 +182,11 @@ public class InputJobInfo implements Serializable {
     ObjectInputStream partInfoReader =
       new ObjectInputStream(new InflaterInputStream(ois));
     partitions = (List<PartInfo>)partInfoReader.readObject();
-    for (PartInfo partInfo : partitions) {
-      if (partInfo.getTableInfo() == null) {
-        partInfo.setTableInfo(this.tableInfo);
+    if (partitions != null) {
+      for (PartInfo partInfo : partitions) {
+        if (partInfo.getTableInfo() == null) {
+          partInfo.setTableInfo(this.tableInfo);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 3458b65..ff2598f 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -464,6 +464,8 @@ public class TestStreaming {
     JobConf job = new JobConf();
     job.set("mapred.input.dir", partitionPath.toString());
     job.set("bucket_count", Integer.toString(buckets));
+    job.set("columns", "id,msg");
+    job.set("columns.types", "bigint:string");
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     InputSplit[] splits = inf.getSplits(job, buckets);
     Assert.assertEquals(buckets, splits.length);

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index e2910dd..dabe434 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -128,7 +128,194 @@ public class TestCompactor {
       driver.close();
     }
   }
-  
+
+  /**
+   * Simple schema evolution add columns with partitioning.
+   * @throws Exception
+   */
+  @Test
+  public void schemaEvolutionAddColDynamicPartitioningInsert() throws Exception {
+    String tblName = "dpct";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " PARTITIONED BY(ds string)" +
+      " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+      " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+
+    // First INSERT round.
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " +
+        "'today'), (2, 'wilma', 'yesterday')", driver);
+
+    // ALTER TABLE ... ADD COLUMNS
+    executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver);
+
+    // Validate there is an added NULL for column c.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+
+    // Second INSERT round with new inserts into previously existing partition 'yesterday'.
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) values " +
+        "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " +
+        "(5, 'doc', 1902, 'yesterday')",
+        driver);
+
+    // Validate there the new insertions for column c.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+    Initiator initiator = new Initiator();
+    initiator.setThreadId((int)initiator.getId());
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0);
+    initiator.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean();
+    stop.set(true);
+    initiator.init(stop, new AtomicBoolean());
+    initiator.run();
+
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(4, compacts.size());
+    SortedSet<String> partNames = new TreeSet<String>();
+    for (int i = 0; i < compacts.size(); i++) {
+      Assert.assertEquals("default", compacts.get(i).getDbname());
+      Assert.assertEquals(tblName, compacts.get(i).getTablename());
+      Assert.assertEquals("initiated", compacts.get(i).getState());
+      partNames.add(compacts.get(i).getPartitionname());
+    }
+    List<String> names = new ArrayList<String>(partNames);
+    Assert.assertEquals("ds=last_century", names.get(0));
+    Assert.assertEquals("ds=soon", names.get(1));
+    Assert.assertEquals("ds=today", names.get(2));
+    Assert.assertEquals("ds=yesterday", names.get(3));
+
+    // Validate after compaction.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+  }
+
+  @Test
+  public void schemaEvolutionAddColDynamicPartitioningUpdate() throws Exception {
+    String tblName = "udpct";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " PARTITIONED BY(ds string)" +
+      " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+      " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " +
+        "'today'), (2, 'wilma', 'yesterday')", driver);
+
+    executeStatementOnDriver("update " + tblName + " set b = 'barney'", driver);
+
+    // Validate the update.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\tyesterday", valuesReadFromHiveDriver.get(1));
+
+    // ALTER TABLE ... ADD COLUMNS
+    executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver);
+
+    // Validate there is an added NULL for column c.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(2, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+
+    // Second INSERT round with new inserts into previously existing partition 'yesterday'.
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) values " +
+        "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " +
+        "(5, 'doc', 1902, 'yesterday')",
+        driver);
+
+    // Validate there the new insertions for column c.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4));
+
+    executeStatementOnDriver("update " + tblName + " set c = 2000", driver);
+
+    // Validate the update of new column c, even in old rows.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4));
+
+    Initiator initiator = new Initiator();
+    initiator.setThreadId((int)initiator.getId());
+    // Set to 1 so insert doesn't set it off but update does
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 1);
+    initiator.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean();
+    stop.set(true);
+    initiator.init(stop, new AtomicBoolean());
+    initiator.run();
+
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(4, compacts.size());
+    SortedSet<String> partNames = new TreeSet<String>();
+    for (int i = 0; i < compacts.size(); i++) {
+      Assert.assertEquals("default", compacts.get(i).getDbname());
+      Assert.assertEquals(tblName, compacts.get(i).getTablename());
+      Assert.assertEquals("initiated", compacts.get(i).getState());
+      partNames.add(compacts.get(i).getPartitionname());
+    }
+    List<String> names = new ArrayList<String>(partNames);
+    Assert.assertEquals("ds=last_century", names.get(0));
+    Assert.assertEquals("ds=soon", names.get(1));
+    Assert.assertEquals("ds=today", names.get(2));
+    Assert.assertEquals("ds=yesterday", names.get(3));
+
+    // Validate after compaction.
+    executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver);
+    valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(5, valuesReadFromHiveDriver.size());
+    Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0));
+    Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1));
+    Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2));
+    Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3));
+    Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4));
+  }
+
   /**
    * After each major compaction, stats need to be updated on each column of the
    * table/partition which previously had stats.
@@ -255,7 +442,9 @@ public class TestCompactor {
     t.run();
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
+    if (1 != compacts.size()) {
+      Assert.fail("Expecting 1 file and found " + compacts.size() + " files " + compacts.toString());
+    }
     Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
 
     stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName,
@@ -409,6 +598,8 @@ public class TestCompactor {
     String dbName = "default";
     String tblName = "cws";
     List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -452,9 +643,12 @@ public class TestCompactor {
         }
       }
       Arrays.sort(names);
-      Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
-          "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"});
-      checkExpectedTxnsPresent(null, new Path[]{resultFile}, 0, 1L, 4L);
+      String[] expected = new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+      if (!Arrays.deepEquals(expected, names)) {
+        Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+      }
+      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 1L, 4L);
 
     } finally {
       connection.close();
@@ -466,6 +660,8 @@ public class TestCompactor {
     String dbName = "default";
     String tblName = "cws";
     List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -500,10 +696,12 @@ public class TestCompactor {
       FileSystem fs = FileSystem.get(conf);
       FileStatus[] stat =
           fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
-      Assert.assertEquals(1, stat.length);
+      if (1 != stat.length) {
+        Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+      }
       String name = stat[0].getPath().getName();
       Assert.assertEquals(name, "base_0000004");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
     } finally {
       connection.close();
     }
@@ -514,6 +712,8 @@ public class TestCompactor {
     String dbName = "default";
     String tblName = "cws";
     List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -561,9 +761,12 @@ public class TestCompactor {
         }
       }
       Arrays.sort(names);
-      Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
-          "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"});
-      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, 0, 1L, 4L);
+      String[] expected = new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"};
+      if (!Arrays.deepEquals(expected, names)) {
+        Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+      }
+      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
     } finally {
       connection.close();
     }
@@ -574,6 +777,8 @@ public class TestCompactor {
     String dbName = "default";
     String tblName = "cws";
     List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
     executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
         " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
@@ -613,10 +818,17 @@ public class TestCompactor {
       FileSystem fs = FileSystem.get(conf);
       FileStatus[] stat =
           fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
-      Assert.assertEquals(1, stat.length);
+      if (1 != stat.length) {
+        Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat));
+      }
+      if (1 != stat.length) {
+        Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
+      }
       String name = stat[0].getPath().getName();
-      Assert.assertEquals(name, "base_0000006");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+      if (!name.equals("base_0000006")) {
+        Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
+      }
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
     } finally {
       connection.close();
     }
@@ -642,7 +854,8 @@ public class TestCompactor {
     }
   }
 
-  private void checkExpectedTxnsPresent(Path base, Path[] deltas, int bucket, long min, long max)
+  private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty,
+      String columnTypesProperty, int bucket, long min, long max)
       throws IOException {
     ValidTxnList txnList = new ValidTxnList() {
       @Override
@@ -678,8 +891,11 @@ public class TestCompactor {
 
     OrcInputFormat aif = new OrcInputFormat();
 
+    Configuration conf = new Configuration();
+    conf.set("columns", columnNamesProperty);
+    conf.set("columns.types", columnTypesProperty);
     AcidInputFormat.RawReader<OrcStruct> reader =
-        aif.getRawReader(new Configuration(), false, bucket, txnList, base, deltas);
+        aif.getRawReader(conf, false, bucket, txnList, base, deltas);
     RecordIdentifier identifier = reader.createKey();
     OrcStruct value = reader.createValue();
     long currentTxn = min;

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 1c8a80d..290cff2 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -156,6 +156,22 @@ minitez.query.files.shared=acid_globallimit.q,\
   ptf_matchpath.q,\
   ptf_streaming.q,\
   sample1.q,\
+  schema_evol_text_nonvec_mapwork_table.q,\
+  schema_evol_text_nonvec_fetchwork_table.q,\
+  schema_evol_orc_nonvec_fetchwork_part.q,\
+  schema_evol_orc_nonvec_mapwork_part.q,\
+  schema_evol_text_nonvec_fetchwork_part.q,\
+  schema_evol_text_nonvec_mapwork_part.q,\
+  schema_evol_orc_acid_mapwork_part.q,\
+  schema_evol_orc_acid_mapwork_table.q,\
+  schema_evol_orc_acidvec_mapwork_table.q,\
+  schema_evol_orc_acidvec_mapwork_part.q,\
+  schema_evol_orc_vec_mapwork_part.q,\
+  schema_evol_text_fetchwork_table.q,\
+  schema_evol_text_mapwork_table.q,\
+  schema_evol_orc_vec_mapwork_table.q,\
+  schema_evol_orc_nonvec_mapwork_table.q,\
+  schema_evol_orc_nonvec_fetchwork_table.q,\
   selectDistinctStar.q,\
   script_env_var1.q,\
   script_env_var2.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 34461ed..7acca77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -435,6 +435,12 @@ public enum ErrorMsg {
   CTAS_CREATES_VOID_TYPE(10305, "CREATE-TABLE-AS-SELECT creates a VOID type, please use CAST to specify the type, near field: "),
   //{2} should be lockid
   LOCK_ACQUIRE_TIMEDOUT(10307, "Lock acquisition for {0} timed out after {1}ms.  {2}", true),
+  CANNOT_CHANGE_SERDE(10309, "Changing SerDe (from {0}) is not supported for table {1}. File format may be incompatible", true),
+  CANNOT_CHANGE_FILEFORMAT(10310, "Changing file format (from {0}) is not supported for table {1}", true),
+  CANNOT_REORDER_COLUMNS(10311, "Reordering columns is not supported for table {0}. SerDe may be incompatible", true),
+  CANNOT_CHANGE_COLUMN_TYPE(10312, "Changing from type {0} to {1} is not supported for column {2}. SerDe may be incompatible", true),
+  REPLACE_CANNOT_DROP_COLUMNS(10313, "Replacing columns cannot drop columns for table {0}. SerDe may be incompatible", true),
+  REPLACE_UNSUPPORTED_TYPE_CONVERSION(10314, "Replacing columns with unsupported type conversion (from {0} to {1}) for column {2}. SerDe may be incompatible", true),
   //========================== 20000 range starts here ========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -497,8 +503,13 @@ public enum ErrorMsg {
       + "data, set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true"),
 
   INVALID_FILE_FORMAT_IN_LOAD(30019, "The file that you are trying to load does not match the" +
-      " file format of the destination table.")
+      " file format of the destination table."),
 
+  SCHEMA_REQUIRED_TO_READ_ACID_TABLES(30020, "Neither the configuration variables " +
+          "schema.evolution.columns / schema.evolution.columns.types " +
+          "nor the " +
+          "columns / columns.types " +
+          "are set.  Table schema information is required to read ACID tables")
   ;
 
   private int errorCode;

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 2a64da3..56a0a4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -48,6 +48,7 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
 import org.apache.commons.lang.StringEscapeUtils;
@@ -97,9 +98,12 @@ import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
@@ -3291,6 +3295,14 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     return 0;
   }
 
+  private boolean isSchemaEvolutionEnabled(Table tbl) {
+    boolean isAcid = AcidUtils.isTablePropertyTransactional(tbl.getMetadata());
+    if (isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) {
+      return true;
+    }
+    return false;
+  }
+
   private int alterTableOrSinglePartition(AlterTableDesc alterTbl, Table tbl, Partition part)
       throws HiveException {
 
@@ -3336,6 +3348,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       String comment = alterTbl.getNewColComment();
       boolean first = alterTbl.getFirst();
       String afterCol = alterTbl.getAfterCol();
+      // if orc table, restrict reordering columns as it will break schema evolution
+      boolean isOrcSchemaEvolution =
+          sd.getInputFormat().equals(OrcInputFormat.class.getName()) &&
+          isSchemaEvolutionEnabled(tbl);
+      if (isOrcSchemaEvolution && (first || (afterCol != null && !afterCol.trim().isEmpty()))) {
+        throw new HiveException(ErrorMsg.CANNOT_REORDER_COLUMNS, alterTbl.getOldName());
+      }
       FieldSchema column = null;
 
       boolean found = false;
@@ -3352,6 +3371,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
             && !oldColName.equalsIgnoreCase(oldName)) {
           throw new HiveException(ErrorMsg.DUPLICATE_COLUMN_NAMES, newName);
         } else if (oldColName.equalsIgnoreCase(oldName)) {
+          // if orc table, restrict changing column types. Only integer type promotion is supported.
+          // smallint -> int -> bigint
+          if (isOrcSchemaEvolution && !isSupportedTypeChange(col.getType(), type)) {
+            throw new HiveException(ErrorMsg.CANNOT_CHANGE_COLUMN_TYPE, col.getType(), type,
+                newName);
+          }
           col.setName(newName);
           if (type != null && !type.trim().equals("")) {
             col.setType(type);
@@ -3403,9 +3428,31 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
           && !serializationLib.equals(LazySimpleSerDe.class.getName())
           && !serializationLib.equals(ColumnarSerDe.class.getName())
           && !serializationLib.equals(DynamicSerDe.class.getName())
-          && !serializationLib.equals(ParquetHiveSerDe.class.getName())) {
+          && !serializationLib.equals(ParquetHiveSerDe.class.getName())
+          && !serializationLib.equals(OrcSerde.class.getName())) {
         throw new HiveException(ErrorMsg.CANNOT_REPLACE_COLUMNS, alterTbl.getOldName());
       }
+      final boolean isOrcSchemaEvolution =
+          serializationLib.equals(OrcSerde.class.getName()) &&
+          isSchemaEvolutionEnabled(tbl);
+      // adding columns and limited integer type promotion is supported for ORC schema evolution
+      if (isOrcSchemaEvolution) {
+        final List<FieldSchema> existingCols = sd.getCols();
+        final List<FieldSchema> replaceCols = alterTbl.getNewCols();
+
+        if (replaceCols.size() < existingCols.size()) {
+          throw new HiveException(ErrorMsg.REPLACE_CANNOT_DROP_COLUMNS, alterTbl.getOldName());
+        }
+
+        for (int i = 0; i < existingCols.size(); i++) {
+          final String currentColType = existingCols.get(i).getType().toLowerCase().trim();
+          final String newColType = replaceCols.get(i).getType().toLowerCase().trim();
+          if (!isSupportedTypeChange(currentColType, newColType)) {
+            throw new HiveException(ErrorMsg.REPLACE_UNSUPPORTED_TYPE_CONVERSION, currentColType,
+                newColType, replaceCols.get(i).getName());
+          }
+        }
+      }
       sd.setCols(alterTbl.getNewCols());
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS) {
       tbl.getTTable().getParameters().putAll(alterTbl.getProps());
@@ -3420,6 +3467,14 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSERDE) {
       StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
       String serdeName = alterTbl.getSerdeName();
+      String oldSerdeName = sd.getSerdeInfo().getSerializationLib();
+      // if orc table, restrict changing the serde as it can break schema evolution
+      if (isSchemaEvolutionEnabled(tbl) &&
+          oldSerdeName.equalsIgnoreCase(OrcSerde.class.getName()) &&
+          !serdeName.equalsIgnoreCase(OrcSerde.class.getName())) {
+        throw new HiveException(ErrorMsg.CANNOT_CHANGE_SERDE, OrcSerde.class.getSimpleName(),
+            alterTbl.getOldName());
+      }
       sd.getSerdeInfo().setSerializationLib(serdeName);
       if ((alterTbl.getProps() != null) && (alterTbl.getProps().size() > 0)) {
         sd.getSerdeInfo().getParameters().putAll(alterTbl.getProps());
@@ -3434,6 +3489,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       }
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDFILEFORMAT) {
       StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
+      // if orc table, restrict changing the file format as it can break schema evolution
+      if (isSchemaEvolutionEnabled(tbl) &&
+          sd.getInputFormat().equals(OrcInputFormat.class.getName())
+          && !alterTbl.getInputFormat().equals(OrcInputFormat.class.getName())) {
+        throw new HiveException(ErrorMsg.CANNOT_CHANGE_FILEFORMAT, "ORC", alterTbl.getOldName());
+      }
       sd.setInputFormat(alterTbl.getInputFormat());
       sd.setOutputFormat(alterTbl.getOutputFormat());
       if (alterTbl.getSerdeName() != null) {
@@ -3554,6 +3615,45 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
 
     return 0;
   }
+
+  // don't change the order of enums as ordinal values are used to check for valid type promotions
+  enum PromotableTypes {
+    SMALLINT,
+    INT,
+    BIGINT;
+
+    static List<String> types() {
+      return ImmutableList.of(SMALLINT.toString().toLowerCase(),
+          INT.toString().toLowerCase(), BIGINT.toString().toLowerCase());
+    }
+  }
+
+  // for ORC, only supported type promotions are smallint -> int -> bigint. No other
+  // type promotions are supported at this point
+  private boolean isSupportedTypeChange(String currentType, String newType) {
+    if (currentType != null && newType != null) {
+      currentType = currentType.toLowerCase().trim();
+      newType = newType.toLowerCase().trim();
+      // no type change
+      if (currentType.equals(newType)) {
+        return true;
+      }
+      if (PromotableTypes.types().contains(currentType)
+          && PromotableTypes.types().contains(newType)) {
+        PromotableTypes pCurrentType = PromotableTypes.valueOf(currentType.toUpperCase());
+        PromotableTypes pNewType = PromotableTypes.valueOf(newType.toUpperCase());
+        if (pNewType.ordinal() >= pCurrentType.ordinal()) {
+          return true;
+        } else {
+          return false;
+        }
+      } else {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Drop a given table or some partitions. DropTableDesc is currently used for both.
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 258d28e..b6e5739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -133,6 +133,10 @@ public class FetchOperator implements Serializable {
     this.job = job;
     this.work = work;
     this.operator = operator;
+    if (operator instanceof TableScanOperator) {
+      Utilities.addTableSchemaToConf(job,
+          (TableScanOperator) operator);
+    }
     this.vcCols = vcCols;
     this.hasVC = vcCols != null && !vcCols.isEmpty();
     this.isStatReader = work.getTblDesc() == null;
@@ -598,6 +602,10 @@ public class FetchOperator implements Serializable {
   }
 
   private boolean needConversion(PartitionDesc partitionDesc) {
+    boolean isAcid = AcidUtils.isTablePropertyTransactional(partitionDesc.getTableDesc().getProperties());
+    if (Utilities.isSchemaEvolutionEnabled(job, isAcid) && Utilities.isInputFileFormatSelfDescribing(partitionDesc)) {
+      return false;
+    }
     return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc));
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index f8717ae..afc03ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -38,8 +38,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.MapOperator.MapOpCtx;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 
@@ -200,8 +203,14 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
     opCtx.partName = String.valueOf(partSpec);
     opCtx.deserializer = pd.getDeserializer(hconf);
 
-    StructObjectInspector partRawRowObjectInspector =
-        (StructObjectInspector) opCtx.deserializer.getObjectInspector();
+    StructObjectInspector partRawRowObjectInspector;
+    boolean isAcid = AcidUtils.isTablePropertyTransactional(td.getProperties());
+    if (Utilities.isSchemaEvolutionEnabled(hconf, isAcid) && Utilities.isInputFileFormatSelfDescribing(pd)) {
+      partRawRowObjectInspector = tableRowOI;
+    } else {
+      partRawRowObjectInspector =
+          (StructObjectInspector) opCtx.deserializer.getObjectInspector();
+    }
 
     opCtx.partTblObjectInspectorConverter =
         ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI);
@@ -302,8 +311,16 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon
         PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
         TableDesc tableDesc = pd.getTableDesc();
         Deserializer partDeserializer = pd.getDeserializer(hconf);
-        StructObjectInspector partRawRowObjectInspector =
-            (StructObjectInspector) partDeserializer.getObjectInspector();
+
+        StructObjectInspector partRawRowObjectInspector;
+        boolean isAcid = AcidUtils.isTablePropertyTransactional(tableDesc.getProperties());
+        if (Utilities.isSchemaEvolutionEnabled(hconf, isAcid) && Utilities.isInputFileFormatSelfDescribing(pd)) {
+          Deserializer tblDeserializer = tableDesc.getDeserializer(hconf);
+          partRawRowObjectInspector = (StructObjectInspector) tblDeserializer.getObjectInspector();
+        } else {
+          partRawRowObjectInspector =
+              (StructObjectInspector) partDeserializer.getObjectInspector();
+        }
 
         StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc);
         if ((tblRawRowObjectInspector == null) ||

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index cbf02e9..d98ea84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -70,6 +70,13 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
 
   private String defaultPartitionName;
 
+  /**
+   * These values are saved during MapWork, FetchWork, etc preparation and later added to the the
+   * JobConf of each task.
+   */
+  private String schemaEvolutionColumns;
+  private String schemaEvolutionColumnsTypes;
+
   public TableDesc getTableDesc() {
     return tableDesc;
   }
@@ -78,6 +85,19 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
     this.tableDesc = tableDesc;
   }
 
+  public void setSchemaEvolution(String schemaEvolutionColumns, String schemaEvolutionColumnsTypes) {
+    this.schemaEvolutionColumns = schemaEvolutionColumns;
+    this.schemaEvolutionColumnsTypes = schemaEvolutionColumnsTypes;
+  }
+
+  public String getSchemaEvolutionColumns() {
+    return schemaEvolutionColumns;
+  }
+
+  public String getSchemaEvolutionColumnsTypes() {
+    return schemaEvolutionColumnsTypes;
+  }
+
   /**
    * Other than gathering statistics for the ANALYZE command, the table scan operator
    * does not do anything special other than just forwarding the row. Since the table

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 3352b49..1d8e3b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -107,6 +107,7 @@ import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -122,15 +123,19 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
@@ -175,6 +180,12 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
@@ -478,11 +489,6 @@ public final class Utilities {
     }
   }
 
-  public static Map<Integer, String> getMapWorkVectorScratchColumnTypeMap(Configuration hiveConf) {
-    MapWork mapWork = getMapWork(hiveConf);
-    return mapWork.getVectorScratchColumnTypeMap();
-  }
-
   public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
     try {
       Graph stageGraph = plan.getQueryPlan().getStageGraph();
@@ -3901,6 +3907,27 @@ public final class Utilities {
     return false;
   }
 
+  /**
+   * @param conf
+   * @return the configured VectorizedRowBatchCtx for a MapWork task.
+   */
+  public static VectorizedRowBatchCtx getVectorizedRowBatchCtx(Configuration conf) {
+    VectorizedRowBatchCtx result = null;
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) &&
+        Utilities.getPlanPath(conf) != null) {
+      MapWork mapWork = Utilities.getMapWork(conf);
+      if (mapWork != null && mapWork.getVectorMode()) {
+        result = mapWork.getVectorizedRowBatchCtx();
+      }
+    }
+    return result;
+  }
+
+  public static boolean isVectorMode(Configuration conf, MapWork mapWork) {
+    return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
+        && mapWork.getVectorMode();
+  }
+
   public static void clearWorkMapForConf(Configuration conf) {
     // Remove cached query plans for the current query only
     Path mapPath = getPlanPath(conf, MAP_PLAN_NAME);
@@ -4057,4 +4084,77 @@ public final class Utilities {
       (loggingLevel.equalsIgnoreCase("PERFORMANCE") || loggingLevel.equalsIgnoreCase("VERBOSE"));
   }
 
+  public static boolean isSchemaEvolutionEnabled(Configuration conf, boolean isAcid) {
+    return isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
+  }
+
+  public static boolean isInputFileFormatSelfDescribing(PartitionDesc pd) {
+    Class<?> inputFormatClass = pd.getInputFileFormatClass();
+    return SelfDescribingInputFormatInterface.class.isAssignableFrom(inputFormatClass);
+  }
+
+  public static boolean isInputFileFormatVectorized(PartitionDesc pd) {
+    Class<?> inputFormatClass = pd.getInputFileFormatClass();
+    return VectorizedInputFormatInterface.class.isAssignableFrom(inputFormatClass);
+  }
+
+  public static void addSchemaEvolutionToTableScanOperator(Table table,
+      TableScanOperator tableScanOp) {
+    String colNames = MetaStoreUtils.getColumnNamesFromFieldSchema(table.getSd().getCols());
+    String colTypes = MetaStoreUtils.getColumnTypesFromFieldSchema(table.getSd().getCols());
+    tableScanOp.setSchemaEvolution(colNames, colTypes);
+  }
+
+  public static void addSchemaEvolutionToTableScanOperator(StructObjectInspector structOI,
+      TableScanOperator tableScanOp) {
+    String colNames = ObjectInspectorUtils.getFieldNames(structOI);
+    String colTypes = ObjectInspectorUtils.getFieldTypes(structOI);
+    tableScanOp.setSchemaEvolution(colNames, colTypes);
+  }
+
+  public static void unsetSchemaEvolution(Configuration conf) {
+    conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+    conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+  }
+
+  public static void addTableSchemaToConf(Configuration conf,
+      TableScanOperator tableScanOp) {
+    String schemaEvolutionColumns = tableScanOp.getSchemaEvolutionColumns();
+    if (schemaEvolutionColumns != null) {
+      conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, tableScanOp.getSchemaEvolutionColumns());
+      conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, tableScanOp.getSchemaEvolutionColumnsTypes());
+    } else {
+      LOG.info("schema.evolution.columns and schema.evolution.columns.types not available");
+    }
+  }
+
+  /**
+   * Create row key and value object inspectors for reduce vectorization.
+   * The row object inspector used by ReduceWork needs to be a **standard**
+   * struct object inspector, not just any struct object inspector.
+   * @param keyInspector
+   * @param valueInspector
+   * @return OI
+   * @throws HiveException
+   */
+  public static StandardStructObjectInspector constructVectorizedReduceRowOI(
+      StructObjectInspector keyInspector, StructObjectInspector valueInspector)
+          throws HiveException {
+
+    ArrayList<String> colNames = new ArrayList<String>();
+    ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+    List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
+    for (StructField field: fields) {
+      colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+      ois.add(field.getFieldObjectInspector());
+    }
+    fields = valueInspector.getAllStructFieldRefs();
+    for (StructField field: fields) {
+      colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+      ois.add(field.getFieldObjectInspector());
+    }
+    StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
+
+    return rowObjectInspector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index ac5e3ca..f9e10c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -153,10 +154,6 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
           /* vectorization only works with struct object inspectors */
           valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag];
 
-          ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = VectorizedBatchUtil.
-              constructVectorizedRowBatch(keyStructInspector,
-              valueStructInspectors[tag], gWork.getVectorScratchColumnTypeMap());
-          batches[tag] = pair.getFirst();
           final int totalColumns = keysColumnOffset
               + valueStructInspectors[tag].getAllStructFieldRefs().size();
           valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns);
@@ -165,7 +162,11 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
           valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory
               .genVectorStructExpressionWritables(valueStructInspectors[tag])));
 
-          rowObjectInspector[tag] = pair.getSecond();
+          rowObjectInspector[tag] = Utilities.constructVectorizedReduceRowOI(keyStructInspector,
+              valueStructInspectors[tag]);
+          batches[tag] = gWork.getVectorizedRowBatchCtx().createVectorizedRowBatch();
+
+
         } else {
           ois.add(keyObjectInspector);
           ois.add(valueObjectInspector[tag]);

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index d649672..5edd587 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -237,7 +237,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode();
     sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc,
         valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
-        redWork.getVectorScratchColumnTypeMap());
+        redWork.getVectorizedRowBatchCtx());
     ois[tag] = sources[tag].getObjectInspector();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index b634877..b1d2f52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -51,7 +52,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
@@ -123,7 +123,7 @@ public class ReduceRecordSource implements RecordSource {
 
   void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
       TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag,
-      Map<Integer, String> vectorScratchColumnTypeMap)
+      VectorizedRowBatchCtx batchContext)
       throws Exception {
 
     ObjectInspector keyObjectInspector;
@@ -174,10 +174,9 @@ public class ReduceRecordSource implements RecordSource {
             .asList(VectorExpressionWriterFactory
                 .genVectorStructExpressionWritables(valueStructInspectors)));
 
-        ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair =
-            VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, valueStructInspectors, vectorScratchColumnTypeMap);
-        rowObjectInspector = pair.getSecond();
-        batch = pair.getFirst();
+        rowObjectInspector = Utilities.constructVectorizedReduceRowOI(keyStructInspector,
+            valueStructInspectors);
+        batch = batchContext.createVectorizedRowBatch();
 
         // Setup vectorized deserialization for the key and value.
         BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) inputKeyDeserializer;
@@ -185,7 +184,7 @@ public class ReduceRecordSource implements RecordSource {
         keyBinarySortableDeserializeToRow =
                   new VectorDeserializeRow(
                         new BinarySortableDeserializeRead(
-                                  VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+                                  VectorizedBatchUtil.typeInfosFromStructObjectInspector(
                                       keyStructInspector),
                                   binarySortableSerDe.getSortOrders()));
         keyBinarySortableDeserializeToRow.init(0);
@@ -195,7 +194,7 @@ public class ReduceRecordSource implements RecordSource {
           valueLazyBinaryDeserializeToRow =
                   new VectorDeserializeRow(
                         new LazyBinaryDeserializeRead(
-                                  VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+                            VectorizedBatchUtil.typeInfosFromStructObjectInspector(
                                        valueStructInspectors)));
           valueLazyBinaryDeserializeToRow.init(firstValueColumnOffset);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 8ec7ead..99744cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -18,11 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.util.Arrays;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 
 /**
  * This class supports string and binary data by value reference -- i.e. each field is
@@ -51,9 +46,6 @@ public class BytesColumnVector extends ColumnVector {
   private byte[] buffer;   // optional buffer to use when actually copying in data
   private int nextFree;    // next free position in buffer
 
-  // Reusable text object
-  private final Text textObject = new Text();
-
   // Estimate that there will be 16 bytes per entry
   static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
 
@@ -165,6 +157,19 @@ public class BytesColumnVector extends ColumnVector {
   }
 
   /**
+   * Set a field by actually copying in to a local buffer.
+   * If you must actually copy data in to the array, use this method.
+   * DO NOT USE this method unless it's not practical to set data by reference with setRef().
+   * Setting data by reference tends to run a lot faster than copying data in.
+   *
+   * @param elementNum index within column vector to set
+   * @param sourceBuf container of source data
+   */
+  public void setVal(int elementNum, byte[] sourceBuf) {
+    setVal(elementNum, sourceBuf, 0, sourceBuf.length);
+  }
+
+  /**
    * Set a field to the concatenation of two string values. Result data is copied
    * into the internal buffer.
    *
@@ -215,22 +220,6 @@ public class BytesColumnVector extends ColumnVector {
     buffer = newBuffer;
   }
 
-  @Override
-  public Writable getWritableObject(int index) {
-    if (this.isRepeating) {
-      index = 0;
-    }
-    Writable result = null;
-    if (!isNull[index] && vector[index] != null) {
-      textObject.clear();
-      textObject.append(vector[index], start[index], length[index]);
-      result = textObject;
-    } else {
-      result = NullWritable.get();
-    }
-    return result;
-  }
-
   /** Copy the current object contents into the output. Only copy selected entries,
     * as indicated by selectedInUse and the sel array.
     */
@@ -294,7 +283,7 @@ public class BytesColumnVector extends ColumnVector {
 
       // Only copy data values if entry is not null. The string value
       // at position 0 is undefined if the position 0 value is null.
-      if (noNulls || (!noNulls && !isNull[0])) {
+      if (noNulls || !isNull[0]) {
 
         // loops start at position 1 because position 0 is already set
         if (selectedInUse) {
@@ -320,14 +309,70 @@ public class BytesColumnVector extends ColumnVector {
     setRef(0, value, 0, value.length);
   }
 
+  // Fill the column vector with nulls
+  public void fillWithNulls() {
+    noNulls = false;
+    isRepeating = true;
+    vector[0] = null;
+    isNull[0] = true;
+  }
+
   @Override
   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
-    BytesColumnVector in = (BytesColumnVector) inputVector;
-    setVal(outElementNum, in.vector[inputElementNum], in.start[inputElementNum], in.length[inputElementNum]);
+    if (inputVector.isRepeating) {
+      inputElementNum = 0;
+    }
+    if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) {
+      isNull[outElementNum] = false;
+      BytesColumnVector in = (BytesColumnVector) inputVector;
+      setVal(outElementNum, in.vector[inputElementNum],
+          in.start[inputElementNum], in.length[inputElementNum]);
+    } else {
+      isNull[outElementNum] = true;
+      noNulls = false;
+    }
   }
 
   @Override
   public void init() {
     initBuffer(0);
   }
+
+  @Override
+  public void stringifyValue(StringBuilder buffer, int row) {
+    if (isRepeating) {
+      row = 0;
+    }
+    if (noNulls || !isNull[row]) {
+      buffer.append('"');
+      buffer.append(new String(this.buffer, start[row], length[row]));
+      buffer.append('"');
+    } else {
+      buffer.append("null");
+    }
+  }
+
+  @Override
+  public void ensureSize(int size, boolean preserveData) {
+    if (size > vector.length) {
+      super.ensureSize(size, preserveData);
+      int[] oldStart = start;
+      start = new int[size];
+      int[] oldLength = length;
+      length = new int[size];
+      byte[][] oldVector = vector;
+      vector = new byte[size][];
+      if (preserveData) {
+        if (isRepeating) {
+          vector[0] = oldVector[0];
+          start[0] = oldStart[0];
+          length[0] = oldLength[0];
+        } else {
+          System.arraycopy(oldVector, 0, vector, 0, oldVector.length);
+          System.arraycopy(oldStart, 0, start, 0 , oldStart.length);
+          System.arraycopy(oldLength, 0, length, 0, oldLength.length);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
index 6654166..fcb1ae9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
@@ -18,10 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.hadoop.io.Writable;
-
 /**
  * ColumnVector contains the shared structure for the sub-types,
  * including NULL information, and whether this vector
@@ -38,10 +37,15 @@ public abstract class ColumnVector {
    * The current kinds of column vectors.
    */
   public static enum Type {
+    NONE,    // Useful when the type of column vector has not be determined yet.
     LONG,
     DOUBLE,
     BYTES,
-    DECIMAL
+    DECIMAL,
+    STRUCT,
+    LIST,
+    MAP,
+    UNION
   }
 
   /*
@@ -64,8 +68,6 @@ public abstract class ColumnVector {
   private boolean preFlattenIsRepeating;
   private boolean preFlattenNoNulls;
 
-  public abstract Writable getWritableObject(int index);
-
   /**
    * Constructor for super-class ColumnVector. This is not called directly,
    * but used to initialize inherited fields.
@@ -76,28 +78,42 @@ public abstract class ColumnVector {
     isNull = new boolean[len];
     noNulls = true;
     isRepeating = false;
+    preFlattenNoNulls = true;
+    preFlattenIsRepeating = false;
   }
 
   /**
-     * Resets the column to default state
-     *  - fills the isNull array with false
-     *  - sets noNulls to true
-     *  - sets isRepeating to false
-     */
-    public void reset() {
-      if (false == noNulls) {
-        Arrays.fill(isNull, false);
-      }
-      noNulls = true;
-      isRepeating = false;
+   * Resets the column to default state
+   *  - fills the isNull array with false
+   *  - sets noNulls to true
+   *  - sets isRepeating to false
+   */
+  public void reset() {
+    if (!noNulls) {
+      Arrays.fill(isNull, false);
     }
+    noNulls = true;
+    isRepeating = false;
+    preFlattenNoNulls = true;
+    preFlattenIsRepeating = false;
+  }
+
+  /**
+   * Sets the isRepeating flag. Recurses over structs and unions so that the
+   * flags are set correctly.
+   * @param isRepeating
+   */
+  public void setRepeating(boolean isRepeating) {
+    this.isRepeating = isRepeating;
+  }
 
-    abstract public void flatten(boolean selectedInUse, int[] sel, int size);
+  abstract public void flatten(boolean selectedInUse, int[] sel, int size);
 
     // Simplify vector by brute-force flattening noNulls if isRepeating
     // This can be used to reduce combinatorial explosion of code paths in VectorExpressions
     // with many arguments.
-    public void flattenRepeatingNulls(boolean selectedInUse, int[] sel, int size) {
+    protected void flattenRepeatingNulls(boolean selectedInUse, int[] sel,
+                                         int size) {
 
       boolean nullFillValue;
 
@@ -120,13 +136,13 @@ public abstract class ColumnVector {
       noNulls = false;
     }
 
-    public void flattenNoNulls(boolean selectedInUse, int[] sel, int size) {
+    protected void flattenNoNulls(boolean selectedInUse, int[] sel,
+                                  int size) {
       if (noNulls) {
         noNulls = false;
         if (selectedInUse) {
           for (int j = 0; j < size; j++) {
-            int i = sel[j];
-            isNull[i] = false;
+            isNull[sel[j]] = false;
           }
         } else {
           Arrays.fill(isNull, 0, size, false);
@@ -155,8 +171,10 @@ public abstract class ColumnVector {
 
     /**
      * Set the element in this column vector from the given input vector.
+     * This method can assume that the output does not have isRepeating set.
      */
-    public abstract void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector);
+    public abstract void setElement(int outElementNum, int inputElementNum,
+                                    ColumnVector inputVector);
 
     /**
      * Initialize the column vector. This method can be overridden by specific column vector types.
@@ -166,5 +184,33 @@ public abstract class ColumnVector {
     public void init() {
       // Do nothing by default
     }
-  }
 
+    /**
+     * Ensure the ColumnVector can hold at least size values.
+     * This method is deliberately *not* recursive because the complex types
+     * can easily have more (or less) children than the upper levels.
+     * @param size the new minimum size
+     * @param presesrveData should the old data be preserved?
+     */
+    public void ensureSize(int size, boolean presesrveData) {
+      if (isNull.length < size) {
+        boolean[] oldArray = isNull;
+        isNull = new boolean[size];
+        if (presesrveData && !noNulls) {
+          if (isRepeating) {
+            isNull[0] = oldArray[0];
+          } else {
+            System.arraycopy(oldArray, 0, isNull, 0, oldArray.length);
+          }
+        }
+      }
+    }
+
+    /**
+     * Print the value for this column into the given string builder.
+     * @param buffer the buffer to print into
+     * @param row the id of the row to print
+     */
+    public abstract void stringifyValue(StringBuilder buffer,
+                                        int row);
+  }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
index 5009a42..fe8ad85 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
@@ -17,14 +17,10 @@
  */
 
 package org.apache.hadoop.hive.ql.exec.vector;
-
 import java.math.BigInteger;
 
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
 
 public class DecimalColumnVector extends ColumnVector {
 
@@ -39,8 +35,6 @@ public class DecimalColumnVector extends ColumnVector {
   public short scale;
   public short precision;
 
-  private final HiveDecimalWritable writableObj = new HiveDecimalWritable();
-
   public DecimalColumnVector(int precision, int scale) {
     this(VectorizedRowBatch.DEFAULT_SIZE, precision, scale);
   }
@@ -49,26 +43,31 @@ public class DecimalColumnVector extends ColumnVector {
     super(size);
     this.precision = (short) precision;
     this.scale = (short) scale;
-    final int len = size;
-    vector = new HiveDecimalWritable[len];
-    for (int i = 0; i < len; i++) {
+    vector = new HiveDecimalWritable[size];
+    for (int i = 0; i < size; i++) {
       vector[i] = new HiveDecimalWritable(HiveDecimal.ZERO);
     }
   }
 
-  @Override
-  public Writable getWritableObject(int index) {
-    if (isRepeating) {
-      index = 0;
-    }
-    if (!noNulls && isNull[index]) {
-      return NullWritable.get();
+  // Fill the all the vector entries with provided value
+  public void fill(HiveDecimal value) {
+    noNulls = true;
+    isRepeating = true;
+    if (vector[0] == null) {
+      vector[0] = new HiveDecimalWritable(value);
     } else {
-      writableObj.set(vector[index]);
-      return writableObj;
+      vector[0].set(value);
     }
   }
 
+  // Fill the column vector with nulls
+  public void fillWithNulls() {
+    noNulls = false;
+    isRepeating = true;
+    vector[0] = null;
+    isNull[0] = true;
+  }
+
   @Override
   public void flatten(boolean selectedInUse, int[] sel, int size) {
     // TODO Auto-generated method stub
@@ -76,12 +75,35 @@ public class DecimalColumnVector extends ColumnVector {
 
   @Override
   public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
-    HiveDecimal hiveDec = ((DecimalColumnVector) inputVector).vector[inputElementNum].getHiveDecimal(precision, scale);
-    if (hiveDec == null) {
-      noNulls = false;
+    if (inputVector.isRepeating) {
+      inputElementNum = 0;
+    }
+    if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) {
+      HiveDecimal hiveDec =
+          ((DecimalColumnVector) inputVector).vector[inputElementNum]
+              .getHiveDecimal(precision, scale);
+      if (hiveDec == null) {
+        isNull[outElementNum] = true;
+        noNulls = false;
+      } else {
+        isNull[outElementNum] = false;
+        vector[outElementNum].set(hiveDec);
+      }
+    } else {
       isNull[outElementNum] = true;
+      noNulls = false;
+    }
+  }
+
+  @Override
+  public void stringifyValue(StringBuilder buffer, int row) {
+    if (isRepeating) {
+      row = 0;
+    }
+    if (noNulls || !isNull[row]) {
+      buffer.append(vector[row].toString());
     } else {
-      vector[outElementNum].set(hiveDec);
+      buffer.append("null");
     }
   }
 
@@ -110,4 +132,20 @@ public class DecimalColumnVector extends ColumnVector {
     HiveDecimal minimumNonZeroValue = HiveDecimal.create(BigInteger.ONE, scale);
     vector[elementNum].set(minimumNonZeroValue);
   }
+
+  @Override
+  public void ensureSize(int size, boolean preserveData) {
+    if (size > vector.length) {
+      super.ensureSize(size, preserveData);
+      HiveDecimalWritable[] oldArray = vector;
+      vector = new HiveDecimalWritable[size];
+      if (preserveData) {
+        // we copy all of the values to avoid creating more objects
+        System.arraycopy(oldArray, 0, vector, 0 , oldArray.length);
+        for(int i= oldArray.length; i < vector.length; ++i) {
+          vector[i] = new HiveDecimalWritable(HiveDecimal.ZERO);
+        }
+      }
+    }
+  }
 }