You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/16 01:14:44 UTC

[2/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 408c089..0e0fca3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -64,16 +65,11 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.validation.constraints.AssertTrue;
-
-/**
- * TODO: this should be merged with TestTxnCommands once that is checked in
- * specifically the tests; the supporting code here is just a clone of TestTxnCommands
- */
 public class TestTxnCommands2 {
   static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands2.class);
   protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
@@ -88,7 +84,7 @@ public class TestTxnCommands2 {
 
   protected HiveConf hiveConf;
   protected Driver d;
-  protected static enum Table {
+  protected enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart", "p"),
     NONACIDORCTBL("nonAcidOrcTbl"),
@@ -113,6 +109,8 @@ public class TestTxnCommands2 {
       this.partitionColumns = partitionColumns;
     }
   }
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
 
   @Before
   public void setUp() throws Exception {
@@ -357,20 +355,22 @@ public class TestTxnCommands2 {
     for(String s : rs) {
       LOG.warn(s);
     }
+    Assert.assertEquals(536870912, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
+    Assert.assertEquals(536936448, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
     /*
      * All ROW__IDs are unique on read after conversion to acid 
      * ROW__IDs are exactly the same before and after compaction
-     * Also check the file name after compaction for completeness
+     * Also check the file name (only) after compaction for completeness
      */
     String[][] expected = {
-      {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13",  "bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t13",  "bucket_00000"},
       {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"},
       {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"},
-      {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"},
-      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2",   "bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4",   "bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5",   "bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6",   "bucket_00001"},
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2",   "bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":3}\t1\t4",   "bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5",   "bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t6",   "bucket_00001"},
       {"{\"transactionid\":18,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
     };
     Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
@@ -392,9 +392,20 @@ public class TestTxnCommands2 {
     }
     //make sure they are the same before and after compaction
   }
-
   /**
-   * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
+   * In current implementation of ACID, altering the value of transactional_properties or trying to
+   * set a value for previously unset value for an acid table will throw an exception.
+   * @throws Exception
+   */
+  @Test
+  public void testFailureOnAlteringTransactionalProperties() throws Exception {
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
+    runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
+  }
+  /**
+   * Test the query correctness and directory layout for ACID table conversion
    * 1. Insert a row to Non-ACID table
    * 2. Convert Non-ACID to ACID table
    * 3. Insert a row to ACID table
@@ -410,7 +421,7 @@ public class TestTxnCommands2 {
     // 1. Insert a row to Non-ACID table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // There should be 2 original bucket files in the location (000000_0 and 000001_0)
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -426,7 +437,7 @@ public class TestTxnCommands2 {
     // 2. Convert NONACIDORCTBL to ACID table
     runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Everything should be same as before
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -442,24 +453,23 @@ public class TestTxnCommands2 {
     // 3. Insert another row to newly-converted ACID table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
-    // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
+    // The delta directory should also have only 1 bucket file (bucket_00001)
     Assert.assertEquals(3, status.length);
     boolean sawNewDelta = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("delta_.*")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(1, buckets.length); // only one bucket file
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
       } else {
         Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
       }
     }
     Assert.assertTrue(sawNewDelta);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
     resultData = new int[][] {{1, 2}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
     rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
@@ -472,16 +482,15 @@ public class TestTxnCommands2 {
     // There should be 1 new directory: base_xxxxxxx.
     // Original bucket files and delta directory should stay until Cleaner kicks in.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(4, status.length);
     boolean sawNewBase = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("base_.*")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+        Assert.assertEquals(1, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
       }
     }
     Assert.assertTrue(sawNewBase);
@@ -495,13 +504,13 @@ public class TestTxnCommands2 {
     // 5. Let Cleaner delete obsolete files/dirs
     // Note, here we create a fake directory along with fake files as original directories/files
     String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
-        "/subdir/000000_0";
+      "/subdir/000000_0";
     String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
-        "/subdir/000000_1";
+      "/subdir/000000_1";
     fs.create(new Path(fakeFile0));
     fs.create(new Path(fakeFile1));
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Before Cleaner, there should be 5 items:
     // 2 original files, 1 original directory, 1 base directory and 1 delta directory
     Assert.assertEquals(5, status.length);
@@ -509,13 +518,12 @@ public class TestTxnCommands2 {
     // There should be only 1 directory left: base_xxxxxxx.
     // Original bucket files and delta directory should have been cleaned up.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(1, status.length);
     Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
     FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(BUCKET_COUNT, buckets.length);
-    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-    Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
+    Assert.assertEquals(1, buckets.length);
+    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 2}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
@@ -525,7 +533,7 @@ public class TestTxnCommands2 {
   }
 
   /**
-   * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
+   * Test the query correctness and directory layout for ACID table conversion
    * 1. Insert a row to Non-ACID table
    * 2. Convert Non-ACID to ACID table
    * 3. Update the existing row in ACID table
@@ -541,7 +549,7 @@ public class TestTxnCommands2 {
     // 1. Insert a row to Non-ACID table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // There should be 2 original bucket files in the location (000000_0 and 000001_0)
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -550,14 +558,14 @@ public class TestTxnCommands2 {
     List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     int [][] resultData = new int[][] {{1, 2}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
     int resultCount = 1;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
     // 2. Convert NONACIDORCTBL to ACID table
     runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Everything should be same as before
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -566,29 +574,39 @@ public class TestTxnCommands2 {
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 2}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
     resultCount = 1;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
     // 3. Update the existing row in newly-converted ACID table
     runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory
+    // and one delete_delta directory. When split-update is enabled, an update event is split into
+    // a combination of delete and insert, that generates the delete_delta directory.
     // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
-    Assert.assertEquals(3, status.length);
+    // and so should the delete_delta directory.
+    Assert.assertEquals(4, status.length);
     boolean sawNewDelta = false;
+    boolean sawNewDeleteDelta = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("delta_.*")) {
         sawNewDelta = true;
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
         Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
         Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+        sawNewDeleteDelta = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
       } else {
         Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
       }
     }
     Assert.assertTrue(sawNewDelta);
+    Assert.assertTrue(sawNewDeleteDelta);
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}};
     Assert.assertEquals(stringifyValues(resultData), rs);
@@ -602,8 +620,8 @@ public class TestTxnCommands2 {
     // There should be 1 new directory: base_0000001.
     // Original bucket files and delta directory should stay until Cleaner kicks in.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(4, status.length);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(5, status.length);
     boolean sawNewBase = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("base_.*")) {
@@ -623,15 +641,15 @@ public class TestTxnCommands2 {
 
     // 5. Let Cleaner delete obsolete files/dirs
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Before Cleaner, there should be 4 items:
-    // 2 original files, 1 delta directory and 1 base directory
-    Assert.assertEquals(4, status.length);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 5 items:
+    // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory
+    Assert.assertEquals(5, status.length);
     runCleaner(hiveConf);
     // There should be only 1 directory left: base_0000001.
-    // Original bucket files and delta directory should have been cleaned up.
+    // Original bucket files, delta directory and delete_delta should have been cleaned up.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(1, status.length);
     Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
     FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
@@ -646,7 +664,7 @@ public class TestTxnCommands2 {
   }
 
   /**
-   * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction
+   * Test the query correctness and directory layout for ACID table conversion
    * 1. Insert a row to Non-ACID table
    * 2. Convert Non-ACID to ACID table
    * 3. Perform Major compaction
@@ -663,7 +681,7 @@ public class TestTxnCommands2 {
     // 1. Insert a row to Non-ACID table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // There should be 2 original bucket files in the location (000000_0 and 000001_0)
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -676,10 +694,10 @@ public class TestTxnCommands2 {
     int resultCount = 1;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
-    // 2. Convert NONACIDORCTBL to ACID table
+    // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default)
     runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Everything should be same as before
     Assert.assertEquals(BUCKET_COUNT, status.length);
     for (int i = 0; i < status.length; i++) {
@@ -698,7 +716,7 @@ public class TestTxnCommands2 {
     // There should be 1 new directory: base_-9223372036854775808
     // Original bucket files should stay until Cleaner kicks in.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(3, status.length);
     boolean sawNewBase = false;
     for (int i = 0; i < status.length; i++) {
@@ -722,12 +740,14 @@ public class TestTxnCommands2 {
     runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Arrays.sort(status);  // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000
     // There should be 2 original bucket files (000000_0 and 000001_0), a base directory,
-    // plus two new delta directories
-    Assert.assertEquals(5, status.length);
+    // plus two new delta directories and one delete_delta directory that would be created due to
+    // the update statement (remember split-update U=D+I)!
+    Assert.assertEquals(6, status.length);
     int numDelta = 0;
+    int numDeleteDelta = 0;
     sawNewBase = false;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("delta_.*")) {
@@ -740,9 +760,17 @@ public class TestTxnCommands2 {
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         } else if (numDelta == 2) {
           Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT, buckets.length);
-          Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
-          Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+          Assert.assertEquals(1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        }
+      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+        numDeleteDelta++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numDeleteDelta == 1) {
+          Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         }
       } else if (status[i].getPath().getName().matches("base_.*")) {
         Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
@@ -755,11 +783,13 @@ public class TestTxnCommands2 {
       }
     }
     Assert.assertEquals(2, numDelta);
+    Assert.assertEquals(1, numDeleteDelta);
     Assert.assertTrue(sawNewBase);
+
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
@@ -767,11 +797,12 @@ public class TestTxnCommands2 {
     runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
     runWorker(hiveConf);
     // There should be 1 new base directory: base_0000001
-    // Original bucket files, delta directories and the previous base directory should stay until Cleaner kicks in.
+    // Original bucket files, delta directories, delete_delta directories and the
+    // previous base directory should stay until Cleaner kicks in.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Arrays.sort(status);
-    Assert.assertEquals(6, status.length);
+    Assert.assertEquals(7, status.length);
     int numBase = 0;
     for (int i = 0; i < status.length; i++) {
       if (status[i].getPath().getName().matches("base_.*")) {
@@ -785,9 +816,8 @@ public class TestTxnCommands2 {
         } else if (numBase == 2) {
           // The new base dir now has two bucket files, since the delta dir has two bucket files
           Assert.assertEquals("base_0000023", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT, buckets.length);
-          Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
-          Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+          Assert.assertEquals(1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         }
       }
     }
@@ -795,28 +825,27 @@ public class TestTxnCommands2 {
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
     // 6. Let Cleaner delete obsolete files/dirs
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     // Before Cleaner, there should be 6 items:
-    // 2 original files, 2 delta directories and 2 base directories
-    Assert.assertEquals(6, status.length);
+    // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
+    Assert.assertEquals(7, status.length);
     runCleaner(hiveConf);
     // There should be only 1 directory left: base_0000001.
     // Original bucket files, delta directories and previous base directory should have been cleaned up.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+      (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(1, status.length);
     Assert.assertEquals("base_0000023", status[0].getPath().getName());
     FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
     Arrays.sort(buckets);
-    Assert.assertEquals(BUCKET_COUNT, buckets.length);
-    Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
-    Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
+    Assert.assertEquals(1, buckets.length);
+    Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
@@ -824,9 +853,6 @@ public class TestTxnCommands2 {
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
   }
-
-
-
   @Test
   public void testValidTxnsBookkeeping() throws Exception {
     // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
deleted file mode 100644
index 520e958..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
+++ /dev/null
@@ -1,545 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-/**
- * Same as TestTxnCommands2 but tests ACID tables with 'transactional_properties' set to 'default'.
- * This tests whether ACID tables with split-update turned on are working correctly or not
- * for the same set of tests when it is turned off. Of course, it also adds a few tests to test
- * specific behaviors of ACID tables with split-update turned on.
- */
-public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
-
-  public TestTxnCommands2WithSplitUpdate() {
-    super();
-  }
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Override
-  @Before
-  public void setUp() throws Exception {
-    setUpWithTableProperties("'transactional'='true','transactional_properties'='default'");
-  }
-
-  @Override
-  @Test
-  public void testInitiatorWithMultipleFailedCompactions() throws Exception {
-    // Test with split-update turned on.
-    testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
-  }
-
-  @Override
-  @Test
-  public void writeBetweenWorkerAndCleaner() throws Exception {
-    writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
-  }
-
-  @Override
-  @Test
-  public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
-    testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
-  }
-
-  /**
-   * In current implementation of ACID, altering the value of transactional_properties or trying to
-   * set a value for previously unset value for an acid table will throw an exception.
-   * @throws Exception
-   */
-  @Test
-  public void testFailureOnAlteringTransactionalProperties() throws Exception {
-    expectedException.expect(RuntimeException.class);
-    expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
-    runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
-    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
-  }
-  /**
-   * Test the query correctness and directory layout for ACID table conversion with split-update
-   * enabled.
-   * 1. Insert a row to Non-ACID table
-   * 2. Convert Non-ACID to ACID table with split-update enabled
-   * 3. Insert a row to ACID table
-   * 4. Perform Major compaction
-   * 5. Clean
-   * @throws Exception
-   */
-  @Test
-  @Override
-  public void testNonAcidToAcidConversion1() throws Exception {
-    FileSystem fs = FileSystem.get(hiveConf);
-    FileStatus[] status;
-
-    // 1. Insert a row to Non-ACID table
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files in the location (000000_0 and 000001_0)
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    int [][] resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    int resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 2. Convert NONACIDORCTBL to ACID table
-    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
-        + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Everything should be same as before
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 3. Insert another row to newly-converted ACID table
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
-    // The delta directory should also have only 1 bucket file (bucket_00001)
-    Assert.assertEquals(3, status.length);
-    boolean sawNewDelta = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("delta_.*")) {
-        sawNewDelta = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(1, buckets.length); // only one bucket file
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-      } else {
-        Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-      }
-    }
-    Assert.assertTrue(sawNewDelta);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
-    resultData = new int[][] {{1, 2}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 4. Perform a major compaction
-    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    // There should be 1 new directory: base_xxxxxxx.
-    // Original bucket files and delta directory should stay until Cleaner kicks in.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(4, status.length);
-    boolean sawNewBase = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("base_.*")) {
-        sawNewBase = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(1, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-      }
-    }
-    Assert.assertTrue(sawNewBase);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 5. Let Cleaner delete obsolete files/dirs
-    // Note, here we create a fake directory along with fake files as original directories/files
-    String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
-        "/subdir/000000_0";
-    String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
-        "/subdir/000000_1";
-    fs.create(new Path(fakeFile0));
-    fs.create(new Path(fakeFile1));
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Before Cleaner, there should be 5 items:
-    // 2 original files, 1 original directory, 1 base directory and 1 delta directory
-    Assert.assertEquals(5, status.length);
-    runCleaner(hiveConf);
-    // There should be only 1 directory left: base_xxxxxxx.
-    // Original bucket files and delta directory should have been cleaned up.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, status.length);
-    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
-    FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, buckets.length);
-    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-  }
-
-  /**
-   * Test the query correctness and directory layout for ACID table conversion with split-update
-   * enabled.
-   * 1. Insert a row to Non-ACID table
-   * 2. Convert Non-ACID to ACID table with split update enabled.
-   * 3. Update the existing row in ACID table
-   * 4. Perform Major compaction
-   * 5. Clean
-   * @throws Exception
-   */
-  @Test
-  @Override
-  public void testNonAcidToAcidConversion2() throws Exception {
-    FileSystem fs = FileSystem.get(hiveConf);
-    FileStatus[] status;
-
-    // 1. Insert a row to Non-ACID table
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files in the location (000000_0 and 000001_0)
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    int [][] resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    int resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 2. Convert NONACIDORCTBL to ACID table
-    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
-        + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Everything should be same as before
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 3. Update the existing row in newly-converted ACID table
-    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory
-    // and one delete_delta directory. When split-update is enabled, an update event is split into
-    // a combination of delete and insert, that generates the delete_delta directory.
-    // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
-    // and so should the delete_delta directory.
-    Assert.assertEquals(4, status.length);
-    boolean sawNewDelta = false;
-    boolean sawNewDeleteDelta = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("delta_.*")) {
-        sawNewDelta = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
-        sawNewDeleteDelta = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
-      } else {
-        Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-      }
-    }
-    Assert.assertTrue(sawNewDelta);
-    Assert.assertTrue(sawNewDeleteDelta);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 4. Perform a major compaction
-    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    // There should be 1 new directory: base_0000001.
-    // Original bucket files and delta directory should stay until Cleaner kicks in.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(5, status.length);
-    boolean sawNewBase = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("base_.*")) {
-        sawNewBase = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-      }
-    }
-    Assert.assertTrue(sawNewBase);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 5. Let Cleaner delete obsolete files/dirs
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Before Cleaner, there should be 5 items:
-    // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory
-    Assert.assertEquals(5, status.length);
-    runCleaner(hiveConf);
-    // There should be only 1 directory left: base_0000001.
-    // Original bucket files, delta directory and delete_delta should have been cleaned up.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, status.length);
-    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
-    FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-  }
-
-  /**
-   * Test the query correctness and directory layout for ACID table conversion with split-update
-   * enabled.
-   * 1. Insert a row to Non-ACID table
-   * 2. Convert Non-ACID to ACID table with split-update enabled
-   * 3. Perform Major compaction
-   * 4. Insert a new row to ACID table
-   * 5. Perform another Major compaction
-   * 6. Clean
-   * @throws Exception
-   */
-  @Test
-  @Override
-  public void testNonAcidToAcidConversion3() throws Exception {
-    FileSystem fs = FileSystem.get(hiveConf);
-    FileStatus[] status;
-
-    // 1. Insert a row to Non-ACID table
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 original bucket files in the location (000000_0 and 000001_0)
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    int [][] resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    int resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default)
-    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
-        + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Everything should be same as before
-    Assert.assertEquals(BUCKET_COUNT, status.length);
-    for (int i = 0; i < status.length; i++) {
-      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-    }
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 3. Perform a major compaction
-    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    // There should be 1 new directory: base_-9223372036854775808
-    // Original bucket files should stay until Cleaner kicks in.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(3, status.length);
-    boolean sawNewBase = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("base_.*")) {
-        Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
-        sawNewBase = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-      }
-    }
-    Assert.assertTrue(sawNewBase);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 2}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 1;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 4. Update the existing row, and insert another row to newly-converted ACID table
-    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
-    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Arrays.sort(status);  // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000
-    // There should be 2 original bucket files (000000_0 and 000001_0), a base directory,
-    // plus two new delta directories and one delete_delta directory that would be created due to
-    // the update statement (remember split-update U=D+I)!
-    Assert.assertEquals(6, status.length);
-    int numDelta = 0;
-    int numDeleteDelta = 0;
-    sawNewBase = false;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("delta_.*")) {
-        numDelta++;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Arrays.sort(buckets);
-        if (numDelta == 1) {
-          Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        } else if (numDelta == 2) {
-          Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
-          Assert.assertEquals(1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        }
-      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
-        numDeleteDelta++;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Arrays.sort(buckets);
-        if (numDeleteDelta == 1) {
-          Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        }
-      } else if (status[i].getPath().getName().matches("base_.*")) {
-        Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
-        sawNewBase = true;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-      } else {
-        Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
-      }
-    }
-    Assert.assertEquals(2, numDelta);
-    Assert.assertEquals(1, numDeleteDelta);
-    Assert.assertTrue(sawNewBase);
-
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 5. Perform another major compaction
-    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    // There should be 1 new base directory: base_0000001
-    // Original bucket files, delta directories, delete_delta directories and the
-    // previous base directory should stay until Cleaner kicks in.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Arrays.sort(status);
-    Assert.assertEquals(7, status.length);
-    int numBase = 0;
-    for (int i = 0; i < status.length; i++) {
-      if (status[i].getPath().getName().matches("base_.*")) {
-        numBase++;
-        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-        Arrays.sort(buckets);
-        if (numBase == 1) {
-          Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
-          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        } else if (numBase == 2) {
-          // The new base dir now has two bucket files, since the delta dir has two bucket files
-          Assert.assertEquals("base_0000023", status[i].getPath().getName());
-          Assert.assertEquals(1, buckets.length);
-          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-        }
-      }
-    }
-    Assert.assertEquals(2, numBase);
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-
-    // 6. Let Cleaner delete obsolete files/dirs
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // Before Cleaner, there should be 6 items:
-    // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
-    Assert.assertEquals(7, status.length);
-    runCleaner(hiveConf);
-    // There should be only 1 directory left: base_0000001.
-    // Original bucket files, delta directories and previous base directory should have been cleaned up.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, status.length);
-    Assert.assertEquals("base_0000023", status[0].getPath().getName());
-    FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
-    Arrays.sort(buckets);
-    Assert.assertEquals(1, buckets.length);
-    Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
-    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
-    resultData = new int[][] {{1, 3}, {3, 4}};
-    Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
-    resultCount = 2;
-    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
index 44a9412..c76d654 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java
@@ -23,11 +23,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 /**
- * Same as TestTxnCommands2WithSplitUpdate but tests ACID tables with vectorization turned on by
+ * Same as TestTxnCommands2 but tests ACID tables with vectorization turned on by
  * default, and having 'transactional_properties' set to 'default'. This specifically tests the
  * fast VectorizedOrcAcidRowBatchReader for ACID tables with split-update turned on.
  */
