You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/16 01:14:43 UTC

[1/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 506168370 -> 34b0e07a3


http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index ba8d675..82cf108 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -18,11 +18,20 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.MemoryManager;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.impl.MemoryManagerImpl;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -58,7 +67,10 @@ import org.mockito.Mockito;
 import com.google.common.collect.Lists;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -283,28 +295,30 @@ public class TestOrcRawRecordMerger {
 
   @Test
   public void testOriginalReaderPair() throws Exception {
+    int BUCKET = 10;
     ReaderKey key = new ReaderKey();
+    Configuration conf = new Configuration();
+    int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
     Reader reader = createMockOriginalReader();
-    RecordIdentifier minKey = new RecordIdentifier(0, 10, 1);
-    RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3);
+    RecordIdentifier minKey = new RecordIdentifier(0, bucketProperty, 1);
+    RecordIdentifier maxKey = new RecordIdentifier(0, bucketProperty, 3);
     boolean[] includes = new boolean[]{true, true};
-    Configuration conf = new Configuration();
     FileSystem fs = FileSystem.getLocal(conf);
     Path root = new Path(tmpDir, "testOriginalReaderPair");
     fs.makeQualified(root);
     fs.create(root);
-    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, minKey, maxKey,
+    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, minKey, maxKey,
         new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
     RecordReader recordReader = pair.getRecordReader();
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(2, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
     assertEquals("third", value(pair.nextRecord()));
 
     pair.next(pair.nextRecord());
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(3, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
     assertEquals("fourth", value(pair.nextRecord()));
@@ -320,46 +334,48 @@ public class TestOrcRawRecordMerger {
 
   @Test
   public void testOriginalReaderPairNoMin() throws Exception {
+    int BUCKET = 10;
     ReaderKey key = new ReaderKey();
     Reader reader = createMockOriginalReader();
     Configuration conf = new Configuration();
+    int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
     FileSystem fs = FileSystem.getLocal(conf);
     Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
     fs.makeQualified(root);
     fs.create(root);
-    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, 10, null, null,
+    ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, null, null,
         new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
     assertEquals("first", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(0, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord());
     assertEquals("second", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(1, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord());
     assertEquals("third", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(2, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord());
     assertEquals("fourth", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(3, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord());
     assertEquals("fifth", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketProperty());
+    assertEquals(bucketProperty, key.getBucketProperty());
     assertEquals(4, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
@@ -506,6 +522,53 @@ public class TestOrcRawRecordMerger {
     return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
   }
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  /**
+   * {@link org.apache.hive.hcatalog.streaming.TestStreaming#testInterleavedTransactionBatchCommits} has more tests
+   */
+  @Test
+  public void testGetLogicalLength() throws Exception {
+    final int BUCKET = 0;
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+    OrcOutputFormat of = new OrcOutputFormat();
+    Path root = new Path(tmpDir, "testEmpty").makeQualified(fs);
+    fs.delete(root, true);
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+        (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+    /*create delta_1_1_0/bucket0 with 1 row and close the file*/
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+      .inspector(inspector).bucket(BUCKET).writingBase(false).minimumTransactionId(1)
+      .maximumTransactionId(1).finalDestination(root);
+    Path delta1_1_0 = new Path(root, AcidUtils.deltaSubdir(
+      options.getMinimumTransactionId(), options.getMaximumTransactionId(), options.getStatementId()));
+    Path bucket0 = AcidUtils.createBucketFile(delta1_1_0, BUCKET);
+    Path bucket0SideFile = OrcAcidUtils.getSideFile(bucket0);
+
+    RecordUpdater ru = of.getRecordUpdater(root, options);
+    ru.insert(options.getMaximumTransactionId(), new MyRow("first"));
+    ru.close(false);
+
+    FileStatus bucket0File = fs.getFileStatus(bucket0);
+    AcidUtils.getLogicalLength(fs, bucket0File);
+    Assert.assertTrue("no " + bucket0, fs.exists(bucket0));
+    Assert.assertFalse("unexpected " + bucket0SideFile, fs.exists(bucket0SideFile));
+    //test getLogicalLength() w/o side file
+    Assert.assertEquals("closed file size mismatch", bucket0File.getLen(),
+      AcidUtils.getLogicalLength(fs, bucket0File));
+
+    //create an empty (invalid) side file - make sure getLogicalLength() throws
+    FSDataOutputStream flushLengths = fs.create(bucket0SideFile, true, 8);
+    flushLengths.close();
+    expectedException.expect(IOException.class);
+    expectedException.expectMessage(bucket0SideFile.getName() + " found but is not readable");
+    AcidUtils.getLogicalLength(fs, bucket0File);
+  }
   @Test
   public void testEmpty() throws Exception {
     final int BUCKET = 0;
@@ -525,7 +588,16 @@ public class TestOrcRawRecordMerger {
         .inspector(inspector).bucket(BUCKET).writingBase(true)
         .maximumTransactionId(100).finalDestination(root);
     of.getRecordUpdater(root, options).close(false);
-
+    {
+      /*OrcRecordUpdater is inconsistent about when it creates empty files and when it does not.
+      This creates an empty bucket. HIVE-17138*/
+      OrcFile.WriterOptions wo = OrcFile.writerOptions(conf);
+      wo.inspector(inspector);
+      wo.callback(new OrcRecordUpdater.KeyIndexBuilder("testEmpty"));
+      Writer w = OrcFile.createWriter(AcidUtils.createBucketFile(new Path(root,
+        AcidUtils.baseDir(100)), BUCKET), wo);
+      w.close();
+    }
     ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE);
     AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
 
@@ -600,52 +672,58 @@ public class TestOrcRawRecordMerger {
 
     assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
     assertEquals(new Path(root, use130Format ?
-        AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
+        AcidUtils.deleteDeltaSubdir(200,200,0) : AcidUtils.deleteDeltaSubdir(200,200)),
         directory.getCurrentDirectories().get(0).getPath());
+    assertEquals(new Path(root, use130Format ?
+        AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)),
+      directory.getCurrentDirectories().get(1).getPath());
 
     Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
         BUCKET);
+    Path deltaPath = AcidUtils.createBucketFile(directory.getCurrentDirectories().get(1).getPath(),
+      BUCKET);
+    Path deleteDeltaDir = directory.getCurrentDirectories().get(0).getPath();
 
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(conf,true);
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
 
+    //the first "split" is for base/
     Reader baseReader = OrcFile.createReader(basePath,
         OrcFile.readerOptions(conf));
     OrcRawRecordMerger merger =
         new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
             createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
+            new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
     assertEquals(null, merger.getMinKey());
     assertEquals(null, merger.getMaxKey());
     RecordIdentifier id = merger.createKey();
     OrcStruct event = merger.createValue();
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
-    assertEquals("update 1", getValue(event));
-    assertFalse(merger.isDelete(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
     assertEquals("second", getValue(event));
-    assertFalse(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
-    assertEquals("update 2", getValue(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
-    assertEquals("update 3", getValue(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
@@ -667,14 +745,13 @@ public class TestOrcRawRecordMerger {
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
-    assertTrue(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
 
@@ -687,109 +764,205 @@ public class TestOrcRawRecordMerger {
     assertEquals(false, merger.next(id, event));
     merger.close();
 
-    // make a merger that doesn't collapse events
-    merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
-            createMaximalTxnList(), new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
+    //second "split" is delta_200_200
+    baseReader = OrcFile.createReader(deltaPath,
+      OrcFile.readerOptions(conf));
+    merger =
+      new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+        createMaximalTxnList(), new Reader.Options(),
+        new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
     assertEquals("update 1", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
+    assertEquals("update 2", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
+    assertEquals("update 3", getValue(event));
+
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+
+    //now run as if it's a minor Compaction so we don't collapse events
+    //it's not exactly like minor compaction since MC would not have a baseReader
+    //here there is only 1 "split" since we only have data for 1 bucket
+    baseReader = OrcFile.createReader(basePath,
+      OrcFile.readerOptions(conf));
+    merger =
+      new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+        createMaximalTxnList(), new Reader.Options(),
+        AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(true));
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
+    assertNull(OrcRecordUpdater.getRow(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 0), id);
     assertEquals("first", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
     assertEquals("second", getValue(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
-    assertEquals("update 2", getValue(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 0), id);
     assertEquals("third", getValue(event));
 
     assertEquals(true, merger.next(id, event));
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION,
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
-    assertEquals("update 3", getValue(event));
+    assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 0), id);
     assertEquals("fourth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id);
     assertEquals("fifth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id);
     assertEquals("sixth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id);
     assertEquals("seventh", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 0), id);
     assertEquals("eighth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
+
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 0), id);
     assertEquals("ninth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
-        OrcRecordUpdater.getOperation(event));
+      OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id);
     assertEquals("tenth", getValue(event));
 
+    //data from delta_200_200
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
+    assertEquals("update 1", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
+    assertEquals("update 2", getValue(event));
+
+    assertEquals(true, merger.next(id, event));
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
+      OrcRecordUpdater.getOperation(event));
+    assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
+    assertEquals("update 3", getValue(event));
+
     assertEquals(false, merger.next(id, event));
     merger.close();
 
+
     // try ignoring the 200 transaction and make sure it works still
     ValidTxnList txns = new ValidReadTxnList("2000:200:200");
+    //again 1st split is for base/
+    baseReader = OrcFile.createReader(basePath,
+      OrcFile.readerOptions(conf));
     merger =
-        new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
-            txns, new Reader.Options(),
-            AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options().isCompacting(false));
+      new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+        txns, new Reader.Options(),
+        new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
+
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
+
     for(int i=0; i < values.length; ++i) {
       assertEquals(true, merger.next(id, event));
       LOG.info("id = " + id + "event = " + event);
@@ -798,7 +971,19 @@ public class TestOrcRawRecordMerger {
       assertEquals(new ReaderKey(0, BUCKET_PROPERTY, i, 0), id);
       assertEquals(values[i], getValue(event));
     }
+    assertEquals(false, merger.next(id, event));
+    merger.close();
+
+    // 2nd split is for delta_200_200 which is filtered out entirely by "txns"
+    baseReader = OrcFile.createReader(deltaPath,
+      OrcFile.readerOptions(conf));
+    merger =
+      new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET,
+        txns, new Reader.Options(),
+        new Path[] {deleteDeltaDir}, new OrcRawRecordMerger.Options().isCompacting(false));
 
+    assertEquals(null, merger.getMinKey());
+    assertEquals(null, merger.getMaxKey());
     assertEquals(false, merger.next(id, event));
     merger.close();
   }
@@ -846,6 +1031,7 @@ public class TestOrcRawRecordMerger {
    * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is
    * a base and a delta.
    * @throws Exception
+   * @see #testRecordReaderNewBaseAndDelta()
    */
   @Test
   public void testRecordReaderOldBaseAndDelta() throws Exception {
@@ -891,63 +1077,90 @@ public class TestOrcRawRecordMerger {
         .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
         .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5)
         .finalDestination(root);
+
+    final int BUCKET_PROPERTY = BucketCodec.V1.encode(options);
+
     RecordUpdater ru = of.getRecordUpdater(root, options);
     values = new String[]{"0.0", null, null, "1.1", null, null, null,
         "ignore.7"};
     for(int i=0; i < values.length; ++i) {
       if (values[i] != null) {
-        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
-    ru.delete(100, new BigRow(9, 0, BUCKET));
-    ru.close(false);
+    ru.delete(1, new BigRow(9, 0, BUCKET_PROPERTY));
+    ru.close(false);//this doesn't create a key index presumably because writerOptions are not set on 'options'
 
     // write a delta
-    options = options.minimumTransactionId(2).maximumTransactionId(2);
+    options = options.minimumTransactionId(100).maximumTransactionId(100);
     ru = of.getRecordUpdater(root, options);
     values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
-    for(int i=0; i < values.length; ++i) {
+    for(int i=0; i < values.length - 1; ++i) {
       if (values[i] != null) {
-        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+        ru.update(100, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
-    ru.delete(100, new BigRow(8, 0, BUCKET));
+    //do this before next update so that delte_delta is properly sorted
+    ru.delete(100, new BigRow(8, 0, BUCKET_PROPERTY));
+    //because row 8 was updated and thus has a different RecordIdentifier now
+    ru.update(100, new BigRow(7, 7, values[values.length - 1], 7, 7, 2, 1, BUCKET_PROPERTY));
+
     ru.close(false);
 
+    MyResult[] expected = new MyResult[10];
+    int k = 0;
+    expected[k++] = new MyResult(0, "0.0");
+    expected[k++] = new MyResult(1, "0.1");
+    expected[k++] = new MyResult(2, "1.0");
+    expected[k++] = new MyResult(3, "1.1");
+    expected[k++] = new MyResult(4, "2.0");
+    expected[k++] = new MyResult(5, "2.1");
+    expected[k++] = new MyResult(6, "3.0");
+    expected[k] = new MyResult(7, "3.1");
+
     InputFormat inf = new OrcInputFormat();
     JobConf job = new JobConf();
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     job.set("mapred.min.split.size", "1");
     job.set("mapred.max.split.size", "2");
     job.set("mapred.input.dir", root.toString());
     InputSplit[] splits = inf.getSplits(job, 5);
-    assertEquals(5, splits.length);
+    assertEquals(7, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
 
-    // loop through the 5 splits and read each
-    for(int i=0; i < 4; ++i) {
-      System.out.println("starting split " + i);
-      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+    for(InputSplit split : splits) {
+      rr = inf.getRecordReader(split, job, Reporter.NULL);
       NullWritable key = rr.createKey();
       OrcStruct value = rr.createValue();
-
-      // there should be exactly two rows per a split
-      for(int j=0; j < 2; ++j) {
-        System.out.println("i = " + i + ", j = " + j);
-        assertEquals(true, rr.next(key, value));
-        System.out.println("record = " + value);
-        assertEquals(i + "." + j, value.getFieldValue(2).toString());
+      while(rr.next(key, value)) {
+        MyResult mr = new MyResult(Integer.parseInt(value.getFieldValue(0).toString()), value.getFieldValue(2).toString());
+        int i = 0;
+        for(; i < expected.length; i++) {
+          if(mr.equals(expected[i])) {
+            expected[i] = null;
+            break;
+          }
+        }
+        if(i >= expected.length) {
+          //not found
+          assertTrue("Found unexpected row: " + mr, false );
+        }
       }
-      assertEquals(false, rr.next(key, value));
     }
-    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
-    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+    for(MyResult mr : expected) {
+      assertTrue("Expected " + mr + " not found in any InputSplit", mr == null);
+    }
   }
 
   /**
    * Test the RecordReader when there is a new base and a delta.
+   * This test creates multiple stripes in both base and delta files which affects how many splits
+   * are created on read.  With ORC-228 this could be done in E2E fashion with a query or
+   * streaming ingest writing data.
+   * @see #testRecordReaderOldBaseAndDelta()
    * @throws Exception
    */
   @Test
@@ -1009,20 +1222,33 @@ public class TestOrcRawRecordMerger {
         ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
-    ru.delete(100, new BigRow(9, 0, BUCKET_PROPERTY));
+    ru.delete(1, new BigRow(9, 0, BUCKET_PROPERTY));
     ru.close(false);
 
     // write a delta
-    options.minimumTransactionId(2).maximumTransactionId(2);
+    options.minimumTransactionId(100).maximumTransactionId(100);
     ru = of.getRecordUpdater(root, options);
     values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
-    for(int i=0; i < values.length; ++i) {
+    for(int i=0; i < values.length - 1; ++i) {
       if (values[i] != null) {
-        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
+        ru.update(100, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
+    //do this before next update so that delte_delta is properly sorted
     ru.delete(100, new BigRow(8, 0, BUCKET_PROPERTY));
+    //because row 8 was updated and thus has a different RecordIdentifier now
+    ru.update(100, new BigRow(7, 7, values[values.length - 1], 7, 7, 2, 1, BUCKET_PROPERTY));
     ru.close(false);
+    MyResult[] expected = new MyResult[10];
+    int k = 0;
+    expected[k++] = new MyResult(0, "0.0");
+    expected[k++] = new MyResult(1, "0.1");
+    expected[k++] = new MyResult(2, "1.0");
+    expected[k++] = new MyResult(3, "1.1");
+    expected[k++] = new MyResult(4, "2.0");
+    expected[k++] = new MyResult(5, "2.1");
+    expected[k++] = new MyResult(6, "3.0");
+    expected[k] = new MyResult(7, "3.1");
 
     InputFormat inf = new OrcInputFormat();
     JobConf job = new JobConf();
@@ -1031,31 +1257,56 @@ public class TestOrcRawRecordMerger {
     job.set("mapred.input.dir", root.toString());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     InputSplit[] splits = inf.getSplits(job, 5);
-    assertEquals(5, splits.length);
+    //base has 10 rows, so 5 splits, 1 delta has 2 rows so 1 split, and 1 delta has 3 so 2 splits
+    assertEquals(8, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
 
-    // loop through the 5 splits and read each
-    for(int i=0; i < 4; ++i) {
-      System.out.println("starting split " + i + " = " + splits[i]);
-      rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
+    for(InputSplit split : splits) {
+      rr = inf.getRecordReader(split, job, Reporter.NULL);
       NullWritable key = rr.createKey();
       OrcStruct value = rr.createValue();
-
-      // there should be exactly two rows per a split
-      for(int j=0; j < 2; ++j) {
-        System.out.println("i = " + i + ", j = " + j);
-        assertEquals(true, rr.next(key, value));
-        System.out.println("record = " + value);
-        assertEquals(i + "." + j, value.getFieldValue(2).toString());
+      while(rr.next(key, value)) {
+        MyResult mr = new MyResult(Integer.parseInt(value.getFieldValue(0).toString()), value.getFieldValue(2).toString());
+        int i = 0;
+        for(; i < expected.length; i++) {
+          if(mr.equals(expected[i])) {
+            expected[i] = null;
+            break;
+          }
+        }
+        if(i >= expected.length) {
+          //not found
+          assertTrue("Found unexpected row: " + mr, false );
+        }
       }
-      assertEquals(false, rr.next(key, value));
     }
-    rr = inf.getRecordReader(splits[4], job, Reporter.NULL);
-    assertEquals(false, rr.next(rr.createKey(), rr.createValue()));
+    for(MyResult mr : expected) {
+      assertTrue("Expected " + mr + " not found in any InputSplit", mr == null);
+    }
+  }
+  private static final class MyResult {
+    private final int myInt;
+    private final String myText;
+    MyResult(int myInt, String myText) {
+      this.myInt = myInt;
+      this.myText = myText;
+    }
+    @Override
+    public boolean equals(Object t) {
+      if(!(t instanceof MyResult)) {
+        return false;
+      }
+      MyResult that = (MyResult)t;
+      return myInt == that.myInt && myText.equals(that.myText);
+    }
+    @Override
+    public String toString() {
+      return "(" + myInt + "," + myText +")";
+    }
   }
-
   /**
    * Test the RecordReader when there is a new base and a delta.
    * @throws Exception
@@ -1081,18 +1332,17 @@ public class TestOrcRawRecordMerger {
             .writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
           .finalDestination(root);
     RecordUpdater ru = of.getRecordUpdater(root, options);
-    String[] values = new String[]{"a", "b", "c", "d", "e"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(1, new MyRow(values[i]));
+    String[][] values = {new String[]{"a", "b", "c", "d", "e"}, new String[]{"f", "g", "h", "i", "j"}};
+    for(int i=0; i < values[0].length; ++i) {
+      ru.insert(1, new MyRow(values[0][i]));
     }
     ru.close(false);
 
     // write a delta
     options.minimumTransactionId(2).maximumTransactionId(2);
     ru = of.getRecordUpdater(root, options);
-    values = new String[]{"f", "g", "h", "i", "j"};
-    for(int i=0; i < values.length; ++i) {
-      ru.insert(2, new MyRow(values[i]));
+    for(int i=0; i < values[1].length; ++i) {
+      ru.insert(2, new MyRow(values[1][i]));
     }
     ru.close(false);
 
@@ -1104,19 +1354,23 @@ public class TestOrcRawRecordMerger {
     job.set("bucket_count", "1");
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     InputSplit[] splits = inf.getSplits(job, 5);
-    assertEquals(1, splits.length);
+    assertEquals(2, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
-    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
-    values = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"};
-    OrcStruct row = rr.createValue();
-    for(int i = 0; i < values.length; ++i) {
-      System.out.println("Checking " + i);
-      assertEquals(true, rr.next(NullWritable.get(), row));
-      assertEquals(values[i], row.getFieldValue(0).toString());
+    for(int j = 0; j < splits.length; j++) {
+      InputSplit split = splits[j];
+      rr = inf.getRecordReader(split, job, Reporter.NULL);
+      OrcStruct row = rr.createValue();
+      for (int i = 0; i < values[j].length; ++i) {
+        System.out.println("Checking " + i);
+        String msg = "split[" + j + "] at i=" + i;
+        assertEquals(msg, true, rr.next(NullWritable.get(), row));
+        assertEquals(msg, values[j][i], row.getFieldValue(0).toString());
+      }
+      assertEquals(false, rr.next(NullWritable.get(), row));
     }
-    assertEquals(false, rr.next(NullWritable.get(), row));
   }
 
   /**
@@ -1174,11 +1428,13 @@ public class TestOrcRawRecordMerger {
     job.set("bucket_count", "2");
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
-    HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
 
     // read the keys before the delta is flushed
     InputSplit[] splits = inf.getSplits(job, 1);
-    assertEquals(2, splits.length);
+    //1 split since we only have 1 bucket file in base/.  delta is not flushed (committed) yet, i.e. empty
+    assertEquals(1, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
         inf.getRecordReader(splits[0], job, Reporter.NULL);
     NullWritable key = rr.createKey();
@@ -1201,17 +1457,24 @@ public class TestOrcRawRecordMerger {
 
     splits = inf.getSplits(job, 1);
     assertEquals(2, splits.length);
-    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
     Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) :
       AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
     assertEquals(true, fs.exists(sideFile));
-    assertEquals(24, fs.getFileStatus(sideFile).getLen());
+    assertEquals(32, fs.getFileStatus(sideFile).getLen());
 
-    for(int i=1; i < 11; ++i) {
+    rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
+    for(int i=1; i <= 5; ++i) {
       assertEquals(true, rr.next(key, value));
       assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
     }
     assertEquals(false, rr.next(key, value));
+
+    rr = inf.getRecordReader(splits[1], job, Reporter.NULL);
+    for(int i=6; i < 11; ++i) {
+      assertEquals("i="+ i, true, rr.next(key, value));
+      assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
+    }
+    assertEquals(false, rr.next(key, value));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
index be15517..e5c6458 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
@@ -125,6 +125,7 @@ public class TestOrcRecordUpdater {
     // read the stopping point for the first flush and make sure we only see
     // 3 rows
     long len = side.readLong();
+    len = side.readLong();
     Reader reader = OrcFile.createReader(bucketPath,
         new OrcFile.ReaderOptions(conf).filesystem(fs).maxLength(len));
     assertEquals(3, reader.getNumberOfRows());
@@ -266,28 +267,51 @@ public class TestOrcRecordUpdater {
 
     Reader reader = OrcFile.createReader(bucketPath,
         new OrcFile.ReaderOptions(conf).filesystem(fs));
-    assertEquals(2, reader.getNumberOfRows());
+    assertEquals(1, reader.getNumberOfRows());
 
     RecordReader rows = reader.rows();
 
     // check the contents of the file
     assertEquals(true, rows.hasNext());
     OrcStruct row = (OrcStruct) rows.next(null);
-    assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
+    assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(row));
     assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
-    assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row));
-    assertEquals(20, OrcRecordUpdater.getBucket(row));
-    assertEquals(30, OrcRecordUpdater.getRowId(row));
+    assertEquals(100, OrcRecordUpdater.getOriginalTransaction(row));
+    int bucketProperty = OrcRecordUpdater.getBucket(row);
+    assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
+    assertEquals(0, OrcRecordUpdater.getRowId(row));
     assertEquals("update",
         OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
+    rows.close();
+
+    options.writingDeleteDelta(true);
+    bucketPath = AcidUtils.createFilename(root, options);
+    reader = OrcFile.createReader(bucketPath,
+      new OrcFile.ReaderOptions(conf).filesystem(fs));
+    assertEquals(2, reader.getNumberOfRows());
+
+    rows = reader.rows();
     assertEquals(true, rows.hasNext());
     row = (OrcStruct) rows.next(null);
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(row));
+    assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
+    assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row));
+    bucketProperty = OrcRecordUpdater.getBucket(row);
+    assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
+    assertEquals(30, OrcRecordUpdater.getRowId(row));
+    assertNull(OrcRecordUpdater.getRow(row));
+
+    assertEquals(true, rows.hasNext());
+    row = (OrcStruct) rows.next(null);
+    assertEquals(OrcRecordUpdater.DELETE_OPERATION, OrcRecordUpdater.getOperation(row));
     assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row));
     assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row));
-    assertEquals(20, OrcRecordUpdater.getBucket(row));
+    bucketProperty = OrcRecordUpdater.getBucket(row);
+    assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
     assertEquals(60, OrcRecordUpdater.getRowId(row));
     assertNull(OrcRecordUpdater.getRow(row));
+
     assertEquals(false, rows.hasNext());
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index 439ec9b..43e0a4a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -246,10 +246,6 @@ public class TestVectorizedOrcAcidRowBatchReader {
   @Test
   public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception {
     OrcSplit mockSplit = Mockito.mock(OrcSplit.class);
-    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
-        AcidUtils.AcidOperationalProperties.getLegacy().toInt());
-    // Test false when trying to create a vectorized ACID row batch reader for a legacy table.
-    assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit));
 
     conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
         AcidUtils.AcidOperationalProperties.getDefault().toInt());

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index bbed591..7455fa8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -409,6 +409,11 @@ public abstract class CompactorTest {
       return null;
     }
 
+    /**
+     * This is bogus especially with split update acid tables.  This causes compaction to create
+     * delete_delta_x_y where none existed before.  Makes the data layout such as would never be
+     * created by 'real' code path.
+     */
     @Override
     public boolean isDelete(Text value) {
       // Alternate between returning deleted and not.  This is easier than actually
@@ -552,4 +557,7 @@ public abstract class CompactorTest {
   String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
     return AcidUtils.deltaSubdir(minTxnId, maxTxnId);
   }
+  String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
+    return AcidUtils.deleteDeltaSubdir(minTxnId, maxTxnId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index efd6ed8..8d01543 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -289,7 +289,7 @@ public class TestWorker extends CompactorTest {
     CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR);
     txnHandler.compact(rqst);
 
-    startWorker();
+    startWorker();//adds delta and delete_delta
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -299,7 +299,7 @@ public class TestWorker extends CompactorTest {
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    Assert.assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
@@ -310,9 +310,19 @@ public class TestWorker extends CompactorTest {
         Assert.assertEquals(2, buckets.length);
         Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(208L, buckets[0].getLen());
-        Assert.assertEquals(208L, buckets[1].getLen());
-      } else {
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
+      }
+      if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
+      }
+      else {
         LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
       }
     }
@@ -348,14 +358,15 @@ public class TestWorker extends CompactorTest {
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    Assert.assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
     Assert.assertEquals("base_20", stat[0].getPath().getName());
-    Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
+    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22), stat[1].getPath().getName());
+    Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[2].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
   }
 
   @Test
@@ -380,18 +391,19 @@ public class TestWorker extends CompactorTest {
     Assert.assertEquals(1, compacts.size());
     Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
 
-    // There should still now be 5 directories in the location
+    // There should still now be 6 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(5, stat.length);
+    Assert.assertEquals(6, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
     Assert.assertEquals("base_20", stat[0].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
-    Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName());
+    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27), stat[1].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName());
+    Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[3].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
   }
 
   @Test
@@ -409,7 +421,7 @@ public class TestWorker extends CompactorTest {
     rqst.setPartitionname("ds=today");
     txnHandler.compact(rqst);
 
-    startWorker();
+    startWorker();//this will create delta_20_24 and delete_delta_20_24. See MockRawReader
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -419,7 +431,7 @@ public class TestWorker extends CompactorTest {
     // There should still be four directories in the location.
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
-    Assert.assertEquals(4, stat.length);
+    Assert.assertEquals(5, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
@@ -430,9 +442,18 @@ public class TestWorker extends CompactorTest {
         Assert.assertEquals(2, buckets.length);
         Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(208L, buckets[0].getLen());
-        Assert.assertEquals(208L, buckets[1].getLen());
-      } else {
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
+      }
+      if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) {
+          sawNewDelta = true;
+          FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+          Assert.assertEquals(2, buckets.length);
+          Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+          Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+          Assert.assertEquals(104L, buckets[0].getLen());
+          Assert.assertEquals(104L, buckets[1].getLen());
+        } else {
         LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
       }
     }
@@ -462,7 +483,7 @@ public class TestWorker extends CompactorTest {
     // There should still now be 5 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(3, stat.length);
+    Assert.assertEquals(4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     boolean sawNewDelta = false;
@@ -473,8 +494,17 @@ public class TestWorker extends CompactorTest {
         Assert.assertEquals(2, buckets.length);
         Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(208L, buckets[0].getLen());
-        Assert.assertEquals(208L, buckets[1].getLen());
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
+      }
+      if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4))) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(stat[i].getPath());
+        Assert.assertEquals(2, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName());
       }
@@ -534,6 +564,17 @@ public class TestWorker extends CompactorTest {
   public void majorNoBaseLotsOfDeltas() throws Exception {
     compactNoBaseLotsOfDeltas(CompactionType.MAJOR);
   }
+
+  /**
+   * These tests are starting to be a hack.  The files writtern by addDeltaFile() are not proper
+   * Acid files and the {@link CompactorTest.MockRawReader} performs no merging of delta files and
+   * fakes isDelete() as a shortcut.  This makes files created on disk to not be representative of
+   * what they should look like in a real system.
+   * Making {@link org.apache.hadoop.hive.ql.txn.compactor.CompactorTest.MockRawReader} do proper
+   * delete event handling would be duplicating either OrcRawRecordMerger or VectorizedOrcAcidRowBatchReaer.
+   * @param type
+   * @throws Exception
+   */
   private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception {
     conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 2);
     Table t = newTable("default", "mapwb", true);
@@ -570,45 +611,55 @@ public class TestWorker extends CompactorTest {
 
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation()));
-    Assert.assertEquals(9, stat.length);
+    /* delete_delta_21_23 and delete_delta_25_33 which are created as a result of compacting*/
+    int numFilesExpected = 11 + (type == CompactionType.MINOR ? 1 : 0);
+    Assert.assertEquals(numFilesExpected, stat.length);
 
     // Find the new delta file and make sure it has the right contents
-    BitSet matchesFound = new BitSet(9);
-    for (int i = 0; i < stat.length; i++) {
+    BitSet matchesFound = new BitSet(numFilesExpected);
+    for (int i = 0, j = 0; i < stat.length; i++) {
+      if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21,23))) {
+        matchesFound.set(j++);
+      }
+      if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(25,33))) {
+        matchesFound.set(j++);
+      }
       if(stat[i].getPath().getName().equals(makeDeltaDirName(21,21))) {
-        matchesFound.set(0);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirName(23, 23))) {
-        matchesFound.set(1);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25, 29))) {
-        matchesFound.set(2);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 32))) {
-        matchesFound.set(3);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 33))) {
-        matchesFound.set(4);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirName(35, 35))) {
-        matchesFound.set(5);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,23))) {
-        matchesFound.set(6);
+        matchesFound.set(j++);
       }
       else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25,33))) {
-        matchesFound.set(7);
+        matchesFound.set(j++);
       }
       switch (type) {
-        //yes, both do set(8)
         case MINOR:
           if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,35))) {
-            matchesFound.set(8);
+            matchesFound.set(j++);
+          }
+          if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 35))) {
+            matchesFound.set(j++);
           }
           break;
         case MAJOR:
           if(stat[i].getPath().getName().equals(AcidUtils.baseDir(35))) {
-            matchesFound.set(8);
+            matchesFound.set(j++);
           }
           break;
         default:

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
index 18408e4..80ee0ba 100644
--- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
+++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_3.q
@@ -108,7 +108,7 @@ when matched then update set
   end_date = date '2017-03-15' 
 when not matched then insert values
   (sub.source_pk, upper(substr(sub.name, 0, 3)), sub.name, sub.state, true, null);
-select * from customer order by source_pk;
+select * from customer order by source_pk, is_current;
 
 drop table customer;
 drop table customer_updates;

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
index 78c9070..7f837cc 100644
--- a/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
+++ b/ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_3.q.out
@@ -981,11 +981,11 @@ POSTHOOK: Lineage: customer.sk EXPRESSION [(new_customer_stage)stage.FieldSchema
 POSTHOOK: Lineage: customer.source_pk SIMPLE [(new_customer_stage)stage.FieldSchema(name:source_pk, type:int, comment:null), ]
 POSTHOOK: Lineage: customer.state SIMPLE [(new_customer_stage)stage.FieldSchema(name:state, type:string, comment:null), ]
 POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(customer)customer.FieldSchema(name:ROW__ID, type:struct<transactionId:bigint,bucketId:int,rowId:bigint>, comment:), ]
-PREHOOK: query: select * from customer order by source_pk
+PREHOOK: query: select * from customer order by source_pk, is_current
 PREHOOK: type: QUERY
 PREHOOK: Input: type2_scd_helper@customer
 #### A masked pattern was here ####
-POSTHOOK: query: select * from customer order by source_pk
+POSTHOOK: query: select * from customer order by source_pk, is_current
 POSTHOOK: type: QUERY
 POSTHOOK: Input: type2_scd_helper@customer
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/results/clientpositive/row__id.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/row__id.q.out b/ql/src/test/results/clientpositive/row__id.q.out
index 059ace9..9aab097 100644
--- a/ql/src/test/results/clientpositive/row__id.q.out
+++ b/ql/src/test/results/clientpositive/row__id.q.out
@@ -56,23 +56,23 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: hello_acid
-            Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+            Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
             Select Operator
               expressions: ROW__ID.transactionid (type: bigint)
               outputColumnNames: _col0
-              Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+              Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
               Reduce Output Operator
                 key expressions: _col0 (type: bigint)
                 sort order: +
-                Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+                Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
       Reduce Operator Tree:
         Select Operator
           expressions: KEY.reducesinkkey0 (type: bigint)
           outputColumnNames: _col0
-          Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+          Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
           File Output Operator
             compressed: false
-            Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
             table:
                 input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                 output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -117,17 +117,17 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: hello_acid
-            Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
+            Statistics: Num rows: 1 Data size: 1860 Basic stats: PARTIAL Column stats: NONE
             Filter Operator
               predicate: (ROW__ID.transactionid = 3) (type: boolean)
-              Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
               Select Operator
                 expressions: ROW__ID.transactionid (type: bigint)
                 outputColumnNames: _col0
-                Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
                 File Output Operator
                   compressed: false
-                  Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 1 Data size: 1860 Basic stats: COMPLETE Column stats: NONE
                   table:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat


[3/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)

Posted by ek...@apache.org.
HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/34b0e07a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/34b0e07a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/34b0e07a

Branch: refs/heads/master
Commit: 34b0e07a3bd5002b197b749e3e5a7992e196c237
Parents: 5061683
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Aug 15 18:13:44 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Aug 15 18:13:44 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   9 +-
 .../hive/hcatalog/streaming/TestStreaming.java  | 147 ++++-
 .../streaming/mutate/StreamingAssert.java       |  45 +-
 .../streaming/mutate/TestMutations.java         |  17 +-
 .../TransactionalValidationListener.java        |   2 -
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   2 -
 .../hadoop/hive/ql/io/AcidInputFormat.java      |   6 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  90 ++-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  41 +-
 .../hadoop/hive/ql/io/orc/OrcOutputFormat.java  |   2 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |  41 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |  60 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  20 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 186 ++++---
 .../ql/TestTxnCommands2WithSplitUpdate.java     | 545 -------------------
 ...ommands2WithSplitUpdateAndVectorization.java |   4 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  31 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   | 191 ++++---
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  | 509 ++++++++++++-----
 .../hive/ql/io/orc/TestOrcRecordUpdater.java    |  36 +-
 .../TestVectorizedOrcAcidRowBatchReader.java    |   4 -
 .../hive/ql/txn/compactor/CompactorTest.java    |   8 +
 .../hive/ql/txn/compactor/TestWorker.java       | 125 +++--
 .../dynamic_semijoin_reduction_3.q              |   2 +-
 .../llap/dynamic_semijoin_reduction_3.q.out     |   4 +-
 .../test/results/clientpositive/row__id.q.out   |  18 +-
 26 files changed, 1086 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b154544..3c158a6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1841,12 +1841,9 @@ public class HiveConf extends Configuration {
         " of the lock manager is dumped to log file.  This is for debugging.  See also " +
         "hive.lock.numretries and hive.lock.sleep.between.retries."),
 
-    HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
-        "Sets the operational properties that control the appropriate behavior for various\n"
-        + "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode\n"
-        + "for ACID, while setting it to one will enable a split-update feature found in the newer\n"
-        + "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
-        + "for future versions of ACID. (See HIVE-14035 for details.)"),
+    HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1,
+        "This is intended to be used as an internal property for future versions of ACID. (See\n" +
+        "HIVE-14035 for details.)"),
 
     HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
         "current open transactions reach this limit, future open transaction requests will be \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 5e8fe62..f3ef92b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -27,6 +27,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -44,7 +45,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.conf.Validator;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
 import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
@@ -97,6 +99,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
+
 
 public class TestStreaming {
   private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
@@ -448,7 +452,10 @@ public class TestStreaming {
     }
   }
 
-
+  /**
+   * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} - there is
+   * little value in using InputFormat directly
+   */
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {
     ValidTxnList txns = msClient.getValidTxns();
@@ -473,13 +480,14 @@ public class TestStreaming {
     InputFormat inf = new OrcInputFormat();
     JobConf job = new JobConf();
     job.set("mapred.input.dir", partitionPath.toString());
-    job.set("bucket_count", Integer.toString(buckets));
+    job.set(BUCKET_COUNT, Integer.toString(buckets));
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
-    job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true");
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     InputSplit[] splits = inf.getSplits(job, buckets);
-    Assert.assertEquals(buckets, splits.length);
+    Assert.assertEquals(numExpectedFiles, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
             inf.getRecordReader(splits[0], job, Reporter.NULL);
 
@@ -491,6 +499,48 @@ public class TestStreaming {
     }
     Assert.assertEquals(false, rr.next(key, value));
   }
+  /**
+   * @param validationQuery query to read from table to compare data against {@code records}
+   * @param records expected data.  each row is CVS list of values
+   */
+  private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
+                                String validationQuery, boolean vectorize, String... records) throws Exception {
+    ValidTxnList txns = msClient.getValidTxns();
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    System.out.println("Files found: ");
+    for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
+    Assert.assertEquals(numExpectedFiles, current.size());
+
+    // find the absolute minimum transaction
+    long min = Long.MAX_VALUE;
+    long max = Long.MIN_VALUE;
+    for (AcidUtils.ParsedDelta pd : current) {
+      if (pd.getMaxTransaction() > max) max = pd.getMaxTransaction();
+      if (pd.getMinTransaction() < min) min = pd.getMinTransaction();
+    }
+    Assert.assertEquals(minTxn, min);
+    Assert.assertEquals(maxTxn, max);
+    boolean isVectorizationEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
+    if(vectorize) {
+      conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    }
+
+    String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
+    for(String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) {
+      //run it with each split strategy - make sure there are differences
+      conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase());
+      List<String> actualResult = queryTable(driver, validationQuery);
+      for (int i = 0; i < actualResult.size(); i++) {
+        Assert.assertEquals("diff at [" + i + "].  actual=" + actualResult + " expected=" +
+          Arrays.toString(records), records[i], actualResult.get(i));
+      }
+    }
+    conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
+  }
 
   private void checkNothingWritten(Path partitionPath) throws Exception {
     ValidTxnList txns = msClient.getValidTxns();
@@ -1016,15 +1066,15 @@ public class TestStreaming {
     txnBatch.beginNextTransaction();
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
-
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+    checkDataWritten2(partLoc, 15, 24, 1, validationQuery, false, "1\tHello streaming");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
-            "{2, Welcome to streaming}");
+    checkDataWritten2(partLoc, 15, 24,  1, validationQuery, true, "1\tHello streaming",
+            "2\tWelcome to streaming");
 
     txnBatch.close();
 
@@ -1034,16 +1084,16 @@ public class TestStreaming {
     txnBatch.write("3,Hello streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
-            "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten2(partLoc, 15, 40,  2, validationQuery, false, "1\tHello streaming",
+            "2\tWelcome to streaming", "3\tHello streaming - once again");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("4,Welcome to streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
-            "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
-            "{4, Welcome to streaming - once again}");
+    checkDataWritten2(partLoc, 15, 40,  2, validationQuery, true, "1\tHello streaming",
+            "2\tWelcome to streaming", "3\tHello streaming - once again",
+            "4\tWelcome to streaming - once again");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -1053,7 +1103,6 @@ public class TestStreaming {
     connection.close();
   }
 
-
   @Test
   public void testInterleavedTransactionBatchCommits() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
@@ -1078,32 +1127,69 @@ public class TestStreaming {
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+    checkDataWritten2(partLoc, 24, 33, 1,
+      validationQuery, true, "3\tHello streaming - once again");
 
     txnBatch1.commit();
-
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    /*now both batches have committed (but not closed) so we for each primary file we expect a side
+    file to exist and indicate the true length of primary file*/
+    FileSystem fs = partLoc.getFileSystem(conf);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+        Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+        Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+        long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+        Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+          lengthFileSize, lengthFileSize > 0);
+        long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+        long actualLength = stat.getLen();
+        Assert.assertTrue("", logicalLength == actualLength);
+      }
+    }
+    checkDataWritten2(partLoc, 14, 33, 2,
+      validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again");
 
     txnBatch1.beginNextTransaction();
     txnBatch1.write("2,Welcome to streaming".getBytes());
 
     txnBatch2.beginNextTransaction();
     txnBatch2.write("4,Welcome to streaming - once again".getBytes());
-
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    //here each batch has written data and committed (to bucket0 since table only has 1 bucket)
+    //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  Furthermore, each bucket0
+    //has now received more data(logically - it's buffered) but it is not yet committed.
+    //lets check that side files exist, etc
+    dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+        Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+        Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+        long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+        Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+          lengthFileSize, lengthFileSize > 0);
+        long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+        long actualLength = stat.getLen();
+        Assert.assertTrue("", logicalLength <= actualLength);
+      }
+    }
+    checkDataWritten2(partLoc, 14, 33, 2,
+      validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again");
 
     txnBatch1.commit();
 
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
-        "{2, Welcome to streaming}",
-        "{3, Hello streaming - once again}");
+    checkDataWritten2(partLoc, 14, 33, 2,
+      validationQuery, false, "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again");
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
-        "{2, Welcome to streaming}",
-        "{3, Hello streaming - once again}",
-        "{4, Welcome to streaming - once again}");
+    checkDataWritten2(partLoc, 14, 33, 2,
+      validationQuery, true, "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again",
+        "4\tWelcome to streaming - once again");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch1.getCurrentTransactionState());
@@ -2035,11 +2121,12 @@ public class TestStreaming {
 
   public static ArrayList<String> queryTable(Driver driver, String query)
           throws CommandNeedRetryException, IOException {
-    driver.run(query);
+    CommandProcessorResponse cpr = driver.run(query);
+    if(cpr.getResponseCode() != 0) {
+      throw new RuntimeException(query + " failed: " + cpr);
+    }
     ArrayList<String> res = new ArrayList<String>();
     driver.getResults(res);
-    if(res.isEmpty())
-      System.err.println(driver.getErrorMsg());
     return res;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index de41d34..d5429fb 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -27,12 +27,12 @@ import java.util.List;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -123,35 +123,50 @@ public class StreamingAssert {
   }
 
   List<Record> readRecords() throws Exception {
+    return readRecords(1);
+  }
+
+  /**
+   * TODO: this would be more flexible doing a SQL select statement rather than using InputFormat directly
+   * see {@link org.apache.hive.hcatalog.streaming.TestStreaming#checkDataWritten2(Path, long, long, int, String, String...)}
+   * @param numSplitsExpected
+   * @return
+   * @throws Exception
+   */
+  List<Record> readRecords(int numSplitsExpected) throws Exception {
     if (currentDeltas.isEmpty()) {
       throw new AssertionError("No data");
     }
     InputFormat<NullWritable, OrcStruct> inputFormat = new OrcInputFormat();
     JobConf job = new JobConf();
     job.set("mapred.input.dir", partitionLocation.toString());
-    job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets()));
+    job.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(table.getSd().getNumBuckets()));
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
-    job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true");
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     InputSplit[] splits = inputFormat.getSplits(job, 1);
-    assertEquals(1, splits.length);
-
-    final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
-        .getRecordReader(splits[0], job, Reporter.NULL);
+    assertEquals(numSplitsExpected, splits.length);
 
-    NullWritable key = recordReader.createKey();
-    OrcStruct value = recordReader.createValue();
 
     List<Record> records = new ArrayList<>();
-    while (recordReader.next(key, value)) {
-      RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
-      Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
+    for(InputSplit is : splits) {
+      final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
+        .getRecordReader(is, job, Reporter.NULL);
+
+      NullWritable key = recordReader.createKey();
+      OrcStruct value = recordReader.createValue();
+
+      while (recordReader.next(key, value)) {
+        RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
+        Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
           recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString());
-      System.out.println(record);
-      records.add(record);
+        System.out.println(record);
+        records.add(record);
+      }
+      recordReader.close();
     }
-    recordReader.close();
     return records;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
index ab9f313..5bfa04d 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
@@ -529,22 +529,23 @@ public class TestMutations {
     StreamingAssert indiaAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
     indiaAssertions.assertMinTransactionId(1L);
     indiaAssertions.assertMaxTransactionId(2L);
-    List<Record> indiaRecords = indiaAssertions.readRecords();
+    List<Record> indiaRecords = indiaAssertions.readRecords(2);
     assertThat(indiaRecords.size(), is(3));
     assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}"));
     assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
       encodeBucket(0), 0L)));
     assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}"));
-    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L,
-      encodeBucket(0), 1L)));
+    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(2L,
+      encodeBucket(0), 0L)));//with split update, new version of the row is a new insert
     assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}"));
     assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L,
