You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/08/12 17:31:49 UTC

[1/3] hive git commit: HIVE-14035 Enable predicate pushdown to delta files created by ACID Transactions (Saket Saurabh via Eugene Koifman)

Repository: hive
Updated Branches:
  refs/heads/master 333fa8763 -> ecab0d072


http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
new file mode 100644
index 0000000..becb22a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Same as TestTxnCommands2 but tests ACID tables with 'transactional_properties' set to 'default'.
+ * This tests whether ACID tables with split-update turned on are working correctly or not
+ * for the same set of tests when it is turned off. Of course, it also adds a few tests to test
+ * specific behaviors of ACID tables with split-update turned on.
+ */
+public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
+
+  public TestTxnCommands2WithSplitUpdate() {
+    super();
+  }
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    setUpWithTableProperties("'transactional'='true','transactional_properties'='default'");
+  }
+
+  @Override
+  @Test
+  public void testOrcPPD() throws Exception  {
+    final String defaultUnset = "unset";
+    String oldSplitStrategyValue = hiveConf.get(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, defaultUnset);
+    // TODO: Setting split strategy as 'BI' is workaround for HIVE-14448 until it is resolved.
+    hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+
+    super.testOrcPPD();
+
+    // Restore the previous value for split strategy, or unset if not previously set.
+    if (oldSplitStrategyValue.equals(defaultUnset)) {
+      hiveConf.unset(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname);
+    } else {
+      hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, oldSplitStrategyValue);
+    }
+  }
+
+  @Override
+  @Test
+  public void testOrcNoPPD() throws Exception {
+    final String defaultUnset = "unset";
+    String oldSplitStrategyValue = hiveConf.get(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, defaultUnset);
+    // TODO: Setting split strategy as 'BI' is workaround for HIVE-14448 until it is resolved.
+    hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+
+    super.testOrcNoPPD();
+
+    // Restore the previous value for split strategy, or unset if not previously set.
+    if (oldSplitStrategyValue.equals(defaultUnset)) {
+      hiveConf.unset(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname);
+    } else {
+      hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, oldSplitStrategyValue);
+    }
+  }
+
+  @Override
+  @Test
+  public void testInitiatorWithMultipleFailedCompactions() throws Exception {
+    // Test with split-update turned on.
+    testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
+  }
+
+  @Override
+  @Test
+  public void writeBetweenWorkerAndCleaner() throws Exception {
+    writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
+  }
+
+  @Override
+  @Test
+  public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
+    testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
+  }
+
+  /**
+   * In current implementation of ACID, altering the value of transactional_properties or trying to
+   * set a value for previously unset value for an acid table will throw an exception.
+   * @throws Exception
+   */
+  @Test
+  public void testFailureOnAlteringTransactionalProperties() throws Exception {
+    expectedException.expect(RuntimeException.class);
+    expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
+    runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
+  }
+
+  /**
+   * Test the query correctness and directory layout for ACID table conversion with split-update
+   * enabled.
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table with split-update enabled
+   * 3. Insert a row to ACID table
+   * 4. Perform Major compaction
+   * 5. Clean
+   * @throws Exception
+   */
+  @Test
+  public void testNonAcidToAcidSplitUpdateConversion1() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert a row to Non-ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files in the location (000000_0 and 000001_0)
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    int [][] resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    int resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 2. Convert NONACIDORCTBL to ACID table
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
+        + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // Everything should be same as before
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 3. Insert another row to newly-converted ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
+    // The delta directory should also have only 1 bucket file (bucket_00001)
+    Assert.assertEquals(3, status.length);
+    boolean sawNewDelta = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("delta_.*")) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(1, buckets.length); // only one bucket file
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+      } else {
+        Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+      }
+    }
+    Assert.assertTrue(sawNewDelta);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
+    resultData = new int[][] {{1, 2}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 4. Perform a major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new directory: base_xxxxxxx.
+    // Original bucket files and delta directory should stay until Cleaner kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(4, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(1, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 5. Let Cleaner delete obsolete files/dirs
+    // Note, here we create a fake directory along with fake files as original directories/files
+    String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
+        "/subdir/000000_0";
+    String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
+        "/subdir/000000_1";
+    fs.create(new Path(fakeFile0));
+    fs.create(new Path(fakeFile1));
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 5 items:
+    // 2 original files, 1 original directory, 1 base directory and 1 delta directory
+    Assert.assertEquals(5, status.length);
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_xxxxxxx.
+    // Original bucket files and delta directory should have been cleaned up.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+    FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, buckets.length);
+    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+  }
+
+  /**
+   * Test the query correctness and directory layout for ACID table conversion with split-update
+   * enabled.
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table with split update enabled.
+   * 3. Update the existing row in ACID table
+   * 4. Perform Major compaction
+   * 5. Clean
+   * @throws Exception
+   */
+  @Test
+  public void testNonAcidToAcidSplitUpdateConversion2() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert a row to Non-ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files in the location (000000_0 and 000001_0)
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    int [][] resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    int resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 2. Convert NONACIDORCTBL to ACID table
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
+        + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // Everything should be same as before
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 3. Update the existing row in newly-converted ACID table
+    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory
+    // and one delete_delta directory. When split-update is enabled, an update event is split into
+    // a combination of delete and insert, that generates the delete_delta directory.
+    // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
+    // and so should the delete_delta directory.
+    Assert.assertEquals(4, status.length);
+    boolean sawNewDelta = false;
+    boolean sawNewDeleteDelta = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("delta_.*")) {
+        sawNewDelta = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+        sawNewDeleteDelta = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+      } else {
+        Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+      }
+    }
+    Assert.assertTrue(sawNewDelta);
+    Assert.assertTrue(sawNewDeleteDelta);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 4. Perform a major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new directory: base_0000001.
+    // Original bucket files and delta directory should stay until Cleaner kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(5, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 5. Let Cleaner delete obsolete files/dirs
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 5 items:
+    // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory
+    Assert.assertEquals(5, status.length);
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_0000001.
+    // Original bucket files, delta directory and delete_delta should have been cleaned up.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+    FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+    Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+  }
+
+  /**
+   * Test the query correctness and directory layout for ACID table conversion with split-update
+   * enabled.
+   * 1. Insert a row to Non-ACID table
+   * 2. Convert Non-ACID to ACID table with split-update enabled
+   * 3. Perform Major compaction
+   * 4. Insert a new row to ACID table
+   * 5. Perform another Major compaction
+   * 6. Clean
+   * @throws Exception
+   */
+  @Test
+  public void testNonAcidToAcidSplitUpdateConversion3() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert a row to Non-ACID table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 original bucket files in the location (000000_0 and 000001_0)
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    int [][] resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    int resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default)
+    runStatementOnDriver("alter table " + Table.NONACIDORCTBL
+        + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // Everything should be same as before
+    Assert.assertEquals(BUCKET_COUNT, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+    }
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 3. Perform a major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new directory: base_-9223372036854775808
+    // Original bucket files should stay until Cleaner kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(3, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 2}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 1;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 4. Update the existing row, and insert another row to newly-converted ACID table
+    runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Arrays.sort(status);  // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000
+    // There should be 2 original bucket files (000000_0 and 000001_0), a base directory,
+    // plus two new delta directories and one delete_delta directory that would be created due to
+    // the update statement (remember split-update U=D+I)!
+    Assert.assertEquals(6, status.length);
+    int numDelta = 0;
+    int numDeleteDelta = 0;
+    sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("delta_.*")) {
+        numDelta++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numDelta == 1) {
+          Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        } else if (numDelta == 2) {
+          Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName());
+          Assert.assertEquals(1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        }
+      } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+        numDeleteDelta++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numDeleteDelta == 1) {
+          Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        }
+      } else if (status[i].getPath().getName().matches("base_.*")) {
+        Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+        Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+      } else {
+        Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+      }
+    }
+    Assert.assertEquals(2, numDelta);
+    Assert.assertEquals(1, numDeleteDelta);
+    Assert.assertTrue(sawNewBase);
+
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 5. Perform another major compaction
+    runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    // There should be 1 new base directory: base_0000001
+    // Original bucket files, delta directories, delete_delta directories and the
+    // previous base directory should stay until Cleaner kicks in.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Arrays.sort(status);
+    Assert.assertEquals(7, status.length);
+    int numBase = 0;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        numBase++;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Arrays.sort(buckets);
+        if (numBase == 1) {
+          Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
+          Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        } else if (numBase == 2) {
+          // The new base dir now has two bucket files, since the delta dir has two bucket files
+          Assert.assertEquals("base_0000002", status[i].getPath().getName());
+          Assert.assertEquals(1, buckets.length);
+          Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+        }
+      }
+    }
+    Assert.assertEquals(2, numBase);
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+    // 6. Let Cleaner delete obsolete files/dirs
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // Before Cleaner, there should be 6 items:
+    // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
+    Assert.assertEquals(7, status.length);
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_0000001.
+    // Original bucket files, delta directories and previous base directory should have been cleaned up.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    Assert.assertEquals("base_0000002", status[0].getPath().getName());
+    FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+    Arrays.sort(buckets);
+    Assert.assertEquals(1, buckets.length);
+    Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+    rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+    resultData = new int[][] {{1, 3}, {3, 4}};
+    Assert.assertEquals(stringifyValues(resultData), rs);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    resultCount = 2;
+    Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 556df18..a7ff9a3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -17,11 +17,21 @@
  */
 package org.apache.hadoop.hive.ql.io;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
@@ -30,13 +40,6 @@ import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class TestAcidUtils {
 
   @Test
@@ -60,12 +63,23 @@ public class TestAcidUtils {
     options.writingBase(false);
     assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(true);
+    assertEquals("/tmp/delete_delta_0000100_0000200_0000/bucket_00023",
+        AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(false);
     options.statementId(-1);
     assertEquals("/tmp/delta_0000100_0000200/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(true);
+    assertEquals("/tmp/delete_delta_0000100_0000200/bucket_00023",
+        AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(false);
     options.statementId(7);
     assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
+    options.writingDeleteDelta(true);
+    assertEquals("/tmp/delete_delta_0000100_0000200_0007/bucket_00023",
+        AcidUtils.createFilename(p, options).toString());
   }
   @Test
   public void testCreateFilenameLargeIds() throws Exception {
@@ -86,7 +100,6 @@ public class TestAcidUtils {
     assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
   }
-  
 
   @Test
   public void testParsing() throws Exception {
@@ -94,19 +107,34 @@ public class TestAcidUtils {
     Path dir = new Path("/tmp/tbl");
     Configuration conf = new Configuration();
     AcidOutputFormat.Options opts =
-        AcidUtils.parseBaseBucketFilename(new Path(dir, "base_567/bucket_123"),
+        AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "base_567/bucket_123"),
             conf);
     assertEquals(false, opts.getOldStyle());
     assertEquals(true, opts.isWritingBase());
     assertEquals(567, opts.getMaximumTransactionId());
     assertEquals(0, opts.getMinimumTransactionId());
     assertEquals(123, opts.getBucket());
-    opts = AcidUtils.parseBaseBucketFilename(new Path(dir, "000123_0"), conf);
+    opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delta_000005_000006/bucket_00001"),
+        conf);
+    assertEquals(false, opts.getOldStyle());
+    assertEquals(false, opts.isWritingBase());
+    assertEquals(6, opts.getMaximumTransactionId());
+    assertEquals(5, opts.getMinimumTransactionId());
+    assertEquals(1, opts.getBucket());
+    opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delete_delta_000005_000006/bucket_00001"),
+        conf);
+    assertEquals(false, opts.getOldStyle());
+    assertEquals(false, opts.isWritingBase());
+    assertEquals(6, opts.getMaximumTransactionId());
+    assertEquals(5, opts.getMinimumTransactionId());
+    assertEquals(1, opts.getBucket());
+    opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "000123_0"), conf);
     assertEquals(true, opts.getOldStyle());
     assertEquals(true, opts.isWritingBase());
     assertEquals(123, opts.getBucket());
     assertEquals(0, opts.getMinimumTransactionId());
     assertEquals(0, opts.getMaximumTransactionId());
+
   }
 
   @Test
@@ -471,5 +499,230 @@ public class TestAcidUtils {
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
   }
 
+  @Test
+  public void testBaseWithDeleteDeltas() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/base_49/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_025_025/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_029_029/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_029_029/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_025_030/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_025_030/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_050_105/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_050_105/bucket_0", 0, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_110_110/bucket_0", 0, new byte[0]));
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
+            "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
+    assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
+    List<FileStatus> obsolete = dir.getObsolete();
+    assertEquals(7, obsolete.size());
+    assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).getPath().toString());
+    assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+    assertEquals(2, deltas.size());
+    assertEquals("mock:/tbl/part1/delete_delta_050_105", deltas.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_050_105", deltas.get(1).getPath().toString());
+    // The delete_delta_110_110 should not be read because it is greater than the high watermark.
+  }
+
+  @Test
+  public void testOverlapingDeltaAndDeleteDelta() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/delta_0000063_63/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_000062_62/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_00061_61/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_00064_64/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_40_60/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_052_55/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
+    assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
+    List<FileStatus> obsolete = dir.getObsolete();
+    assertEquals(3, obsolete.size());
+    assertEquals("mock:/tbl/part1/delete_delta_052_55", obsolete.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(2).getPath().toString());
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(6, delts.size());
+    assertEquals("mock:/tbl/part1/delete_delta_40_60", delts.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_40_60", delts.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_00061_61", delts.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_000062_62", delts.get(3).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(4).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_00064_64", delts.get(5).getPath().toString());
+  }
 