-public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2WithSplitUpdate {
+public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2 {
 
   public TestTxnCommands2WithSplitUpdateAndVectorization() {
     super();

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 44ff65c..06e4f98 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath;
+import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.junit.Assert;
 import org.junit.Test;
@@ -669,21 +670,12 @@ public class TestAcidUtils {
 
   @Test
   public void testAcidOperationalProperties() throws Exception {
-    AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getLegacy();
-    assertsForAcidOperationalProperties(testObj, "legacy");
-
-    testObj = AcidUtils.AcidOperationalProperties.getDefault();
+    AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getDefault();
     assertsForAcidOperationalProperties(testObj, "default");
 
-    testObj = AcidUtils.AcidOperationalProperties.parseInt(0);
-    assertsForAcidOperationalProperties(testObj, "legacy");
-
     testObj = AcidUtils.AcidOperationalProperties.parseInt(1);
     assertsForAcidOperationalProperties(testObj, "split_update");
 
-    testObj = AcidUtils.AcidOperationalProperties.parseString("legacy");
-    assertsForAcidOperationalProperties(testObj, "legacy");
-
     testObj = AcidUtils.AcidOperationalProperties.parseString("default");
     assertsForAcidOperationalProperties(testObj, "default");
 
@@ -699,12 +691,6 @@ public class TestAcidUtils {
         assertEquals(1, testObj.toInt());
         assertEquals("|split_update", testObj.toString());
         break;
-      case "legacy":
-        assertEquals(false, testObj.isSplitUpdate());
-        assertEquals(false, testObj.isHashBasedMerge());
-        assertEquals(0, testObj.toInt());
-        assertEquals("", testObj.toString());
-        break;
       default:
         break;
     }
@@ -716,7 +702,7 @@ public class TestAcidUtils {
     Configuration testConf = new Configuration();
     // Test setter for configuration object.
     AcidUtils.setAcidOperationalProperties(testConf, oprProps);
-    assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 0));
+    assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, -1));
     // Test getter for configuration object.
     assertEquals(oprProps.toString(), AcidUtils.getAcidOperationalProperties(testConf).toString());
 
@@ -726,12 +712,15 @@ public class TestAcidUtils {
     assertEquals(oprProps.toString(),
         parameters.get(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname));
     // Test getter for map object.
-    // Calling a get on the 'parameters' will still return legacy type because the setters/getters
-    // for map work on different string keys. The setter will set the HIVE_TXN_OPERATIONAL_PROPERTIES
-    // while the getter will try to read the key TABLE_TRANSACTIONAL_PROPERTIES.
-    assertEquals(0, AcidUtils.getAcidOperationalProperties(parameters).toInt());
+    assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt());
     parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, oprProps.toString());
     // Set the appropriate key in the map and test that we are able to read it back correctly.
     assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt());
   }