-      encodeBucket(0), 0L)));
+      encodeBucket(0), 1L)));
 
     StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
     ukAssertions.assertMinTransactionId(1L);
     ukAssertions.assertMaxTransactionId(2L);
-    List<Record> ukRecords = ukAssertions.readRecords();
+    //1 split since mutateTransaction txn just does deletes
+    List<Record> ukRecords = ukAssertions.readRecords(1);
     assertThat(ukRecords.size(), is(1));
     assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}"));
     assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
@@ -553,11 +554,11 @@ public class TestMutations {
     StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
     franceAssertions.assertMinTransactionId(1L);
     franceAssertions.assertMaxTransactionId(2L);
-    List<Record> franceRecords = franceAssertions.readRecords();
+    List<Record> franceRecords = franceAssertions.readRecords(2);
     assertThat(franceRecords.size(), is(1));
     assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}"));
-    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
-      encodeBucket(0), 1L)));
+    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(2L,
+      encodeBucket(0), 0L)));//with split update, new version of the row is a new insert
 
     client.close();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 0f08f43..023d703 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -39,7 +39,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
 
   // These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils.
   public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default";
-  public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy";
 
   TransactionalValidationListener(Configuration conf) {
     super(conf);
@@ -276,7 +275,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
     boolean isValid = false;
     switch (transactionalProperties) {
       case DEFAULT_TRANSACTIONAL_PROPERTY:
-      case LEGACY_TRANSACTIONAL_PROPERTY:
         isValid = true;
         break;
       default:

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 8999f6f..25ad1e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -148,8 +148,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     RecordWriter[] outWriters;
     RecordUpdater[] updaters;
     Stat stat;
-    int acidLastBucket = -1;
-    int acidFileOffset = -1;
 
     public FSPaths(Path specPath) {
       tmpPath = Utilities.toTempPath(specPath);

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index 25177ef..9864c76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -112,6 +112,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     private long minTxnId;
     private long maxTxnId;
     private List<Integer> stmtIds;
+    //would be useful to have enum for Type: insert/delete/load data
 
     public DeltaMetaData() {
       this(0,0,new ArrayList<Integer>());
@@ -155,6 +156,11 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
         stmtIds.add(in.readInt());
       }
     }
+    @Override
+    public String toString() {
+      //? is Type - when implemented
+      return "Delta(?," + minTxnId + "," + maxTxnId + "," + stmtIds + ")";
+    }
   }
   /**
    * Options for controlling the record readers.

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 1e33424..feacdd8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -39,11 +39,13 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.common.util.Ref;
+import org.apache.orc.impl.OrcAcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +69,18 @@ public class AcidUtils {
   };
   public static final String DELTA_PREFIX = "delta_";
   public static final String DELETE_DELTA_PREFIX = "delete_delta_";
+  /**
+   * Acid Streaming Ingest writes multiple transactions to the same file.  It also maintains a
+   * {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} side file which stores the length of
+   * the primary file as of the last commit ({@link OrcRecordUpdater#flush()}).  That is the 'logical length'.
+   * Once the primary is closed, the side file is deleted (logical length = actual length) but if
+   * the writer dies or the primary file is being read while its still being written to, anything
+   * past the logical length should be ignored.
+   *
+   * @see org.apache.orc.impl.OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX
+   * @see org.apache.orc.impl.OrcAcidUtils#getLastFlushLength(FileSystem, Path)
+   * @see #getLogicalLength(FileSystem, FileStatus)
+   */
   public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
   public static final PathFilter deltaFileFilter = new PathFilter() {
     @Override
@@ -167,7 +181,7 @@ public class AcidUtils {
    * This is format of delete delta dir name prior to Hive 2.2.x
    */
   @VisibleForTesting
-  static String deleteDeltaSubdir(long min, long max) {
+  public static String deleteDeltaSubdir(long min, long max) {
     return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
         String.format(DELTA_DIGITS, max);
   }
@@ -178,7 +192,7 @@ public class AcidUtils {
    * @since 2.2.x
    */
   @VisibleForTesting
-  static String deleteDeltaSubdir(long min, long max, int statementId) {
+  public static String deleteDeltaSubdir(long min, long max, int statementId) {
     return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
   }
 
@@ -371,21 +385,10 @@ public class AcidUtils {
     public static final int HASH_BASED_MERGE_BIT = 0x02;
     public static final String HASH_BASED_MERGE_STRING = "hash_merge";
     public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY;
-    public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY;
 
     private AcidOperationalProperties() {
     }
 
-    /**
-     * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables
-     * that were created before ACID type system using operational properties was put in place.
-     * @return the acidOperationalProperties object
-     */
-    public static AcidOperationalProperties getLegacy() {
-      AcidOperationalProperties obj = new AcidOperationalProperties();
-      // In legacy mode, none of these properties are turned on.
-      return obj;
-    }
 
     /**
      * Returns an acidOperationalProperties object that represents default ACID behavior for tables
@@ -406,14 +409,11 @@ public class AcidUtils {
      */
     public static AcidOperationalProperties parseString(String propertiesStr) {
       if (propertiesStr == null) {
-        return AcidOperationalProperties.getLegacy();
+        return AcidOperationalProperties.getDefault();
       }
       if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) {
         return AcidOperationalProperties.getDefault();
       }
-      if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) {
-        return AcidOperationalProperties.getLegacy();
-      }
       AcidOperationalProperties obj = new AcidOperationalProperties();
       String[] options = propertiesStr.split("\\|");
       for (String option : options) {
@@ -1119,7 +1119,12 @@ public class AcidUtils {
   public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) {
     HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable);
   }
-
+  /**
+   * @param p - not null
+   */
+  public static boolean isDeleteDelta(Path p) {
+    return p.getName().startsWith(DELETE_DELTA_PREFIX);
+  }
   /** Checks if a table is a valid ACID table.
    * Note, users are responsible for using the correct TxnManager. We do not look at
    * SessionState.get().getTxnMgr().supportsAcid() here
@@ -1171,8 +1176,8 @@ public class AcidUtils {
     String transactionalProperties = table.getProperty(
             hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     if (transactionalProperties == null) {
-      // If the table does not define any transactional properties, we return a legacy type.
-      return AcidOperationalProperties.getLegacy();
+      // If the table does not define any transactional properties, we return a default type.
+      return AcidOperationalProperties.getDefault();
     }
     return AcidOperationalProperties.parseString(transactionalProperties);
   }
@@ -1184,7 +1189,7 @@ public class AcidUtils {
    */
   public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) {
     // If the conf does not define any transactional properties, the parseInt() should receive
-    // a value of zero, which will set AcidOperationalProperties to a legacy type and return that.
+    // a value of 1, which will set AcidOperationalProperties to a default type and return that.
     return AcidOperationalProperties.parseInt(
             HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES));
   }
@@ -1197,8 +1202,8 @@ public class AcidUtils {
   public static AcidOperationalProperties getAcidOperationalProperties(Properties props) {
     String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     if (resultStr == null) {
-      // If the properties does not define any transactional properties, we return a legacy type.
-      return AcidOperationalProperties.getLegacy();
+      // If the properties does not define any transactional properties, we return a default type.
+      return AcidOperationalProperties.getDefault();
     }
     return AcidOperationalProperties.parseString(resultStr);
   }
@@ -1212,9 +1217,44 @@ public class AcidUtils {
           Map<String, String> parameters) {
     String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     if (resultStr == null) {
-      // If the parameters does not define any transactional properties, we return a legacy type.
-      return AcidOperationalProperties.getLegacy();
+      // If the parameters does not define any transactional properties, we return a default type.
+      return AcidOperationalProperties.getDefault();
     }
     return AcidOperationalProperties.parseString(resultStr);
   }
+  /**
+   * See comments at {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX}.
+   *
+   * Returns the logical end of file for an acid data file.
+   *
+   * This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out
+   * by {@link #getAcidState(Path, Configuration, ValidTxnList)} and so won't be read at all.
+   * @param file - data file to read/compute splits on
+   */
+  public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException {
+    Path lengths = OrcAcidUtils.getSideFile(file.getPath());
+    if(!fs.exists(lengths)) {
+      /**
+       * if here for delta_x_y that means txn y is resolved and all files in this delta are closed so
+       * they should all have a valid ORC footer and info from NameNode about length is good
+       */
+      return file.getLen();
+    }
+    long len = OrcAcidUtils.getLastFlushLength(fs, file.getPath());
+    if(len >= 0) {
+      /**
+       * if here something is still writing to delta_x_y so  read only as far as the last commit,
+       * i.e. where last footer was written.  The file may contain more data after 'len' position
+       * belonging to an open txn.
+       */
+      return len;
+    }
+    /**
+     * if here, side file is there but we couldn't read it.  We want to avoid a read where
+     * (file.getLen() < 'value from side file' which may happen if file is not closed) because this
+     * means some committed data from 'file' would be skipped.
+     * This should be very unusual.
+     */
+    throw new IOException(lengths + " found but is not readable.  Consider waiting or orcfiledump --recover");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index de49fc8..17f3d02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -641,6 +642,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(),
       // & therefore we should be able to retrieve them here and determine appropriate behavior.
       // Note that this will be meaningless for non-acid tables & will be set to null.
+      //this is set by Utilities.copyTablePropertiesToConf()
       boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
       String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
       this.acidOperationalProperties = isTableTransactional ?
@@ -972,16 +974,21 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       List<OrcSplit> splits = Lists.newArrayList();
       for (HdfsFileStatusWithId file : fileStatuses) {
         FileStatus fileStatus = file.getFileStatus();
-        if (fileStatus.getLen() != 0) {
+        long logicalLen = AcidUtils.getLogicalLength(fs, fileStatus);
+        if (logicalLen != 0) {
           Object fileKey = file.getFileId();
           if (fileKey == null && allowSyntheticFileIds) {
             fileKey = new SyntheticFileId(fileStatus);
           }
           TreeMap<Long, BlockLocation> blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus);
           for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) {
+            if(entry.getKey() + entry.getValue().getLength() > logicalLen) {
+              //don't create splits for anything past logical EOF
+              continue;
+            }
             OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
                 entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
-                deltas, -1, fileStatus.getLen());
+                deltas, -1, logicalLen);
             splits.add(orcSplit);
           }
         }
@@ -1002,13 +1009,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * ACID split strategy is used when there is no base directory (when transactions are enabled).
    */
   static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
-    Path dir;
-    List<DeltaMetaData> deltas;
-    boolean[] covered;
-    int numBuckets;
-    AcidOperationalProperties acidOperationalProperties;
+    private Path dir;
+    private List<DeltaMetaData> deltas;
+    private boolean[] covered;
+    private int numBuckets;
+    private AcidOperationalProperties acidOperationalProperties;
 
-    public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
+    ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
         AcidOperationalProperties acidOperationalProperties) {
       this.dir = dir;
       this.numBuckets = numBuckets;
@@ -1027,18 +1034,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       // with valid user payload data has already been considered as base for the covered buckets.
       // Hence, the uncovered buckets do not have any relevant data and we can just ignore them.
       if (acidOperationalProperties != null && acidOperationalProperties.isSplitUpdate()) {
-        return splits; // return an empty list.
+        return Collections.emptyList();
       }
 
       // Generate a split for any buckets that weren't covered.
       // This happens in the case where a bucket just has deltas and no
       // base.
       if (!deltas.isEmpty()) {
-        for (int b = 0; b < numBuckets; ++b) {
-          if (!covered[b]) {
-            splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1, -1));
-          }
-        }
+        //since HIVE-17089 if here, then it's not an acid table so there should never be any deltas
+        throw new IllegalStateException("Found unexpected deltas: " + deltas + " in " + dir);
       }
       return splits;
     }
@@ -1133,7 +1137,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             if (val == null || val) {
               try {
                 List<HdfsFileStatusWithId> insertDeltaFiles =
-                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
                 for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
                   baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
                 }
@@ -1149,7 +1153,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
               }
             }
             // Fall back to regular API and create statuses without ID.