-}
+  @Test
+  public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exception {
+    // This test checks that if we have a minor compacted delta for the txn range [40,60]
+    // then it will make any delete delta in that range as obsolete.
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_50_50/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
+    List<FileStatus> obsolete = dir.getObsolete();
+    assertEquals(1, obsolete.size());
+    assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).getPath().toString());
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(1, delts.size());
+    assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
+  }
+
+  @Test
+  public void deltasAndDeleteDeltasWithOpenTxnsNotInCompact() throws Exception {
+    // This tests checks that appropriate delta and delete_deltas are included when minor
+    // compactions specifies a valid open txn range.
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_2_2/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500,
+            new byte[0]),
+        new MockFile("mock:/tbl/part1/delete_delta_7_7/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("4:" + Long.MAX_VALUE + ":"));
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(2, delts.size());
+    assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_2_2", delts.get(1).getPath().toString());
+  }
+
+  @Test
+  public void deleteDeltasWithOpenTxnInRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+        AcidUtils.AcidOperationalProperties.getDefault().toInt());
+    MockFileSystem fs = new MockFileSystem(conf,
+      new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delete_delta_3_3/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4:4"));
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(3, delts.size());
+    assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_2_5", delts.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_2_5", delts.get(2).getPath().toString());
+    // Note that delete_delta_3_3 should not be read, when a minor compacted
+    // [delete_]delta_2_5 is present.
+  }
+
+  @Test
+  public void testDeleteDeltaSubdirPathGeneration() throws Exception {
+    String deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10);
+    assertEquals("delete_delta_0000001_0000010", deleteDeltaSubdirPath);
+    deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10, 5);
+    assertEquals("delete_delta_0000001_0000010_0005", deleteDeltaSubdirPath);
+  }
+
+  @Test
+  public void testDeleteEventDeltaDirPathFilter() throws Exception {
+    Path positivePath = new Path("delete_delta_000001_000010");
+    Path negativePath = new Path("delta_000001_000010");
+    assertEquals(true, AcidUtils.deleteEventDeltaDirFilter.accept(positivePath));
+    assertEquals(false, AcidUtils.deleteEventDeltaDirFilter.accept(negativePath));
+  }
+
+  @Test
+  public void testAcidOperationalProperties() throws Exception {
+    AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getLegacy();
+    assertsForAcidOperationalProperties(testObj, "legacy");
+
+    testObj = AcidUtils.AcidOperationalProperties.getDefault();
+    assertsForAcidOperationalProperties(testObj, "default");
+
+    testObj = AcidUtils.AcidOperationalProperties.parseInt(0);
+    assertsForAcidOperationalProperties(testObj, "legacy");
+
+    testObj = AcidUtils.AcidOperationalProperties.parseInt(1);
+    assertsForAcidOperationalProperties(testObj, "split_update");
+
+    testObj = AcidUtils.AcidOperationalProperties.parseString("legacy");
+    assertsForAcidOperationalProperties(testObj, "legacy");
+
+    testObj = AcidUtils.AcidOperationalProperties.parseString("default");
+    assertsForAcidOperationalProperties(testObj, "default");
+
+  }
+
+  private void assertsForAcidOperationalProperties(AcidUtils.AcidOperationalProperties testObj,
+      String type) throws Exception {
+    switch(type) {
+      case "split_update":
+      case "default":
+        assertEquals(true, testObj.isSplitUpdate());
+        assertEquals(false, testObj.isHashBasedMerge());
+        assertEquals(1, testObj.toInt());
+        assertEquals("|split_update", testObj.toString());
+        break;
+      case "legacy":
+        assertEquals(false, testObj.isSplitUpdate());
+        assertEquals(false, testObj.isHashBasedMerge());
+        assertEquals(0, testObj.toInt());
+        assertEquals("", testObj.toString());
+        break;
+      default:
+        break;
+    }
+  }
+
+  @Test
+  public void testAcidOperationalPropertiesSettersAndGetters() throws Exception {
+    AcidUtils.AcidOperationalProperties oprProps = AcidUtils.AcidOperationalProperties.getDefault();
+    Configuration testConf = new Configuration();
+    // Test setter for configuration object.
+    AcidUtils.setAcidOperationalProperties(testConf, oprProps);
+    assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 0));
+    // Test getter for configuration object.
+    assertEquals(oprProps.toString(), AcidUtils.getAcidOperationalProperties(testConf).toString());
+
+    Map<String, String> parameters = new HashMap<String, String>();
+    // Test setter for map object.
+    AcidUtils.setAcidOperationalProperties(parameters, oprProps);
+    assertEquals(oprProps.toString(),
+        parameters.get(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname));
+    // Test getter for map object.
+    // Calling a get on the 'parameters' will still return legacy type because the setters/getters
+    // for map work on different string keys. The setter will set the HIVE_TXN_OPERATIONAL_PROPERTIES
+    // while the getter will try to read the key TABLE_TRANSACTIONAL_PROPERTIES.
+    assertEquals(0, AcidUtils.getAcidOperationalProperties(parameters).toInt());
+    parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, oprProps.toString());
+    // Set the appropriate key in the map and test that we are able to read it back correctly.
+    assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 6648829..2c1bb6f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -520,7 +520,9 @@ public class TestInputOutputFormat {
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
               context, fs, new MockPath(fs, "mock:/a/b"), false, null);
