You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/18 23:41:01 UTC

[19/34] hive git commit: HIVE-11981: ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java
index eaff732..ae4b239 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.hive.serde2.fast.SerializeWrite;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -320,13 +322,13 @@ public class TestVectorSerDeRow extends TestCase {
 
   void testVectorSerializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException {
 
-    Map<Integer, String> emptyScratchMap = new HashMap<Integer, String>();
+    String[] emptyScratchTypeNames = new String[0];
 
     RandomRowObjectSource source = new RandomRowObjectSource();
     source.init(r);
 
     VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
-    batchContext.init(emptyScratchMap, source.rowStructObjectInspector());
+    batchContext.init(source.rowStructObjectInspector(), emptyScratchTypeNames);
     VectorizedRowBatch batch = batchContext.createVectorizedRowBatch();
 
     VectorAssignRowSameBatch vectorAssignRow = new VectorAssignRowSameBatch();
@@ -552,13 +554,13 @@ public class TestVectorSerDeRow extends TestCase {
 
   void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException {
 
-    Map<Integer, String> emptyScratchMap = new HashMap<Integer, String>();
+    String[] emptyScratchTypeNames = new String[0];
 
     RandomRowObjectSource source = new RandomRowObjectSource();
     source.init(r);
 
     VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
-    batchContext.init(emptyScratchMap, source.rowStructObjectInspector());
+    batchContext.init(source.rowStructObjectInspector(), emptyScratchTypeNames);
     VectorizedRowBatch batch = batchContext.createVectorizedRowBatch();
 
     int fieldCount = source.typeNames().size();

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
deleted file mode 100644
index 3321823..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.RCFile;
-import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
-import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Class that tests the functionality of VectorizedRowBatchCtx.
- */
-public class TestVectorizedRowBatchCtx {
-
-  private Configuration conf;
-  private FileSystem fs;
-  private Path testFilePath;
-  private int colCount;
-  private ColumnarSerDe serDe;
-  private Properties tbl;
-
-  @Before
-  public void openFileSystem() throws Exception {
-    conf = new Configuration();
-    fs = FileSystem.getLocal(conf);
-    Path workDir = new Path(System.getProperty("test.tmp.dir",
-        "target" + File.separator + "test" + File.separator + "tmp"));
-    fs.setWorkingDirectory(workDir);
-    testFilePath = new Path("TestVectorizedRowBatchCtx.testDump.rc");
-    fs.delete(testFilePath, false);
-  }
-
-  private void initSerde() {
-    tbl = new Properties();
-
-    // Set the configuration parameters
-    tbl.setProperty(serdeConstants.SERIALIZATION_FORMAT, "6");
-    tbl.setProperty("columns",
-        "ashort,aint,along,adouble,afloat,astring,abyte,aboolean,atimestamp");
-    tbl.setProperty("columns.types",
-        "smallint:int:bigint:double:float:string:tinyint:boolean:timestamp");
-    colCount = 9;
-    tbl.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
-
-    try {
-      serDe = new ColumnarSerDe();
-      SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
-    } catch (SerDeException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void WriteRCFile(FileSystem fs, Path file, Configuration conf)
-      throws IOException, SerDeException {
-    fs.delete(file, true);
-
-    RCFileOutputFormat.setColumnNumber(conf, colCount);
-    RCFile.Writer writer =
-        new RCFile.Writer(fs, conf, file, null, null,
-            new DefaultCodec());
-
-    for (int i = 0; i < 10; ++i) {
-      BytesRefArrayWritable bytes = new BytesRefArrayWritable(colCount);
-      BytesRefWritable cu;
-
-       if (i % 3 != 0) {
-      //if (i < 100) {
-        cu = new BytesRefWritable((i + "").getBytes("UTF-8"), 0, (i + "").getBytes("UTF-8").length);
-        bytes.set(0, cu);
-
-        cu = new BytesRefWritable((i + 100 + "").getBytes("UTF-8"), 0,
-            (i + 100 + "").getBytes("UTF-8").length);
-        bytes.set(1, cu);
-
-        cu = new BytesRefWritable((i + 200 + "").getBytes("UTF-8"), 0,
-            (i + 200 + "").getBytes("UTF-8").length);
-        bytes.set(2, cu);
-
-        cu = new BytesRefWritable((i + 1.23 + "").getBytes("UTF-8"), 0,
-            (i + 1.23 + "").getBytes("UTF-8").length);
-        bytes.set(3, cu);
-
-        cu = new BytesRefWritable((i + 2.23 + "").getBytes("UTF-8"), 0,
-            (i + 2.23 + "").getBytes("UTF-8").length);
-        bytes.set(4, cu);
-
-        cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0,
-            ("Test string").getBytes("UTF-8").length);
-        bytes.set(5, cu);
-
-        cu = new BytesRefWritable((1 + "").getBytes("UTF-8"), 0,
-            (1 + "").getBytes("UTF-8").length);
-        bytes.set(6, cu);
-
-        cu = new BytesRefWritable(("true").getBytes("UTF-8"), 0,
-            ("true").getBytes("UTF-8").length);
-        bytes.set(7, cu);
-
-        Timestamp t = new Timestamp(Calendar.getInstance().getTime().getTime());
-        cu = new BytesRefWritable(t.toString().getBytes("UTF-8"), 0,
-            t.toString().getBytes("UTF-8").length);
-        bytes.set(8, cu);
-
-      } else {
-        cu = new BytesRefWritable((i + "").getBytes("UTF-8"), 0, (i + "").getBytes("UTF-8").length);
-        bytes.set(0, cu);
-
-        cu = new BytesRefWritable(new byte[0], 0, 0);
-        bytes.set(1, cu);
-
-        cu = new BytesRefWritable(new byte[0], 0, 0);
-        bytes.set(2, cu);
-
-        cu = new BytesRefWritable(new byte[0], 0, 0);
-        bytes.set(3, cu);
-
-        cu = new BytesRefWritable(new byte[0], 0, 0);
-        bytes.set(4, cu);
-
-        cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0,
-            ("Test string").getBytes("UTF-8").length);
-        bytes.set(5, cu);
-
-        cu = new BytesRefWritable(new byte[0], 0, 0);
-        bytes.set(6, cu);
-
-        cu = new BytesRefWritable(new byte[0], 0, 0);
-        bytes.set(7, cu);
-
-//        cu = new BytesRefWritable(new byte[0], 0, 0);
-//        bytes.set(8, cu);
-        Timestamp t = new Timestamp(Calendar.getInstance().getTime().getTime());
-        cu = new BytesRefWritable(t.toString().getBytes("UTF-8"), 0,
-            t.toString().getBytes("UTF-8").length);
-        bytes.set(8, cu);
-      }
-      writer.append(bytes);
-    }
-    writer.close();
-  }
-
-  private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, IOException {
-
-    RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf);
-    DataOutputBuffer buffer = new DataOutputBuffer();
-
-    // Get object inspector
-    StructObjectInspector oi = (StructObjectInspector) serDe
-        .getObjectInspector();
-    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
-
-    Assert.assertEquals("Field size should be 9", colCount, fieldRefs.size());
-
-    // Create the context
-    VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null, null);
-    VectorizedRowBatch batch = ctx.createVectorizedRowBatch();
-    VectorizedBatchUtil.setNoNullFields(batch);
-
-    // Iterate thru the rows and populate the batch
-    LongWritable rowID = new LongWritable();
-    for (int i = 0; i < 10; i++) {
-      reader.next(rowID);
-      BytesRefArrayWritable cols = new BytesRefArrayWritable();
-      reader.getCurrentRow(cols);
-      cols.resetValid(colCount);
-      ctx.addRowToBatch(i, cols, batch, buffer);
-    }
-    reader.close();
-    batch.size = 10;
-    return batch;
-  }
-
-  void ValidateRowBatch(VectorizedRowBatch batch) throws IOException, SerDeException {
-
-    LongWritable rowID = new LongWritable();
-    RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf);
-    for (int i = 0; i < batch.size; i++) {
-      reader.next(rowID);
-      BytesRefArrayWritable cols = new BytesRefArrayWritable();
-      reader.getCurrentRow(cols);
-      cols.resetValid(colCount);
-      Object row = serDe.deserialize(cols);
-
-      StructObjectInspector oi = (StructObjectInspector) serDe
-          .getObjectInspector();
-      List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
-
-      for (int j = 0; j < fieldRefs.size(); j++) {
-        Object fieldData = oi.getStructFieldData(row, fieldRefs.get(j));
-        ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector();
-
-        // Vectorization only supports PRIMITIVE data types. Assert the same
-        Assert.assertEquals(true, foi.getCategory() == Category.PRIMITIVE);
-
-        PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
-        Object writableCol = poi.getPrimitiveWritableObject(fieldData);
-        if (writableCol != null) {
-          switch (poi.getPrimitiveCategory()) {
-          case BOOLEAN: {
-            LongColumnVector lcv = (LongColumnVector) batch.cols[j];
-            Assert.assertEquals(true, lcv.vector[i] == (((BooleanWritable) writableCol).get() ? 1
-                : 0));
-          }
-            break;
-          case BYTE: {
-            LongColumnVector lcv = (LongColumnVector) batch.cols[j];
-            Assert.assertEquals(true, lcv.vector[i] == (long) ((ByteWritable) writableCol).get());
-          }
-            break;
-          case SHORT: {
-            LongColumnVector lcv = (LongColumnVector) batch.cols[j];
-            Assert.assertEquals(true, lcv.vector[i] == ((ShortWritable) writableCol).get());
-          }
-            break;
-          case INT: {
-            LongColumnVector lcv = (LongColumnVector) batch.cols[j];
-            Assert.assertEquals(true, lcv.vector[i] == ((IntWritable) writableCol).get());
-          }
-            break;
-          case LONG: {
-            LongColumnVector lcv = (LongColumnVector) batch.cols[j];
-            Assert.assertEquals(true, lcv.vector[i] == ((LongWritable) writableCol).get());
-          }
-            break;
-          case FLOAT: {
-            DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[j];
-            Assert.assertEquals(true, dcv.vector[i] == ((FloatWritable) writableCol).get());
-          }
-            break;
-          case DOUBLE: {
-            DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[j];
-            Assert.assertEquals(true, dcv.vector[i] == ((DoubleWritable) writableCol).get());
-          }
-            break;
-          case BINARY: {
-            BytesColumnVector bcv = (BytesColumnVector) batch.cols[j];
-              BytesWritable colBinary = (BytesWritable) writableCol;
-              BytesWritable batchBinary = new BytesWritable();
-              batchBinary.set(bcv.vector[i], bcv.start[i], bcv.length[i]);
-              byte[] a = colBinary.getBytes();
-              byte[] b = batchBinary.getBytes();
-              Assert.assertEquals(true, Arrays.equals(a, b));
-          }
-            break;
-          case STRING: {
-            BytesColumnVector bcv = (BytesColumnVector) batch.cols[j];
-            Text colText = (Text) writableCol;
-            Text batchText = new Text();
-            batchText.set(bcv.vector[i], bcv.start[i], bcv.length[i]);
-            String a = colText.toString();
-            String b = batchText.toString();
-            Assert.assertEquals(true, a.equals(b));
-          }
-            break;
-          case TIMESTAMP: {
-            LongColumnVector tcv = (LongColumnVector) batch.cols[j];
-            Timestamp t = ((TimestampWritable) writableCol).getTimestamp();
-            long timeInNanoSec = (t.getTime() * 1000000) + (t.getNanos() % 1000000);
-            Assert.assertEquals(true, tcv.vector[i] == timeInNanoSec);
-          }
-            break;
-          default:
-            Assert.assertTrue("Unknown type", false);
-          }
-        } else {
-          Assert.assertEquals(true, batch.cols[j].isNull[i]);
-        }
-      }
-
-      // Check repeating
-      Assert.assertEquals(false, batch.cols[0].isRepeating);
-      Assert.assertEquals(false, batch.cols[1].isRepeating);
-      Assert.assertEquals(false, batch.cols[2].isRepeating);
-      Assert.assertEquals(false, batch.cols[3].isRepeating);
-      Assert.assertEquals(false, batch.cols[4].isRepeating);
-
-      // Check non null
-      Assert.assertEquals(true, batch.cols[0].noNulls);
-      Assert.assertEquals(false, batch.cols[1].noNulls);
-      Assert.assertEquals(false, batch.cols[2].noNulls);
-      Assert.assertEquals(false, batch.cols[3].noNulls);
-      Assert.assertEquals(false, batch.cols[4].noNulls);
-    }
-    reader.close();
-  }
-
-  @Test
-  public void TestCtx() throws Exception {
-    initSerde();
-    WriteRCFile(this.fs, this.testFilePath, this.conf);
-    VectorizedRowBatch batch = GetRowBatch();
-    ValidateRowBatch(batch);
-
-    // Test VectorizedColumnarSerDe
-    VectorizedColumnarSerDe vcs = new VectorizedColumnarSerDe();
-    SerDeUtils.initializeSerDe(vcs, this.conf, tbl, null);
-    Writable w = vcs.serializeVector(batch, (StructObjectInspector) serDe
-        .getObjectInspector());
-    BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) ((ObjectWritable) w).get();
-    vcs.deserializeVector(refArray, 10, batch);
-    ValidateRowBatch(batch);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index ec90481..b8d39d2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -66,10 +67,12 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy;
+import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger.MyRow;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -359,6 +362,15 @@ public class TestInputOutputFormat {
     public void readFields(DataInput dataInput) throws IOException {
      throw new UnsupportedOperationException("no read");
     }
+
+
+    static String getColumnNamesProperty() {
+      return "x,y";
+    }
+    static String getColumnTypesProperty() {
+      return "int:int";
+    }
+
   }
 
   @Rule
@@ -1220,6 +1232,8 @@ public class TestInputOutputFormat {
 
 
     // read the whole file
+    conf.set("columns", MyRow.getColumnNamesProperty());
+    conf.set("columns.types", MyRow.getColumnTypesProperty());
     org.apache.hadoop.mapred.RecordReader reader =
         in.getRecordReader(splits[0], conf, Reporter.NULL);
     Object key = reader.createKey();
@@ -1339,6 +1353,8 @@ public class TestInputOutputFormat {
     InputSplit[] splits = in.getSplits(conf, 1);
     assertEquals(1, splits.length);
     ColumnProjectionUtils.appendReadColumns(conf, Collections.singletonList(1));
+    conf.set("columns", "z,r");
+    conf.set("columns.types", "int:struct<x:int,y:int>");
     org.apache.hadoop.mapred.RecordReader reader =
         in.getRecordReader(splits[0], conf, Reporter.NULL);
     Object key = reader.createKey();
@@ -1418,6 +1434,14 @@ public class TestInputOutputFormat {
     public void readFields(DataInput dataInput) throws IOException {
       throw new UnsupportedOperationException("no read");
     }
+
+    static String getColumnNamesProperty() {
+      return "str,str2";
+    }
+    static String getColumnTypesProperty() {
+      return "string:string";
+    }
+
   }
 
   @Test
@@ -1453,6 +1477,8 @@ public class TestInputOutputFormat {
     assertEquals(1, splits.length);
 
     // read the whole file
+    conf.set("columns", StringRow.getColumnNamesProperty());
+    conf.set("columns.types", StringRow.getColumnTypesProperty());
     org.apache.hadoop.mapred.RecordReader reader =
         in.getRecordReader(splits[0], conf, Reporter.NULL);
     Object key = reader.createKey();
@@ -1493,6 +1519,7 @@ public class TestInputOutputFormat {
    * @param isVectorized should run vectorized
    * @return a JobConf that contains the necessary information
    * @throws IOException
+   * @throws HiveException
    */
   JobConf createMockExecutionEnvironment(Path workDir,
                                          Path warehouseDir,
@@ -1500,7 +1527,7 @@ public class TestInputOutputFormat {
                                          ObjectInspector objectInspector,
                                          boolean isVectorized,
                                          int partitions
-                                         ) throws IOException {
+                                         ) throws IOException, HiveException {
     JobConf conf = new JobConf();
     Utilities.clearWorkMap(conf);
     conf.set("hive.exec.plan", workDir.toString());
@@ -1555,6 +1582,11 @@ public class TestInputOutputFormat {
 
     MapWork mapWork = new MapWork();
     mapWork.setVectorMode(isVectorized);
+    if (isVectorized) {
+      VectorizedRowBatchCtx vectorizedRowBatchCtx = new VectorizedRowBatchCtx();
+      vectorizedRowBatchCtx.init(structOI, new String[0]);
+      mapWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx);
+    }
     mapWork.setUseBucketizedHiveInputFormat(false);
     LinkedHashMap<String, ArrayList<String>> aliasMap =
         new LinkedHashMap<String, ArrayList<String>>();

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 15ee24c..032ac4b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -27,13 +27,16 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair;
 import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey;
 import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.IntWritable;
@@ -48,6 +51,8 @@ import org.junit.Test;
 import org.mockito.MockSettings;
 import org.mockito.Mockito;
 
+import com.google.common.collect.Lists;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -354,6 +359,8 @@ public class TestOrcRawRecordMerger {
     Configuration conf = new Configuration();
     conf.set("columns", "col1");
     conf.set("columns.types", "string");
+    conf.set(serdeConstants.LIST_COLUMNS, "col1");
+    conf.set(serdeConstants.LIST_COLUMN_TYPES, "string");
     Reader reader = Mockito.mock(Reader.class, settings);
     RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
 
@@ -362,6 +369,8 @@ public class TestOrcRawRecordMerger {
     typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1)
         .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5)
         .addSubtypes(6);
+    typeBuilder.addAllFieldNames(Lists.newArrayList("operation", "originalTransaction", "bucket",
+        "rowId", "currentTransaction", "row"));
     types.add(typeBuilder.build());
     types.add(null);
     types.add(null);
@@ -370,6 +379,10 @@ public class TestOrcRawRecordMerger {
     types.add(null);
     typeBuilder.clearSubtypes();
     typeBuilder.addSubtypes(7);
+    typeBuilder.addAllFieldNames(Lists.newArrayList("col1"));
+    types.add(typeBuilder.build());
+    typeBuilder.clear();
+    typeBuilder.setKind(OrcProto.Type.Kind.STRING);
     types.add(typeBuilder.build());
 
     Mockito.when(reader.getTypes()).thenReturn(types);
@@ -466,6 +479,14 @@ public class TestOrcRawRecordMerger {
       col1 = new Text(val);
       ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
     }
+
+    static String getColumnNamesProperty() {
+      return "col1,ROW__ID";
+    }
+    static String getColumnTypesProperty() {
+      return "string:struct<transactionId:bigint,bucketId:int,rowId:bigint>";
+    }
+
   }
 
   static String getValue(OrcStruct event) {
@@ -499,6 +520,8 @@ public class TestOrcRawRecordMerger {
         BUCKET);
     Reader baseReader = OrcFile.createReader(basePath,
         OrcFile.readerOptions(conf));
+    conf.set("columns", MyRow.getColumnNamesProperty());
+    conf.set("columns.types", MyRow.getColumnTypesProperty());
     OrcRawRecordMerger merger =
         new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
             createMaximalTxnList(), new Reader.Options(),
@@ -567,6 +590,10 @@ public class TestOrcRawRecordMerger {
 
     Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
         BUCKET);
+
+    conf.set("columns", MyRow.getColumnNamesProperty());
+    conf.set("columns.types", MyRow.getColumnTypesProperty());
+
     Reader baseReader = OrcFile.createReader(basePath,
         OrcFile.readerOptions(conf));
     OrcRawRecordMerger merger =
@@ -790,6 +817,13 @@ public class TestOrcRawRecordMerger {
     BigRow(long rowId, long origTxn, int bucket) {
       ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
     }
+
+    static String getColumnNamesProperty() {
+      return "myint,mylong,mytext,myfloat,mydouble,ROW__ID";
+    }
+    static String getColumnTypesProperty() {
+      return "int:bigint:string:float:double:struct<transactionId:bigint,bucketId:int,rowId:bigint>";
+    }
   }
 
   /**
@@ -865,6 +899,8 @@ public class TestOrcRawRecordMerger {
 
     InputFormat inf = new OrcInputFormat();
     JobConf job = new JobConf();
+    job.set("columns", BigRow.getColumnNamesProperty());
+    job.set("columns.types", BigRow.getColumnTypesProperty());
     job.set("mapred.min.split.size", "1");
     job.set("mapred.max.split.size", "2");
     job.set("mapred.input.dir", root.toString());
@@ -971,6 +1007,8 @@ public class TestOrcRawRecordMerger {
     job.set("mapred.min.split.size", "1");
     job.set("mapred.max.split.size", "2");
     job.set("mapred.input.dir", root.toString());
+    job.set("columns", BigRow.getColumnNamesProperty());
+    job.set("columns.types", BigRow.getColumnTypesProperty());
     InputSplit[] splits = inf.getSplits(job, 5);
     assertEquals(5, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
@@ -1041,6 +1079,8 @@ public class TestOrcRawRecordMerger {
     job.set("mapred.max.split.size", "2");
     job.set("mapred.input.dir", root.toString());
     job.set("bucket_count", "1");
+    job.set("columns", MyRow.getColumnNamesProperty());
+    job.set("columns.types", MyRow.getColumnTypesProperty());
     InputSplit[] splits = inf.getSplits(job, 5);
     assertEquals(1, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
@@ -1108,6 +1148,8 @@ public class TestOrcRawRecordMerger {
     JobConf job = new JobConf();
     job.set("mapred.input.dir", root.toString());
     job.set("bucket_count", "2");
+    job.set("columns", MyRow.getColumnNamesProperty());
+    job.set("columns.types", MyRow.getColumnTypesProperty());
 
     // read the keys before the delta is flushed
     InputSplit[] splits = inf.getSplits(job, 1);

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig
new file mode 100644
index 0000000..15ee24c
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig
@@ -0,0 +1,1150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair;
+import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey;
+import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.mockito.MockSettings;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+public class TestOrcRawRecordMerger {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestOrcRawRecordMerger.class);
+//todo: why is statementId -1?
+  @Test
+  public void testOrdering() throws Exception {
+    ReaderKey left = new ReaderKey(100, 200, 1200, 300);
+    ReaderKey right = new ReaderKey();
+    right.setValues(100, 200, 1000, 200,1);
+    assertTrue(right.compareTo(left) < 0);
+    assertTrue(left.compareTo(right) > 0);
+    assertEquals(false, left.equals(right));
+    left.set(right);
+    assertTrue(right.compareTo(left) == 0);
+    assertEquals(true, right.equals(left));
+    right.setRowId(2000);
+    assertTrue(right.compareTo(left) > 0);
+    left.setValues(1, 2, 3, 4,-1);
+    right.setValues(100, 2, 3, 4,-1);
+    assertTrue(left.compareTo(right) < 0);
+    assertTrue(right.compareTo(left) > 0);
+    left.setValues(1, 2, 3, 4,-1);
+    right.setValues(1, 100, 3, 4,-1);
+    assertTrue(left.compareTo(right) < 0);
+    assertTrue(right.compareTo(left) > 0);
+    left.setValues(1, 2, 3, 100,-1);
+    right.setValues(1, 2, 3, 4,-1);
+    assertTrue(left.compareTo(right) < 0);
+    assertTrue(right.compareTo(left) > 0);
+
+    // ensure that we are consistent when comparing to the base class
+    RecordIdentifier ri = new RecordIdentifier(1, 2, 3);
+    assertEquals(1, ri.compareTo(left));
+    assertEquals(-1, left.compareTo(ri));
+    assertEquals(false, ri.equals(left));
+    assertEquals(false, left.equals(ri));
+  }
+
+  private static void setRow(OrcStruct event,
+                             int operation,
+                             long originalTransaction,
+                             int bucket,
+                             long rowId,
+                             long currentTransaction,
+                             String value) {
+    event.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
+    event.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+        new LongWritable(originalTransaction));
+    event.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucket));
+    event.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(rowId));
+    event.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+        new LongWritable(currentTransaction));
+    OrcStruct row = new OrcStruct(1);
+    row.setFieldValue(0, new Text(value));
+    event.setFieldValue(OrcRecordUpdater.ROW, row);
+  }
+
+  private static String value(OrcStruct event) {
+    return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
+  }
+
+  private List<StripeInformation> createStripes(long... rowCounts) {
+    long offset = 0;
+    List<StripeInformation> result =
+        new ArrayList<StripeInformation>(rowCounts.length);
+    for(long count: rowCounts) {
+      OrcProto.StripeInformation.Builder stripe =
+          OrcProto.StripeInformation.newBuilder();
+      stripe.setDataLength(800).setIndexLength(100).setFooterLength(100)
+          .setNumberOfRows(count).setOffset(offset);
+      offset += 1000;
+      result.add(new ReaderImpl.StripeInformationImpl(stripe.build()));
+    }
+    return result;
+  }
+
+  // can add .verboseLogging() to cause Mockito to log invocations
+  private final MockSettings settings = Mockito.withSettings();
+  private final Path tmpDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+
+  private Reader createMockReader() throws IOException {
+    Reader reader = Mockito.mock(Reader.class, settings);
+    RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
+    OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
+    OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
+    OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
+    OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
+    OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
+    Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
+        .thenReturn(recordReader);
+
+    Mockito.when(recordReader.hasNext()).
+        thenReturn(true, true, true, true, true, false);
+
+    Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
+
+    Mockito.when(recordReader.next(null)).thenReturn(row1);
+    Mockito.when(recordReader.next(row1)).thenReturn(row2);
+    Mockito.when(recordReader.next(row2)).thenReturn(row3);
+    Mockito.when(recordReader.next(row3)).thenReturn(row4);
+    Mockito.when(recordReader.next(row4)).thenReturn(row5);
+
+    return reader;
+  }
+
+  @Test
+  public void testReaderPair() throws Exception {
+    ReaderKey key = new ReaderKey();
+    Reader reader = createMockReader();
+    RecordIdentifier minKey = new RecordIdentifier(10, 20, 30);
+    RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60);
+    ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey,
+        new Reader.Options(), 0);
+    RecordReader recordReader = pair.recordReader;
+    assertEquals(10, key.getTransactionId());
+    assertEquals(20, key.getBucketId());
+    assertEquals(40, key.getRowId());
+    assertEquals(120, key.getCurrentTransactionId());
+    assertEquals("third", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(40, key.getTransactionId());
+    assertEquals(50, key.getBucketId());
+    assertEquals(60, key.getRowId());
+    assertEquals(130, key.getCurrentTransactionId());
+    assertEquals("fourth", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(null, pair.nextRecord);
+    Mockito.verify(recordReader).close();
+  }
+
+  @Test
+  public void testReaderPairNoMin() throws Exception {
+    ReaderKey key = new ReaderKey();
+    Reader reader = createMockReader();
+
+    ReaderPair pair = new ReaderPair(key, reader, 20, null, null,
+        new Reader.Options(), 0);
+    RecordReader recordReader = pair.recordReader;
+    assertEquals(10, key.getTransactionId());
+    assertEquals(20, key.getBucketId());
+    assertEquals(20, key.getRowId());
+    assertEquals(100, key.getCurrentTransactionId());
+    assertEquals("first", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(10, key.getTransactionId());
+    assertEquals(20, key.getBucketId());
+    assertEquals(30, key.getRowId());
+    assertEquals(110, key.getCurrentTransactionId());
+    assertEquals("second", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(10, key.getTransactionId());
+    assertEquals(20, key.getBucketId());
+    assertEquals(40, key.getRowId());
+    assertEquals(120, key.getCurrentTransactionId());
+    assertEquals("third", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(40, key.getTransactionId());
+    assertEquals(50, key.getBucketId());
+    assertEquals(60, key.getRowId());
+    assertEquals(130, key.getCurrentTransactionId());
+    assertEquals("fourth", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(40, key.getTransactionId());
+    assertEquals(50, key.getBucketId());
+    assertEquals(61, key.getRowId());
+    assertEquals(140, key.getCurrentTransactionId());
+    assertEquals("fifth", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(null, pair.nextRecord);
+    Mockito.verify(recordReader).close();
+  }
+
+  private static OrcStruct createOriginalRow(String value) {
+    OrcStruct result = new OrcStruct(1);
+    result.setFieldValue(0, new Text(value));
+    return result;
+  }
+
+  private Reader createMockOriginalReader() throws IOException {
+    Reader reader = Mockito.mock(Reader.class, settings);
+    RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
+    OrcStruct row1 = createOriginalRow("first");
+    OrcStruct row2 = createOriginalRow("second");
+    OrcStruct row3 = createOriginalRow("third");
+    OrcStruct row4 = createOriginalRow("fourth");
+    OrcStruct row5 = createOriginalRow("fifth");
+
+    Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
+        .thenReturn(recordReader);
+    Mockito.when(recordReader.hasNext()).
+        thenReturn(true, true, true, true, true, false);
+    Mockito.when(recordReader.getRowNumber()).thenReturn(0L, 1L, 2L, 3L, 4L);
+    Mockito.when(recordReader.next(null)).thenReturn(row1);
+    Mockito.when(recordReader.next(row1)).thenReturn(row2);
+    Mockito.when(recordReader.next(row2)).thenReturn(row3);
+    Mockito.when(recordReader.next(row3)).thenReturn(row4);
+    Mockito.when(recordReader.next(row4)).thenReturn(row5);
+    return reader;
+  }
+
+  @Test
+  public void testOriginalReaderPair() throws Exception {
+    ReaderKey key = new ReaderKey();
+    Reader reader = createMockOriginalReader();
+    RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
+    RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
+    boolean[] includes = new boolean[]{true, true};
+    ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey,
+        new Reader.Options().include(includes));
+    RecordReader recordReader = pair.recordReader;
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(2, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+    assertEquals("third", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(3, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+    assertEquals("fourth", value(pair.nextRecord));
+
+    pair.next(pair.nextRecord);
+    assertEquals(null, pair.nextRecord);
+    Mockito.verify(recordReader).close();
+  }
+
+  private static ValidTxnList createMaximalTxnList() {
+    return new ValidReadTxnList(Long.MAX_VALUE + ":");
+  }
+
+  @Test
+  public void testOriginalReaderPairNoMin() throws Exception {
+    ReaderKey key = new ReaderKey();
+    Reader reader = createMockOriginalReader();
+    ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null,
+        new Reader.Options());
+    assertEquals("first", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(0, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals("second", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(1, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals("third", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(2, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals("fourth", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(3, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals("fifth", value(pair.nextRecord));
+    assertEquals(0, key.getTransactionId());
+    assertEquals(10, key.getBucketId());
+    assertEquals(4, key.getRowId());
+    assertEquals(0, key.getCurrentTransactionId());
+
+    pair.next(pair.nextRecord);
+    assertEquals(null, pair.nextRecord);
+    Mockito.verify(pair.recordReader).close();
+  }
+
+  @Test
+  public void testNewBase() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("columns", "col1");
+    conf.set("columns.types", "string");
+    Reader reader = Mockito.mock(Reader.class, settings);
+    RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
+
+    List<OrcProto.Type> types = new ArrayList<OrcProto.Type>();
+    OrcProto.Type.Builder typeBuilder = OrcProto.Type.newBuilder();
+    typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1)
+        .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5)
+        .addSubtypes(6);
+    types.add(typeBuilder.build());
+    types.add(null);
+    types.add(null);
+    types.add(null);
+    types.add(null);
+    types.add(null);
+    typeBuilder.clearSubtypes();
+    typeBuilder.addSubtypes(7);
+    types.add(typeBuilder.build());
+
+    Mockito.when(reader.getTypes()).thenReturn(types);
+    Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class)))
+        .thenReturn(recordReader);
+
+    OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first");
+    OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second");
+    OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third");
+    OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth");
+    OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
+    setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
+
+    Mockito.when(recordReader.hasNext()).
+        thenReturn(true, true, true, true, true, false);
+
+    Mockito.when(recordReader.getProgress()).thenReturn(1.0f);
+
+    Mockito.when(recordReader.next(null)).thenReturn(row1, row4);
+    Mockito.when(recordReader.next(row1)).thenReturn(row2);
+    Mockito.when(recordReader.next(row2)).thenReturn(row3);
+    Mockito.when(recordReader.next(row3)).thenReturn(row5);
+
+    Mockito.when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
+        .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61"
+            .getBytes("UTF-8")));
+    Mockito.when(reader.getStripes())
+        .thenReturn(createStripes(2, 2, 1));
+
+    OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
+        false, 10, createMaximalTxnList(),
+        new Reader.Options().range(1000, 1000), null);
+    RecordReader rr = merger.getCurrentReader().recordReader;
+    assertEquals(0, merger.getOtherReaders().size());
+
+    assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey());
+    assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey());
+    RecordIdentifier id = merger.createKey();
+    OrcStruct event = merger.createValue();
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(10, id.getTransactionId());
+    assertEquals(20, id.getBucketId());
+    assertEquals(40, id.getRowId());
+    assertEquals("third", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(40, id.getTransactionId());
+    assertEquals(50, id.getBucketId());
+    assertEquals(60, id.getRowId());
+    assertEquals("fourth", getValue(event));
+
+    assertEquals(false, merger.next(id, event));
+    assertEquals(1.0, merger.getProgress(), 0.01);
+    merger.close();
+    Mockito.verify(rr).close();
+    Mockito.verify(rr).getProgress();
+
+    StructObjectInspector eventObjectInspector =
+        (StructObjectInspector) merger.getObjectInspector();
+    List<? extends StructField> fields =
+        eventObjectInspector.getAllStructFieldRefs();
+    assertEquals(OrcRecordUpdater.FIELDS, fields.size());
+    assertEquals("operation",
+        fields.get(OrcRecordUpdater.OPERATION).getFieldName());
+    assertEquals("currentTransaction",
+        fields.get(OrcRecordUpdater.CURRENT_TRANSACTION).getFieldName());
+    assertEquals("originalTransaction",
+        fields.get(OrcRecordUpdater.ORIGINAL_TRANSACTION).getFieldName());
+    assertEquals("bucket",
+        fields.get(OrcRecordUpdater.BUCKET).getFieldName());
+    assertEquals("rowId",
+        fields.get(OrcRecordUpdater.ROW_ID).getFieldName());
+    StructObjectInspector rowObjectInspector =
+        (StructObjectInspector) fields.get(OrcRecordUpdater.ROW)
+            .getFieldObjectInspector();
+    assertEquals("col1",
+        rowObjectInspector.getAllStructFieldRefs().get(0).getFieldName());
+  }
+
+  static class MyRow {
+    Text col1;
+    RecordIdentifier ROW__ID;
+
+    MyRow(String val) {
+      col1 = new Text(val);
+    }
+
+    MyRow(String val, long rowId, long origTxn, int bucket) {
+      col1 = new Text(val);
+      ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+    }
+  }
+
+  static String getValue(OrcStruct event) {
+    return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
+  }
+
+  @Test
+  public void testEmpty() throws Exception {
+    final int BUCKET = 0;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(tmpDir, "testEmpty").makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write the empty base
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .inspector(inspector).bucket(BUCKET).writingBase(true)
+        .maximumTransactionId(100).finalDestination(root);
+    of.getRecordUpdater(root, options).close(false);
+
+    ValidTxnList txnList = new ValidReadTxnList("200:");
+    AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
+
+    Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
+        BUCKET);
+    Reader baseReader = OrcFile.createReader(basePath,
+        OrcFile.readerOptions(conf));
+    OrcRawRecordMerger merger =
+        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+            createMaximalTxnList(), new Reader.Options(),
+            AcidUtils.getPaths(directory.getCurrentDirectories()));
+    RecordIdentifier key = merger.createKey();
+    OrcStruct value = merger.createValue();
+    assertEquals(false, merger.next(key, value));
+  }
+
+  /**
+   * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
+   * a base and a delta.
+   * @throws Exception
+   */
+  @Test
+  public void testNewBaseAndDelta() throws Exception {
+    testNewBaseAndDelta(false);
+    testNewBaseAndDelta(true);
+  }
+  private void testNewBaseAndDelta(boolean use130Format) throws Exception {
+    final int BUCKET = 10;
+    String[] values = new String[]{"first", "second", "third", "fourth",
+                                   "fifth", "sixth", "seventh", "eighth",
+                                   "ninth", "tenth"};
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(tmpDir, "testNewBaseAndDelta").makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write the base
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .inspector(inspector).bucket(BUCKET).finalDestination(root);
+    if(!use130Format) {
+      options.statementId(-1);
+    }
+    RecordUpdater ru = of.getRecordUpdater(root,
+        options.writingBase(true).maximumTransactionId(100));
+    for(String v: values) {
+      ru.insert(0, new MyRow(v));
+    }
+    ru.close(false);
+
+    // write a delta
+    ru = of.getRecordUpdater(root, options.writingBase(false)
+        .minimumTransactionId(200).maximumTransactionId(200).recordIdColumn(1));
+    ru.update(200, new MyRow("update 1", 0, 0, BUCKET));
+    ru.update(200, new MyRow("update 2", 2, 0, BUCKET));
+    ru.update(200, new MyRow("update 3", 3, 0, BUCKET));
+    ru.delete(200, new MyRow("", 7, 0, BUCKET));
+    ru.delete(200, new MyRow("", 8, 0, BUCKET));
+    ru.close(false);
+
+    ValidTxnList txnList = new ValidReadTxnList("200:");
+    AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
+
+    assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
+    assertEquals(new Path(root, use130Format ?
+        AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
+        directory.getCurrentDirectories().get(0).getPath());
+
+    Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
+        BUCKET);
+    Reader baseReader = OrcFile.createReader(basePath,
+        OrcFile.readerOptions(conf));
+    OrcRawRecordMerger merger =
+        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+            createMaximalTxnList(), new Reader.Options(),
+            AcidUtils.getPaths(directory.getCurrentDirectories()));
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
+    RecordIdentifier id = merger.createKey();
+    OrcStruct event = merger.createValue();
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
+    assertEquals("update 1", getValue(event));
+    assertFalse(merger.isDelete(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
+    assertEquals("second", getValue(event));
+    assertFalse(merger.isDelete(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
+    assertEquals("update 2", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
+    assertEquals("update 3", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
+    assertEquals("fifth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
+    assertEquals("sixth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
+    assertEquals("seventh", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+    assertTrue(merger.isDelete(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
+    assertEquals("tenth", getValue(event));
+
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+
+    // make a merger that doesn't collapse events
+    merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+            createMaximalTxnList(), new Reader.Options(),
+            AcidUtils.getPaths(directory.getCurrentDirectories()));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
+    assertEquals("update 1", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 0, 0), id);
+    assertEquals("first", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
+    assertEquals("second", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
+    assertEquals("update 2", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 2, 0), id);
+    assertEquals("third", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
+    assertEquals("update 3", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 3, 0), id);
+    assertEquals("fourth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
+    assertEquals("fifth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
+    assertEquals("sixth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
+    assertEquals("seventh", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 7, 0), id);
+    assertEquals("eighth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 8, 0), id);
+    assertEquals("ninth", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+        OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
+    assertEquals("tenth", getValue(event));
+
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+
+    // try ignoring the 200 transaction and make sure it works still
+    ValidTxnList txns = new ValidReadTxnList("2000:200");
+    merger =
+        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+            txns, new Reader.Options(),
+            AcidUtils.getPaths(directory.getCurrentDirectories()));
+    for(int i=0; i < values.length; ++i) {
+      assertEquals(true, merger.next(id, event));
+      LOG.info("id = " + id + "event = " + event);
+      assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+          OrcRecordUpdater.getOperation(event));
+      assertEquals(new ReaderKey(0, BUCKET, i, 0), id);
+      assertEquals(values[i], getValue(event));
+    }
+
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+  }
+
+  static class BigRow {
+    int myint;
+    long mylong;
+    Text mytext;
+    float myfloat;
+    double mydouble;
+    RecordIdentifier ROW__ID;
+
+    BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) {
+      this.myint = myint;
+      this.mylong = mylong;
+      this.mytext = new Text(mytext);
+      this.myfloat = myfloat;
+      this.mydouble = mydouble;
+      ROW__ID = null;
+    }
+
+    BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble,
+                    long rowId, long origTxn, int bucket) {
+      this.myint = myint;
+      this.mylong = mylong;
+      this.mytext = new Text(mytext);
+      this.myfloat = myfloat;
+      this.mydouble = mydouble;
+      ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+    }
+
+    BigRow(long rowId, long origTxn, int bucket) {
+      ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+    }
+  }
+
+  /**
+   * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
+   * a base and a delta.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderOldBaseAndDelta() throws Exception {
+    final int BUCKET = 10;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(tmpDir, "testOldBaseAndDelta").makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write the base
+    MemoryManager mgr = new MemoryManager(conf){
+      int rowsAddedSinceCheck = 0;
+
+      @Override
+      synchronized void addedRow(int rows) throws IOException {
+        rowsAddedSinceCheck += rows;
+        if (rowsAddedSinceCheck >= 2) {
+          notifyWriters();
+          rowsAddedSinceCheck = 0;
+        }
+      }
+    };
+    // make 5 stripes with 2 rows each
+    Writer writer = OrcFile.createWriter(new Path(root, "0000010_0"),
+        OrcFile.writerOptions(conf).inspector(inspector).fileSystem(fs)
+        .blockPadding(false).bufferSize(10000).compress(CompressionKind.NONE)
+        .stripeSize(1).memory(mgr).version(OrcFile.Version.V_0_11));
+    String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
+       "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
+    for(int i=0; i < values.length; ++i) {
+      writer.addRow(new BigRow(i, i, values[i], i, i));
+    }
+    writer.close();
+
+    // write a delta
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+        .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
+        .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    values = new String[]{"0.0", null, null, "1.1", null, null, null,
+        "ignore.7"};
+    for(int i=0; i < values.length; ++i) {
+      if (values[i] != null) {
+        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+      }
+    }
+    ru.delete(100, new BigRow(9, 0, BUCKET));
+    ru.close(false);
+
+    // write a delta
+    options = options.minimumTransactionId(2).maximumTransactionId(2);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
+    for(int i=0; i < values.length; ++i) {
+      if (values[i] != null) {
+        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+      }
+    }
+    ru.delete(100, new BigRow(8, 0, BUCKET));
+    ru.close(false);
+
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.min.split.size", "1");
+    job.set("mapred.max.split.size", "2");
+    job.set("mapred.input.dir", root.toString());
+    InputSplit[] splits = inf.getSplits(job, 5);
+    assertEquals(5, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
+
+    // loop through the 5 splits and read each
+    for(int i=0; i < 4; ++i) {
+      System.out.println("starting split " + i);
+      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+      NullWritable key = rr.createKey();
+      OrcStruct value = rr.createValue();
+
+      // there should be exactly two rows per a split
+      for(int j=0; j < 2; ++j) {
+        System.out.println("i = " + i + ", j = " + j);
+        assertEquals(true, rr.next(key, value));
+        System.out.println("record = " + value);
+        assertEquals(i + "." + j, value.getFieldValue(2).toString());
+      }
+      assertEquals(false, rr.next(key, value));
+    }
+    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
+    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+  }
+
+  /**
+   * Test the RecordReader when there is a new base and a delta.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderNewBaseAndDelta() throws Exception {
+    final int BUCKET = 11;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(tmpDir, "testRecordReaderNewBaseAndDelta").makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write the base
+    MemoryManager mgr = new MemoryManager(conf){
+      int rowsAddedSinceCheck = 0;
+
+      @Override
+      synchronized void addedRow(int rows) throws IOException {
+        rowsAddedSinceCheck += rows;
+        if (rowsAddedSinceCheck >= 2) {
+          notifyWriters();
+          rowsAddedSinceCheck = 0;
+        }
+      }
+    };
+
+    // make 5 stripes with 2 rows each
+    OrcRecordUpdater.OrcOptions options = (OrcRecordUpdater.OrcOptions)
+        new OrcRecordUpdater.OrcOptions(conf)
+        .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
+        .bucket(BUCKET).inspector(inspector).filesystem(fs);
+    options.orcOptions(OrcFile.writerOptions(conf)
+      .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
+      .memory(mgr));
+    options.finalDestination(root);
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
+        "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(0, new BigRow(i, i, values[i], i, i));
+    }
+    ru.close(false);
+
+    // write a delta
+    options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1).recordIdColumn(5);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{"0.0", null, null, "1.1", null, null, null,
+        "ignore.7"};
+    for(int i=0; i < values.length; ++i) {
+      if (values[i] != null) {
+        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+      }
+    }
+    ru.delete(100, new BigRow(9, 0, BUCKET));
+    ru.close(false);
+
+    // write a delta
+    options.minimumTransactionId(2).maximumTransactionId(2);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
+    for(int i=0; i < values.length; ++i) {
+      if (values[i] != null) {
+        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+      }
+    }
+    ru.delete(100, new BigRow(8, 0, BUCKET));
+    ru.close(false);
+
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.min.split.size", "1");
+    job.set("mapred.max.split.size", "2");
+    job.set("mapred.input.dir", root.toString());
+    InputSplit[] splits = inf.getSplits(job, 5);
+    assertEquals(5, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
+
+    // loop through the 5 splits and read each
+    for(int i=0; i < 4; ++i) {
+      System.out.println("starting split " + i);
+      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+      NullWritable key = rr.createKey();
+      OrcStruct value = rr.createValue();
+
+      // there should be exactly two rows per a split
+      for(int j=0; j < 2; ++j) {
+        System.out.println("i = " + i + ", j = " + j);
+        assertEquals(true, rr.next(key, value));
+        System.out.println("record = " + value);
+        assertEquals(i + "." + j, value.getFieldValue(2).toString());
+      }
+      assertEquals(false, rr.next(key, value));
+    }
+    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
+    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+  }
+
+  /**
+   * Test the RecordReader when there is a new base and a delta.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderDelta() throws Exception {
+    final int BUCKET = 0;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path root = new Path(tmpDir, "testRecordReaderDelta").makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write a delta
+    AcidOutputFormat.Options options =
+        new AcidOutputFormat.Options(conf)
+            .bucket(BUCKET).inspector(inspector).filesystem(fs)
+            .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
+          .finalDestination(root);
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    String[] values = new String[]{"a", "b", "c", "d", "e"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(1, new MyRow(values[i]));
+    }
+    ru.close(false);
+
+    // write a delta
+    options.minimumTransactionId(2).maximumTransactionId(2);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{"f", "g", "h", "i", "j"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(2, new MyRow(values[i]));
+    }
+    ru.close(false);
+
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.min.split.size", "1");
+    job.set("mapred.max.split.size", "2");
+    job.set("mapred.input.dir", root.toString());
+    job.set("bucket_count", "1");
+    InputSplit[] splits = inf.getSplits(job, 5);
+    assertEquals(1, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
+    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
+    values = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
+    OrcStruct row = rr.createValue();
+    for(int i = 0; i < values.length; ++i) {
+      System.out.println("Checking " + i);
+      assertEquals(true, rr.next(NullWritable.get(), row));
+      assertEquals(values[i], row.getFieldValue(0).toString());
+    }
+    assertEquals(false, rr.next(NullWritable.get(), row));
+  }
+
+  /**
+   * Test the RecordReader when the delta has been flushed, but not closed.
+   * @throws Exception
+   */
+  @Test
+  public void testRecordReaderIncompleteDelta() throws Exception {
+    testRecordReaderIncompleteDelta(false);
+    testRecordReaderIncompleteDelta(true);
+  }
+  /**
+   * 
+   * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001
+   */
+  private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception {
+    final int BUCKET = 1;
+    Configuration conf = new Configuration();
+    OrcOutputFormat of = new OrcOutputFormat();
+    FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    Path root = new Path(tmpDir, "testRecordReaderIncompleteDelta").makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    // write a base
+    AcidOutputFormat.Options options =
+        new AcidOutputFormat.Options(conf)
+            .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
+            .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root);
+    if(!use130Format) {
+      options.statementId(-1);
+    }
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    String[] values= new String[]{"1", "2", "3", "4", "5"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(0, new MyRow(values[i]));
+    }
+    ru.close(false);
+
+    // write a delta
+    options.writingBase(false).minimumTransactionId(10)
+        .maximumTransactionId(19);
+    ru = of.getRecordUpdater(root, options);
+    values = new String[]{"6", "7", "8"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(1, new MyRow(values[i]));
+    }
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.input.dir", root.toString());
+    job.set("bucket_count", "2");
+
+    // read the keys before the delta is flushed
+    InputSplit[] splits = inf.getSplits(job, 1);
+    assertEquals(2, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
+        inf.getRecordReader(splits[0], job, Reporter.NULL);
+    NullWritable key = rr.createKey();
+    OrcStruct value = rr.createValue();
+    System.out.println("Looking at split " + splits[0]);
+    for(int i=1; i < 6; ++i) {
+      System.out.println("Checking row " + i);
+      assertEquals(true, rr.next(key, value));
+      assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
+    }
+    assertEquals(false, rr.next(key, value));
+
+    ru.flush();
+    ru.flush();
+    values = new String[]{"9", "10"};
+    for(int i=0; i < values.length; ++i) {
+      ru.insert(3, new MyRow(values[i]));
+    }
+    ru.flush();
+
+    splits = inf.getSplits(job, 1);
+    assertEquals(2, splits.length);
+    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
+    Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
+      AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
+    assertEquals(true, fs.exists(sideFile));
+    assertEquals(24, fs.getFileStatus(sideFile).getLen());
+
+    for(int i=1; i < 11; ++i) {
+      assertEquals(true, rr.next(key, value));
+      assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
+    }
+    assertEquals(false, rr.next(key, value));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/test/queries/clientpositive/schema_evol_orc_acid_mapwork_part.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/schema_evol_orc_acid_mapwork_part.q b/ql/src/test/queries/clientpositive/schema_evol_orc_acid_mapwork_part.q
new file mode 100644
index 0000000..681a4ac
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/schema_evol_orc_acid_mapwork_part.q
@@ -0,0 +1,171 @@
+set hive.cli.print.header=true;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.enforce.bucketing=true;
+SET hive.vectorized.execution.enabled=false;
+set hive.fetch.task.conversion=none;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+
+-- SORT_QUERY_RESULTS
+--
+-- FILE VARIATION: ORC, ACID Non-Vectorized, MapWork, Partitioned
+--
+--
+-- SECTION VARIATION: ALTER TABLE ADD COLUMNS ... STATIC INSERT
+---
+CREATE TABLE partitioned1(a INT, b STRING) PARTITIONED BY(part INT) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table partitioned1 partition(part=1) values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+-- Table-Non-Cascade ADD COLUMNS ...
+alter table partitioned1 add columns(c int, d string);
+
+insert into table partitioned1 partition(part=2) values(1, 'new', 10, 'ten'),(2, 'new', 20, 'twenty'), (3, 'new', 30, 'thirty'),(4, 'new', 40, 'forty');
+
+insert into table partitioned1 partition(part=1) values(5, 'new', 100, 'hundred'),(6, 'new', 200, 'two hundred');
+
+-- SELECT permutation columns to make sure NULL defaulting works right
+select part,a,b from partitioned1;
+select part,a,b,c from partitioned1;
+select part,a,b,c,d from partitioned1;
+select part,a,c,d from partitioned1;
+select part,a,d from partitioned1;
+select part,c from partitioned1;
+select part,d from partitioned1;
+
+--
+-- SECTION VARIATION: ALTER TABLE CHANGE COLUMN ... STATIC INSERT
+-- smallint = (2-byte signed integer, from -32,768 to 32,767)
+--
+CREATE TABLE partitioned2(a smallint, b STRING) PARTITIONED BY(part INT) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table partitioned2 partition(part=1) values(1000, 'original'),(6737, 'original'), ('3', 'original'),('4', 'original');
+
+-- Table-Non-Cascade CHANGE COLUMNS ...
+alter table partitioned2 change column a a int;
+
+insert into table partitioned2 partition(part=2) values(72909, 'new'),(200, 'new'), (32768, 'new'),(40000, 'new');
+
+insert into table partitioned2 partition(part=1) values(5000, 'new'),(90000, 'new');
+
+select part,a,b from partitioned2;
+
+
+--
+--
+-- SECTION VARIATION: ALTER TABLE ADD COLUMNS ... DYNAMIC INSERT
+---
+CREATE TABLE partitioned3(a INT, b STRING) PARTITIONED BY(part INT) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table partitioned3 partition(part=1) values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+-- Table-Non-Cascade ADD COLUMNS ...
+alter table partitioned3 add columns(c int, d string);
+
+insert into table partitioned3 partition(part) values(1, 'new', 10, 'ten', 2),(2, 'new', 20, 'twenty', 2), (3, 'new', 30, 'thirty', 2),(4, 'new', 40, 'forty', 2),
+    (5, 'new', 100, 'hundred', 1),(6, 'new', 200, 'two hundred', 1);
+
+-- SELECT permutation columns to make sure NULL defaulting works right
+select part,a,b from partitioned1;
+select part,a,b,c from partitioned1;
+select part,a,b,c,d from partitioned1;
+select part,a,c,d from partitioned1;
+select part,a,d from partitioned1;
+select part,c from partitioned1;
+select part,d from partitioned1;
+
+
+--
+-- SECTION VARIATION: ALTER TABLE CHANGE COLUMN ... DYNAMIC INSERT
+-- smallint = (2-byte signed integer, from -32,768 to 32,767)
+--
+CREATE TABLE partitioned4(a smallint, b STRING) PARTITIONED BY(part INT) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table partitioned4 partition(part=1) values(1000, 'original'),(6737, 'original'), ('3', 'original'),('4', 'original');
+
+-- Table-Non-Cascade CHANGE COLUMNS ...
+alter table partitioned4 change column a a int;
+
+insert into table partitioned4 partition(part) values(72909, 'new', 2),(200, 'new', 2), (32768, 'new', 2),(40000, 'new', 2),
+    (5000, 'new', 1),(90000, 'new', 1);
+
+select part,a,b from partitioned4;
+
+
+--
+--
+-- SECTION VARIATION: ALTER TABLE ADD COLUMNS ... UPDATE New Columns
+---
+CREATE TABLE partitioned5(a INT, b STRING) PARTITIONED BY(part INT) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table partitioned5 partition(part=1) values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+-- Table-Non-Cascade ADD COLUMNS ...
+alter table partitioned5 add columns(c int, d string);
+
+insert into table partitioned5 partition(part=2) values(1, 'new', 10, 'ten'),(2, 'new', 20, 'twenty'), (3, 'new', 30, 'thirty'),(4, 'new', 40, 'forty');
+
+insert into table partitioned5 partition(part=1) values(5, 'new', 100, 'hundred'),(6, 'new', 200, 'two hundred');
+
+select part,a,b,c,d from partitioned5;
+
+-- UPDATE New Columns
+update partitioned5 set c=99;
+
+select part,a,b,c,d from partitioned5;
+
+
+--
+--
+-- SECTION VARIATION: ALTER TABLE ADD COLUMNS ... DELETE where old column
+---
+CREATE TABLE partitioned6(a INT, b STRING) PARTITIONED BY(part INT) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table partitioned6 partition(part=1) values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+-- Table-Non-Cascade ADD COLUMNS ...
+alter table partitioned6 add columns(c int, d string);
+
+insert into table partitioned6 partition(part=2) values(1, 'new', 10, 'ten'),(2, 'new', 20, 'twenty'), (3, 'new', 30, 'thirty'),(4, 'new', 40, 'forty');
+
+insert into table partitioned6 partition(part=1) values(5, 'new', 100, 'hundred'),(6, 'new', 200, 'two hundred');
+
+select part,a,b,c,d from partitioned6;
+
+-- DELETE where old column
+delete from partitioned6 where a = 2 or a = 4 or a = 6;
+
+select part,a,b,c,d from partitioned6;
+
+
+--
+--
+-- SECTION VARIATION: ALTER TABLE ADD COLUMNS ... DELETE where new column
+---
+CREATE TABLE partitioned7(a INT, b STRING) PARTITIONED BY(part INT) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table partitioned7 partition(part=1) values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+-- Table-Non-Cascade ADD COLUMNS ...
+alter table partitioned7 add columns(c int, d string);
+
+insert into table partitioned7 partition(part=2) values(1, 'new', 10, 'ten'),(2, 'new', 20, 'twenty'), (3, 'new', 30, 'thirty'),(4, 'new', 40, 'forty');
+
+insert into table partitioned7 partition(part=1) values(5, 'new', 100, 'hundred'),(6, 'new', 200, 'two hundred');
+
+select part,a,b,c,d from partitioned7;
+
+-- DELETE where new column
+delete from partitioned7 where a = 1 or c = 30 or c == 100;
+
+select part,a,b,c,d from partitioned7;
+
+
+DROP TABLE partitioned1;
+DROP TABLE partitioned2;
+DROP TABLE partitioned3;
+DROP TABLE partitioned4;
+DROP TABLE partitioned5;
+DROP TABLE partitioned6;
+DROP TABLE partitioned7;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/test/queries/clientpositive/schema_evol_orc_acid_mapwork_table.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/schema_evol_orc_acid_mapwork_table.q b/ql/src/test/queries/clientpositive/schema_evol_orc_acid_mapwork_table.q
new file mode 100644
index 0000000..bde5d50
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/schema_evol_orc_acid_mapwork_table.q
@@ -0,0 +1,129 @@
+set hive.cli.print.header=true;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.enforce.bucketing=true;
+SET hive.vectorized.execution.enabled=false;
+set hive.fetch.task.conversion=none;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+
+-- SORT_QUERY_RESULTS
+--
+-- FILE VARIATION: ORC, ACID Non-Vectorized, MapWork, Table
+--
+--
+-- SECTION VARIATION: ALTER TABLE ADD COLUMNS ... STATIC INSERT
+---
+CREATE TABLE table1(a INT, b STRING) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table table1 values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+-- Table-Non-Cascade ADD COLUMNS ...
+alter table table1 add columns(c int, d string);
+
+insert into table table1 values(1, 'new', 10, 'ten'),(2, 'new', 20, 'twenty'), (3, 'new', 30, 'thirty'),(4, 'new', 40, 'forty');
+
+insert into table table1 values(5, 'new', 100, 'hundred'),(6, 'new', 200, 'two hundred');
+
+-- SELECT permutation columns to make sure NULL defaulting works right
+select a,b from table1;
+select a,b,c from table1;
+select a,b,c,d from table1;
+select a,c,d from table1;
+select a,d from table1;
+select c from table1;
+select d from table1;
+
+--
+-- SECTION VARIATION: ALTER TABLE CHANGE COLUMN ... STATIC INSERT
+-- smallint = (2-byte signed integer, from -32,768 to 32,767)
+--
+CREATE TABLE table2(a smallint, b STRING) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table table2 values(1000, 'original'),(6737, 'original'), ('3', 'original'),('4', 'original');
+
+-- Table-Non-Cascade CHANGE COLUMNS ...
+alter table table2 change column a a int;
+
+insert into table table2 values(72909, 'new'),(200, 'new'), (32768, 'new'),(40000, 'new');
+
+insert into table table2 values(5000, 'new'),(90000, 'new');
+
+select a,b from table2;
+
+
+
+--
+--
+-- SECTION VARIATION: ALTER TABLE ADD COLUMNS ... UPDATE New Columns
+---
+CREATE TABLE table5(a INT, b STRING) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table table5 values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+-- Table-Non-Cascade ADD COLUMNS ...
+alter table table5 add columns(c int, d string);
+
+insert into table table5 values(1, 'new', 10, 'ten'),(2, 'new', 20, 'twenty'), (3, 'new', 30, 'thirty'),(4, 'new', 40, 'forty');
+
+insert into table table5 values(5, 'new', 100, 'hundred'),(6, 'new', 200, 'two hundred');
+
+select a,b,c,d from table5;
+
+-- UPDATE New Columns
+update table5 set c=99;
+
+select a,b,c,d from table5;
+
+
+--
+--
+-- SECTION VARIATION: ALTER TABLE ADD COLUMNS ... DELETE where old column
+---
+CREATE TABLE table6(a INT, b STRING) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table table6 values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+-- Table-Non-Cascade ADD COLUMNS ...
+alter table table6 add columns(c int, d string);
+
+insert into table table6 values(1, 'new', 10, 'ten'),(2, 'new', 20, 'twenty'), (3, 'new', 30, 'thirty'),(4, 'new', 40, 'forty');
+
+insert into table table6 values(5, 'new', 100, 'hundred'),(6, 'new', 200, 'two hundred');
+
+select a,b,c,d from table6;
+
+-- DELETE where old column
+delete from table6 where a = 2 or a = 4 or a = 6;
+
+select a,b,c,d from table6;
+
+
+--
+--
+-- SECTION VARIATION: ALTER TABLE ADD COLUMNS ... DELETE where new column
+---
+CREATE TABLE table7(a INT, b STRING) clustered by (a) into 2 buckets STORED AS ORC TBLPROPERTIES ('transactional'='true');
+
+insert into table table7 values(1, 'original'),(2, 'original'), (3, 'original'),(4, 'original');
+
+-- Table-Non-Cascade ADD COLUMNS ...
+alter table table7 add columns(c int, d string);
+
+insert into table table7 values(1, 'new', 10, 'ten'),(2, 'new', 20, 'twenty'), (3, 'new', 30, 'thirty'),(4, 'new', 40, 'forty');
+
+insert into table table7 values(5, 'new', 100, 'hundred'),(6, 'new', 200, 'two hundred');
+
+select a,b,c,d from table7;
+
+-- DELETE where new column
+delete from table7 where a = 1 or c = 30 or c == 100;
+
+select a,b,c,d from table7;
+
+
+DROP TABLE table1;
+DROP TABLE table2;
+DROP TABLE table5;
+DROP TABLE table6;
+DROP TABLE table7;
\ No newline at end of file