-            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
             for (FileStatus child : children) {
               HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
               baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
@@ -1402,7 +1406,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         populateAndCacheStripeDetails();
         boolean[] includeStripe = null;
         // We can't eliminate stripes if there are deltas because the
-        // deltas may change the rows making them match the predicate.
+        // deltas may change the rows making them match the predicate. todo: See HIVE-14516.
         if ((deltas == null || deltas.isEmpty()) && context.sarg != null) {
           String[] colNames =
               extractNeededColNames((readerTypes == null ? fileTypes : readerTypes),
@@ -1516,7 +1520,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         Reader orcReader = OrcFile.createReader(file.getPath(),
             OrcFile.readerOptions(context.conf)
                 .filesystem(fs)
-                .maxLength(file.getLen()));
+                .maxLength(AcidUtils.getLogicalLength(fs, file)));
         orcTail = new OrcTail(orcReader.getFileTail(), orcReader.getSerializedFileFooter(),
             file.getModificationTime());
         if (context.cacheStripeDetails) {
@@ -2210,7 +2214,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         totalFileSize += child.getFileStatus().getLen();
         AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename
             (child.getFileStatus().getPath(), context.conf);
-        opts.writingBase(true);
         int b = opts.getBucketId();
         // If the bucket is in the valid range, mark it as covered.
         // I wish Hive actually enforced bucketing all of the time.

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index a179300..214f22a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -296,7 +296,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
           .rowIndexStride(0);
     }
     final OrcRecordUpdater.KeyIndexBuilder watcher =
-        new OrcRecordUpdater.KeyIndexBuilder();
+        new OrcRecordUpdater.KeyIndexBuilder("compactor");
     opts.inspector(options.getInspector())
         .callback(watcher);
     final Writer writer = OrcFile.createWriter(filename, opts);

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 814782a..97c4e3d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -24,6 +24,7 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.StripeInformation;
@@ -310,11 +311,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     OrcStruct nextRecord;
     private final ReaderKey key;
     final int bucketId;
+    final int bucketProperty;
 
-    OriginalReaderPair(ReaderKey key, int bucketId) throws IOException {
+    OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException {
       this.key = key;
       this.bucketId = bucketId;
       assert bucketId >= 0 : "don't support non-bucketed tables yet";
+      this.bucketProperty = encodeBucketId(conf, bucketId);
     }
     @Override public final OrcStruct nextRecord() {
       return nextRecord;
@@ -348,7 +351,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
               new LongWritable(0));
           nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
-              new IntWritable(bucketId));
+              new IntWritable(bucketProperty));
           nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
               new LongWritable(nextRowId));
           nextRecord().setFieldValue(OrcRecordUpdater.ROW,
@@ -360,7 +363,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
               .set(0);
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
-              .set(bucketId);
+              .set(bucketProperty);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
               .set(0);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
@@ -368,7 +371,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           nextRecord().setFieldValue(OrcRecordUpdater.ROW,
               getRecordReader().next(OrcRecordUpdater.getRow(next)));
         }