-          final SplitStrategy splitStrategy = createSplitStrategy(context, gen);
+          List<SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+          assertEquals(1, splitStrategies.size());
+          final SplitStrategy splitStrategy = splitStrategies.get(0);
           assertTrue(
               String.format(
                   "Split strategy for %d files x %d size for %d splits", c, s,
@@ -541,7 +543,9 @@ public class TestInputOutputFormat {
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
               context, fs, new MockPath(fs, "mock:/a/b"), false, null);
-          final SplitStrategy splitStrategy = createSplitStrategy(context, gen);
+          List<SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+          assertEquals(1, splitStrategies.size());
+          final SplitStrategy splitStrategy = splitStrategies.get(0);
           assertTrue(
               String.format(
                   "Split strategy for %d files x %d size for %d splits", c, s,
@@ -565,8 +569,9 @@ public class TestInputOutputFormat {
     OrcInputFormat.FileGenerator gen =
       new OrcInputFormat.FileGenerator(context, fs,
           new MockPath(fs, "mock:/a/b"), false, null);
-    OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
 
     conf.set("mapreduce.input.fileinputformat.split.maxsize", "500");
     context = new OrcInputFormat.Context(conf);
@@ -578,8 +583,9 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
     gen = new OrcInputFormat.FileGenerator(context, fs,
             new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
   }
 
   @Test
@@ -594,9 +600,9 @@ public class TestInputOutputFormat {
     OrcInputFormat.FileGenerator gen =
         new OrcInputFormat.FileGenerator(context, fs,
             new MockPath(fs, "mock:/a"), false, null);
-    OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof OrcInputFormat.ACIDSplitStrategy);
-    List<OrcSplit> splits = splitStrategy.getSplits();
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+    List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
     ColumnarSplitSizeEstimator splitSizeEstimator = new ColumnarSplitSizeEstimator();
     for (OrcSplit split: splits) {
       assertEquals(Integer.MAX_VALUE, splitSizeEstimator.getEstimatedSize(split));
@@ -605,6 +611,127 @@ public class TestInputOutputFormat {
   }
 
   @Test
+  public void testACIDSplitStrategyForSplitUpdate() throws Exception {
+    conf.set("bucket_count", "2");
+    conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+    conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default");
+    OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+
+    // Case 1: Test with just originals => Single split strategy with two splits.
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")));
+    OrcInputFormat.FileGenerator gen =
+        new OrcInputFormat.FileGenerator(context, fs,
+            new MockPath(fs, "mock:/a"), false, null);
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+    List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(2, splits.size());
+    assertEquals("mock:/a/b/000000_0", splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/b/000000_1", splits.get(1).getPath().toUri().toString());
+    assertTrue(splits.get(0).isOriginal());
+    assertTrue(splits.get(1).isOriginal());
+
+    // Case 2: Test with originals and base => Single split strategy with two splits on compacted
+    // base since the presence of a base will make the originals obsolete.
+    fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+    splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(2, splits.size());
+    assertEquals("mock:/a/base_0000001/bucket_00000", splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/base_0000001/bucket_00001", splits.get(1).getPath().toUri().toString());
+    assertFalse(splits.get(0).isOriginal());
+    assertFalse(splits.get(1).isOriginal());
+
+    // Case 3: Test with originals and deltas => Two split strategies with two splits for each.
+    fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(2, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+    splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(2, splits.size());
+    assertEquals("mock:/a/b/000000_0", splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/b/000000_1", splits.get(1).getPath().toUri().toString());
+    assertTrue(splits.get(0).isOriginal());
+    assertTrue(splits.get(1).isOriginal());
+    assertEquals(true, splitStrategies.get(1) instanceof OrcInputFormat.ACIDSplitStrategy);
+    splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(1)).getSplits();
+    assertEquals(2, splits.size());
+    assertEquals("mock:/a/delta_0000001_0000001_0000/bucket_00000", splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/delta_0000001_0000001_0000/bucket_00001", splits.get(1).getPath().toUri().toString());
+    assertFalse(splits.get(0).isOriginal());
+    assertFalse(splits.get(1).isOriginal());
+
+    // Case 4: Test with originals and deltas but now with only one bucket covered, i.e. we will
+    // have originals & insert_deltas for only one bucket, but the delete_deltas will be for two
+    // buckets => Two strategies with one split for each.
+    // When split-update is enabled, we do not need to account for buckets that aren't covered.
+    // The reason why we are able to do so is because the valid user data has already been considered
+    // as base for the covered buckets. Hence, the uncovered buckets do not have any relevant
+    // data and we can just ignore them.
+    fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(2, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+    splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(1, splits.size());
+    assertEquals("mock:/a/b/000000_0", splits.get(0).getPath().toUri().toString());
+    assertTrue(splits.get(0).isOriginal());
+    assertEquals(true, splitStrategies.get(1) instanceof OrcInputFormat.ACIDSplitStrategy);
+    splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(1)).getSplits();
+    assertEquals(1, splits.size());
+    assertEquals("mock:/a/delta_0000001_0000001_0000/bucket_00000", splits.get(0).getPath().toUri().toString());
+    assertFalse(splits.get(0).isOriginal());
+
+    // Case 5: Test with originals, compacted_base, insert_deltas, delete_deltas (exhaustive test)
+    // This should just generate one strategy with splits for base and insert_deltas.
+    fs = new MockFileSystem(conf,
+        new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+        new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+    splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+    assertEquals(4, splits.size());
+    assertEquals("mock:/a/base_0000001/bucket_00000", splits.get(0).getPath().toUri().toString());
+    assertEquals("mock:/a/base_0000001/bucket_00001", splits.get(1).getPath().toUri().toString());
+    assertEquals("mock:/a/delta_0000002_0000002_0000/bucket_00000", splits.get(2).getPath().toUri().toString());
+    assertEquals("mock:/a/delta_0000002_0000002_0000/bucket_00001", splits.get(3).getPath().toUri().toString());
+    assertFalse(splits.get(0).isOriginal());
+    assertFalse(splits.get(1).isOriginal());
+    assertFalse(splits.get(2).isOriginal());
+    assertFalse(splits.get(3).isOriginal());
+  }
+
+  @Test
   public void testBIStrategySplitBlockBoundary() throws Exception {
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
     OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
@@ -617,9 +744,10 @@ public class TestInputOutputFormat {
     OrcInputFormat.FileGenerator gen =
         new OrcInputFormat.FileGenerator(context, fs,
             new MockPath(fs, "mock:/a/b"), false, null);
-    OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
-    List<OrcSplit> splits = splitStrategy.getSplits();
+    List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+    List<OrcSplit> splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     int numSplits = splits.size();
     assertEquals(5, numSplits);
 
@@ -632,9 +760,10 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000], new MockBlock("host1", "host2")));
     gen = new OrcInputFormat.FileGenerator(context, fs,
         new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
-    splits = splitStrategy.getSplits();
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+    splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     numSplits = splits.size();
     assertEquals(5, numSplits);
 
@@ -652,9 +781,10 @@ public class TestInputOutputFormat {
             new MockBlock("host1", "host2")));
     gen = new OrcInputFormat.FileGenerator(context, fs,
         new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
-    splits = splitStrategy.getSplits();
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+    splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     numSplits = splits.size();
     assertEquals(10, numSplits);
 
@@ -672,9 +802,10 @@ public class TestInputOutputFormat {
             new MockBlock("host1", "host2")));
     gen = new OrcInputFormat.FileGenerator(context, fs,
         new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
-    splits = splitStrategy.getSplits();
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+    splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     numSplits = splits.size();
     assertEquals(10, numSplits);
 
@@ -692,9 +823,10 @@ public class TestInputOutputFormat {
             new MockBlock("host1", "host2"), new MockBlock("host1", "host2")));
     gen = new OrcInputFormat.FileGenerator(context, fs,
         new MockPath(fs, "mock:/a/b"), false, null);
-    splitStrategy = createSplitStrategy(context, gen);
-    assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
-    splits = splitStrategy.getSplits();
+    splitStrategies = createSplitStrategies(context, gen);
+    assertEquals(1, splitStrategies.size());
+    assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+    splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
     numSplits = splits.size();
     assertEquals(15, numSplits);
   }
@@ -717,22 +849,23 @@ public class TestInputOutputFormat {
 
     OrcInputFormat.CombinedCtx combineCtx = new OrcInputFormat.CombinedCtx();
     // The first directory becomes the base for combining.
-    SplitStrategy<?> ss = createOrCombineStrategy(context, fs, "mock:/a/1", combineCtx);
-    assertNull(ss);
+    List<SplitStrategy<?>> ss = createOrCombineStrategies(context, fs, "mock:/a/1", combineCtx);
+    assertTrue(ss.isEmpty());
     assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
     OrcInputFormat.ETLSplitStrategy etlSs = combineCtx.combined;
     assertEquals(2, etlSs.files.size());
     assertTrue(etlSs.isOriginal);
     assertEquals(1, etlSs.dirs.size());
     // The second one should be combined into the first.
-    ss = createOrCombineStrategy(context, fs, "mock:/a/2", combineCtx);
-    assertNull(ss);
+    ss = createOrCombineStrategies(context, fs, "mock:/a/2", combineCtx);
+    assertTrue(ss.isEmpty());
     assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
     assertEquals(4, etlSs.files.size());
     assertEquals(2, etlSs.dirs.size());
     // The third one has the base file, so it shouldn't be combined but could be a base.
-    ss = createOrCombineStrategy(context, fs, "mock:/a/3", combineCtx);
-    assertSame(etlSs, ss);
+    ss = createOrCombineStrategies(context, fs, "mock:/a/3", combineCtx);
+    assertEquals(1, ss.size());
+    assertSame(etlSs, ss.get(0));
     assertEquals(4, etlSs.files.size());
     assertEquals(2, etlSs.dirs.size());
     assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
@@ -741,34 +874,37 @@ public class TestInputOutputFormat {
     assertFalse(etlSs.isOriginal);
     assertEquals(1, etlSs.dirs.size());
     // Try the first again, it would not be combined and we'd retain the old base (less files).
-    ss = createOrCombineStrategy(context, fs, "mock:/a/1", combineCtx);
-    assertTrue(ss instanceof OrcInputFormat.ETLSplitStrategy);
-    assertNotSame(etlSs, ss);
-    OrcInputFormat.ETLSplitStrategy rejectedEtlSs = (OrcInputFormat.ETLSplitStrategy)ss;
+    ss = createOrCombineStrategies(context, fs, "mock:/a/1", combineCtx);
+    assertEquals(1, ss.size());
+    assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
+    assertNotSame(etlSs, ss.get(0));
+    OrcInputFormat.ETLSplitStrategy rejectedEtlSs = (OrcInputFormat.ETLSplitStrategy)ss.get(0);
     assertEquals(2, rejectedEtlSs.files.size());
     assertEquals(1, rejectedEtlSs.dirs.size());
     assertTrue(rejectedEtlSs.isOriginal);
     assertEquals(1, etlSs.files.size());
     assertEquals(1, etlSs.dirs.size());
     // The fourth could be combined again.
-    ss = createOrCombineStrategy(context, fs, "mock:/a/4", combineCtx);
-    assertNull(ss);
+    ss = createOrCombineStrategies(context, fs, "mock:/a/4", combineCtx);
+    assertTrue(ss.isEmpty());
     assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
     assertEquals(2, etlSs.files.size());
     assertEquals(2, etlSs.dirs.size());
     // The fifth will not be combined because of delta files.
-    ss = createOrCombineStrategy(context, fs, "mock:/a/5", combineCtx);
-    assertTrue(ss instanceof OrcInputFormat.ETLSplitStrategy);
+    ss = createOrCombineStrategies(context, fs, "mock:/a/5", combineCtx);
+    assertEquals(1, ss.size());
+    assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
     assertNotSame(etlSs, ss);
     assertEquals(2, etlSs.files.size());
     assertEquals(2, etlSs.dirs.size());
   }
 
-  public SplitStrategy<?> createOrCombineStrategy(OrcInputFormat.Context context,
+  public List<SplitStrategy<?>> createOrCombineStrategies(OrcInputFormat.Context context,
       MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException {
     OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
-    return OrcInputFormat.determineSplitStrategy(combineCtx, context,
-        adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, null, true);
+    return OrcInputFormat.determineSplitStrategies(combineCtx, context,
+        adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+        null, null, true);
   }
 
   public OrcInputFormat.AcidDirInfo createAdi(
@@ -777,11 +913,12 @@ public class TestInputOutputFormat {
         context, fs, new MockPath(fs, path), false, null).call();
   }
 
-  private OrcInputFormat.SplitStrategy createSplitStrategy(
+  private List<OrcInputFormat.SplitStrategy<?>> createSplitStrategies(
       OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException {
     OrcInputFormat.AcidDirInfo adi = gen.call();
-    return OrcInputFormat.determineSplitStrategy(
-        null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, null, true);
+    return OrcInputFormat.determineSplitStrategies(
+        null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+        null, null, true);
   }
 
   public static class MockBlock {


[3/3] hive git commit: HIVE-14035 Enable predicate pushdown to delta files created by ACID Transactions (Saket Saurabh via Eugene Koifman)

Posted by ek...@apache.org.
HIVE-14035 Enable predicate pushdown to delta files created by ACID Transactions (Saket Saurabh via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ecab0d07
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ecab0d07
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ecab0d07

Branch: refs/heads/master
Commit: ecab0d072b50a8d85dca6e850e47425d96c1ac09
Parents: 333fa87
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Aug 12 10:31:39 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Aug 12 10:31:39 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  57 +-
 .../mapreduce/FosterStorageHandler.java         |   3 +
 .../streaming/AbstractRecordWriter.java         |   7 +
 .../hive/ql/txn/compactor/TestCompactor.java    | 318 +++++++++-
 metastore/if/hive_metastore.thrift              |   1 +
 .../thrift/gen-cpp/hive_metastore_constants.cpp |   2 +
 .../thrift/gen-cpp/hive_metastore_constants.h   |   1 +
 .../metastore/api/hive_metastoreConstants.java  |   2 +
 .../src/gen/thrift/gen-php/metastore/Types.php  |   5 +
 .../thrift/gen-py/hive_metastore/constants.py   |   1 +
 .../thrift/gen-rb/hive_metastore_constants.rb   |   2 +
 .../TransactionalValidationListener.java        | 126 +++-
 .../apache/hadoop/hive/ql/exec/FetchTask.java   |   1 +
 .../hadoop/hive/ql/exec/SMBMapJoinOperator.java |   1 +
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java |   1 +
 .../hadoop/hive/ql/io/AcidOutputFormat.java     |  25 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 456 +++++++++++++--
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   1 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 248 ++++++--
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 152 ++++-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  95 +--
 .../hadoop/hive/ql/plan/TableScanDesc.java      |   9 +
 .../hive/ql/txn/compactor/CompactorMR.java      |  99 +++-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 106 +++-
 .../ql/TestTxnCommands2WithSplitUpdate.java     | 584 +++++++++++++++++++
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 275 ++++++++-
 .../hive/ql/io/orc/TestInputOutputFormat.java   | 225 +++++--
 27 files changed, 2515 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3e9f6ec..0abb788 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -18,7 +18,32 @@
 
 package org.apache.hadoop.hive.conf;
 
-import com.google.common.base.Joiner;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.security.auth.login.LoginException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -42,27 +67,7 @@ import org.apache.hive.common.HiveCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.auth.login.LoginException;
-
-import java.io.*;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.base.Joiner;
 
 /**
  * Hive Configuration.
@@ -261,6 +266,7 @@ public class HiveConf extends Configuration {
       HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
       HiveConf.ConfVars.HIVE_TXN_MANAGER,
       HiveConf.ConfVars.HIVE_TXN_TIMEOUT,
+      HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES,
       HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE,
       HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH,
       HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX,
@@ -1762,6 +1768,13 @@ public class HiveConf extends Configuration {
         " of the lock manager is dumped to log file.  This is for debugging.  See also " +
         "hive.lock.numretries and hive.lock.sleep.between.retries."),
 
+    HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
+        "Sets the operational properties that control the appropriate behavior for various\n"
+        + "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode\n"
+        + "for ACID, while setting it to one will enable a split-update feature found in the newer\n"
+        + "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
+        + "for future versions of ACID. (See HIVE-14035 for details.)"),
+
     HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
         "current open transactions reach this limit, future open transaction requests will be \n" +
         "rejected, until this number goes below the limit."),

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
index 14f7316..b970153 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
@@ -133,6 +133,9 @@ public class FosterStorageHandler extends DefaultStorageHandler {
 
         boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties);
         AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable);
+        AcidUtils.AcidOperationalProperties acidOperationalProperties =
+                AcidUtils.getAcidOperationalProperties(tableProperties);
+        AcidUtils.setAcidOperationalProperties(jobProperties, acidOperationalProperties);
       }
     } catch (IOException e) {
       throw new IllegalStateException("Failed to set output path", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index 974c6b8..b8615cb 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -48,6 +48,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Properties;
 
 
 public abstract class AbstractRecordWriter implements RecordWriter {
@@ -265,10 +266,16 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
           throws IOException, SerializationError {
     try {
+      // Initialize table properties from the table parameters. This is required because the table
+      // may define certain table parameters that may be required while writing. The table parameter
+      // 'transactional_properties' is one such example.
+      Properties tblProperties = new Properties();
+      tblProperties.putAll(tbl.getParameters());
       return  outf.getRecordUpdater(partitionPath,
               new AcidOutputFormat.Options(conf)
                       .inspector(getSerde().getObjectInspector())
                       .bucket(bucketId)
+                      .tableProperties(tblProperties)
                       .minimumTransactionId(minTxnId)
                       .maximumTransactionId(maxTxnID)
                       .statementId(-1)

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 731caa8..f81752f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -17,6 +17,19 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -68,20 +81,6 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.TimeUnit;
-
 /**
  */
 public class TestCompactor {
@@ -857,6 +856,297 @@ public class TestCompactor {
     }
   }
 
+  @Test
+  public void majorCompactWhileStreamingForSplitUpdate() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true', "
+        + "'transactional_properties'='default') ", driver); // this turns on split-update U=D+I
+
+    HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    try {
+      // Write a couple of batches
+      for (int i = 0; i < 2; i++) {
+        writeBatch(connection, writer, false);
+      }
+
+      // Start a third batch, but don't close it.
+      writeBatch(connection, writer, true);
+
+      // Now, compact
+      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
+      Worker t = new Worker();
+      t.setThreadId((int) t.getId());
+      t.setHiveConf(conf);
+      AtomicBoolean stop = new AtomicBoolean(true);
+      AtomicBoolean looped = new AtomicBoolean();
+      t.init(stop, looped);
+      t.run();
+
+      // Find the location of the table
+      IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+      Table table = msClient.getTable(dbName, tblName);
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] stat =
+          fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
+      if (1 != stat.length) {
+        Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+      }
+      String name = stat[0].getPath().getName();
+      Assert.assertEquals(name, "base_0000004");
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+    } finally {
+      connection.close();
+    }
+  }
+
+  @Test
+  public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true',"
+        + "'transactional_properties'='default')", driver);
+
+    // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+
+    // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_3_3
+    executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver);
+
+    // Now, compact -> Compaction produces a single range for both delta and delete delta
+    // That is, both delta and delete_deltas would be compacted into delta_1_3 and delete_delta_1_3
+    // even though there are only two delta_1_1, delta_2_2 and one delete_delta_3_3.
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean(true);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+
+    // Verify that we have got correct set of deltas.
+    FileStatus[] stat =
+        fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+    String[] deltas = new String[stat.length];
+    Path minorCompactedDelta = null;
+    for (int i = 0; i < deltas.length; i++) {
+      deltas[i] = stat[i].getPath().getName();
+      if (deltas[i].equals("delta_0000001_0000003")) {
+        minorCompactedDelta = stat[i].getPath();
+      }
+    }
+    Arrays.sort(deltas);
+    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"};
+    if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
+    }
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+
+    // Verify that we have got correct set of delete_deltas.
+    FileStatus[] deleteDeltaStat =
+        fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter);
+    String[] deleteDeltas = new String[deleteDeltaStat.length];
+    Path minorCompactedDeleteDelta = null;
+    for (int i = 0; i < deleteDeltas.length; i++) {
+      deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+      if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) {
+        minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
+      }
+    }
+    Arrays.sort(deleteDeltas);
+    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"};
+    if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+    }
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 2L, 2L);
+  }
+
+  @Test
+  public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true',"
+        + "'transactional_properties'='default')", driver);
+
+    // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+
+    // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    // Now, compact
+    // One important thing to note in this test is that minor compaction always produces
+    // delta_x_y and a counterpart delete_delta_x_y, even when there are no delete_delta events.
+    // Such a choice has been made to simplify processing of AcidUtils.getAcidState().
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(conf);
+    AtomicBoolean stop = new AtomicBoolean(true);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+
+    // Verify that we have got correct set of deltas.
+    FileStatus[] stat =
+        fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+    String[] deltas = new String[stat.length];
+    Path minorCompactedDelta = null;
+    for (int i = 0; i < deltas.length; i++) {
+      deltas[i] = stat[i].getPath().getName();
+      if (deltas[i].equals("delta_0000001_0000002")) {
+        minorCompactedDelta = stat[i].getPath();
+      }
+    }
+    Arrays.sort(deltas);
+    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"};
+    if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
+    }
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+
+    // Verify that we have got correct set of delete_deltas.
+    FileStatus[] deleteDeltaStat =
+        fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter);
+    String[] deleteDeltas = new String[deleteDeltaStat.length];
+    Path minorCompactedDeleteDelta = null;
+    for (int i = 0; i < deleteDeltas.length; i++) {
+      deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+      if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) {
+        minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
+      }
+    }
+    Arrays.sort(deleteDeltas);
+    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"};
+    if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+    }
+    // There should be no rows in the delete_delta because there have been no delete events.
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+  }
+
+  @Test
+  public void minorCompactWhileStreamingWithSplitUpdate() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+        " STORED AS ORC  TBLPROPERTIES ('transactional'='true',"
+        + "'transactional_properties'='default')", driver);
+
+    HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    try {
+      // Write a couple of batches
+      for (int i = 0; i < 2; i++) {
+        writeBatch(connection, writer, false);
+      }
+
+      // Start a third batch, but don't close it.
+      writeBatch(connection, writer, true);
+
+      // Now, compact
+      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
+      Worker t = new Worker();
+      t.setThreadId((int) t.getId());
+      t.setHiveConf(conf);
+      AtomicBoolean stop = new AtomicBoolean(true);
+      AtomicBoolean looped = new AtomicBoolean();
+      t.init(stop, looped);
+      t.run();
+
+      // Find the location of the table
+      IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+      Table table = msClient.getTable(dbName, tblName);
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] stat =
+          fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+      String[] names = new String[stat.length];
+      Path resultFile = null;
+      for (int i = 0; i < names.length; i++) {
+        names[i] = stat[i].getPath().getName();
+        if (names[i].equals("delta_0000001_0000004")) {
+          resultFile = stat[i].getPath();
+        }
+      }
+      Arrays.sort(names);
+      String[] expected = new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+      if (!Arrays.deepEquals(expected, names)) {
+        Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+      }
+      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 1L, 4L);
+
+      // Verify that we have got correct set of delete_deltas also
+      FileStatus[] deleteDeltaStat =
+          fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter);
+      String[] deleteDeltas = new String[deleteDeltaStat.length];
+      Path minorCompactedDeleteDelta = null;
+      for (int i = 0; i < deleteDeltas.length; i++) {
+        deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+        if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) {
+          minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
+        }
+      }
+      Arrays.sort(deleteDeltas);
+      String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"};
+      if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+        Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+      }
+      // There should be no rows in the delete_delta because there have been no delete events.
+      checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+
+    } finally {
+      connection.close();
+    }
+  }
+
   /**
    * Users have the choice of specifying compaction related tblproperties either in CREATE TABLE
    * statement or in ALTER TABLE .. COMPACT statement. This tests both cases.

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index a2e35b8..872c0f3 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -1475,5 +1475,6 @@ const string FILE_OUTPUT_FORMAT   = "file.outputformat",
 const string META_TABLE_STORAGE   = "storage_handler",
 const string TABLE_IS_TRANSACTIONAL = "transactional",
 const string TABLE_NO_AUTO_COMPACT = "no_auto_compaction",
+const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties",
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
index f982bf2..1cbd176 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
@@ -57,6 +57,8 @@ hive_metastoreConstants::hive_metastoreConstants() {
 
   TABLE_NO_AUTO_COMPACT = "no_auto_compaction";
 
+  TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
+
 }
 
 }}} // namespace

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
index ae14bd1..3d068c3 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
@@ -38,6 +38,7 @@ class hive_metastoreConstants {
   std::string META_TABLE_STORAGE;
   std::string TABLE_IS_TRANSACTIONAL;
   std::string TABLE_NO_AUTO_COMPACT;
+  std::string TABLE_TRANSACTIONAL_PROPERTIES;
 };
 
 extern const hive_metastoreConstants g_hive_metastore_constants;

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
index 5a666f2..8de8896 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
@@ -82,4 +82,6 @@ public class hive_metastoreConstants {
 
   public static final String TABLE_NO_AUTO_COMPACT = "no_auto_compaction";
 
+  public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php
index d6f7f49..2f9cc9b 100644
--- a/metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -18865,6 +18865,7 @@ final class Constant extends \Thrift\Type\TConstant {
   static protected $META_TABLE_STORAGE;
   static protected $TABLE_IS_TRANSACTIONAL;
   static protected $TABLE_NO_AUTO_COMPACT;
+  static protected $TABLE_TRANSACTIONAL_PROPERTIES;
 
   static protected function init_DDL_TIME() {
     return "transient_lastDdlTime";
@@ -18957,6 +18958,10 @@ final class Constant extends \Thrift\Type\TConstant {
   static protected function init_TABLE_NO_AUTO_COMPACT() {
     return "no_auto_compaction";
   }
+
+  static protected function init_TABLE_TRANSACTIONAL_PROPERTIES() {
+    return "transactional_properties";
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py b/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
index d1c07a5..5100236 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
@@ -32,3 +32,4 @@ FILE_OUTPUT_FORMAT = "file.outputformat"
 META_TABLE_STORAGE = "storage_handler"
 TABLE_IS_TRANSACTIONAL = "transactional"
 TABLE_NO_AUTO_COMPACT = "no_auto_compaction"
+TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
index eeccc84..6aa7143 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
@@ -53,3 +53,5 @@ TABLE_IS_TRANSACTIONAL = %q"transactional"
 
 TABLE_NO_AUTO_COMPACT = %q"no_auto_compaction"
 
+TABLE_TRANSACTIONAL_PROPERTIES = %q"transactional_properties"
+

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 3e74675..0f08f43 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -17,25 +17,35 @@
  */
 package org.apache.hadoop.hive.metastore;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent;
 import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
 import org.apache.hadoop.hive.metastore.events.PreEventContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-final class TransactionalValidationListener extends MetaStorePreEventListener {
+public final class TransactionalValidationListener extends MetaStorePreEventListener {
   public static final Logger LOG = LoggerFactory.getLogger(TransactionalValidationListener.class);
 
+  // These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils.
+  public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default";
+  public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy";
+
   TransactionalValidationListener(Configuration conf) {
     super(conf);
   }
 
+  @Override
   public void onEvent(PreEventContext context) throws MetaException, NoSuchObjectException,
       InvalidOperationException {
     switch (context.getEventType()) {
@@ -60,6 +70,8 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
 
   /**
    * once a table is marked transactional, you cannot go back.  Enforce this.
+   * Also in current version, 'transactional_properties' of the table cannot be altered after
+   * the table is created. Any attempt to alter it will throw a MetaException.
    */
   private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throws MetaException {
     Table newTable = context.getNewTable();
@@ -70,12 +82,22 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
     Set<String> keys = new HashSet<>(parameters.keySet());
     String transactionalValue = null;
     boolean transactionalValuePresent = false;
+    boolean isTransactionalPropertiesPresent = false;
+    String transactionalPropertiesValue = null;
+    boolean hasValidTransactionalValue = false;
+
     for (String key : keys) {
       if(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
         transactionalValuePresent = true;
         transactionalValue = parameters.get(key);
         parameters.remove(key);
       }
+      if(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+        isTransactionalPropertiesPresent = true;
+        transactionalPropertiesValue = parameters.get(key);
+        // Do not remove the parameter yet, because we have separate initialization routine
+        // that will use it down below.
+      }
     }
     if (transactionalValuePresent) {
       //normalize prop name
@@ -91,24 +113,52 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
         throw new MetaException(newTable.getDbName() + "." + newTable.getTableName() +
             " cannot be declared transactional because it's an external table");
       }
-
-      return;
+      hasValidTransactionalValue = true;
     }
+
     Table oldTable = context.getOldTable();
     String oldTransactionalValue = null;
+    String oldTransactionalPropertiesValue = null;
     for (String key : oldTable.getParameters().keySet()) {
       if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
         oldTransactionalValue = oldTable.getParameters().get(key);
       }
+      if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+        oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
+      }
     }
+
+
     if (oldTransactionalValue == null ? transactionalValue == null
                                      : oldTransactionalValue.equalsIgnoreCase(transactionalValue)) {
       //this covers backward compat cases where this prop may have been set already
-      return;
+      hasValidTransactionalValue = true;
+    }
+
+    if (!hasValidTransactionalValue) {
+      // if here, there is attempt to set transactional to something other than 'true'
+      // and NOT the same value it was before
+      throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset");
+    }
+
+    if (isTransactionalPropertiesPresent) {
+      // Now validate transactional_properties for the table.
+      if (oldTransactionalValue == null) {
+        // If this is the first time the table is being initialized to 'transactional=true',
+        // any valid value can be set for the 'transactional_properties'.
+        initializeTransactionalProperties(newTable);
+      } else {
+        // If the table was already marked as 'transactional=true', then the new value of
+        // 'transactional_properties' must match the old value. Any attempt to alter the previous
+        // value will throw an error. An exception will still be thrown if the previous value was
+        // null and an attempt is made to set it. This behaviour can be changed in the future.
+        if (oldTransactionalPropertiesValue == null
+            || !oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue) ) {
+          throw new MetaException("TBLPROPERTIES with 'transactional_properties' cannot be "
+              + "altered after the table is created");
+        }
+      }
     }
-    // if here, there is attempt to set transactional to something other than 'true'
-    // and NOT the same value it was before
-    throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset");
   }
 
   /**
@@ -157,6 +207,7 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
 
       // normalize prop name
       parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString());
+      initializeTransactionalProperties(newTable);
       return;
     }
 
@@ -187,4 +238,53 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
 
     return true;
   }
-}
\ No newline at end of file
+
+  private void initializeTransactionalProperties(Table table) throws MetaException {
+    // All new versions of Acid tables created after the introduction of Acid version/type system
+    // can have TRANSACTIONAL_PROPERTIES property defined. This parameter can be used to change
+    // the operational behavior of ACID. However if this parameter is not defined, the new Acid
+    // tables will still behave as the old ones. This is done so to preserve the behavior
+    // in case of rolling downgrade.
+
+    // Initialize transaction table properties with default string value.
+    String tableTransactionalProperties = null;
+
+    Map<String, String> parameters = table.getParameters();
+    if (parameters != null) {
+      Set<String> keys = parameters.keySet();
+      for (String key : keys) {
+        if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+          tableTransactionalProperties = parameters.get(key).toLowerCase();
+          parameters.remove(key);
+          String validationError = validateTransactionalProperties(tableTransactionalProperties);
+          if (validationError != null) {
+            throw new MetaException("Invalid transactional properties specified for the "
+                + "table with the error " + validationError);
+          }
+          break;
+        }
+      }
+    }
+
+    if (tableTransactionalProperties != null) {
+      parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
+              tableTransactionalProperties);
+    }
+  }
+
+  private String validateTransactionalProperties(String transactionalProperties) {
+    boolean isValid = false;
+    switch (transactionalProperties) {
+      case DEFAULT_TRANSACTIONAL_PROPERTY:
+      case LEGACY_TRANSACTIONAL_PROPERTY:
+        isValid = true;
+        break;
+      default:
+        isValid = false;
+    }
+    if (!isValid) {
+      return "unknown value " + transactionalProperties +  " for transactional_properties";
+    }
+    return null; // All checks passed, return null.
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index db6848a..8c7d99d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -81,6 +81,7 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
         HiveInputFormat.pushFilters(job, ts);
 
         AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable());
+        AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties());
       }
       sink = work.getSink();
       fetch = new FetchOperator(work, job, source, getVirtualColumns(source));

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
index 57b6c67..584eff4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
@@ -212,6 +212,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> imp
       HiveInputFormat.pushFilters(jobClone, ts);
 
       AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable());
+      AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties());
 
       ts.passExecContext(getExecContext());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 26e6443..ac922ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -472,6 +472,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
       HiveInputFormat.pushFilters(jobClone, ts);
 
       AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable());
+      AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties());
 
       // create a fetch operator
       FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone);

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index dd90a95..b85b827 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Properties;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -26,10 +30,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.Reporter;
 
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Properties;
-
 /**
  * An extension for OutputFormats that want to implement ACID transactions.
  * @param <V> the row type of the file
@@ -44,6 +44,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     private FileSystem fs;
     private ObjectInspector inspector;
     private boolean writingBase = false;
+    private boolean writingDeleteDelta = false;
     private boolean isCompressed = false;
     private Properties properties;
     private Reporter reporter;
@@ -98,6 +99,16 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     }
 
     /**
+     * Is this writing a delete delta directory?
+     * @param val is this a delete delta file?
+     * @return this
+     */
+    public Options writingDeleteDelta(boolean val) {
+      this.writingDeleteDelta = val;
+      return this;
+    }
+
+    /**
      * Provide a file system to the writer. Otherwise, the filesystem for the
      * path will be used.
      * @param fs the file system that corresponds to the the path
@@ -223,7 +234,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
       this.finalDestination = p;
       return this;
     }
-    
+
     public Configuration getConfiguration() {
       return configuration;
     }
@@ -260,6 +271,10 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
       return writingBase;
     }
 
+    public boolean isWritingDeleteDelta() {
+      return writingDeleteDelta;
+    }
+
     public int getBucket() {
       return bucket;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 449d889..cda5f39 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -18,10 +18,14 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,22 +34,19 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.regex.Pattern;
-
 /**
  * Utilities that are shared by all of the ACID input and output formats. They
  * are used by the compactor and cleaner and thus must be format agnostic.
@@ -61,6 +62,7 @@ public class AcidUtils {
     }
   };
   public static final String DELTA_PREFIX = "delta_";
+  public static final String DELETE_DELTA_PREFIX = "delete_delta_";
   public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
   public static final PathFilter deltaFileFilter = new PathFilter() {
     @Override
@@ -68,6 +70,12 @@ public class AcidUtils {
       return path.getName().startsWith(DELTA_PREFIX);
     }
   };
+  public static final PathFilter deleteEventDeltaDirFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(DELETE_DELTA_PREFIX);
+    }
+  };
   public static final String BUCKET_PREFIX = "bucket_";
   public static final PathFilter bucketFileFilter = new PathFilter() {
     @Override
@@ -142,6 +150,25 @@ public class AcidUtils {
     return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
   }
 
+  /**
+   * This is format of delete delta dir name prior to Hive 2.2.x
+   */
+  @VisibleForTesting
+  static String deleteDeltaSubdir(long min, long max) {
+    return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
+        String.format(DELTA_DIGITS, max);
+  }
+
+  /**
+   * Each write statement in a transaction creates its own delete delta dir,
+   * when split-update acid operational property is turned on.
+   * @since 2.2.x
+   */
+  @VisibleForTesting
+  static String deleteDeltaSubdir(long min, long max, int statementId) {
+    return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
+  }
+
   public static String baseDir(long txnId) {
     return BASE_PREFIX + String.format(DELTA_DIGITS, txnId);
   }