+
+  /**
+   * See {@link TestOrcRawRecordMerger#testGetLogicalLength()}
+   */
+  public void testGetLogicalLength() throws Exception {
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index b004cf5..53bd08c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -75,12 +75,14 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -597,12 +599,13 @@ public class TestInputOutputFormat {
   @Test
   public void testACIDSplitStrategy() throws Exception {
     conf.set("bucket_count", "2");
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
     MockFileSystem fs = new MockFileSystem(conf,
-        new MockFile("mock:/a/delta_000_001/part-00", 1000, new byte[1], new MockBlock("host1")),
-        new MockFile("mock:/a/delta_000_001/part-01", 1000, new byte[1], new MockBlock("host1")),
-        new MockFile("mock:/a/delta_001_002/part-02", 1000, new byte[1], new MockBlock("host1")),
-        new MockFile("mock:/a/delta_001_002/part-03", 1000, new byte[1], new MockBlock("host1")));
+        new MockFile("mock:/a/delta_000_001/bucket_000000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_000_001/bucket_000001", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_001_002/bucket_000000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_001_002/bucket_000001", 1000, new byte[1], new MockBlock("host1")));
     OrcInputFormat.FileGenerator gen =
         new OrcInputFormat.FileGenerator(context, fs,
             new MockPath(fs, "mock:/a"), false, null);
@@ -611,9 +614,9 @@ public class TestInputOutputFormat {
     List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
     ColumnarSplitSizeEstimator splitSizeEstimator = new ColumnarSplitSizeEstimator();
     for (OrcSplit split: splits) {
-      assertEquals(Integer.MAX_VALUE, splitSizeEstimator.getEstimatedSize(split));
+      assertEquals(1, splitSizeEstimator.getEstimatedSize(split));
     }
-    assertEquals(2, splits.size());
+    assertEquals(4, splits.size());
   }
 
   @Test
@@ -1105,6 +1108,9 @@ public class TestInputOutputFormat {
     }
   }
 
+  /**
+   * WARNING: detele(Path...) don't actually delete
+   */
   public static class MockFileSystem extends FileSystem {
     final List<MockFile> files = new ArrayList<MockFile>();
     final Map<MockFile, FileStatus> fileStatusMap = new HashMap<>();
@@ -1230,14 +1236,32 @@ public class TestInputOutputFormat {
     public boolean delete(Path path) throws IOException {
       statistics.incrementWriteOps(1);
       checkAccess();
-      return false;
+      int removed = 0;
+      for(int i = 0; i < files.size(); i++) {
+        MockFile mf = files.get(i);
+        if(path.equals(mf.path)) {
+          files.remove(i);
+          removed++;
+          break;
+        }
+      }
+      for(int i = 0; i < globalFiles.size(); i++) {
+        MockFile mf = files.get(i);
+        if(path.equals(mf.path)) {
+          globalFiles.remove(i);
+          removed++;
+          break;
+        }
+      }
+      return removed > 0;
     }
 
     @Override
     public boolean delete(Path path, boolean b) throws IOException {
-      statistics.incrementWriteOps(1);
-      checkAccess();
-      return false;
+      if(b) {
+        throw new UnsupportedOperationException();
+      }
+      return delete(path);
     }
 
     @Override
@@ -2690,9 +2714,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktable
-    // call-2: open - mock:/mocktable/0_0
-    // call-3: open - mock:/mocktable/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check existence of side file for mock:/mocktable/0_0
+    // call-3: open - mock:/mocktable/0_0
+    // call-4: check existence of side file for mock:/mocktable/0_1
+    // call-5: open - mock:/mocktable/0_1
+    assertEquals(5, readOpsDelta);
 
     assertEquals(2, splits.length);
     // revert back to local fs
@@ -2748,9 +2774,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl
-    // call-2: open - mock:/mocktbl/0_0
-    // call-3: open - mock:/mocktbl/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check existence of side file for mock:/mocktbl/0_0
+    // call-3: open - mock:/mocktbl/0_0
+    // call-4: check existence of side file for  mock:/mocktbl/0_1
+    // call-5: open - mock:/mocktbl/0_1
+    assertEquals(5, readOpsDelta);
 
     // force BI to avoid reading footers
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
@@ -2768,7 +2796,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl
-    assertEquals(1, readOpsDelta);
+    // call-2: check existence of side file for mock:/mocktbl/0_0
+    // call-3: check existence of side file for  mock:/mocktbl/0_1
+    assertEquals(3, readOpsDelta);
 
     // enable cache and use default strategy
     conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb");
@@ -2787,9 +2817,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl
-    // call-2: open - mock:/mocktbl/0_0
-    // call-3: open - mock:/mocktbl/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check existence of side file for mock:/mocktbl/0_0
+    // call-3: open - mock:/mocktbl/0_0
+    // call-4: check existence of side file for mock:/mocktbl/0_1
+    // call-5: open - mock:/mocktbl/0_1
+    assertEquals(5, readOpsDelta);
 
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -2859,9 +2891,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktable
-    // call-2: open - mock:/mocktbl1/0_0
-    // call-3: open - mock:/mocktbl1/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check side file for mock:/mocktbl1/0_0
+    // call-3: open - mock:/mocktbl1/0_0
+    // call-4: check side file for  mock:/mocktbl1/0_1
+    // call-5: open - mock:/mocktbl1/0_1
+    assertEquals(5, readOpsDelta);
 
     // change file length and look for cache misses
 
@@ -2898,9 +2932,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktable
-    // call-2: open - mock:/mocktbl1/0_0
-    // call-3: open - mock:/mocktbl1/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check side file for mock:/mocktbl1/0_0
+    // call-3: open - mock:/mocktbl1/0_0
+    // call-4: check side file for  mock:/mocktbl1/0_1
+    // call-5: open - mock:/mocktbl1/0_1
+    assertEquals(5, readOpsDelta);
 
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -2971,9 +3007,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl2
-    // call-2: open - mock:/mocktbl2/0_0
-    // call-3: open - mock:/mocktbl2/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: check side file for mock:/mocktbl2/0_0
+    // call-3: open - mock:/mocktbl2/0_0
+    // call-4: check side file for  mock:/mocktbl2/0_1
+    // call-5: open - mock:/mocktbl2/0_1
+    assertEquals(5, readOpsDelta);
 
     // change file modification time and look for cache misses
     FileSystem fs1 = FileSystem.get(conf);
@@ -2993,8 +3031,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl2
-    // call-2: open - mock:/mocktbl2/0_1
-    assertEquals(2, readOpsDelta);
+    // call-2: check side file for  mock:/mocktbl2/0_1
+    // call-3: open - mock:/mocktbl2/0_1
+    assertEquals(3, readOpsDelta);
 
     // touch the next file
     fs1 = FileSystem.get(conf);
@@ -3014,8 +3053,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl2
-    // call-2: open - mock:/mocktbl2/0_0
-    assertEquals(2, readOpsDelta);
+    // call-2: check side file for  mock:/mocktbl2/0_0
+    // call-3: open - mock:/mocktbl2/0_0
+    assertEquals(3, readOpsDelta);
 
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3332,6 +3372,7 @@ public class TestInputOutputFormat {
     MockFileSystem fs = new MockFileSystem(conf);
     MockPath mockPath = new MockPath(fs, "mock:///mocktable5");
     conf.set("hive.transactional.table.scan", "true");
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
     conf.set("hive.orc.splits.include.file.footer", "false");
@@ -3409,6 +3450,7 @@ public class TestInputOutputFormat {
     MockFileSystem fs = new MockFileSystem(conf);
     MockPath mockPath = new MockPath(fs, "mock:///mocktable6");
     conf.set("hive.transactional.table.scan", "true");
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
     conf.set("hive.orc.splits.include.file.footer", "true");
@@ -3481,15 +3523,14 @@ public class TestInputOutputFormat {
 
   @Test
   public void testACIDReaderNoFooterSerializeWithDeltas() throws Exception {
-    MockFileSystem fs = new MockFileSystem(conf);
+    conf.set("fs.defaultFS", "mock:///");
+    conf.set("fs.mock.impl", MockFileSystem.class.getName());
+    FileSystem fs = FileSystem.get(conf);
     MockPath mockPath = new MockPath(fs, "mock:///mocktable7");
-    conf.set("hive.transactional.table.scan", "true");
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
     conf.set("hive.orc.splits.include.file.footer", "false");
     conf.set("mapred.input.dir", mockPath.toString());
-    conf.set("fs.defaultFS", "mock:///");
-    conf.set("fs.mock.impl", MockFileSystem.class.getName());
     StructObjectInspector inspector;
     synchronized (TestOrcFile.class) {
       inspector = (StructObjectInspector)
@@ -3505,17 +3546,22 @@ public class TestInputOutputFormat {
     }
     writer.close();
 
-    writer = OrcFile.createWriter(new Path(new Path(mockPath + "/delta_001_002") + "/0_1"),
-        OrcFile.writerOptions(conf).blockPadding(false)
-            .bufferSize(1024).inspector(inspector));
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).bucket(1).minimumTransactionId(1)
+      .maximumTransactionId(1).inspector(inspector).finalDestination(mockPath);
+    OrcOutputFormat of = new OrcOutputFormat();
+    RecordUpdater ru = of.getRecordUpdater(mockPath, options);
     for (int i = 0; i < 10; ++i) {
-      writer.addRow(new MyRow(i, 2 * i));
+      ru.insert(options.getMinimumTransactionId(), new MyRow(i, 2 * i));
     }
-    writer.close();
+    ru.close(false);//this deletes the side file
+
+    //set up props for read
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+    AcidUtils.setTransactionalTableScan(conf, true);
 
     OrcInputFormat orcInputFormat = new OrcInputFormat();
     InputSplit[] splits = orcInputFormat.getSplits(conf, 2);
-    assertEquals(1, splits.length);
+    assertEquals(2, splits.length);
     int readOpsBefore = -1;
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3530,14 +3576,8 @@ public class TestInputOutputFormat {
       assertTrue(split.toString().contains("start=3"));
       assertTrue(split.toString().contains("hasFooter=false"));
       assertTrue(split.toString().contains("hasBase=true"));
-      // NOTE: don't be surprised if deltas value is different
-      // in older release deltas=2 as min and max transaction are added separately to delta list.
-      // in newer release since both of them are put together deltas=1
-      assertTrue(split.toString().contains("deltas=1"));
-      if (split instanceof OrcSplit) {
-        assertFalse("No footer serialize test for ACID reader, hasFooter is not expected in" +
-            " orc splits.", ((OrcSplit) split).hasFooter());
-      }
+      assertFalse("No footer serialize test for ACID reader, hasFooter is not expected in" +
+        " orc splits.", ((OrcSplit) split).hasFooter());
       orcInputFormat.getRecordReader(split, conf, Reporter.NULL);
     }
 
@@ -3547,11 +3587,9 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: open to read footer - split 1 => mock:/mocktable7/0_0
-    // call-2: open to read data - split 1 => mock:/mocktable7/0_0
-    // call-3: open side file (flush length) of delta directory
-    // call-4: fs.exists() check for delta_xxx_xxx/bucket_00000 file
-    // call-5: AcidUtils.getAcidState - getLen() mock:/mocktable7/0_0
+    // call-1: open to read data - split 1 => mock:/mocktable8/0_0
+    // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
+    // call-3: split 2 - read delta_x_y/bucket_00001
     assertEquals(5, readOpsDelta);
 
     // revert back to local fs
@@ -3560,15 +3598,14 @@ public class TestInputOutputFormat {
 
   @Test
   public void testACIDReaderFooterSerializeWithDeltas() throws Exception {
-    MockFileSystem fs = new MockFileSystem(conf);
+    conf.set("fs.defaultFS", "mock:///");
+    conf.set("fs.mock.impl", MockFileSystem.class.getName());
+    FileSystem fs = FileSystem.get(conf);//ensures that FS object is cached so that everyone uses the same instance
     MockPath mockPath = new MockPath(fs, "mock:///mocktable8");
-    conf.set("hive.transactional.table.scan", "true");
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty());
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty());
     conf.set("hive.orc.splits.include.file.footer", "true");
     conf.set("mapred.input.dir", mockPath.toString());
-    conf.set("fs.defaultFS", "mock:///");
-    conf.set("fs.mock.impl", MockFileSystem.class.getName());
     StructObjectInspector inspector;
     synchronized (TestOrcFile.class) {
       inspector = (StructObjectInspector)
@@ -3584,17 +3621,22 @@ public class TestInputOutputFormat {
     }
     writer.close();
 
-    writer = OrcFile.createWriter(new Path(new Path(mockPath + "/delta_001_002") + "/0_1"),
-        OrcFile.writerOptions(conf).blockPadding(false)
-            .bufferSize(1024).inspector(inspector));
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).bucket(1).minimumTransactionId(1)
+      .maximumTransactionId(1).inspector(inspector).finalDestination(mockPath);
+    OrcOutputFormat of = new OrcOutputFormat();
+    RecordUpdater ru = of.getRecordUpdater(mockPath, options);
     for (int i = 0; i < 10; ++i) {
-      writer.addRow(new MyRow(i, 2 * i));
+      ru.insert(options.getMinimumTransactionId(), new MyRow(i, 2 * i));
     }
-    writer.close();
+    ru.close(false);//this deletes the side file
+
+    //set up props for read
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+    AcidUtils.setTransactionalTableScan(conf, true);
 
     OrcInputFormat orcInputFormat = new OrcInputFormat();
     InputSplit[] splits = orcInputFormat.getSplits(conf, 2);
-    assertEquals(1, splits.length);
+    assertEquals(2, splits.length);
     int readOpsBefore = -1;
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3609,14 +3651,8 @@ public class TestInputOutputFormat {
       assertTrue(split.toString().contains("start=3"));
       assertTrue(split.toString().contains("hasFooter=true"));
       assertTrue(split.toString().contains("hasBase=true"));
-      // NOTE: don't be surprised if deltas value is different
-      // in older release deltas=2 as min and max transaction are added separately to delta list.
-      // in newer release since both of them are put together deltas=1
-      assertTrue(split.toString().contains("deltas=1"));
-      if (split instanceof OrcSplit) {
-        assertTrue("Footer serialize test for ACID reader, hasFooter is not expected in" +
-            " orc splits.", ((OrcSplit) split).hasFooter());
-      }
+      assertTrue("Footer serialize test for ACID reader, hasFooter is not expected in" +
+        " orc splits.", ((OrcSplit) split).hasFooter());
       orcInputFormat.getRecordReader(split, conf, Reporter.NULL);
     }
 
@@ -3627,10 +3663,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: open to read data - split 1 => mock:/mocktable8/0_0
-    // call-2: open side file (flush length) of delta directory
-    // call-3: fs.exists() check for delta_xxx_xxx/bucket_00000 file
-    // call-4: AcidUtils.getAcidState - getLen() mock:/mocktable8/0_0
-    assertEquals(4, readOpsDelta);
+    // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
+    // call-3: split 2 - read delta_x_y/bucket_00001
+    assertEquals(3, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");