-        key.setValues(0L, bucketId, nextRowId, 0L, 0);
+        key.setValues(0L, bucketProperty, nextRowId, 0L, 0);
         if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("key " + key + " > maxkey " + getMaxKey());
@@ -380,6 +383,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       return false;//reached EndOfFile
     }
   }
+  static int encodeBucketId(Configuration conf, int bucketId) {
+    return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId));
+  }
   @VisibleForTesting
   final static class OriginalReaderPairToRead extends OriginalReaderPair {
     private final long rowIdOffset;
@@ -392,7 +398,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                              final RecordIdentifier minKey, final RecordIdentifier maxKey,
                              Reader.Options options, Options mergerOptions, Configuration conf,
                              ValidTxnList validTxnList) throws IOException {
-      super(key, bucketId);
+      super(key, bucketId, conf);
       this.reader = reader;
       assert !mergerOptions.isCompacting();
       assert mergerOptions.getRootPath() != null : "Since we have original files";
@@ -444,8 +450,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         if (rowIdOffset > 0) {
           //rowIdOffset could be 0 if all files before current one are empty
           /**
-           * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
-           * int, Reader.Options)} need to fix min/max key since these are used by
+           * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration)}
+           * need to fix min/max key since these are used by
            * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
            * the key.  Clear?  */
           if (minKey != null) {
@@ -455,7 +461,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              *  If this is not the 1st file, set minKey 1 less than the start of current file
              * (Would not need to set minKey if we knew that there are no delta files)
              * {@link #advanceToMinKey()} needs this */
-            newMinKey = new RecordIdentifier(0, bucketId, rowIdOffset - 1);
+            newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1);
           }
           if (maxKey != null) {
             maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
@@ -485,7 +491,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            * of the file so we want to leave it blank to make sure any insert events in delta
            * files are included; Conversely, if it's not the last file, set the maxKey so that
            * events from deltas that don't modify anything in the current split are excluded*/
-        newMaxKey = new RecordIdentifier(0, bucketId,
+        newMaxKey = new RecordIdentifier(0, bucketProperty,
           rowIdOffset + reader.getNumberOfRows() - 1);
       }
       this.minKey = newMinKey;
@@ -536,7 +542,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     OriginalReaderPairToCompact(ReaderKey key, int bucketId,
                        Reader.Options options, Options mergerOptions, Configuration conf,
                        ValidTxnList validTxnList) throws IOException {
-      super(key, bucketId);
+      super(key, bucketId, conf);
       assert mergerOptions.isCompacting() : "Should only be used for Compaction";
       this.conf = conf;
       this.options = options;
@@ -651,8 +657,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * @throws IOException
    */
   private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
-                                         Reader.Options options
-                                         ) throws IOException {
+                                         Reader.Options options,
+                                         Configuration conf) throws IOException {
     long rowLength = 0;
     long rowOffset = 0;
     long offset = options.getOffset();//this would usually be at block boundary
@@ -660,6 +666,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     boolean isTail = true;
     RecordIdentifier minKey = null;
     RecordIdentifier maxKey = null;
+    int bucketProperty = encodeBucketId(conf, bucket);
    /**
     * options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
     * necessarily match stripe boundary.  So we want to come up with minKey to be one before the 1st
@@ -679,10 +686,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       }
     }
     if (rowOffset > 0) {
-      minKey = new RecordIdentifier(0, bucket, rowOffset - 1);
+      minKey = new RecordIdentifier(0, bucketProperty, rowOffset - 1);
     }
     if (!isTail) {
-      maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1);
+      maxKey = new RecordIdentifier(0, bucketProperty, rowOffset + rowLength - 1);
     }
     return new KeyInterval(minKey, maxKey);
   }
@@ -829,7 +836,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       KeyInterval keyInterval;
       // find the min/max based on the offset and length (and more for 'original')
       if (isOriginal) {
-        keyInterval = discoverOriginalKeyBounds(reader, bucket, options);
+        keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
       } else {
         keyInterval = discoverKeyBounds(reader, options);
       }
@@ -865,6 +872,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     eventOptions.range(0, Long.MAX_VALUE);
     if (deltaDirectory != null) {
       for(Path delta: deltaDirectory) {
+        if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) {
+          //all inserts should be in baseReader for normal read so this should always be delete delta if not compacting
+          throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
+        }
         ReaderKey key = new ReaderKey();
         Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
         AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index c30e8fe..429960b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -90,6 +90,7 @@ public class OrcRecordUpdater implements RecordUpdater {
   private Path deleteEventPath;
   private final FileSystem fs;
   private OrcFile.WriterOptions writerOptions;
+  private OrcFile.WriterOptions deleteWriterOptions;
   private Writer writer = null;
   private boolean writerClosed = false;
   private Writer deleteEventWriter = null;
@@ -104,7 +105,7 @@ public class OrcRecordUpdater implements RecordUpdater {
   // This records how many rows have been inserted or deleted.  It is separate from insertedRows
   // because that is monotonically increasing to give new unique row ids.
   private long rowCountDelta = 0;
-  private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+  private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder("insert");
   private KeyIndexBuilder deleteEventIndexBuilder;
   private StructField recIdField = null; // field to look for the record identifier in
   private StructField rowIdField = null; // field inside recId to look for row id in
@@ -148,20 +149,23 @@ public class OrcRecordUpdater implements RecordUpdater {
   /**
    * An extension to AcidOutputFormat that allows users to add additional
    * options.
+   *
+   * todo: since this is only used for testing could we not control the writer some other way?
+   * to simplify {@link #OrcRecordUpdater(Path, AcidOutputFormat.Options)}
    */
-  public static class OrcOptions extends AcidOutputFormat.Options {
+   final static class OrcOptions extends AcidOutputFormat.Options {
     OrcFile.WriterOptions orcOptions = null;
 
-    public OrcOptions(Configuration conf) {
+    OrcOptions(Configuration conf) {
       super(conf);
     }
 
-    public OrcOptions orcOptions(OrcFile.WriterOptions opts) {
+    OrcOptions orcOptions(OrcFile.WriterOptions opts) {
       this.orcOptions = opts;
       return this;
     }
 
-    public OrcFile.WriterOptions getOrcOptions() {
+    OrcFile.WriterOptions getOrcOptions() {
       return orcOptions;
     }
   }
@@ -205,6 +209,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       this.acidOperationalProperties =
           AcidUtils.getAcidOperationalProperties(options.getConfiguration());
     }
+    assert this.acidOperationalProperties.isSplitUpdate() : "HIVE-17089?!";
     BucketCodec bucketCodec = BucketCodec.V1;
     if(options.getConfiguration() != null) {
       //so that we can test "old" files
@@ -240,6 +245,8 @@ public class OrcRecordUpdater implements RecordUpdater {
         && !options.isWritingBase()){
       flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8,
           options.getReporter());
+      flushLengths.writeLong(0);
+      OrcInputFormat.SHIMS.hflush(flushLengths);
     } else {
       flushLengths = null;
     }
@@ -265,12 +272,26 @@ public class OrcRecordUpdater implements RecordUpdater {
             optionsCloneForDelta.getConfiguration());
       }
       if (this.acidOperationalProperties.isSplitUpdate()) {
+        AcidOutputFormat.Options deleteOptions = options.clone().writingDeleteDelta(true);
         // If this is a split-update, we initialize a delete delta file path in anticipation that
         // they would write update/delete events to that separate file.
         // This writes to a file in directory which starts with "delete_delta_..."
-        // The actual initialization of a writer only happens if any delete events are written.
-        this.deleteEventPath = AcidUtils.createFilename(path,
-            optionsCloneForDelta.writingDeleteDelta(true));
+        // The actual initialization of a writer only happens if any delete events are written
+        //to avoid empty files.
+        this.deleteEventPath = AcidUtils.createFilename(path, deleteOptions);
+        /**
+         * HIVE-14514 is not done so we can't clone writerOptions().  So here we create a new
+         * options object to make sure insert and delete writers don't share them (like the
+         * callback object, for example)
+         * In any case insert writer and delete writer would most likely have very different
+         * characteristics - delete writer only writes a tiny amount of data.  Once we do early
+         * update split, each {@link OrcRecordUpdater} will have only 1 writer. (except for Mutate API)
+         * Then it would perhaps make sense to take writerOptions as input - how?.
+         */
+        this.deleteWriterOptions = OrcFile.writerOptions(optionsCloneForDelta.getTableProperties(),
+          optionsCloneForDelta.getConfiguration());
+        this.deleteWriterOptions.inspector(createEventSchema(findRecId(options.getInspector(),
+          options.getRecordIdColumn())));
       }
 
       // get buffer size and stripe size for base writer
@@ -377,19 +398,10 @@ public class OrcRecordUpdater implements RecordUpdater {
         recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
       // Initialize a deleteEventWriter if not yet done. (Lazy initialization)
       if (deleteEventWriter == null) {
-        // Initialize an indexBuilder for deleteEvents.
-        deleteEventIndexBuilder = new KeyIndexBuilder();
-        // Change the indexBuilder callback too for the deleteEvent file, the remaining writer
-        // options remain the same.
-
-        // TODO: When we change the callback, we are essentially mutating the writerOptions.
-        // This works but perhaps is not a good thing. The proper way to do this would be
-        // to clone the writerOptions, however it requires that the parent OrcFile.writerOptions
-        // implements a clone() method (which it does not for now). HIVE-14514 is currently an open
-        // JIRA to fix this.
-
+        // Initialize an indexBuilder for deleteEvents. (HIVE-17284)
+        deleteEventIndexBuilder = new KeyIndexBuilder("delete");
         this.deleteEventWriter = OrcFile.createWriter(deleteEventPath,
-                                                      writerOptions.callback(deleteEventIndexBuilder));
+                                                      deleteWriterOptions.callback(deleteEventIndexBuilder));
       }
 
       // A delete/update generates a delete event for the original row.
@@ -461,6 +473,8 @@ public class OrcRecordUpdater implements RecordUpdater {
     long len = writer.writeIntermediateFooter();
     flushLengths.writeLong(len);
     OrcInputFormat.SHIMS.hflush(flushLengths);
+    //multiple transactions only happen for streaming ingest which only allows inserts
+    assert deleteEventWriter == null : "unexpected delete writer for " + path;
   }
 
   @Override
@@ -539,12 +553,16 @@ public class OrcRecordUpdater implements RecordUpdater {
   }
 
   static class KeyIndexBuilder implements OrcFile.WriterCallback {
-    StringBuilder lastKey = new StringBuilder();
+    private final String builderName;
+    StringBuilder lastKey = new StringBuilder();//list of last keys for each stripe
     long lastTransaction;
     int lastBucket;
     long lastRowId;
     AcidStats acidStats = new AcidStats();
 
+    KeyIndexBuilder(String name) {
+      this.builderName = name;
+    }
     @Override
     public void preStripeWrite(OrcFile.WriterContext context
     ) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index c50c1a8..bff9884 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -859,15 +861,19 @@ public class TestTxnCommands {
     for(String s : rs) {
       LOG.warn(s);
     }
+    Assert.assertEquals(536870912,
+      BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
+    Assert.assertEquals(536936448,
+      BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
     Assert.assertEquals("", 4, rs.size());
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
     Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
     Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000014_0000014_0000/bucket_00001"));
     //run Compaction
     runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
@@ -877,11 +883,11 @@ public class TestTxnCommands {
       LOG.warn(s);
     }
     Assert.assertEquals("", 4, rs.size());
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
     Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
     Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
     Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));


[2/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)

Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 408c089..0e0fca3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -64,16 +65,11 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.validation.constraints.AssertTrue;
-
-/**
- * TODO: this should be merged with TestTxnCommands once that is checked in
- * specifically the tests; the supporting code here is just a clone of TestTxnCommands
- */
 public class TestTxnCommands2 {
   static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands2.class);
   protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
@@ -88,7 +84,7 @@ public class TestTxnCommands2 {
 
   protected HiveConf hiveConf;
   protected Driver d;
-  protected static enum Table {
+  protected enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart", "p"),
     NONACIDORCTBL("nonAcidOrcTbl"),
@@ -113,6 +109,8 @@ public class TestTxnCommands2 {
       this.partitionColumns = partitionColumns;
     }
   }
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
 
   @Before
   public void setUp() throws Exception {
@@ -357,20 +355,22 @@ public class TestTxnCommands2 {
     for(String s : rs) {
       LOG.warn(s);
     }
+    Assert.assertEquals(536870912, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
+    Assert.assertEquals(536936448, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
     /*
      * All ROW__IDs are unique on read after conversion to acid 
      * ROW__IDs are exactly the same before and after compaction
-     * Also check the file name after compaction for completeness
+     * Also check the file name (only) after compaction for completeness
      */
     String[][] expected = {
-      {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13",  "bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13",  "bucket_00000"},
       {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"},
       {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"},
-      {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"},
-      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2",   "bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4",   "bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5",   "bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6",   "bucket_00001"},
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2",   "bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4",   "bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5",   "bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6",   "bucket_00001"},
       {"{\"transactionid\":18,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
     };
     Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
@@ -392,9 +392,20 @@ public class TestTxnCommands2 {
     }
     //make sure they are the same before and after compaction
   }
-
   /**
-   * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
+   * In current implementation of ACID, altering the value of transactional_properties or trying to
+   * set a value for previously unset value for an acid table will throw an exception.
+   * @throws Exception
+   */
+  @Test
+  public void testFailureOnAlteringTransactionalProperties() throws Exception {
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
+    runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
+  }
+  /**
+   * Test the query correctness and directory layout for ACID table conversion
    * 1. Insert a row to Non-ACID table
    * 2. Convert Non-ACID to ACID table
    * 3. Insert a row to ACID table
@@ -410,7 +421,7 @@ public class TestTxnCommands2 {
     // 1. Insert a row to Non-ACID table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // There should be 2 original bucket files in the location (000000_0 and 000001_0)
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -426,7 +437,7 @@ public class TestTxnCommands2 {
     // 2. Convert NONACIDORCTBL to ACID table
     runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Everything should be same as before
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -442,24 +453,23 @@ public class TestTxnCommands2 {
     // 3. Insert another row to newly-converted ACID table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
-    // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
+    // The delta directory should also have only 1 bucket file (bucket_00001)
     Assert.assertEquals(3, status.length);
     boolean sawNewDelta = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("delta_.*")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(1, buckets.length); // only one bucket file
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
       } else {
         Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
       }
     }
     Assert.assertTrue(sawNewDelta);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
     resultData = new int[][] {{1, 2}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
     rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
@@ -472,16 +482,15 @@ public class TestTxnCommands2 {
     // There should be 1 new directory: base_xxxxxxx.
     // Original bucket files and delta directory should stay until Cleaner kicks in.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(4, status.length);
     boolean sawNewBase = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("base_.*")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(1, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
       }
     }
     Assert.assertTrue(sawNewBase);
@@ -495,13 +504,13 @@ public class TestTxnCommands2 {
     // 5. Let Cleaner delete obsolete files/dirs
     // Note, here we create a fake directory along with fake files as original directories/files
     String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
-        "/subdir/000000_0";
+      "/subdir/000000_0";
     String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
-        "/subdir/000000_1";
+      "/subdir/000000_1";
     fs.create(new Path(fakeFile0));
     fs.create(new Path(fakeFile1));
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Before Cleaner, there should be 5 items:
     // 2 original files, 1 original directory, 1 base directory and 1 delta directory
     Assert.assertEquals(5, status.length);
@@ -509,13 +518,12 @@ public class TestTxnCommands2 {
     // There should be only 1 directory left: base_xxxxxxx.
     // Original bucket files and delta directory should have been cleaned up.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(1, status.length);
     Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
     FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(BUCKET_COUNT, buckets.length);
-    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-    Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+    Assert.assertEquals(1, buckets.length);
+    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 2}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
@@ -525,7 +533,7 @@ public class TestTxnCommands2 {
   }
 
   /**
-   * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
+   * Test the query correctness and directory layout for ACID table conversion
    * 1. Insert a row to Non-ACID table
    * 2. Convert Non-ACID to ACID table
    * 3. Update the existing row in ACID table
@@ -541,7 +549,7 @@ public class TestTxnCommands2 {
     // 1. Insert a row to Non-ACID table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // There should be 2 original bucket files in the location (000000_0 and 000001_0)
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -550,14 +558,14 @@ public class TestTxnCommands2 {
     List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     int [][] resultData = new int[][] {{1, 2}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
     int resultCount = 1;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
     // 2. Convert NONACIDORCTBL to ACID table
     runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Everything should be same as before
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -566,29 +574,39 @@ public class TestTxnCommands2 {
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 2}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
     resultCount = 1;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
     // 3. Update the existing row in newly-converted ACID table
     runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory
+    // and one delete_delta directory. When split-update is enabled, an update event is split into
+    // a combination of delete and insert, that generates the delete_delta directory.
     // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
-    Assert.assertEquals(3, status.length);
+    // and so should the delete_delta directory.
+    Assert.assertEquals(4, status.length);
     boolean sawNewDelta = false;
+    boolean sawNewDeleteDelta = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("delta_.*")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
         Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
         Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+        sawNewDeleteDelta = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
       } else {
         Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
       }
     }
     Assert.assertTrue(sawNewDelta);
+    Assert.assertTrue(sawNewDeleteDelta);
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}};
     Assert.assertEquals(stringifyValues(resultData), rs);
@@ -602,8 +620,8 @@ public class TestTxnCommands2 {
     // There should be 1 new directory: base_0000001.
     // Original bucket files and delta directory should stay until Cleaner kicks in.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(4, status.length);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(5, status.length);
     boolean sawNewBase = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("base_.*")) {
@@ -623,15 +641,15 @@ public class TestTxnCommands2 {
 
     // 5. Let Cleaner delete obsolete files/dirs
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Before Cleaner, there should be 4 items:
-    // 2 original files, 1 delta directory and 1 base directory
-    Assert.assertEquals(4, status.length);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 5 items:
+    // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory
+    Assert.assertEquals(5, status.length);
     runCleaner(hiveConf);
     // There should be only 1 directory left: base_0000001.
-    // Original bucket files and delta directory should have been cleaned up.
+    // Original bucket files, delta directory and delete_delta should have been cleaned up.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(1, status.length);
     Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
     FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
@@ -646,7 +664,7 @@ public class TestTxnCommands2 {
   }
 
   /**
-   * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
+   * Test the query correctness and directory layout for ACID table conversion
    * 1. Insert a row to Non-ACID table
    * 2. Convert Non-ACID to ACID table
    * 3. Perform Major compaction
@@ -663,7 +681,7 @@ public class TestTxnCommands2 {
     // 1. Insert a row to Non-ACID table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // There should be 2 original bucket files in the location (000000_0 and 000001_0)
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -676,10 +694,10 @@ public class TestTxnCommands2 {
     int resultCount = 1;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
-    // 2. Convert NONACIDORCTBL to ACID table
+    // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default)
     runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Everything should be same as before
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -698,7 +716,7 @@ public class TestTxnCommands2 {
     // There should be 1 new directory: base_-9223372036854775808
     // Original bucket files should stay until Cleaner kicks in.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(3, status.length);
     boolean sawNewBase = false;
     for (int i = 0; i < status.length; i++) {
@@ -722,12 +740,14 @@ public class TestTxnCommands2 {
     runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Arrays.sort(status);  // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000
     // There should be 2 original bucket files (000000_0 and 000001_0), a base directory,
-    // plus two new delta directories
-    Assert.assertEquals(5, status.length);
+    // plus two new delta directories and one delete_delta directory that would be created due to
+    // the update statement (remember split-update U=D+I)!
+    Assert.assertEquals(6, status.length);
     int numDelta = 0;
+    int numDeleteDelta = 0;
     sawNewBase = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("delta_.*")) {
@@ -740,9 +760,17 @@ public class TestTxnCommands2 {
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         } else if (numDelta == 2) {
           Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT, buckets.length);
-          Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
-          Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+          Assert.assertEquals(1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        }
+      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+        numDeleteDelta++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numDeleteDelta == 1) {
+          Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         }
       } else if (status[i].getPath().getName().matches("base_.*")) {
         Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
@@ -755,11 +783,13 @@ public class TestTxnCommands2 {
       }
     }
     Assert.assertEquals(2, numDelta);
+    Assert.assertEquals(1, numDeleteDelta);
     Assert.assertTrue(sawNewBase);
+
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
@@ -767,11 +797,12 @@ public class TestTxnCommands2 {
     runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
     runWorker(hiveConf);
     // There should be 1 new base directory: base_0000001
-    // Original bucket files, delta directories and the previous base directory should stay until Cleaner kicks in.
+    // Original bucket files, delta directories, delete_delta directories and the
+    // previous base directory should stay until Cleaner kicks in.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Arrays.sort(status);
-    Assert.assertEquals(6, status.length);
+    Assert.assertEquals(7, status.length);
     int numBase = 0;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("base_.*")) {
@@ -785,9 +816,8 @@ public class TestTxnCommands2 {
         } else if (numBase == 2) {
           // The new base dir now has two bucket files, since the delta dir has two bucket files
           Assert.assertEquals("base_0000023", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT, buckets.length);
-          Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
-          Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+          Assert.assertEquals(1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         }
       }
     }
@@ -795,28 +825,27 @@ public class TestTxnCommands2 {
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
     // 6. Let Cleaner delete obsolete files/dirs
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Before Cleaner, there should be 6 items:
-    // 2 original files, 2 delta directories and 2 base directories
-    Assert.assertEquals(6, status.length);
+    // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
+    Assert.assertEquals(7, status.length);
     runCleaner(hiveConf);
     // There should be only 1 directory left: base_0000001.
     // Original bucket files, delta directories and previous base directory should have been cleaned up.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(1, status.length);
     Assert.assertEquals("base_0000023", status[0].getPath().getName());
     FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
     Arrays.sort(buckets);
-    Assert.assertEquals(BUCKET_COUNT, buckets.length);
-    Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
-    Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+    Assert.assertEquals(1, buckets.length);
+    Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
@@ -824,9 +853,6 @@ public class TestTxnCommands2 {
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
   }
-
-
-
   @Test
   public void testValidTxnsBookkeeping() throws Exception {
     // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
deleted file mode 100644
index 520e958..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
+++ /dev/null
@@ -1,545 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-/**
- * Same as TestTxnCommands2 but tests ACID tables with 'transactional_properties' set to 'default'.
- * This tests whether ACID tables with split-update turned on are working correctly or not
- * for the same set of tests when it is turned off. Of course, it also adds a few tests to test
- * specific behaviors of ACID tables with split-update turned on.
- */
-public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
-
-  public TestTxnCommands2WithSplitUpdate() {
-    super();
-  }
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Override
-  @Before
-  public void setUp() throws Exception {
-    setUpWithTableProperties("'transactional'='true','transactional_properties'='default'");
-  }
-
-  @Override
-  @Test
-  public void testInitiatorWithMultipleFailedCompactions() throws Exception {
-    // Test with split-update turned on.
-    testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
-  }
-
-  @Override
-  @Test
-  public void writeBetweenWorkerAndCleaner() throws Exception {
-    writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
-  }
-
-  @Override
-  @Test
-  public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
-    testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
-  }
-
-  /**
-   * In current implementation of ACID, altering the value of transactional_properties or trying to
-   * set a value for previously unset value for an acid table will throw an exception.
-   * @throws Exception
-   */
-  @Test
-  public void testFailureOnAlteringTransactionalProperties() throws Exception {
-    expectedException.expect(RuntimeException.class);
-    expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
-    runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
-    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
-  }
-  /**
-   * Test the query correctness and directory layout for ACID table conversion with split-update
-   * enabled.
-   * 1. Insert a row to Non-ACID table
-   * 2. Convert Non-ACID to ACID table with split-update enabled
-   * 3. Insert a row to ACID table
-   * 4. Perform Major compaction
-   * 5. Clean
-   * @throws Exception
-   */
-  @Test
-  @Override
-  public void testNonAcidToAcidConversion1() throws Exception {
-    FileSystem fs = FileSystem.get(hiveConf);
-    FileStatus[] status;
-
-    // 1. Insert a row to Non-ACID table
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files in the location (000000_0 and 000001_0)
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    int [][] resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    int resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 2. Convert NONACIDORCTBL to ACID table
-    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
-        + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Everything should be same as before
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 3. Insert another row to newly-converted ACID table
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
-    // The delta directory should also have only 1 bucket file (bucket_00001)
-    Assert.assertEquals(3, status.length);
-    boolean sawNewDelta = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("delta_.*")) {
-        sawNewDelta = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(1, buckets.length); // only one bucket file
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-      } else {
-        Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-      }
-    }
-    Assert.assertTrue(sawNewDelta);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
-    resultData = new int[][] {{1, 2}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 4. Perform a major compaction
-    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    // There should be 1 new directory: base_xxxxxxx.
-    // Original bucket files and delta directory should stay until Cleaner kicks in.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(4, status.length);
-    boolean sawNewBase = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("base_.*")) {
-        sawNewBase = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(1, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-      }
-    }
-    Assert.assertTrue(sawNewBase);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 5. Let Cleaner delete obsolete files/dirs
-    // Note, here we create a fake directory along with fake files as original directories/files
-    String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
-        "/subdir/000000_0";
-    String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
-        "/subdir/000000_1";
-    fs.create(new Path(fakeFile0));
-    fs.create(new Path(fakeFile1));
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Before Cleaner, there should be 5 items:
-    // 2 original files, 1 original directory, 1 base directory and 1 delta directory
-    Assert.assertEquals(5, status.length);
-    runCleaner(hiveConf);
-    // There should be only 1 directory left: base_xxxxxxx.
-    // Original bucket files and delta directory should have been cleaned up.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, status.length);
-    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
-    FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, buckets.length);
-    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-  }
-
-  /**
-   * Test the query correctness and directory layout for ACID table conversion with split-update
-   * enabled.
-   * 1. Insert a row to Non-ACID table
-   * 2. Convert Non-ACID to ACID table with split update enabled.
-   * 3. Update the existing row in ACID table
-   * 4. Perform Major compaction
-   * 5. Clean
-   * @throws Exception
-   */
-  @Test
-  @Override
-  public void testNonAcidToAcidConversion2() throws Exception {
-    FileSystem fs = FileSystem.get(hiveConf);
-    FileStatus[] status;
-
-    // 1. Insert a row to Non-ACID table
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files in the location (000000_0 and 000001_0)
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    int [][] resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    int resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 2. Convert NONACIDORCTBL to ACID table
-    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
-        + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Everything should be same as before
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 3. Update the existing row in newly-converted ACID table
-    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory
-    // and one delete_delta directory. When split-update is enabled, an update event is split into
-    // a combination of delete and insert, that generates the delete_delta directory.
-    // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
-    // and so should the delete_delta directory.
-    Assert.assertEquals(4, status.length);
-    boolean sawNewDelta = false;
-    boolean sawNewDeleteDelta = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("delta_.*")) {
-        sawNewDelta = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
-        sawNewDeleteDelta = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-      } else {
-        Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-      }
-    }
-    Assert.assertTrue(sawNewDelta);
-    Assert.assertTrue(sawNewDeleteDelta);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 4. Perform a major compaction
-    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    // There should be 1 new directory: base_0000001.
-    // Original bucket files and delta directory should stay until Cleaner kicks in.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(5, status.length);
-    boolean sawNewBase = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("base_.*")) {
-        sawNewBase = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-      }
-    }
-    Assert.assertTrue(sawNewBase);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 5. Let Cleaner delete obsolete files/dirs
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Before Cleaner, there should be 5 items:
-    // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory
-    Assert.assertEquals(5, status.length);
-    runCleaner(hiveConf);
-    // There should be only 1 directory left: base_0000001.
-    // Original bucket files, delta directory and delete_delta should have been cleaned up.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, status.length);
-    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
-    FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-  }
-
-  /**
-   * Test the query correctness and directory layout for ACID table conversion with split-update
-   * enabled.
-   * 1. Insert a row to Non-ACID table
-   * 2. Convert Non-ACID to ACID table with split-update enabled
-   * 3. Perform Major compaction
-   * 4. Insert a new row to ACID table
-   * 5. Perform another Major compaction
-   * 6. Clean
-   * @throws Exception
-   */
-  @Test
-  @Override
-  public void testNonAcidToAcidConversion3() throws Exception {
-    FileSystem fs = FileSystem.get(hiveConf);
-    FileStatus[] status;
-
-    // 1. Insert a row to Non-ACID table
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files in the location (000000_0 and 000001_0)
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    int [][] resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    int resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default)
-    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
-        + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Everything should be same as before
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 3. Perform a major compaction
-    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    // There should be 1 new directory: base_-9223372036854775808
-    // Original bucket files should stay until Cleaner kicks in.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(3, status.length);
-    boolean sawNewBase = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("base_.*")) {
-        Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
-        sawNewBase = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-      }
-    }
-    Assert.assertTrue(sawNewBase);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 4. Update the existing row, and insert another row to newly-converted ACID table
-    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Arrays.sort(status);  // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000
-    // There should be 2 original bucket files (000000_0 and 000001_0), a base directory,
-    // plus two new delta directories and one delete_delta directory that would be created due to
-    // the update statement (remember split-update U=D+I)!
-    Assert.assertEquals(6, status.length);
-    int numDelta = 0;
-    int numDeleteDelta = 0;
-    sawNewBase = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("delta_.*")) {
-        numDelta++;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Arrays.sort(buckets);
-        if (numDelta == 1) {
-          Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        } else if (numDelta == 2) {
-          Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
-          Assert.assertEquals(1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        }
-      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
-        numDeleteDelta++;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Arrays.sort(buckets);
-        if (numDeleteDelta == 1) {
-          Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        }
-      } else if (status[i].getPath().getName().matches("base_.*")) {
-        Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
-        sawNewBase = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-      } else {
-        Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-      }
-    }
-    Assert.assertEquals(2, numDelta);
-    Assert.assertEquals(1, numDeleteDelta);
-    Assert.assertTrue(sawNewBase);
-
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 5. Perform another major compaction
-    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    // There should be 1 new base directory: base_0000001
-    // Original bucket files, delta directories, delete_delta directories and the
-    // previous base directory should stay until Cleaner kicks in.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Arrays.sort(status);
-    Assert.assertEquals(7, status.length);
-    int numBase = 0;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("base_.*")) {
-        numBase++;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Arrays.sort(buckets);
-        if (numBase == 1) {
-          Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        } else if (numBase == 2) {
-          // The new base dir now has two bucket files, since the delta dir has two bucket files
-          Assert.assertEquals("base_0000023", status[i].getPath().getName());
-          Assert.assertEquals(1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        }
-      }
-    }
-    Assert.assertEquals(2, numBase);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 6. Let Cleaner delete obsolete files/dirs
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Before Cleaner, there should be 6 items:
-    // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
-    Assert.assertEquals(7, status.length);
-    runCleaner(hiveConf);
-    // There should be only 1 directory left: base_0000001.
-    // Original bucket files, delta directories and previous base directory should have been cleaned up.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, status.length);
-    Assert.assertEquals("base_0000023", status[0].getPath().getName());
-    FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-    Arrays.sort(buckets);
-    Assert.assertEquals(1, buckets.length);
-    Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
index 44a9412..c76d654 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
@@ -23,11 +23,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Same as TestTxnCommands2WithSplitUpdate but tests ACID tables with vectorization turned on by
+ * Same as TestTxnCommands2 but tests ACID tables with vectorization turned on by
  * default, and having 'transactional_properties' set to 'default'. This specifically tests the
  * fast VectorizedOrcAcidRowBatchReader for ACID tables with split-update turned on.
  */
-public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2WithSplitUpdate {
+public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2 {
 
   public TestTxnCommands2WithSplitUpdateAndVectorization() {
     super();

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 44ff65c..06e4f98 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath;
+import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.junit.Assert;
 import org.junit.Test;
@@ -669,21 +670,12 @@ public class TestAcidUtils {
 
   @Test
   public void testAcidOperationalProperties() throws Exception {
-    AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getLegacy();
-    assertsForAcidOperationalProperties(testObj, "legacy");
-
-    testObj = AcidUtils.AcidOperationalProperties.getDefault();
+    AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getDefault();
     assertsForAcidOperationalProperties(testObj, "default");
 
-    testObj = AcidUtils.AcidOperationalProperties.parseInt(0);
-    assertsForAcidOperationalProperties(testObj, "legacy");
-
     testObj = AcidUtils.AcidOperationalProperties.parseInt(1);
     assertsForAcidOperationalProperties(testObj, "split_update");
 
-    testObj = AcidUtils.AcidOperationalProperties.parseString("legacy");
-    assertsForAcidOperationalProperties(testObj, "legacy");
-
     testObj = AcidUtils.AcidOperationalProperties.parseString("default");
     assertsForAcidOperationalProperties(testObj, "default");
 
@@ -699,12 +691,6 @@ public class TestAcidUtils {
         assertEquals(1, testObj.toInt());
         assertEquals("|split_update", testObj.toString());
         break;
-      case "legacy":
-        assertEquals(false, testObj.isSplitUpdate());
-        assertEquals(false, testObj.isHashBasedMerge());
-        assertEquals(0, testObj.toInt());
-        assertEquals("", testObj.toString());
-        break;
       default:
         break;
     }
@@ -716,7 +702,7 @@ public class TestAcidUtils {
     Configuration testConf = new Configuration();
     // Test setter for configuration object.
     AcidUtils.setAcidOperationalProperties(testConf, oprProps);
-    assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 0));
+    assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, -1));
     // Test getter for configuration object.
     assertEquals(oprProps.toString(), AcidUtils.getAcidOperationalProperties(testConf).toString());
 
@@ -726,12 +712,15 @@ public class TestAcidUtils {
     assertEquals(oprProps.toString(),
         parameters.get(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname));
     // Test getter for map object.
-    // Calling a get on the 'parameters' will still return legacy type because the setters/getters
-    // for map work on different string keys. The setter will set the HIVE_TXN_OPERATIONAL_PROPERTIES
-    // while the getter will try to read the key TABLE_TRANSACTIONAL_PROPERTIES.
-    assertEquals(0, AcidUtils.getAcidOperationalProperties(parameters).toInt());
+    assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt());
     parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, oprProps.toString());
     // Set the appropriate key in the map and test that we are able to read it back correctly.
     assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt());
   }
+
+  /**
+   * See {@link TestOrcRawRecordMerger#testGetLogicalLength()}
+   */
+  public void testGetLogicalLength() throws Exception {
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index b004cf5..53bd08c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -75,12 +75,14 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -597,12 +599,13 @@ public class TestInputOutputFormat {
   @Test
   public void testACIDSplitStrategy() throws Exception {
     conf.set("bucket_count", "2");
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
     MockFileSystem fs = new MockFileSystem(conf,
-        new MockFile("mock:/a/delta_000_001/part-00", 1000, new byte[1], new MockBlock("host1")),
-        new MockFile("mock:/a/delta_000_001/part-01", 1000, new byte[1], new MockBlock("host1")),
-        new MockFile("mock:/a/delta_001_002/part-02", 1000, new byte[1], new MockBlock("host1")),
-        new MockFile("mock:/a/delta_001_002/part-03", 1000, new byte[1], new MockBlock("host1")));
+        new MockFile("mock:/a/delta_000_001/bucket_000000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_000_001/bucket_000001", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_001_002/bucket_000000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_001_002/bucket_000001", 1000, new byte[1], new MockBlock("host1")));
     OrcInputFormat.FileGenerator gen =
         new OrcInputFormat.FileGenerator(context, fs,
             new MockPath(fs, "mock:/a"), false, null);
@@ -611,9 +614,9 @@ public class TestInputOutputFormat {
     List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
     ColumnarSplitSizeEstimator splitSizeEstimator = new ColumnarSplitSizeEstimator();
     for (OrcSplit split: splits) {
-      assertEquals(Integer.MAX_VALUE, splitSizeEstimator.getEstimatedSize(split));
+      assertEquals(1, splitSizeEstimator.getEstimatedSize(split));
     }
-    assertEquals(2, splits.size());
+    assertEquals(4, splits.size());
   }
 
   @Test
@@ -1105,6 +1108,9 @@ public class TestInputOutputFormat {
     }
   }
 
+  /**
+   * WARNING: detele(Path...) don't actually delete
+   */
   public static class MockFileSystem extends FileSystem {
     final List<MockFile> files = new ArrayList<MockFile>();
     final Map<MockFile, FileStatus> fileStatusMap = new HashMap<>();
@@ -1230,14 +1236,32 @@ public class TestInputOutputFormat {
     public boolean delete(Path path) throws IOException {
       statistics.incrementWriteOps(1);
       checkAccess();
-      return false;
+      int removed = 0;
+      for(int i = 0; i < files.size(); i++) {
+        MockFile mf = files.get(i);
+        if(path.equals(mf.path)) {
+          files.remove(i);
+          removed++;
+          break;
+        }
+      }
+      for(int i = 0; i < globalFiles.size(); i++) {
+        MockFile mf = files.get(i);
+        if(path.equals(mf.path)) {
+          globalFiles.remove(i);
+          removed++;
+          break;
+        }
+      }
+      return removed > 0;
     }
 
     @Override
     public boolean delete(Path path, boolean b) throws IOException {
-      statistics.incrementWriteOps(1);
-      checkAccess();
-      return false;
+      if(b) {
+        throw new UnsupportedOperationException();
+      }
+      return delete(path);
     }
 
     @Override
@@ -2690,9 +2714,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktable
-    // call-2: open - mock:/mocktable/0_0
-    // call-3: open - mock:/mocktable/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check existence of side file for mock:/mocktable/0_0
+    // call-3: open - mock:/mocktable/0_0
+    // call-4: check existence of side file for mock:/mocktable/0_1
+    // call-5: open - mock:/mocktable/0_1
+    assertEquals(5, readOpsDelta);
 
     assertEquals(2, splits.length);
     // revert back to local fs
@@ -2748,9 +2774,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl
-    // call-2: open - mock:/mocktbl/0_0
-    // call-3: open - mock:/mocktbl/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check existence of side file for mock:/mocktbl/0_0
+    // call-3: open - mock:/mocktbl/0_0
+    // call-4: check existence of side file for  mock:/mocktbl/0_1
+    // call-5: open - mock:/mocktbl/0_1
+    assertEquals(5, readOpsDelta);
 
     // force BI to avoid reading footers
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
@@ -2768,7 +2796,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl
-    assertEquals(1, readOpsDelta);
+    // call-2: check existence of side file for mock:/mocktbl/0_0
+    // call-3: check existence of side file for  mock:/mocktbl/0_1
+    assertEquals(3, readOpsDelta);
 
     // enable cache and use default strategy
     conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb");
@@ -2787,9 +2817,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl
-    // call-2: open - mock:/mocktbl/0_0
-    // call-3: open - mock:/mocktbl/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check existence of side file for mock:/mocktbl/0_0
+    // call-3: open - mock:/mocktbl/0_0
+    // call-4: check existence of side file for mock:/mocktbl/0_1
+    // call-5: open - mock:/mocktbl/0_1
+    assertEquals(5, readOpsDelta);
 
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -2859,9 +2891,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktable
-    // call-2: open - mock:/mocktbl1/0_0
-    // call-3: open - mock:/mocktbl1/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check side file for mock:/mocktbl1/0_0
+    // call-3: open - mock:/mocktbl1/0_0
+    // call-4: check side file for  mock:/mocktbl1/0_1
+    // call-5: open - mock:/mocktbl1/0_1
+    assertEquals(5, readOpsDelta);
 
     // change file length and look for cache misses
 
@@ -2898,9 +2932,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktable
-    // call-2: open - mock:/mocktbl1/0_0
-    // call-3: open - mock:/mocktbl1/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check side file for mock:/mocktbl1/0_0
+    // call-3: open - mock:/mocktbl1/0_0
+    // call-4: check side file for  mock:/mocktbl1/0_1
+    // call-5: open - mock:/mocktbl1/0_1
+    assertEquals(5, readOpsDelta);
 
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -2971,9 +3007,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl2
-    // call-2: open - mock:/mocktbl2/0_0
-    // call-3: open - mock:/mocktbl2/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check side file for mock:/mocktbl2/0_0
+    // call-3: open - mock:/mocktbl2/0_0
+    // call-4: check side file for  mock:/mocktbl2/0_1
+    // call-5: open - mock:/mocktbl2/0_1
+    assertEquals(5, readOpsDelta);
 
     // change file modification time and look for cache misses
     FileSystem fs1 = FileSystem.get(conf);
@@ -2993,8 +3031,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl2
-    // call-2: open - mock:/mocktbl2/0_1
-    assertEquals(2, readOpsDelta);
+    // call-2: check side file for  mock:/mocktbl2/0_1
+    // call-3: open - mock:/mocktbl2/0_1
+    assertEquals(3, readOpsDelta);
 
     // touch the next file
     fs1 = FileSystem.get(conf);
@@ -3014,8 +3053,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl2
-    // call-2: open - mock:/mocktbl2/0_0
-    assertEquals(2, readOpsDelta);
+    // call-2: check side file for  mock:/mocktbl2/0_0
+    // call-3: open - mock:/mocktbl2/0_0
+    assertEquals(3, readOpsDelta);
 
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3332,6 +3372,7 @@ public class TestInputOutputFormat {
     MockFileSystem fs = new MockFileSystem(conf);
     MockPath mockPath = new MockPath(fs, "mock:///mocktable5");
     conf.set("hive.transactional.table.scan", "true");
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
     conf.set("hive.orc.splits.include.file.footer", "false");
@@ -3409,6 +3450,7 @@ public class TestInputOutputFormat {
     MockFileSystem fs = new MockFileSystem(conf);
     MockPath mockPath = new MockPath(fs, "mock:///mocktable6");
     conf.set("hive.transactional.table.scan", "true");
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
     conf.set("hive.orc.splits.include.file.footer", "true");
@@ -3481,15 +3523,14 @@ public class TestInputOutputFormat {
 
   @Test
   public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception {
-    MockFileSystem fs = new MockFileSystem(conf);
+    conf.set("fs.defaultFS", "mock:///");
+    conf.set("fs.mock.impl", MockFileSystem.class.getName());
+    FileSystem fs = FileSystem.get(conf);
     MockPath mockPath = new MockPath(fs, "mock:///mocktable7");
-    conf.set("hive.transactional.table.scan", "true");
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
     conf.set("hive.orc.splits.include.file.footer", "false");
     conf.set("mapred.input.dir", mockPath.toString());
-    conf.set("fs.defaultFS", "mock:///");
-    conf.set("fs.mock.impl", MockFileSystem.class.getName());
     StructObjectInspector inspector;
     synchronized (TestOrcFile.class) {
       inspector = (StructObjectInspector)
@@ -3505,17 +3546,22 @@ public class TestInputOutputFormat {
     }
     writer.close();
 
-    writer = OrcFile.createWriter(new Path(new Path(mockPath + "/delta_001_002") + "/0_1"),
-        OrcFile.writerOptions(conf).blockPadding(false)
-            .bufferSize(1024).inspector(inspector));
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).bucket(1).minimumTransactionId(1)
+      .maximumTransactionId(1).inspector(inspector).finalDestination(mockPath);
+    OrcOutputFormat of = new OrcOutputFormat();
+    RecordUpdater ru = of.getRecordUpdater(mockPath, options);
     for (int i = 0; i < 10; ++i) {
-      writer.addRow(new MyRow(i, 2 * i));
+      ru.insert(options.getMinimumTransactionId(), new MyRow(i, 2 * i));
     }
-    writer.close();
+    ru.close(false);//this deletes the side file
+
+    //set up props for read
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+    AcidUtils.setTransactionalTableScan(conf, true);
 
     OrcInputFormat orcInputFormat = new OrcInputFormat();
     InputSplit[] splits = orcInputFormat.getSplits(conf, 2);
-    assertEquals(1, splits.length);
+    assertEquals(2, splits.length);
     int readOpsBefore = -1;
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3530,14 +3576,8 @@ public class TestInputOutputFormat {
       assertTrue(split.toString().contains("start=3"));
       assertTrue(split.toString().contains("hasFooter=false"));
       assertTrue(split.toString().contains("hasBase=true"));
-      // NOTE: don't be surprised if deltas value is different
-      // in older release deltas=2 as min and max transaction are added separately to delta list.
-      // in newer release since both of them are put together deltas=1
-      assertTrue(split.toString().contains("deltas=1"));
-      if (split instanceof OrcSplit) {
-        assertFalse("No footer serialize test for ACID reader, hasFooter is not expected in" +
-            " orc splits.", ((OrcSplit) split).hasFooter());
-      }
+      assertFalse("No footer serialize test for ACID reader, hasFooter is not expected in" +
+        " orc splits.", ((OrcSplit) split).hasFooter());
       orcInputFormat.getRecordReader(split, conf, Reporter.NULL);
     }
 
@@ -3547,11 +3587,9 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: open to read footer - split 1 => mock:/mocktable7/0_0
-    // call-2: open to read data - split 1 => mock:/mocktable7/0_0
-    // call-3: open side file (flush length) of delta directory
-    // call-4: fs.exists() check for delta_xxx_xxx/bucket_00000 file
-    // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable7/0_0
+    // call-1: open to read data - split 1 => mock:/mocktable8/0_0
+    // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
+    // call-3: split 2 - read delta_x_y/bucket_00001
     assertEquals(5, readOpsDelta);
 
     // revert back to local fs
@@ -3560,15 +3598,14 @@ public class TestInputOutputFormat {
 
   @Test
   public void testACIDReaderFooterSerializeWithDeltas() throws Exception {
-    MockFileSystem fs = new MockFileSystem(conf);
+    conf.set("fs.defaultFS", "mock:///");
+    conf.set("fs.mock.impl", MockFileSystem.class.getName());
+    FileSystem fs = FileSystem.get(conf);//ensures that FS object is cached so that everyone uses the same instance
     MockPath mockPath = new MockPath(fs, "mock:///mocktable8");
-    conf.set("hive.transactional.table.scan", "true");
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
     conf.set("hive.orc.splits.include.file.footer", "true");
     conf.set("mapred.input.dir", mockPath.toString());
-    conf.set("fs.defaultFS", "mock:///");
-    conf.set("fs.mock.impl", MockFileSystem.class.getName());
     StructObjectInspector inspector;
     synchronized (TestOrcFile.class) {
       inspector = (StructObjectInspector)
@@ -3584,17 +3621,22 @@ public class TestInputOutputFormat {
     }
     writer.close();
 
-    writer = OrcFile.createWriter(new Path(new Path(mockPath + "/delta_001_002") + "/0_1"),
-        OrcFile.writerOptions(conf).blockPadding(false)
-            .bufferSize(1024).inspector(inspector));
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).bucket(1).minimumTransactionId(1)
+      .maximumTransactionId(1).inspector(inspector).finalDestination(mockPath);
+    OrcOutputFormat of = new OrcOutputFormat();
+    RecordUpdater ru = of.getRecordUpdater(mockPath, options);
     for (int i = 0; i < 10; ++i) {
-      writer.addRow(new MyRow(i, 2 * i));
+      ru.insert(options.getMinimumTransactionId(), new MyRow(i, 2 * i));
     }
-    writer.close();
+    ru.close(false);//this deletes the side file
+
+    //set up props for read
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+    AcidUtils.setTransactionalTableScan(conf, true);
 
     OrcInputFormat orcInputFormat = new OrcInputFormat();
     InputSplit[] splits = orcInputFormat.getSplits(conf, 2);
-    assertEquals(1, splits.length);
+    assertEquals(2, splits.length);
     int readOpsBefore = -1;
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3609,14 +3651,8 @@ public class TestInputOutputFormat {
       assertTrue(split.toString().contains("start=3"));
       assertTrue(split.toString().contains("hasFooter=true"));
       assertTrue(split.toString().contains("hasBase=true"));
-      // NOTE: don't be surprised if deltas value is different
-      // in older release deltas=2 as min and max transaction are added separately to delta list.
-      // in newer release since both of them are put together deltas=1
-      assertTrue(split.toString().contains("deltas=1"));
-      if (split instanceof OrcSplit) {
-        assertTrue("Footer serialize test for ACID reader, hasFooter is not expected in" +
-            " orc splits.", ((OrcSplit) split).hasFooter());
-      }
+      assertTrue("Footer serialize test for ACID reader, hasFooter is not expected in" +
+        " orc splits.", ((OrcSplit) split).hasFooter());
       orcInputFormat.getRecordReader(split, conf, Reporter.NULL);
     }
 
@@ -3627,10 +3663,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: open to read data - split 1 => mock:/mocktable8/0_0
-    // call-2: open side file (flush length) of delta directory
-    // call-3: fs.exists() check for delta_xxx_xxx/bucket_00000 file
-    // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable8/0_0
-    assertEquals(4, readOpsDelta);
+    // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
+    // call-3: split 2 - read delta_x_y/bucket_00001
+    assertEquals(3, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");