@@ -163,12 +190,19 @@ public class AcidUtils {
     } else if(options.getStatementId() == -1) {
       //when minor compaction runs, we collapse per statement delta files inside a single
       //transaction so we no longer need a statementId in the file name
-      subdir = deltaSubdir(options.getMinimumTransactionId(),
-        options.getMaximumTransactionId());
+      subdir = options.isWritingDeleteDelta() ?
+          deleteDeltaSubdir(options.getMinimumTransactionId(),
+                            options.getMaximumTransactionId())
+          : deltaSubdir(options.getMinimumTransactionId(),
+                        options.getMaximumTransactionId());
     } else {
-      subdir = deltaSubdir(options.getMinimumTransactionId(),
-        options.getMaximumTransactionId(),
-        options.getStatementId());
+      subdir = options.isWritingDeleteDelta() ?
+          deleteDeltaSubdir(options.getMinimumTransactionId(),
+                            options.getMaximumTransactionId(),
+                            options.getStatementId())
+          : deltaSubdir(options.getMinimumTransactionId(),
+                        options.getMaximumTransactionId(),
+                        options.getStatementId());
     }
     return createBucketFile(new Path(directory, subdir), options.getBucket());
   }
@@ -195,11 +229,10 @@ public class AcidUtils {
    * @return the options used to create that filename
    */
   public static AcidOutputFormat.Options
-                    parseBaseBucketFilename(Path bucketFile,
-                                            Configuration conf) {
+                    parseBaseOrDeltaBucketFilename(Path bucketFile,
+                                                   Configuration conf) {
     AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
     String filename = bucketFile.getName();
-    result.writingBase(true);
     if (ORIGINAL_PATTERN.matcher(filename).matches()) {
       int bucket =
           Integer.parseInt(filename.substring(0, filename.indexOf('_')));
@@ -207,15 +240,33 @@ public class AcidUtils {
           .setOldStyle(true)
           .minimumTransactionId(0)
           .maximumTransactionId(0)
-          .bucket(bucket);
+          .bucket(bucket)
+          .writingBase(true);
     } else if (filename.startsWith(BUCKET_PREFIX)) {
       int bucket =
           Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
-      result
-          .setOldStyle(false)
-          .minimumTransactionId(0)
-          .maximumTransactionId(parseBase(bucketFile.getParent()))
-          .bucket(bucket);
+      if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) {
+        result
+            .setOldStyle(false)
+            .minimumTransactionId(0)
+            .maximumTransactionId(parseBase(bucketFile.getParent()))
+            .bucket(bucket)
+            .writingBase(true);
+      } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
+        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX);
+        result
+            .setOldStyle(false)
+            .minimumTransactionId(parsedDelta.minTransaction)
+            .maximumTransactionId(parsedDelta.maxTransaction)
+            .bucket(bucket);
+      } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
+        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX);
+        result
+            .setOldStyle(false)
+            .minimumTransactionId(parsedDelta.minTransaction)
+            .maximumTransactionId(parsedDelta.maxTransaction)
+            .bucket(bucket);
+      }
     } else {
       result.setOldStyle(true).bucket(-1).minimumTransactionId(0)
           .maximumTransactionId(0);
@@ -248,6 +299,179 @@ public class AcidUtils {
     }
   }
 
+  public enum AcidBaseFileType {
+    COMPACTED_BASE, // a regular base file generated through major compaction
+    ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid
+    INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update
+  }
+
+  /**
+   * A simple wrapper class that stores the information about a base file and its type.
+   * Orc splits can be generated on three kinds of base files: an original file (non-acid converted
+   * files), a regular base file (created by major compaction) or an insert delta (which can be
+   * treated as a base when split-update is enabled for acid).
+   */
+  public static class AcidBaseFileInfo {
+    final private HdfsFileStatusWithId fileId;
+    final private AcidBaseFileType acidBaseFileType;
+
+    public AcidBaseFileInfo(HdfsFileStatusWithId fileId, AcidBaseFileType acidBaseFileType) {
+      this.fileId = fileId;
+      this.acidBaseFileType = acidBaseFileType;
+    }
+
+    public boolean isCompactedBase() {
+      return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE;
+    }
+
+    public boolean isOriginal() {
+      return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE;
+    }
+
+    public boolean isInsertDelta() {
+      return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA;
+    }
+
+    public HdfsFileStatusWithId getHdfsFileStatusWithId() {
+      return this.fileId;
+    }
+  }
+
+  public static class AcidOperationalProperties {
+    private int description = 0x00;
+    public static final int SPLIT_UPDATE_BIT = 0x01;
+    public static final String SPLIT_UPDATE_STRING = "split_update";
+    public static final int HASH_BASED_MERGE_BIT = 0x02;
+    public static final String HASH_BASED_MERGE_STRING = "hash_merge";
+    public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY;
+    public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY;
+
+    private AcidOperationalProperties() {
+    }
+
+    /**
+     * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables
+     * that were created before ACID type system using operational properties was put in place.
+     * @return the acidOperationalProperties object
+     */
+    public static AcidOperationalProperties getLegacy() {
+      AcidOperationalProperties obj = new AcidOperationalProperties();
+      // In legacy mode, none of these properties are turned on.
+      return obj;
+    }
+
+    /**
+     * Returns an acidOperationalProperties object that represents default ACID behavior for tables
+     * that do no explicitly specify/override the default behavior.
+     * @return the acidOperationalProperties object.
+     */
+    public static AcidOperationalProperties getDefault() {
+      AcidOperationalProperties obj = new AcidOperationalProperties();
+      obj.setSplitUpdate(true);
+      obj.setHashBasedMerge(false);
+      return obj;
+    }
+
+    /**
+     * Returns an acidOperationalProperties object that is represented by an encoded string.
+     * @param propertiesStr an encoded string representing the acidOperationalProperties.
+     * @return the acidOperationalProperties object.
+     */
+    public static AcidOperationalProperties parseString(String propertiesStr) {
+      if (propertiesStr == null) {
+        return AcidOperationalProperties.getLegacy();
+      }
+      if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) {
+        return AcidOperationalProperties.getDefault();
+      }
+      if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) {
+        return AcidOperationalProperties.getLegacy();
+      }
+      AcidOperationalProperties obj = new AcidOperationalProperties();
+      String[] options = propertiesStr.split("\\|");
+      for (String option : options) {
+        if (option.trim().length() == 0) continue; // ignore empty strings
+        switch (option) {
+          case SPLIT_UPDATE_STRING:
+            obj.setSplitUpdate(true);
+            break;
+          case HASH_BASED_MERGE_STRING:
+            obj.setHashBasedMerge(true);
+            break;
+          default:
+            throw new IllegalArgumentException(
+                "Unexpected value " + option + " for ACID operational properties!");
+        }
+      }
+      return obj;
+    }
+
+    /**
+     * Returns an acidOperationalProperties object that is represented by an encoded 32-bit integer.
+     * @param properties an encoded 32-bit representing the acidOperationalProperties.
+     * @return the acidOperationalProperties object.
+     */
+    public static AcidOperationalProperties parseInt(int properties) {
+      AcidOperationalProperties obj = new AcidOperationalProperties();
+      if ((properties & SPLIT_UPDATE_BIT)  > 0) {
+        obj.setSplitUpdate(true);
+      }
+      if ((properties & HASH_BASED_MERGE_BIT)  > 0) {
+        obj.setHashBasedMerge(true);
+      }
+      return obj;
+    }
+
+    /**
+     * Sets the split update property for ACID operations based on the boolean argument.
+     * When split update is turned on, an update ACID event is interpreted as a combination of
+     * delete event followed by an update event.
+     * @param isSplitUpdate a boolean property that turns on split update when true.
+     * @return the acidOperationalProperties object.
+     */
+    public AcidOperationalProperties setSplitUpdate(boolean isSplitUpdate) {
+      description = (isSplitUpdate
+              ? (description | SPLIT_UPDATE_BIT) : (description & ~SPLIT_UPDATE_BIT));
+      return this;
+    }
+
+    /**
+     * Sets the hash-based merge property for ACID operations that combines delta files using
+     * GRACE hash join based approach, when turned on. (Currently unimplemented!)
+     * @param isHashBasedMerge a boolean property that turns on hash-based merge when true.
+     * @return the acidOperationalProperties object.
+     */
+    public AcidOperationalProperties setHashBasedMerge(boolean isHashBasedMerge) {
+      description = (isHashBasedMerge
+              ? (description | HASH_BASED_MERGE_BIT) : (description & ~HASH_BASED_MERGE_BIT));
+      return this;
+    }
+
+    public boolean isSplitUpdate() {
+      return (description & SPLIT_UPDATE_BIT) > 0;
+    }
+
+    public boolean isHashBasedMerge() {
+      return (description & HASH_BASED_MERGE_BIT) > 0;
+    }
+
+    public int toInt() {
+      return description;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder str = new StringBuilder();
+      if (isSplitUpdate()) {
+        str.append("|" + SPLIT_UPDATE_STRING);
+      }
+      if (isHashBasedMerge()) {
+        str.append("|" + HASH_BASED_MERGE_STRING);
+      }
+      return str.toString();
+    }
+  }
+
   public static interface Directory {
 
     /**
@@ -287,18 +511,20 @@ public class AcidUtils {
     //-1 is for internal (getAcidState()) purposes and means the delta dir
     //had no statement ID
     private final int statementId;
+    private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
 
     /**
      * for pre 1.3.x delta files
      */
-    ParsedDelta(long min, long max, FileStatus path) {
-      this(min, max, path, -1);
+    ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) {
+      this(min, max, path, -1, isDeleteDelta);
     }
-    ParsedDelta(long min, long max, FileStatus path, int statementId) {
+    ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta) {
       this.minTransaction = min;
       this.maxTransaction = max;
       this.path = path;
       this.statementId = statementId;
+      this.isDeleteDelta = isDeleteDelta;
     }
 
     public long getMinTransaction() {
@@ -317,6 +543,10 @@ public class AcidUtils {
       return statementId == -1 ? 0 : statementId;
     }
 
+    public boolean isDeleteDelta() {
+      return isDeleteDelta;
+    }
+
     /**
      * Compactions (Major/Minor) merge deltas/bases but delete of old files
      * happens in a different process; thus it's possible to have bases/deltas with
@@ -418,15 +648,49 @@ public class AcidUtils {
     return results.toArray(new Path[results.size()]);
   }
 
-  private static ParsedDelta parseDelta(FileStatus path) {
-    ParsedDelta p = parsedDelta(path.getPath());
-    return new ParsedDelta(p.getMinTransaction(),
-      p.getMaxTransaction(), path, p.statementId);
+  /**
+   * Convert the list of begin/end transaction id pairs to a list of delete delta
+   * directories.  Note that there may be multiple delete_delta files for the exact same txn range starting
+   * with 2.2.x;
+   * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
+   * @param root the root directory
+   * @param deleteDeltas list of begin/end transaction id pairs
+   * @return the list of delta paths
+   */
+  public static Path[] deserializeDeleteDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deleteDeltas) throws IOException {
+    List<Path> results = new ArrayList<Path>(deleteDeltas.size());
+    for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) {
+      if(dmd.getStmtIds().isEmpty()) {
+        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
+        continue;
+      }
+      for(Integer stmtId : dmd.getStmtIds()) {
+        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
+      }
+    }
+    return results.toArray(new Path[results.size()]);
   }
+
   public static ParsedDelta parsedDelta(Path deltaDir) {
+    String deltaDirName = deltaDir.getName();
+    if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
+      return parsedDelta(deltaDir, DELETE_DELTA_PREFIX);
+    }
+    return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix
+  }
+
+  private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) {
+    ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix);
+    boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
+    return new ParsedDelta(p.getMinTransaction(),
+        p.getMaxTransaction(), path, p.statementId, isDeleteDelta);
+  }
+
+  public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) {
     String filename = deltaDir.getName();
-    if (filename.startsWith(DELTA_PREFIX)) {
-      String rest = filename.substring(DELTA_PREFIX.length());
+    boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
+    if (filename.startsWith(deltaPrefix)) {
+      String rest = filename.substring(deltaPrefix.length());
       int split = rest.indexOf('_');
       int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
       long min = Long.parseLong(rest.substring(0, split));
@@ -434,13 +698,13 @@ public class AcidUtils {
         Long.parseLong(rest.substring(split + 1)) :
         Long.parseLong(rest.substring(split + 1, split2));
       if(split2 == -1) {
-        return new ParsedDelta(min, max, null);
+        return new ParsedDelta(min, max, null, isDeleteDelta);
       }
       int statementId = Integer.parseInt(rest.substring(split2 + 1));
-      return new ParsedDelta(min, max, null, statementId);
+      return new ParsedDelta(min, max, null, statementId, isDeleteDelta);
     }
     throw new IllegalArgumentException(deltaDir + " does not start with " +
-                                       DELTA_PREFIX);
+                                       deltaPrefix);
   }
 
   /**
@@ -456,7 +720,8 @@ public class AcidUtils {
     for(FileStatus file: fs.listStatus(directory)) {
       String filename = file.getPath().getName();
       if (filename.startsWith(BASE_PREFIX) ||
-          filename.startsWith(DELTA_PREFIX)) {
+          filename.startsWith(DELTA_PREFIX) ||
+          filename.startsWith(DELETE_DELTA_PREFIX)) {
         if (file.isDir()) {
           return true;
         }
@@ -499,6 +764,7 @@ public class AcidUtils {
                                        boolean ignoreEmptyFiles
                                        ) throws IOException {
     FileSystem fs = directory.getFileSystem(conf);
+    // The following 'deltas' includes all kinds of delta files including insert & delete deltas.
     final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
     List<ParsedDelta> working = new ArrayList<ParsedDelta>();
     List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
@@ -553,6 +819,7 @@ public class AcidUtils {
     //subject to list of 'exceptions' in 'txnList' (not show in above example).
     long current = bestBase.txn;
     int lastStmtId = -1;
+    ParsedDelta prev = null;
     for(ParsedDelta next: working) {
       if (next.maxTransaction > current) {
         // are any of the new transactions ones that we care about?
@@ -561,6 +828,7 @@ public class AcidUtils {
           deltas.add(next);
           current = next.maxTransaction;
           lastStmtId = next.statementId;
+          prev = next;
         }
       }
       else if(next.maxTransaction == current && lastStmtId >= 0) {
@@ -568,6 +836,24 @@ public class AcidUtils {
         //generate multiple delta files with the same txnId range
         //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete
         deltas.add(next);
+        prev = next;
+      }
+      else if (prev != null && next.maxTransaction == prev.maxTransaction
+                  && next.minTransaction == prev.minTransaction
+                  && next.statementId == prev.statementId) {
+        // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except
+        // the path. This may happen when we have split update and we have two types of delta
+        // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range.
+
+        // Also note that any delete_deltas in between a given delta_x_y range would be made
+        // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete.
+        // This is valid because minor compaction always compacts the normal deltas and the delete
+        // deltas for the same range. That is, if we had 3 directories, delta_30_30,
+        // delete_delta_40_40 and delta_50_50, then running minor compaction would produce
+        // delta_30_50 and delete_delta_30_50.
+
+        deltas.add(next);
+        prev = next;
       }
       else {
         obsolete.add(next.path);
@@ -638,7 +924,8 @@ public class AcidUtils {
   }
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
       ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
-      List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) {
+      List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
+      boolean ignoreEmptyFiles) throws IOException {
     Path p = child.getPath();
     String fn = p.getName();
     if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
@@ -662,8 +949,11 @@ public class AcidUtils {
       } else {
         obsolete.add(child);
       }
-    } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) {
-      ParsedDelta delta = parseDelta(child);
+    } else if ((fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX))
+                    && child.isDir()) {
+      String deltaPrefix =
+              (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
+      ParsedDelta delta = parseDelta(child, deltaPrefix);
       if (txnList.isTxnRangeValid(delta.minTransaction,
           delta.maxTransaction) !=
           ValidTxnList.RangeResponse.NONE) {
@@ -791,4 +1081,84 @@ public class AcidUtils {
 
     return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
   }
+
+  /**
+   * Sets the acidOperationalProperties in the configuration object argument.
+   * @param conf Mutable configuration object
+   * @param properties An acidOperationalProperties object to initialize from.
+   */
+  public static void setAcidOperationalProperties(Configuration conf,
+          AcidOperationalProperties properties) {
+    if (properties != null) {
+      HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, properties.toInt());
+    }
+  }
+
+  /**
+   * Sets the acidOperationalProperties in the map object argument.
+   * @param parameters Mutable map object
+   * @param properties An acidOperationalProperties object to initialize from.
+   */
+  public static void setAcidOperationalProperties(
+          Map<String, String> parameters, AcidOperationalProperties properties) {
+    if (properties != null) {
+      parameters.put(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, properties.toString());
+    }
+  }
+
+  /**
+   * Returns the acidOperationalProperties for a given table.
+   * @param table A table object
+   * @return the acidOperationalProperties object for the corresponding table.
+   */
+  public static AcidOperationalProperties getAcidOperationalProperties(Table table) {
+    String transactionalProperties = table.getProperty(
+            hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+    if (transactionalProperties == null) {
+      // If the table does not define any transactional properties, we return a legacy type.
+      return AcidOperationalProperties.getLegacy();
+    }
+    return AcidOperationalProperties.parseString(transactionalProperties);
+  }
+
+  /**
+   * Returns the acidOperationalProperties for a given configuration.
+   * @param conf A configuration object
+   * @return the acidOperationalProperties object for the corresponding configuration.
+   */
+  public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) {
+    // If the conf does not define any transactional properties, the parseInt() should receive
+    // a value of zero, which will set AcidOperationalProperties to a legacy type and return that.
+    return AcidOperationalProperties.parseInt(
+            HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES));
+  }
+
+  /**
+   * Returns the acidOperationalProperties for a given set of properties.
+   * @param props A properties object
+   * @return the acidOperationalProperties object for the corresponding properties.
+   */
+  public static AcidOperationalProperties getAcidOperationalProperties(Properties props) {
+    String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+    if (resultStr == null) {
+      // If the properties does not define any transactional properties, we return a legacy type.
+      return AcidOperationalProperties.getLegacy();
+    }
+    return AcidOperationalProperties.parseString(resultStr);
+  }
+
+  /**
+   * Returns the acidOperationalProperties for a given map.
+   * @param parameters A parameters object
+   * @return the acidOperationalProperties object for the corresponding map.
+   */
+  public static AcidOperationalProperties getAcidOperationalProperties(
+          Map<String, String> parameters) {
+    String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+    if (resultStr == null) {
+      // If the parameters does not define any transactional properties, we return a legacy type.
+      return AcidOperationalProperties.getLegacy();
+    }
+    return AcidOperationalProperties.parseString(resultStr);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 945b828..c4b9940 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -637,6 +637,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
         pushFilters(jobConf, ts);
 
         AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable());
+        AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties());
       }
     }
   }


[2/3] hive git commit: HIVE-14035 Enable predicate pushdown to delta files created by ACID Transactions (Saket Saurabh via Eugene Koifman)

Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 334cb31..6261a14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -80,7 +80,10 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.AcidBaseFileInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
 import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
 import org.apache.hadoop.hive.ql.io.BatchToRowReader;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
@@ -525,6 +528,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final ValidTxnList transactionList;
     private SplitStrategyKind splitStrategyKind;
     private final SearchArgument sarg;
+    private final AcidOperationalProperties acidOperationalProperties;
 
     Context(Configuration conf) throws IOException {
       this(conf, 1, null);
@@ -606,6 +610,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
       String value = conf.get(ValidTxnList.VALID_TXNS_KEY);
       transactionList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value);
+
+      // Determine the transactional_properties of the table from the job conf stored in context.
+      // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(),
+      // & therefore we should be able to retrieve them here and determine appropriate behavior.
+      // Note that this will be meaningless for non-acid tables & will be set to null.
+      boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
+      String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+      this.acidOperationalProperties = isTableTransactional ?
+          AcidOperationalProperties.parseString(transactionalProperties) : null;
     }
 
     @VisibleForTesting
@@ -639,17 +652,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   @VisibleForTesting
   static final class AcidDirInfo {
     public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
-        List<HdfsFileStatusWithId> baseOrOriginalFiles) {
+        List<AcidBaseFileInfo> baseFiles,
+        List<ParsedDelta> parsedDeltas) {
       this.splitPath = splitPath;
       this.acidInfo = acidInfo;
-      this.baseOrOriginalFiles = baseOrOriginalFiles;
+      this.baseFiles = baseFiles;
       this.fs = fs;
+      this.parsedDeltas = parsedDeltas;
     }
 
     final FileSystem fs;
     final Path splitPath;
     final AcidUtils.Directory acidInfo;
-    final List<HdfsFileStatusWithId> baseOrOriginalFiles;
+    final List<AcidBaseFileInfo> baseFiles;
+    final List<ParsedDelta> parsedDeltas;
   }
 
   @VisibleForTesting
@@ -672,7 +688,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, OrcTail orcTail,
         List<OrcProto.Type> readerTypes, boolean isOriginal, List<DeltaMetaData> deltas,
         boolean hasBase, Path dir, boolean[] covered, ByteBuffer ppdResult) throws IOException {
-      super(dir, context.numBuckets, deltas, covered);
+      super(dir, context.numBuckets, deltas, covered, context.acidOperationalProperties);
       this.context = context;
       this.fs = fs;
       this.fileWithId = fileWithId;
@@ -916,7 +932,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     public BISplitStrategy(Context context, FileSystem fs,
         Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal,
         List<DeltaMetaData> deltas, boolean[] covered, boolean allowSyntheticFileIds) {
-      super(dir, context.numBuckets, deltas, covered);
+      super(dir, context.numBuckets, deltas, covered, context.acidOperationalProperties);
       this.fileStatuses = fileStatuses;
       this.isOriginal = isOriginal;
       this.deltas = deltas;
@@ -964,20 +980,33 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     List<DeltaMetaData> deltas;
     boolean[] covered;
     int numBuckets;
+    AcidOperationalProperties acidOperationalProperties;
 
-    public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered) {
+    public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
+        AcidOperationalProperties acidOperationalProperties) {
       this.dir = dir;
       this.numBuckets = numBuckets;
       this.deltas = deltas;
       this.covered = covered;
+      this.acidOperationalProperties = acidOperationalProperties;
     }
 
     @Override
     public List<OrcSplit> getSplits() throws IOException {
+      List<OrcSplit> splits = Lists.newArrayList();
+
+      // When split-update is enabled, we do not need to account for buckets that aren't covered.
+      // This is a huge performance benefit of split-update. And the reason why we are able to
+      // do so is because the 'deltas' here are actually only the delete_deltas. All the insert_deltas
+      // with valid user payload data has already been considered as base for the covered buckets.
+      // Hence, the uncovered buckets do not have any relevant data and we can just ignore them.
+      if (acidOperationalProperties != null && acidOperationalProperties.isSplitUpdate()) {
+        return splits; // return an empty list.
+      }
+
       // Generate a split for any buckets that weren't covered.
       // This happens in the case where a bucket just has deltas and no
       // base.
-      List<OrcSplit> splits = Lists.newArrayList();
       if (!deltas.isEmpty()) {
         for (int b = 0; b < numBuckets; ++b) {
           if (!covered[b]) {
@@ -1032,13 +1061,70 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
 
     private AcidDirInfo callInternal() throws IOException {
-      AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
-          context.conf, context.transactionList, useFileIds, true);
+      AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
+          context.transactionList, useFileIds, true);
       Path base = dirInfo.getBaseDirectory();
       // find the base files (original or new style)
-      List<HdfsFileStatusWithId> children = (base == null)
-          ? dirInfo.getOriginalFiles() : findBaseFiles(base, useFileIds);
-      return new AcidDirInfo(fs, dir, dirInfo, children);
+      List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>();
+      if (base == null) {
+        for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) {
+          baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
+        }
+      } else {
+        List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(base, useFileIds);
+        for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
+          baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.COMPACTED_BASE));
+        }
+      }
+
+      // Find the parsed deltas- some of them containing only the insert delta events
+      // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details)
+      List<ParsedDelta> parsedDeltas = new ArrayList<ParsedDelta>();
+
+      if (context.acidOperationalProperties != null &&
+          context.acidOperationalProperties.isSplitUpdate()) {
+        // If we have split-update turned on for this table, then the delta events have already been
+        // split into two directories- delta_x_y/ and delete_delta_x_y/.
+        // When you have split-update turned on, the insert events go to delta_x_y/ directory and all
+        // the delete events go to delete_x_y/. An update event will generate two events-
+        // a delete event for the old record that is put into delete_delta_x_y/,
+        // followed by an insert event for the updated record put into the usual delta_x_y/.
+        // Therefore, everything inside delta_x_y/ is an insert event and all the files in delta_x_y/
+        // can be treated like base files. Hence, each of these are added to baseOrOriginalFiles list.
+
+        for (ParsedDelta parsedDelta : dirInfo.getCurrentDirectories()) {
+          if (parsedDelta.isDeleteDelta()) {
+            parsedDeltas.add(parsedDelta);
+          } else {
+            // This is a normal insert delta, which only has insert events and hence all the files
+            // in this delta directory can be considered as a base.
+            if (useFileIds) {
+              try {
+                List<HdfsFileStatusWithId> insertDeltaFiles =
+                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+                for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
+                  baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+                }
+                continue; // move on to process to the next parsedDelta.
+              } catch (Throwable t) {
+                LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
+              }
+            }
+            // Fall back to regular API and create statuses without ID.
+            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+            for (FileStatus child : children) {
+              HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
+              baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+            }
+          }
+        }
+
+      } else {
+        // When split-update is not enabled, then all the deltas in the current directories
+        // should be considered as usual.
+        parsedDeltas.addAll(dirInfo.getCurrentDirectories());
+      }
+      return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas);
     }
 
     private List<HdfsFileStatusWithId> findBaseFiles(
@@ -1526,26 +1612,32 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           continue;
         }
 
-        // We have received a new directory information, make a split strategy.
+        // We have received a new directory information, make split strategies.
         --resultsLeft;
-        SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context, adi.fs,
-            adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, readerTypes, ugi,
+
+        // The reason why we can get a list of split strategies here is because for ACID split-update
+        // case when we have a mix of original base files & insert deltas, we will produce two
+        // independent split strategies for them. There is a global flag 'isOriginal' that is set
+        // on a per split strategy basis and it has to be same for all the files in that strategy.
+        List<SplitStrategy<?>> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs,
+            adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
             allowSyntheticFileIds);
-        if (splitStrategy == null) continue; // Combined.
 
-        if (isDebugEnabled) {
-          LOG.debug("Split strategy: {}", splitStrategy);
-        }
+        for (SplitStrategy<?> splitStrategy : splitStrategies) {
+          if (isDebugEnabled) {
+            LOG.debug("Split strategy: {}", splitStrategy);
+          }
 
-        // Hack note - different split strategies return differently typed lists, yay Java.
-        // This works purely by magic, because we know which strategy produces which type.
-        if (splitStrategy instanceof ETLSplitStrategy) {
-          scheduleSplits((ETLSplitStrategy)splitStrategy,
-              context, splitFutures, strategyFutures, splits);
-        } else {
-          @SuppressWarnings("unchecked")
-          List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits();
-          splits.addAll(readySplits);
+          // Hack note - different split strategies return differently typed lists, yay Java.
+          // This works purely by magic, because we know which strategy produces which type.
+          if (splitStrategy instanceof ETLSplitStrategy) {
+            scheduleSplits((ETLSplitStrategy)splitStrategy,
+                context, splitFutures, strategyFutures, splits);
+          } else {
+            @SuppressWarnings("unchecked")
+            List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits();
+            splits.addAll(readySplits);
+          }
         }
       }
 
@@ -1763,6 +1855,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     final OrcSplit split = (OrcSplit) inputSplit;
     final Path path = split.getPath();
+
     Path root;
     if (split.hasBase()) {
       if (split.isOriginal()) {
@@ -1773,7 +1866,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     } else {
       root = path;
     }
-    final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas());
+
+    // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat.
+    AcidUtils.AcidOperationalProperties acidOperationalProperties
+            = AcidUtils.getAcidOperationalProperties(options.getConfiguration());
+
+    // The deltas are decided based on whether split-update has been turned on for the table or not.
+    // When split-update is turned off, everything in the delta_x_y/ directory should be treated
+    // as delta. However if split-update is turned on, only the files in delete_delta_x_y/ directory
+    // need to be considered as delta, because files in delta_x_y/ will be processed as base files
+    // since they only have insert events in them.
+    final Path[] deltas =
+        acidOperationalProperties.isSplitUpdate() ?
+            AcidUtils.deserializeDeleteDeltas(root, split.getDeltas())
+            : AcidUtils.deserializeDeltas(root, split.getDeltas());
     final Configuration conf = options.getConfiguration();
 
 
@@ -1793,7 +1899,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
 
     if (split.hasBase()) {
-      bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
+      bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf)
           .getBucket();
       OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf)
           .maxLength(split.getFileLength());
@@ -1948,23 +2054,76 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   }
 
   @VisibleForTesting
+  static List<SplitStrategy<?>> determineSplitStrategies(CombinedCtx combinedCtx, Context context,
+      FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
+      List<AcidBaseFileInfo> baseFiles,
+      List<ParsedDelta> parsedDeltas,
+      List<OrcProto.Type> readerTypes,
+      UserGroupInformation ugi, boolean allowSyntheticFileIds) {
+    List<SplitStrategy<?>> splitStrategies = new ArrayList<SplitStrategy<?>>();
+    SplitStrategy<?> splitStrategy;
+
+    // When no baseFiles, we will just generate a single split strategy and return.
+    List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
+    if (baseFiles.isEmpty()) {
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+          acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
+      if (splitStrategy != null) {
+        splitStrategies.add(splitStrategy);
+      }
+      return splitStrategies; // return here
+    }
+
+    List<HdfsFileStatusWithId> originalSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
+    // Separate the base files into acid schema and non-acid(original) schema files.
+    for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) {
+      if (acidBaseFileInfo.isOriginal()) {
+        originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
+      } else {
+        acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
+      }
+    }
+
+    // Generate split strategy for non-acid schema original files, if any.
+    if (!originalSchemaFiles.isEmpty()) {
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+          originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
+      if (splitStrategy != null) {
+        splitStrategies.add(splitStrategy);
+      }
+    }
+
+    // Generate split strategy for acid schema files, if any.
+    if (!acidSchemaFiles.isEmpty()) {
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+          acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
+      if (splitStrategy != null) {
+        splitStrategies.add(splitStrategy);
+      }
+    }
+
+    return splitStrategies;
+  }
+
+  @VisibleForTesting
   static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
       FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
-      List<HdfsFileStatusWithId> baseOrOriginalFiles, List<OrcProto.Type> readerTypes,
+      List<HdfsFileStatusWithId> baseFiles,
+      boolean isOriginal,
+      List<ParsedDelta> parsedDeltas,
+      List<OrcProto.Type> readerTypes,
       UserGroupInformation ugi, boolean allowSyntheticFileIds) {
-    Path base = dirInfo.getBaseDirectory();
-    List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
-    List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+    List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(parsedDeltas);
     boolean[] covered = new boolean[context.numBuckets];
-    boolean isOriginal = base == null;
 
     // if we have a base to work from
-    if (base != null || !original.isEmpty()) {
+    if (!baseFiles.isEmpty()) {
       long totalFileSize = 0;
-      for (HdfsFileStatusWithId child : baseOrOriginalFiles) {
+      for (HdfsFileStatusWithId child : baseFiles) {
         totalFileSize += child.getFileStatus().getLen();
-        AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
+        AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename
             (child.getFileStatus().getPath(), context.conf);
+        opts.writingBase(true);
         int b = opts.getBucket();
         // If the bucket is in the valid range, mark it as covered.
         // I wish Hive actually enforced bucketing all of the time.
@@ -1973,31 +2132,32 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         }
       }
 
-      int numFiles = baseOrOriginalFiles.size();
+      int numFiles = baseFiles.size();
       long avgFileSize = totalFileSize / numFiles;
       int totalFiles = context.numFilesCounter.addAndGet(numFiles);
       switch(context.splitStrategyKind) {
         case BI:
           // BI strategy requested through config
-          return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+          return new BISplitStrategy(context, fs, dir, baseFiles,
               isOriginal, deltas, covered, allowSyntheticFileIds);
         case ETL:
           // ETL strategy requested through config
-          return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles,
+          return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseFiles,
               deltas, covered, readerTypes, isOriginal, ugi, allowSyntheticFileIds);
         default:
           // HYBRID strategy
           if (avgFileSize > context.maxSize || totalFiles <= context.etlFileThreshold) {
-            return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles,
+            return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseFiles,
                 deltas, covered, readerTypes, isOriginal, ugi, allowSyntheticFileIds);
           } else {
-            return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+            return new BISplitStrategy(context, fs, dir, baseFiles,
                 isOriginal, deltas, covered, allowSyntheticFileIds);
           }
       }
     } else {
       // no base, only deltas
-      return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
+      return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered,
+          context.acidOperationalProperties);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 1a1af28..492c64c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -25,11 +25,6 @@ import java.nio.charset.CharsetDecoder;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.orc.impl.AcidStats;
-import org.apache.orc.impl.OrcAcidUtils;
-import org.apache.orc.OrcConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,10 +37,16 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -79,9 +80,13 @@ public class OrcRecordUpdater implements RecordUpdater {
   private static final Charset UTF8 = Charset.forName("UTF-8");
 
   private final AcidOutputFormat.Options options;
+  private final AcidUtils.AcidOperationalProperties acidOperationalProperties;
   private final Path path;
+  private Path deleteEventPath;
   private final FileSystem fs;
+  private OrcFile.WriterOptions writerOptions;
   private Writer writer;
+  private Writer deleteEventWriter = null;
   private final FSDataOutputStream flushLengths;
   private final OrcStruct item;
   private final IntWritable operation = new IntWritable();
@@ -95,9 +100,11 @@ public class OrcRecordUpdater implements RecordUpdater {
   // because that is monotonically increasing to give new unique row ids.
   private long rowCountDelta = 0;
   private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+  private KeyIndexBuilder deleteEventIndexBuilder;
   private StructField recIdField = null; // field to look for the record identifier in
   private StructField rowIdField = null; // field inside recId to look for row id in
   private StructField originalTxnField = null;  // field inside recId to look for original txn in
+  private StructField bucketField = null; // field inside recId to look for bucket in
   private StructObjectInspector rowInspector; // OI for the original row
   private StructObjectInspector recIdInspector; // OI for the record identifier struct
   private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier
@@ -180,8 +187,22 @@ public class OrcRecordUpdater implements RecordUpdater {
   OrcRecordUpdater(Path path,
                    AcidOutputFormat.Options options) throws IOException {
     this.options = options;
+    // Initialize acidOperationalProperties based on table properties, and
+    // if they are not available, see if we can find it in the job configuration.
+    // We have to look at these two places instead of just the conf, because Streaming Ingest
+    // uses table properties, while normal Hive SQL inserts/updates/deletes will place this
+    // value in the configuration object.
+    if (options.getTableProperties() != null) {
+      this.acidOperationalProperties =
+          AcidUtils.getAcidOperationalProperties(options.getTableProperties());
+    } else {
+      this.acidOperationalProperties =
+          AcidUtils.getAcidOperationalProperties(options.getConfiguration());
+    }
     this.bucket.set(options.getBucket());
     this.path = AcidUtils.createFilename(path, options);
+    this.deleteEventWriter = null;
+    this.deleteEventPath = null;
     FileSystem fs = options.getFilesystem();
     if (fs == null) {
       fs = path.getFileSystem(options.getConfiguration());
@@ -205,7 +226,7 @@ public class OrcRecordUpdater implements RecordUpdater {
     } else {
       flushLengths = null;
     }
-    OrcFile.WriterOptions writerOptions = null;
+    this.writerOptions = null;
     // If writing delta dirs, we need to make a clone of original options, to avoid polluting it for
     // the base writer
     if (options.isWritingBase()) {
@@ -242,6 +263,13 @@ public class OrcRecordUpdater implements RecordUpdater {
     writerOptions.inspector(createEventSchema(findRecId(options.getInspector(),
         options.getRecordIdColumn())));
     this.writer = OrcFile.createWriter(this.path, writerOptions);
+    if (this.acidOperationalProperties.isSplitUpdate()) {
+      // If this is a split-update, we initialize a delete delta file path in anticipation that
+      // they would write update/delete events to that separate file.
+      // This writes to a file in directory which starts with "delete_delta_..."
+      // The actual initialization of a writer only happens if any delete events are written.
+      this.deleteEventPath = AcidUtils.createFilename(path, options.writingDeleteDelta(true));
+    }
     item = new OrcStruct(FIELDS);
     item.setFieldValue(OPERATION, operation);
     item.setFieldValue(CURRENT_TRANSACTION, currentTransaction);
@@ -250,6 +278,7 @@ public class OrcRecordUpdater implements RecordUpdater {
     item.setFieldValue(ROW_ID, rowId);
   }
 
+  @Override
   public String toString() {
     return getClass().getName() + "[" + path +"]";
   }
@@ -264,14 +293,16 @@ public class OrcRecordUpdater implements RecordUpdater {
     * 1. need to know bucket we are writing to
     * 2. need to know which delta dir it's in
     * Then,
-    * 1. find the same bucket file in previous delta dir for this txn
+    * 1. find the same bucket file in previous (insert) delta dir for this txn
+    *    (Note: in case of split_update, we can ignore the delete_delta dirs)
     * 2. read the footer and get AcidStats which has insert count
-     * 2.1 if AcidStats.inserts>0 done
+     * 2.1 if AcidStats.inserts>0 add to the insert count.
      *  else go to previous delta file
      *  For example, consider insert/update/insert case...*/
     if(options.getStatementId() <= 0) {
       return 0;//there is only 1 statement in this transaction (so far)
     }
+    long totalInserts = 0;
     for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) {
       Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt));
       if(!fs.exists(matchingBucket)) {
@@ -281,12 +312,10 @@ public class OrcRecordUpdater implements RecordUpdater {
       //no close() on Reader?!
       AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader);
       if(acidStats.inserts > 0) {
-        return acidStats.inserts;
+        totalInserts += acidStats.inserts;
       }
     }
-    //if we got here, we looked at all delta files in this txn, prior to current statement and didn't 
-    //find any inserts...
-    return 0;
+    return totalInserts;
   }
   // Find the record identifier column (if there) and return a possibly new ObjectInspector that
   // will strain out the record id for the underlying writer.
@@ -307,6 +336,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       // in RecordIdentifier is transactionId, bucketId, rowId
       originalTxnField = fields.get(0);
       origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector();
+      bucketField = fields.get(1);
       rowIdField = fields.get(2);
       rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector();
 
@@ -316,7 +346,7 @@ public class OrcRecordUpdater implements RecordUpdater {
     }
   }
 
-  private void addEvent(int operation, long currentTransaction, long rowId, Object row)
+  private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row)
       throws IOException {
     this.operation.set(operation);
     this.currentTransaction.set(currentTransaction);
@@ -334,11 +364,60 @@ public class OrcRecordUpdater implements RecordUpdater {
     }
     this.rowId.set(rowId);
     this.originalTransaction.set(originalTransaction);
+    item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
     item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
     indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId);
     writer.addRow(item);
   }
 
+  private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row)
+      throws IOException {
+    if (operation == INSERT_OPERATION) {
+      // Just insert the record in the usual way, i.e., default to the simple behavior.
+      addSimpleEvent(operation, currentTransaction, rowId, row);
+      return;
+    }
+    this.operation.set(operation);
+    this.currentTransaction.set(currentTransaction);
+    Object rowValue = rowInspector.getStructFieldData(row, recIdField);
+    long originalTransaction = origTxnInspector.get(
+            recIdInspector.getStructFieldData(rowValue, originalTxnField));
+    rowId = rowIdInspector.get(
+            recIdInspector.getStructFieldData(rowValue, rowIdField));
+
+    if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+      // Initialize a deleteEventWriter if not yet done. (Lazy initialization)
+      if (deleteEventWriter == null) {
+        // Initialize an indexBuilder for deleteEvents.
+        deleteEventIndexBuilder = new KeyIndexBuilder();
+        // Change the indexBuilder callback too for the deleteEvent file, the remaining writer
+        // options remain the same.
+
+        // TODO: When we change the callback, we are essentially mutating the writerOptions.
+        // This works but perhaps is not a good thing. The proper way to do this would be
+        // to clone the writerOptions, however it requires that the parent OrcFile.writerOptions
+        // implements a clone() method (which it does not for now). HIVE-14514 is currently an open
+        // JIRA to fix this.
+
+        this.deleteEventWriter = OrcFile.createWriter(deleteEventPath,
+                                                      writerOptions.callback(deleteEventIndexBuilder));
+      }
+
+      // A delete/update generates a delete event for the original row.
+      this.rowId.set(rowId);
+      this.originalTransaction.set(originalTransaction);
+      item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION));
+      item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events.
+      deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId);
+      deleteEventWriter.addRow(item);
+    }
+
+    if (operation == UPDATE_OPERATION) {
+      // A new row is also inserted in the usual delta file for an update event.
+      addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+    }
+  }
+
   @Override
   public void insert(long currentTransaction, Object row) throws IOException {
     if (this.currentTransaction.get() != currentTransaction) {
@@ -347,7 +426,11 @@ public class OrcRecordUpdater implements RecordUpdater {
       //always true in that case
       rowIdOffset = findRowIdOffsetForInsert();
     }
-    addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+    if (acidOperationalProperties.isSplitUpdate()) {
+      addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+    } else {
+      addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+    }
     rowCountDelta++;
   }
 
@@ -355,8 +438,13 @@ public class OrcRecordUpdater implements RecordUpdater {
   public void update(long currentTransaction, Object row) throws IOException {
     if (this.currentTransaction.get() != currentTransaction) {
       insertedRows = 0;
+      rowIdOffset = findRowIdOffsetForInsert();
+    }
+    if (acidOperationalProperties.isSplitUpdate()) {
+      addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
+    } else {
+      addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
     }
-    addEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
   }
 
   @Override
@@ -364,9 +452,12 @@ public class OrcRecordUpdater implements RecordUpdater {
     if (this.currentTransaction.get() != currentTransaction) {
       insertedRows = 0;
     }
-    addEvent(DELETE_OPERATION, currentTransaction, -1, row);
+    if (acidOperationalProperties.isSplitUpdate()) {
+      addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, -1L, row);
+    } else {
+      addSimpleEvent(DELETE_OPERATION, currentTransaction, -1L, row);
+    }
     rowCountDelta--;
-
   }
 
   @Override
@@ -390,13 +481,38 @@ public class OrcRecordUpdater implements RecordUpdater {
         fs.delete(path, false);
       }
     } else {
-      if (writer != null) writer.close();
+      if (writer != null) {
+        if (acidOperationalProperties.isSplitUpdate()) {
+          // When split-update is enabled, we can choose not to write
+          // any delta files when there are no inserts. In such cases only the delete_deltas
+          // would be written & they are closed separately below.
+          if (indexBuilder.acidStats.inserts > 0) {
+            writer.close(); // normal close, when there are inserts.
+          } else {
+            // Just remove insert delta paths, when there are no insert events.
+            fs.delete(path, false);
+          }
+        } else {
+          writer.close(); // normal close.
+        }
+      }
+      if (deleteEventWriter != null) {
+        if (deleteEventIndexBuilder.acidStats.deletes > 0) {
+          // Only need to write out & close the delete_delta if there have been any.
+          deleteEventWriter.close();
+        } else {
+          // Just remove delete_delta, if there have been no delete events.
+          fs.delete(deleteEventPath, false);
+        }
+      }
+
     }
     if (flushLengths != null) {
       flushLengths.close();
       fs.delete(OrcAcidUtils.getSideFile(path), false);
     }
     writer = null;
+    deleteEventWriter = null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 8cb5e8a..5f53aef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -3167,10 +3168,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
   private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst,
                                     List<Path> newFiles) throws HiveException {
-    // The layout for ACID files is table|partname/base|delta/bucket
+    // The layout for ACID files is table|partname/base|delta|delete_delta/bucket
     // We will always only be writing delta files.  In the buckets created by FileSinkOperator
-    // it will look like bucket/delta/bucket.  So we need to move that into the above structure.
-    // For the first mover there will be no delta directory, so we can move the whole directory.
+    // it will look like bucket/delta|delete_delta/bucket.  So we need to move that into
+    // the above structure. For the first mover there will be no delta directory,
+    // so we can move the whole directory.
     // For everyone else we will need to just move the buckets under the existing delta
     // directory.
 
@@ -3193,49 +3195,58 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
       for (FileStatus origBucketStat : origBucketStats) {
         Path origBucketPath = origBucketStat.getPath();
-        LOG.debug("Acid move looking for delta files in bucket " + origBucketPath);
+        moveAcidDeltaFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter,
+                fs, dst, origBucketPath, createdDeltaDirs, newFiles);
+        moveAcidDeltaFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter,
+                fs, dst,origBucketPath, createdDeltaDirs, newFiles);
+      }
+    }
+  }
 
-        FileStatus[] deltaStats = null;
-        try {
-          deltaStats = fs.listStatus(origBucketPath, AcidUtils.deltaFileFilter);
-        } catch (IOException e) {
-          throw new HiveException("Unable to look for delta files in original bucket " +
-              origBucketPath.toUri().toString(), e);
-        }
-        LOG.debug("Acid move found " + deltaStats.length + " delta files");
-
-        for (FileStatus deltaStat : deltaStats) {
-          Path deltaPath = deltaStat.getPath();
-          // Create the delta directory.  Don't worry if it already exists,
-          // as that likely means another task got to it first.  Then move each of the buckets.
-          // it would be more efficient to try to move the delta with it's buckets but that is
-          // harder to make race condition proof.
-          Path deltaDest = new Path(dst, deltaPath.getName());
+  private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs,
+                                         Path dst, Path origBucketPath, Set<Path> createdDeltaDirs,
+                                         List<Path> newFiles) throws HiveException {
+    LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath);
+
+    FileStatus[] deltaStats = null;
+    try {
+      deltaStats = fs.listStatus(origBucketPath, pathFilter);
+    } catch (IOException e) {
+      throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " +
+          origBucketPath.toUri().toString(), e);
+    }
+    LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files");
+
+    for (FileStatus deltaStat : deltaStats) {
+      Path deltaPath = deltaStat.getPath();
+      // Create the delta directory.  Don't worry if it already exists,
+      // as that likely means another task got to it first.  Then move each of the buckets.
+      // it would be more efficient to try to move the delta with it's buckets but that is
+      // harder to make race condition proof.
+      Path deltaDest = new Path(dst, deltaPath.getName());
+      try {
+        if (!createdDeltaDirs.contains(deltaDest)) {
           try {
-            if (!createdDeltaDirs.contains(deltaDest)) {
-              try {
-                fs.mkdirs(deltaDest);
-                createdDeltaDirs.add(deltaDest);
-              } catch (IOException swallowIt) {
-                // Don't worry about this, as it likely just means it's already been created.
-                LOG.info("Unable to create delta directory " + deltaDest +
-                    ", assuming it already exists: " + swallowIt.getMessage());
-              }
-            }
-            FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
-            LOG.debug("Acid move found " + bucketStats.length + " bucket files");
-            for (FileStatus bucketStat : bucketStats) {
-              Path bucketSrc = bucketStat.getPath();
-              Path bucketDest = new Path(deltaDest, bucketSrc.getName());
-              LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
-                  bucketDest.toUri().toString());
-              fs.rename(bucketSrc, bucketDest);
-              if (newFiles != null) newFiles.add(bucketDest);
-            }
-          } catch (IOException e) {
-            throw new HiveException("Error moving acid files " + e.getMessage(), e);
+            fs.mkdirs(deltaDest);
+            createdDeltaDirs.add(deltaDest);
+          } catch (IOException swallowIt) {
+            // Don't worry about this, as it likely just means it's already been created.
+            LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest +
+                ", assuming it already exists: " + swallowIt.getMessage());
           }
         }
+        FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
+        LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+        for (FileStatus bucketStat : bucketStats) {
+          Path bucketSrc = bucketStat.getPath();
+          Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+          LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
+              bucketDest.toUri().toString());
+          fs.rename(bucketSrc, bucketDest);
+          if (newFiles != null) newFiles.add(bucketDest);
+        }
+      } catch (IOException e) {
+        throw new HiveException("Error moving acid files " + e.getMessage(), e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 8cf261d..47c65bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -101,6 +101,8 @@ public class TableScanDesc extends AbstractOperatorDesc {
 
   private boolean isAcidTable;
 
+  private AcidUtils.AcidOperationalProperties acidOperationalProperties = null;
+
   private transient TableSample tableSample;
 
   private transient Table tableMetadata;
@@ -127,6 +129,9 @@ public class TableScanDesc extends AbstractOperatorDesc {
     this.virtualCols = vcs;
     this.tableMetadata = tblMetadata;
     isAcidTable = AcidUtils.isAcidTable(this.tableMetadata);
+    if (isAcidTable) {
+      acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata);
+    }
   }
 
   @Override
@@ -159,6 +164,10 @@ public class TableScanDesc extends AbstractOperatorDesc {
     return isAcidTable;
   }
 
+  public AcidUtils.AcidOperationalProperties getAcidOperationalProperties() {
+    return acidOperationalProperties;
+  }
+
   @Explain(displayName = "Output", explainLevels = { Level.USER })
   public List<String> getOutputColumnNames() {
     return this.neededColumns;

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 6caca98..c3e3982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -17,9 +17,16 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.hadoop.hive.common.ValidCompactorTxnList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -27,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StringableMap;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -61,12 +69,8 @@ import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.StringUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-import java.util.regex.Matcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class to do compactions via an MR job.  This has to be in the ql package rather than metastore
@@ -129,7 +133,7 @@ public class CompactorMR {
     job.setInt(NUM_BUCKETS, sd.getNumBuckets());
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable
-    if (ci.properties != null) { // override MR properties and general tblproperties if applicable
+    if (ci.properties != null) {
       overrideTblProps(job, t.getParameters(), ci.properties);
     }
     setColumnTypes(job, sd.getCols());
@@ -137,6 +141,11 @@ public class CompactorMR {
     //to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter
     //to do the final move
     job.setBoolean("mapreduce.map.speculative", false);
+
+    // Set appropriate Acid readers/writers based on the table properties.
+    AcidUtils.setAcidOperationalProperties(job,
+            AcidUtils.getAcidOperationalProperties(t.getParameters()));
+
     return job;
   }
 
@@ -501,12 +510,18 @@ public class CompactorMR {
       Map<Integer, BucketTracker> splitToBucketMap = new HashMap<Integer, BucketTracker>();
       for (Path dir : dirsToSearch) {
         FileSystem fs = dir.getFileSystem(entries);
+        // When we have split-update and there are two kinds of delta directories-
+        // the delta_x_y/ directory one which has only insert events and
+        // the delete_delta_x_y/ directory which has only the delete events.
+        // The clever thing about this kind of splitting is that everything in the delta_x_y/
+        // directory can be processed as base files. However, this is left out currently
+        // as an improvement for the future.
 
-        // If this is a base or delta directory, then we need to be looking for the bucket files.
-        // But if it's a legacy file then we need to add it directly.
         if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) ||
-            dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
+            dir.getName().startsWith(AcidUtils.DELTA_PREFIX) ||
+            dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
           boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
+
           FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
           for(FileStatus f : files) {
             // For each file, figure out which bucket it is.
@@ -519,6 +534,8 @@ public class CompactorMR {
           addFileToMap(matcher, dir, true, splitToBucketMap);
         }
       }
+
+
       List<InputSplit> splits = new ArrayList<InputSplit>(splitToBucketMap.size());
       for (Map.Entry<Integer, BucketTracker> e : splitToBucketMap.entrySet()) {
         BucketTracker bt = e.getValue();
@@ -613,7 +630,8 @@ public class CompactorMR {
       implements Mapper<WritableComparable, CompactorInputSplit,  NullWritable,  NullWritable> {
 
     JobConf jobConf;
-    RecordWriter writer;
+    RecordWriter writer = null;
+    RecordWriter deleteEventWriter = null;
 
     @Override
     public void map(WritableComparable key, CompactorInputSplit split,
@@ -636,10 +654,30 @@ public class CompactorMR {
       RecordIdentifier identifier = reader.createKey();
       V value = reader.createValue();
       getWriter(reporter, reader.getObjectInspector(), split.getBucket());
+
+      AcidUtils.AcidOperationalProperties acidOperationalProperties
+          = AcidUtils.getAcidOperationalProperties(jobConf);
+
+      if (!isMajor && acidOperationalProperties.isSplitUpdate()) {
+        // When split-update is enabled for ACID, we initialize a separate deleteEventWriter
+        // that is used to write all the delete events (in case of minor compaction only). For major
+        // compaction, history is not required to be maintained hence the delete events are processed
+        // but not re-written separately.
+        getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket());
+      }
+
       while (reader.next(identifier, value)) {
-        if (isMajor && reader.isDelete(value)) continue;
-        writer.write(value);
-        reporter.progress();
+        boolean sawDeleteRecord = reader.isDelete(value);
+        if (isMajor && sawDeleteRecord) continue;
+        if (sawDeleteRecord && deleteEventWriter != null) {
+          // When minor compacting, write delete events to a separate file when split-update is
+          // turned on.
+          deleteEventWriter.write(value);
+          reporter.progress();
+        } else {
+          writer.write(value);
+          reporter.progress();
+        }
       }
     }
 
@@ -653,6 +691,9 @@ public class CompactorMR {
       if (writer != null) {
         writer.close(false);
       }
+      if (deleteEventWriter != null) {
+        deleteEventWriter.close(false);
+      }
     }
 
     private void getWriter(Reporter reporter, ObjectInspector inspector,
@@ -679,6 +720,30 @@ public class CompactorMR {
       }
     }
 
+    private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector,
+        int bucket) throws IOException {
+      if (deleteEventWriter == null) {
+        AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
+        options.inspector(inspector)
+          .writingBase(false)
+          .writingDeleteDelta(true)   // this is the option which will make it a delete writer
+          .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
+          .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
+          .reporter(reporter)
+          .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+          .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+          .bucket(bucket)
+          .statementId(-1);//setting statementId == -1 makes compacted delta files use
+        //delta_xxxx_yyyy format
+
+        // Instantiate the underlying output format
+        @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
+        AcidOutputFormat<WritableComparable, V> aof =
+          instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
+
+        deleteEventWriter = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options);
+      }
+    }
   }
 
   static class StringableList extends ArrayList<Path> {

http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index af192fb..08ca9d5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -70,18 +70,19 @@ import org.junit.rules.TestName;
  * specifically the tests; the supporting code here is just a clone of TestTxnCommands
  */
 public class TestTxnCommands2 {
-  private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+  protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
     File.separator + TestTxnCommands2.class.getCanonicalName()
     + "-" + System.currentTimeMillis()
   ).getPath().replaceAll("\\\\", "/");
-  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+  protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
   //bucket count for test tables; set it to 1 for easier debugging
-  private static int BUCKET_COUNT = 2;
+  protected static int BUCKET_COUNT = 2;
   @Rule
   public TestName testName = new TestName();
-  private HiveConf hiveConf;
-  private Driver d;
-  private static enum Table {
+
+  protected HiveConf hiveConf;
+  protected Driver d;
+  protected static enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart"),
     NONACIDORCTBL("nonAcidOrcTbl"),
@@ -99,6 +100,10 @@ public class TestTxnCommands2 {
 
   @Before
   public void setUp() throws Exception {
+    setUpWithTableProperties("'transactional'='true'");
+  }
+
+  protected void setUpWithTableProperties(String tableProperties) throws Exception {
     tearDown();
     hiveConf = new HiveConf(this.getClass());
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
@@ -122,12 +127,13 @@ public class TestTxnCommands2 {
     SessionState.start(new SessionState(hiveConf));
     d = new Driver(hiveConf);
     dropTables();
-    runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
-    runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
+    runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
     runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
     runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')");
   }
-  private void dropTables() throws Exception {
+
+  protected void dropTables() throws Exception {
     for(Table t : Table.values()) {
       runStatementOnDriver("drop table if exists " + t);
     }
@@ -731,6 +737,8 @@ public class TestTxnCommands2 {
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
   }
 
+
+
   @Test
   public void testValidTxnsBookkeeping() throws Exception {
     // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf
@@ -859,11 +867,15 @@ public class TestTxnCommands2 {
    */
   @Test
   public void testInitiatorWithMultipleFailedCompactions() throws Exception {
+    testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true'");
+  }
+
+  void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tblProperties) throws Exception {
     String tblName = "hive12353";
     runStatementOnDriver("drop table if exists " + tblName);
     runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
       " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-      " STORED AS ORC  TBLPROPERTIES ('transactional'='true')");
+      " STORED AS ORC  TBLPROPERTIES ( " + tblProperties + " )");
     hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4);
     for(int i = 0; i < 5; i++) {
       //generate enough delta files so that Initiator can trigger auto compaction
@@ -1074,11 +1086,15 @@ public class TestTxnCommands2 {
    */
   @Test
   public void writeBetweenWorkerAndCleaner() throws Exception {
+    writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true'");
+  }
+
+  protected void writeBetweenWorkerAndCleanerForVariousTblProperties(String tblProperties) throws Exception {
     String tblName = "hive12352";
     runStatementOnDriver("drop table if exists " + tblName);
     runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
       " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-      " STORED AS ORC  TBLPROPERTIES ('transactional'='true')");
+      " STORED AS ORC  TBLPROPERTIES ( " + tblProperties + " )");
 
     //create some data
     runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
@@ -1125,7 +1141,6 @@ public class TestTxnCommands2 {
     Assert.assertEquals("", expected,
       runStatementOnDriver("select a,b from " + tblName + " order by a"));
   }
-
   /**
    * Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found.
    * When a heartbeat fails, the query should be failed too.
@@ -1215,17 +1230,78 @@ public class TestTxnCommands2 {
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
-    
+
     runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
     runWorker(hiveConf);
     runCleaner(hiveConf);
     runStatementOnDriver("select count(*) from " + Table.ACIDTBL);
   }
+
+  @Test
+  public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
+    testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true'");
+  }
+
+  protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProperties) throws Exception {
+    String tblName = "acidWithSchemaEvol";
+    int numBuckets = 1;
+    runStatementOnDriver("drop table if exists " + tblName);
+    runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " CLUSTERED BY(a) INTO " + numBuckets +" BUCKETS" + //currently ACID requires table to be bucketed
+      " STORED AS ORC  TBLPROPERTIES ( " + tblProperties + " )");
+
+    // create some data
+    runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
+    runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
+
+    // apply schema evolution by adding some columns
+    runStatementOnDriver("alter table " + tblName + " add columns(c int, d string)");
+
+    // insert some data in new schema
+    runStatementOnDriver("insert into " + tblName + " values(4, 'acid', 100, 'orc'),"
+        + "(5, 'llap', 200, 'tez')");
+
+    // update old data with values for the new schema columns
+    runStatementOnDriver("update " + tblName + " set d = 'hive' where a <= 3");
+    runStatementOnDriver("update " + tblName + " set c = 999 where a <= 3");
+
+    // read the entire data back and see if did everything right
+    List<String> rs = runStatementOnDriver("select * from " + tblName + " order by a");
+    String[] expectedResult = { "1\tfoo\t999\thive", "2\tbar\t999\thive", "3\tblah\t999\thive", "4\tacid\t100\torc", "5\tllap\t200\ttez" };
+    Assert.assertEquals(Arrays.asList(expectedResult), rs);
+
+    // now compact and see if compaction still preserves the data correctness
+    runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    runCleaner(hiveConf); // Cleaner would remove the obsolete files.
+
+    // Verify that there is now only 1 new directory: base_xxxxxxx and the rest have have been cleaned.
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tblName.toString().toLowerCase()),
+        FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    boolean sawNewBase = false;
+    for (int i = 0; i < status.length; i++) {
+      if (status[i].getPath().getName().matches("base_.*")) {
+        sawNewBase = true;
+        FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+        Assert.assertEquals(numBuckets, buckets.length);
+        Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000"));
+      }
+    }
+    Assert.assertTrue(sawNewBase);
+
+    rs = runStatementOnDriver("select * from " + tblName + " order by a");
+    Assert.assertEquals(Arrays.asList(expectedResult), rs);
+  }
+
+
   /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order
    */
-  private List<String> stringifyValues(int[][] rowsIn) {
+  protected List<String> stringifyValues(int[][] rowsIn) {
     assert rowsIn.length > 0;
     int[][] rows = rowsIn.clone();
     Arrays.sort(rows, new RowComp());
@@ -1275,7 +1351,7 @@ public class TestTxnCommands2 {
     return sb.toString();
   }
 
-  private List<String> runStatementOnDriver(String stmt) throws Exception {
+  protected List<String> runStatementOnDriver(String stmt) throws Exception {
     CommandProcessorResponse cpr = d.run(stmt);
     if(cpr.getResponseCode() != 0) {
       throw new RuntimeException(stmt + " failed: " + cpr);