You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/12/01 02:40:05 UTC

[1/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional tables (Eugene Koifman, reviewed by Alan Gates)

Repository: hive
Updated Branches:
  refs/heads/master c03001e98 -> 508d7e6f2


http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
new file mode 100644
index 0000000..b98c74a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests related to support of LOAD DATA with Acid tables
+ * Most tests run in vectorized and non-vectorized mode since we currently have a vectorized and
+ * a non-vectorized acid readers and it's critical that ROW_IDs are generated the same way.
+ */
+public class TestTxnLoadData extends TxnCommandsBaseForTests {
+  static final private Logger LOG = LoggerFactory.getLogger(TestTxnLoadData.class);
+  private static final String TEST_DATA_DIR =
+    new File(System.getProperty("java.io.tmpdir") +
+    File.separator + TestTxnLoadData.class.getCanonicalName()
+    + "-" + System.currentTimeMillis()
+  ).getPath().replaceAll("\\\\", "/");
+  @Rule
+  public TemporaryFolder folder= new TemporaryFolder();
+
+  @Override
+  String getTestDataDir() {
+    return TEST_DATA_DIR;
+  }
+
+  @Test
+  public void loadData() throws Exception {
+    loadData(false);
+  }
+  @Test
+  public void loadDataVectorized() throws Exception {
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    loadData(true);
+  }
+  @Test
+  public void loadDataUpdate() throws Exception {
+    loadDataUpdate(false);
+  }
+  @Test
+  public void loadDataUpdateVectorized() throws Exception {
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    loadDataUpdate(true);
+  }
+  @Test
+  public void loadDataNonAcid2AcidConversion() throws Exception {
+    loadDataNonAcid2AcidConversion(false);
+  }
+  @Test
+  public void loadDataNonAcid2AcidConversionVectorized() throws Exception {
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    loadDataNonAcid2AcidConversion(true);
+  }
+  @Test
+  public void testMultiStatement() throws Exception {
+    testMultiStatement(false);
+  }
+  @Test
+  public void testMultiStatementVectorized() throws Exception {
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    testMultiStatement(true);
+  }
+  private void loadDataUpdate(boolean isVectorized) throws Exception {
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("drop table if exists Tstage");
+    runStatementOnDriver(
+      "create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
+    //Tstage is just a simple way to generate test data
+    runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+    runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
+    //this creates an ORC data file with correct schema under table root
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+    //"load data local inpath" doesn't delete source files so clean it here
+    runStatementOnDriver("truncate table Tstage");
+    //and do a Load Data into the same table, which should now land in a delta_x_x.
+    // 'data' is created by export command/
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+
+    String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+      "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+    String[][] expected = new String[][]{
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000020_0000020_0000/000000_0"},
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}};
+    checkResult(expected, testQuery, isVectorized, "load data inpath");
+    runStatementOnDriver("update T set b = 17 where a = 1");
+    String[][] expected2 = new String[][]{
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"},
+      {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000023_0000023_0000/bucket_00000"}
+    };
+    checkResult(expected2, testQuery, isVectorized, "update");
+
+    runStatementOnDriver("insert into T values(2,2)");
+    runStatementOnDriver("delete from T where a = 3");
+    //test minor compaction
+    runStatementOnDriver("alter table T compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    String[][] expected3 = new String[][] {
+      {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000020_0000027/bucket_00000"},
+      {"{\"transactionid\":26,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000020_0000027/bucket_00000"}
+    };
+    checkResult(expected3, testQuery, isVectorized, "delete compact minor");
+
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T");
+    String[][] expected4 = new String[][]{
+      {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000031/000000_0"},
+      {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000031/000000_0"}};
+    checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite");
+
+    //load same data again (additive)
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+    runStatementOnDriver("update T set b = 17 where a = 1");//matches 2 rows
+    runStatementOnDriver("delete from T where a = 3");//matches 2 rows
+    runStatementOnDriver("insert into T values(2,2)");
+    String[][] expected5 = new String[][]{
+      {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"},
+      {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"},
+      {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000037_0000037_0000/bucket_00000"}
+    };
+    checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update");
+
+    //test major compaction
+    runStatementOnDriver("alter table T compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    String[][] expected6 = new String[][]{
+      {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000037/bucket_00000"},
+      {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000037/bucket_00000"},
+      {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000037/bucket_00000"}
+    };
+    checkResult(expected6, testQuery, isVectorized, "load data inpath compact major");
+  }
+  private void loadData(boolean isVectorized) throws Exception {
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("drop table if exists Tstage");
+    runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
+    runStatementOnDriver("insert into T values(0,2),(0,4)");
+    //Tstage is just a simple way to generate test data
+    runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+    runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
+    //this creates an ORC data file with correct schema under table root
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'");
+    //"load data local inpath" doesn't delete source files so clean it here
+    runStatementOnDriver("truncate table Tstage");
+    //and do a Load Data into the same table, which should now land in a delta_x_x.
+    // 'data' is created by export command/
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+
+    String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+      "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+    String[][] expected = new String[][] {
+      //normal insert
+      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000016_0000/bucket_00000"},
+      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000016_0000/bucket_00000"},
+      //Load Data
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000021_0000021_0000/000000_0"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000021_0000021_0000/000000_0"}};
+    checkResult(expected, testQuery, isVectorized, "load data inpath");
+
+    //test minor compaction
+    runStatementOnDriver("alter table T compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    String[][] expected1 = new String[][] {
+      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000021/bucket_00000"},
+      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000021/bucket_00000"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000016_0000021/bucket_00000"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000016_0000021/bucket_00000"}
+    };
+    checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)");
+
+    //test major compaction
+    runStatementOnDriver("insert into T values(2,2)");
+    runStatementOnDriver("alter table T compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    String[][] expected2 = new String[][] {
+      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000027/bucket_00000"},
+      {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000027/bucket_00000"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000027/bucket_00000"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000027/bucket_00000"},
+      {"{\"transactionid\":27,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000027/bucket_00000"}
+    };
+    checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
+
+    //create more staging data and test Load Data Overwrite
+    runStatementOnDriver("insert into Tstage values(5,6),(7,8)");
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
+    runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
+    String[][] expected3 = new String[][] {
+      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/000000_0"},
+      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/000000_0"}};
+    checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite");
+
+    //one more major compaction
+    runStatementOnDriver("insert into T values(6,6)");
+    runStatementOnDriver("alter table T compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    String[][] expected4 = new String[][] {
+      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000036/bucket_00000"},
+      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000036/bucket_00000"},
+      {"{\"transactionid\":36,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000036/bucket_00000"}};
+    checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)");
+  }
+  /**
+   * Load Data [overwrite] in to an (un-)partitioned acid converted table
+   */
+  private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Exception {
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("drop table if exists Tstage");
+    runStatementOnDriver("create table T (a int, b int) stored as orc");
+    //per acid write to test nonAcid2acid conversion mixed with load data
+    runStatementOnDriver("insert into T values(0,2),(0,4)");
+    runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+    runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
+    //make 2 more inserts so that we have 000000_0_copy_1, 000000_0_copy_2 files in export
+    //export works at file level so if you have copy_N in the table dir, you'll have those in output
+    runStatementOnDriver("insert into Tstage values(2,2),(3,3)");
+    runStatementOnDriver("insert into Tstage values(4,4),(5,5)");
+    //create a file we'll import later
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'");
+    runStatementOnDriver("truncate table Tstage");//clean the staging table
+
+    //now convert T to acid
+    runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional' = 'true')");
+    //and do a Load Data into the same table, which should now land in a delta/
+    // (with 000000_0, 000000_0_copy_1, 000000_0_copy_2)
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+
+    String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+      "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+
+    String[][] expected = new String[][] {
+      //from pre-acid insert
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"},
+      //from Load Data into acid converted table
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000024_0000024_0000/000000_0"},
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000024_0000024_0000/000000_0"},
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":2}\t2\t2", "t/delta_0000024_0000024_0000/000000_0_copy_1"},
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":3}\t3\t3", "t/delta_0000024_0000024_0000/000000_0_copy_1"},
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":4}\t4\t4", "t/delta_0000024_0000024_0000/000000_0_copy_2"},
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":5}\t5\t5", "t/delta_0000024_0000024_0000/000000_0_copy_2"},
+    };
+    checkResult(expected, testQuery, isVectorized, "load data inpath");
+
+
+    //create more staging data with copy_N files and do LD+Overwrite
+    runStatementOnDriver("insert into Tstage values(5,6),(7,8)");
+    runStatementOnDriver("insert into Tstage values(8,8)");
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
+
+    String[][] expected2 = new String[][] {
+      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000030/000000_0"},
+      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000030/000000_0"},
+      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":2}\t8\t8", "t/base_0000030/000000_0_copy_1"}
+    };
+    checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite");
+
+    //create 1 more delta_x_x so that compactor has > dir file to compact
+    runStatementOnDriver("insert into T values(9,9)");
+    runStatementOnDriver("alter table T compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    String[][] expected3 = new String[][] {
+      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/bucket_00000"},
+      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/bucket_00000"},
+      {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":2}\t8\t8", "t/base_0000033/bucket_00000"},
+      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000033/bucket_00000"}
+
+    };
+    checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)");
+  }
+  /**
+   * Load Data [overwrite] in to a partitioned transactional table
+   */
+  @Test
+  public void loadDataPartitioned() throws Exception {
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("drop table if exists Tstage");
+    runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc tblproperties('transactional'='true')");
+    runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+
+    runStatementOnDriver("insert into Tstage values(0,2),(0,4)");
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'");
+    runStatementOnDriver("truncate table Tstage");//because 'local' inpath doesn't delete source files
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T partition(p=0)");
+
+    runStatementOnDriver("insert into Tstage values(1,2),(1,4)");
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
+    runStatementOnDriver("truncate table Tstage");
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' into table T partition(p=1)");
+
+    runStatementOnDriver("insert into Tstage values(2,2),(2,4)");
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/3'");
+    runStatementOnDriver("truncate table Tstage");
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/3/data' into table T partition(p=1)");
+
+    List<String> rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
+    String[][] expected = new String[][] {
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"},
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"},
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000024_0000024_0000/000000_0"},
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000024_0000024_0000/000000_0"},
+      {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000028_0000028_0000/000000_0"},
+      {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000028_0000028_0000/000000_0"}};
+    checkExpected(rs, expected, "load data inpath partitioned");
+
+
+    runStatementOnDriver("insert into Tstage values(5,2),(5,4)");
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/4'");
+    runStatementOnDriver("truncate table Tstage");
+    runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/4/data' overwrite into table T partition(p=1)");
+    String[][] expected2 = new String[][] {
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"},
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"},
+      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000033/000000_0"},
+      {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000033/000000_0"}};
+    rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
+    checkExpected(rs, expected2, "load data inpath partitioned overwrite");
+  }
+
+  /**
+   * By default you can't load into bucketed tables.  Things will break badly in acid (data loss, etc)
+   * if loaded data is not bucketed properly.  This test is to capture that this is still the default.
+   * If the default is changed, Load Data should probably do more validation to ensure data is
+   * properly distributed into files and files are named correctly.
+   */
+  @Test
+  public void testValidations() throws Exception {
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("drop table if exists Tstage");
+    runStatementOnDriver("create table T (a int, b int) clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')");
+    File createdFile= folder.newFile("myfile.txt");
+    FileUtils.writeStringToFile(createdFile, "hello world");
+    runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+    //this creates an ORC data file with correct schema under table root
+    runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
+    CommandProcessorResponse cpr = runStatementOnDriverNegative("load data local inpath '" + getWarehouseDir() + "' into table T");
+    Assert.assertTrue(cpr.getErrorMessage().contains("Load into bucketed tables are disabled"));
+  }
+  private void checkExpected(List<String> rs, String[][] expected, String msg) {
+    super.checkExpected(rs, expected, msg, LOG, true);
+  }
+  @Test
+  public void testMMOrcTable() throws Exception {
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true', 'transactional_properties'='insert_only')");
+    int[][] values = {{1,2},{3,4}};
+    runStatementOnDriver("insert into T " + makeValuesClause(values));
+    List<String> rs = runStatementOnDriver("select a, b from T order by b");
+    Assert.assertEquals("", stringifyValues(values), rs);
+  }
+
+  /**
+   * Make sure Load Data assigns ROW_IDs correctly when there is statementId suffix on delta dir
+   * For example, delta_x_x_0001.
+   */
+  private void testMultiStatement(boolean isVectorized) throws Exception {
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("drop table if exists Tstage");
+    runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
+    //Tstage is just a simple way to generate test data
+    runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+    runStatementOnDriver("insert into Tstage values(5,5),(6,6)");
+    //this creates an ORC data file with correct schema under table root
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+    //and do a Load Data into the same table, which should now land in a delta_x_x.
+    // 'data' is created by export command/
+    runStatementOnDriver("START TRANSACTION");
+    //statementId = 0
+    runStatementOnDriver("insert into T values(1,2),(3,4)");
+    //statementId = 1
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+    runStatementOnDriver("COMMIT");
+
+    String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+      "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+    String[][] expected = new String[][] {
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000019_0000019_0001/000000_0"},
+      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000019_0000019_0001/000000_0"}
+    };
+    checkResult(expected, testQuery, isVectorized, "load data inpath");
+
+    runStatementOnDriver("alter table T compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    String[][] expected2 = new String[][] {
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000019/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000019/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000019/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000019/bucket_00000"}
+    };
+    checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
+    //at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154
+  }
+  @Test
+  public void testAbort() throws Exception {
+    boolean isVectorized = false;
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("drop table if exists Tstage");
+    runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
+    //Tstage is just a simple way to generate test data
+    runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+    runStatementOnDriver("insert into Tstage values(5,5),(6,6)");
+    //this creates an ORC data file with correct schema under table root
+    runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+    //and do a Load Data into the same table, which should now land in a delta_x_x.
+    // 'data' is created by export command/
+    runStatementOnDriver("insert into T values(1,2),(3,4)");
+    runStatementOnDriver("START TRANSACTION");
+    runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+    runStatementOnDriver("ROLLBACK");
+
+    String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+      "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+    String[][] expected = new String[][] {
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"}
+    };
+    checkResult(expected, testQuery, isVectorized, "load data inpath");
+  }
+  /**
+   * We have to use a different query to check results for Vectorized tests because to get the
+   * file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME}
+   * which will currently make the query non-vectorizable.  This means we can't check the file name
+   * for vectorized version of the test.
+   */
+  private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{
+    List<String> rs = runStatementOnDriver(query);
+    checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized);
+    assertVectorized(isVectorized, query);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index 7f5e091..f5f8cc8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -264,18 +264,6 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
     };
     checkExpected(rs, expected, "Unexpected row count after ctas");
   }
-  private void checkExpected(List<String> rs, String[][] expected, String msg) {
-    LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
-    for(String s : rs) {
-      LOG.warn(s);
-    }
-    Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size());
-    //verify data and layout
-    for(int i = 0; i < expected.length; i++) {
-      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
-      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
-    }
-  }
   /**
    * The idea here is to create a non acid table that was written by multiple writers, i.e.
    * unbucketed table that has 000000_0 & 000001_0, for example.
@@ -363,7 +351,9 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
     Assert.assertEquals(2, BucketCodec.determineVersion(537001984).decodeWriterId(537001984));
     Assert.assertEquals(1, BucketCodec.determineVersion(536936448).decodeWriterId(536936448));
 
+    assertVectorized(true, "update T set b = 88 where b = 80");
     runStatementOnDriver("update T set b = 88 where b = 80");
+    assertVectorized(true, "delete from T where b = 8");
     runStatementOnDriver("delete from T where b = 8");
     String expected3[][] = {
       {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2",  "warehouse/t/000002_0"},
@@ -374,7 +364,7 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
       {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/000000_0_copy_1"},
       {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"},
       {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"},
-      {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000023_0000023_0000/bucket_00000"},
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000024_0000024_0000/bucket_00000"},
     };
     rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME");
     checkExpected(rs, expected3,"after converting to acid (no compaction with updates)");
@@ -386,15 +376,15 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
 
     /*Compaction preserves location of rows wrt buckets/tranches (for now)*/
     String expected4[][] = {
-      {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000024/bucket_00002"},
-      {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000024/bucket_00002"},
-      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/base_0000024/bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/base_0000024/bucket_00001"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/base_0000024/bucket_00000"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/base_0000024/bucket_00000"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/base_0000024/bucket_00000"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/base_0000024/bucket_00000"},
-      {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000024/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000026/bucket_00002"},
+      {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000026/bucket_00002"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/base_0000026/bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/base_0000026/bucket_00001"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/base_0000026/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/base_0000026/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/base_0000026/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/base_0000026/bucket_00000"},
+      {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000026/bucket_00000"},
     };
     checkExpected(rs, expected4,"after major compact");
   }
@@ -635,15 +625,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
     //vectorized because there is INPUT__FILE__NAME
     assertVectorized(false, query);
   }
-  private void assertVectorized(boolean vectorized, String query) throws Exception {
-    List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query);
-    for(String line : rs) {
-      if(line != null && line.contains("Execution mode: vectorized")) {
-        Assert.assertTrue("Was vectorized when it wasn't expected", vectorized);
-        return;
-      }
-    }
-    Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized);
+  private void checkExpected(List<String> rs, String[][] expected, String msg) {
+    super.checkExpected(rs, expected, msg, LOG, true);
   }
   /**
    * HIVE-17900

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 9f31eb1..6a2164f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -74,7 +75,6 @@ public abstract class TxnCommandsBaseForTests {
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir());
-    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
     hiveConf
       .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
@@ -151,6 +151,21 @@ public abstract class TxnCommandsBaseForTests {
     }
     throw new RuntimeException("Didn't get expected failure!");
   }
+
+  /**
+   * Runs Vectorized Explain on the query and checks if the plan is vectorized as expected
+   * @param vectorized {@code true} - assert that it's vectorized
+   */
+  void assertVectorized(boolean vectorized, String query) throws Exception {
+    List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query);
+    for(String line : rs) {
+      if(line != null && line.contains("Execution mode: vectorized")) {
+        Assert.assertTrue("Was vectorized when it wasn't expected", vectorized);
+        return;
+      }
+    }
+    Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized);
+  }
   /**
    * Will assert that actual files match expected.
    * @param expectedFiles - suffixes of expected Paths.  Must be the same length
@@ -176,4 +191,18 @@ public abstract class TxnCommandsBaseForTests {
     }
     Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles);
   }
+  void checkExpected(List<String> rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) {
+    LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      if(checkFileName) {
+        Assert.assertTrue("Actual line(file) " + i + " file: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
index d5ab079..afccf64 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
@@ -142,7 +142,7 @@ public class TestExecDriver extends TestCase {
         db.createTable(src, cols, null, TextInputFormat.class,
             HiveIgnoreKeyTextOutputFormat.class);
         db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING,
-           true, false, false, false, null, 0, false);
+           true, false, false, false, null, 0);
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index ccd7d8e..5d26524 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -836,16 +836,22 @@ public class TestInputOutputFormat {
   public void testEtlCombinedStrategy() throws Exception {
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL");
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS.varname, "1000000");
+    AcidUtils.setTransactionalTableScan(conf, true);
+    conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+    conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default");
+
     OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
     MockFileSystem fs = new MockFileSystem(conf,
         new MockFile("mock:/a/1/part-00", 1000, new byte[1]),
         new MockFile("mock:/a/1/part-01", 1000, new byte[1]),
         new MockFile("mock:/a/2/part-00", 1000, new byte[1]),
         new MockFile("mock:/a/2/part-01", 1000, new byte[1]),
-        new MockFile("mock:/a/3/base_0/1", 1000, new byte[1]),
-        new MockFile("mock:/a/4/base_0/1", 1000, new byte[1]),
-        new MockFile("mock:/a/5/base_0/1", 1000, new byte[1]),
-        new MockFile("mock:/a/5/delta_0_25/1", 1000, new byte[1])
+        new MockFile("mock:/a/3/base_0/bucket_00001", 1000, new byte[1]),
+        new MockFile("mock:/a/4/base_0/bucket_00001", 1000, new byte[1]),
+        new MockFile("mock:/a/5/base_0/bucket_00001", 1000, new byte[1]),
+        new MockFile("mock:/a/5/delta_0_25/bucket_00001", 1000, new byte[1]),
+        new MockFile("mock:/a/6/delta_27_29/bucket_00001", 1000, new byte[1]),
+        new MockFile("mock:/a/6/delete_delta_27_29/bucket_00001", 1000, new byte[1])
     );
 
     OrcInputFormat.CombinedCtx combineCtx = new OrcInputFormat.CombinedCtx();
@@ -891,20 +897,27 @@ public class TestInputOutputFormat {
     assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
     assertEquals(2, etlSs.files.size());
     assertEquals(2, etlSs.dirs.size());
-    // The fifth will not be combined because of delta files.
+    // The fifth could be combined again.
     ss = createOrCombineStrategies(context, fs, "mock:/a/5", combineCtx);
+    assertTrue(ss.isEmpty());
+    assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
+    assertEquals(4, etlSs.files.size());
+    assertEquals(3, etlSs.dirs.size());
+
+    // The sixth will not be combined because of delete delta files.  Is that desired? HIVE-18110
+    ss = createOrCombineStrategies(context, fs, "mock:/a/6", combineCtx);
     assertEquals(1, ss.size());
     assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
     assertNotSame(etlSs, ss);
-    assertEquals(2, etlSs.files.size());
-    assertEquals(2, etlSs.dirs.size());
+    assertEquals(4, etlSs.files.size());
+    assertEquals(3, etlSs.dirs.size());
   }
 
   public List<SplitStrategy<?>> createOrCombineStrategies(OrcInputFormat.Context context,
       MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException {
     OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
     return OrcInputFormat.determineSplitStrategies(combineCtx, context,
-        adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
+        adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents,
         null, null, true);
   }
 
@@ -918,7 +931,7 @@ public class TestInputOutputFormat {
       OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException {
     OrcInputFormat.AcidDirInfo adi = gen.call();
     return OrcInputFormat.determineSplitStrategies(
-        null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
+        null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents,
         null, null, true);
   }
 
@@ -3586,10 +3599,14 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: open to read data - split 1 => mock:/mocktable8/0_0
-    // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
-    // call-3: split 2 - read delta_x_y/bucket_00001
-    assertEquals(5, readOpsDelta);
+    // call-1: open(mock:/mocktable7/0_0)
+    // call-2: open(mock:/mocktable7/0_0)
+    // call-3: listLocatedFileStatuses(mock:/mocktable7)
+    // call-4: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid)
+    // call-5: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001)
+    // call-6: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid)
+    // call-7: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001)
+    assertEquals(7, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3662,9 +3679,11 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: open to read data - split 1 => mock:/mocktable8/0_0
-    // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
-    // call-3: split 2 - read delta_x_y/bucket_00001
-    assertEquals(3, readOpsDelta);
+    // call-2: listLocatedFileStatus(mock:/mocktable8)
+    // call-3: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid)
+    // call-4: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid)
+    // call-5: open(mock:/mocktable8/delta_0000001_0000001_0000/bucket_00001)
+    assertEquals(5, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 9628a40..030f012 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -298,7 +298,7 @@ public class TestOrcRawRecordMerger {
     int BUCKET = 10;
     ReaderKey key = new ReaderKey();
     Configuration conf = new Configuration();
-    int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
+    int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET, 0);
     Reader reader = createMockOriginalReader();
     RecordIdentifier minKey = new RecordIdentifier(0, bucketProperty, 1);
     RecordIdentifier maxKey = new RecordIdentifier(0, bucketProperty, 3);
@@ -308,7 +308,7 @@ public class TestOrcRawRecordMerger {
     fs.makeQualified(root);
     fs.create(root);
     ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, minKey, maxKey,
-        new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
+        new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList(), 0);
     RecordReader recordReader = pair.getRecordReader();
     assertEquals(0, key.getTransactionId());
     assertEquals(bucketProperty, key.getBucketProperty());
@@ -338,13 +338,13 @@ public class TestOrcRawRecordMerger {
     ReaderKey key = new ReaderKey();
     Reader reader = createMockOriginalReader();
     Configuration conf = new Configuration();
-    int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
+    int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET, 0);
     FileSystem fs = FileSystem.getLocal(conf);
     Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
     fs.makeQualified(root);
     fs.create(root);
     ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, null, null,
-        new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
+        new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList(), 0);
     assertEquals("first", value(pair.nextRecord()));
     assertEquals(0, key.getTransactionId());
     assertEquals(bucketProperty, key.getBucketProperty());
@@ -835,6 +835,8 @@ public class TestOrcRawRecordMerger {
     assertEquals(null, merger.getMaxKey());
 
     assertEquals(true, merger.next(id, event));
+    //minor comp, so we ignore 'base_0000100' files so all Deletes end up first since
+    // they all modify primordial rows
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
       OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
@@ -891,10 +893,10 @@ public class TestOrcRawRecordMerger {
     baseReader = OrcFile.createReader(basePath,
       OrcFile.readerOptions(conf));
     merger =
-      new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+      new OrcRawRecordMerger(conf, true, null, false, BUCKET,
         createMaximalTxnList(), new Reader.Options(),
         AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options()
-        .isCompacting(true).isMajorCompaction(true));
+        .isCompacting(true).isMajorCompaction(true).baseDir(new Path(root, "base_0000100")));
     assertEquals(null, merger.getMinKey());
     assertEquals(null, merger.getMaxKey());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index b2ac687..95e3463 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
+
 /**
  * This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set
  * of delete delta files. The split is on an insert delta and there are multiple delete deltas
@@ -186,7 +186,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
     OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null);
     OrcInputFormat.AcidDirInfo adi = gen.call();
     List<OrcInputFormat.SplitStrategy<?>> splitStrategies = OrcInputFormat.determineSplitStrategies(
-        null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
+        null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents,
         null, null, true);
     assertEquals(1, splitStrategies.size());
     List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/queries/clientnegative/load_data_into_acid.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/load_data_into_acid.q b/ql/src/test/queries/clientnegative/load_data_into_acid.q
index fba1496..2ac5b56 100644
--- a/ql/src/test/queries/clientnegative/load_data_into_acid.q
+++ b/ql/src/test/queries/clientnegative/load_data_into_acid.q
@@ -1,7 +1,5 @@
-set hive.strict.checks.bucketing=false;
 set hive.support.concurrency=true;
 set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
-set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
 
 create table acid_ivot(
     ctinyint TINYINT,
@@ -15,7 +13,7 @@ create table acid_ivot(
     ctimestamp1 TIMESTAMP,
     ctimestamp2 TIMESTAMP,
     cboolean1 BOOLEAN,
-    cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true');
+    cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true');
 
 LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientnegative/load_data_into_acid.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/load_data_into_acid.q.out b/ql/src/test/results/clientnegative/load_data_into_acid.q.out
index cd829ba..46b5cdd 100644
--- a/ql/src/test/results/clientnegative/load_data_into_acid.q.out
+++ b/ql/src/test/results/clientnegative/load_data_into_acid.q.out
@@ -10,7 +10,7 @@ PREHOOK: query: create table acid_ivot(
     ctimestamp1 TIMESTAMP,
     ctimestamp2 TIMESTAMP,
     cboolean1 BOOLEAN,
-    cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+    cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true')
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
 PREHOOK: Output: default@acid_ivot
@@ -26,8 +26,8 @@ POSTHOOK: query: create table acid_ivot(
     ctimestamp1 TIMESTAMP,
     ctimestamp2 TIMESTAMP,
     cboolean1 BOOLEAN,
-    cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+    cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true')
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@acid_ivot
-FAILED: SemanticException [Error 10266]: LOAD DATA... statement is not supported on transactional table default@acid_ivot.
+FAILED: SemanticException [Error 30023]: alltypesorc file name is not valid in Load Data into Acid table default.acid_ivot.  Examples of valid names are: 00000_0, 00000_0_copy_1

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientpositive/acid_table_stats.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/acid_table_stats.q.out b/ql/src/test/results/clientpositive/acid_table_stats.q.out
index d0fbcac..4c8297e 100644
--- a/ql/src/test/results/clientpositive/acid_table_stats.q.out
+++ b/ql/src/test/results/clientpositive/acid_table_stats.q.out
@@ -38,6 +38,7 @@ Table Parameters:
 	rawDataSize         	0                   
 	totalSize           	0                   
 	transactional       	true                
+	transactional_properties	default             
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
index 2bc1789..b3df04f 100644
--- a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
+++ b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
@@ -29,6 +29,7 @@ Table Parameters:
 	rawDataSize         	0                   
 	totalSize           	0                   
 	transactional       	true                
+	transactional_properties	default             
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 
@@ -198,6 +199,7 @@ Table Parameters:
 	rawDataSize         	0                   
 	totalSize           	1798                
 	transactional       	true                
+	transactional_properties	default             
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 
@@ -241,6 +243,7 @@ Table Parameters:
 	rawDataSize         	0                   
 	totalSize           	2909                
 	transactional       	true                
+	transactional_properties	default             
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientpositive/mm_default.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/mm_default.q.out b/ql/src/test/results/clientpositive/mm_default.q.out
index ebbcb9d..1345efd 100644
--- a/ql/src/test/results/clientpositive/mm_default.q.out
+++ b/ql/src/test/results/clientpositive/mm_default.q.out
@@ -324,6 +324,7 @@ Table Parameters:
 	rawDataSize         	0                   
 	totalSize           	0                   
 	transactional       	true                
+	transactional_properties	default             
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 25caf29..da10313 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -128,7 +128,12 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
       parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue);
     }
     if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) {
-      //only need to check conformance if alter table enabled aicd
+      if(!isTransactionalPropertiesPresent) {
+        normazlieTransactionalPropertyDefault(newTable);
+        isTransactionalPropertiesPresent = true;
+        transactionalPropertiesValue = DEFAULT_TRANSACTIONAL_PROPERTY;
+      }
+      //only need to check conformance if alter table enabled acid
       if (!conformToAcid(newTable)) {
         // INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing
         if (transactionalPropertiesValue == null || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue)) {
@@ -232,6 +237,9 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
 
       // normalize prop name
       parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString());
+      if(transactionalProperties == null) {
+        normazlieTransactionalPropertyDefault(newTable);
+      }
       initializeTransactionalProperties(newTable);
       return;
     }
@@ -241,6 +249,16 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
   }
 
   /**
+   * When a table is marked transactional=true but transactional_properties is not set then
+   * transactional_properties should take on the default value.  Easier to make this explicit in
+   * table definition than keep checking everywhere if it's set or not.
+   */
+  private void normazlieTransactionalPropertyDefault(Table table) {
+    table.getParameters().put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
+        DEFAULT_TRANSACTIONAL_PROPERTY);
+
+  }
+  /**
    * Check that InputFormatClass/OutputFormatClass should implement
    * AcidInputFormat/AcidOutputFormat
    */


[3/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional tables (Eugene Koifman, reviewed by Alan Gates)

Posted by ek...@apache.org.
HIVE-17361 Support LOAD DATA for transactional tables (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/508d7e6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/508d7e6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/508d7e6f

Branch: refs/heads/master
Commit: 508d7e6f269398a47147a697aecdbe546425679b
Parents: c03001e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Nov 30 18:39:42 2017 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Nov 30 18:39:42 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +-
 .../hadoop/hive/ql/history/TestHiveHistory.java |   2 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |  33 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   3 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   2 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   8 +-
 .../repl/bootstrap/load/table/LoadTable.java    |   1 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 267 ++++++++---
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   3 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 128 +++--
 .../hive/ql/io/orc/OrcRawRecordMerger.java      | 330 +++++++++++--
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |   4 +-
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |   3 +
 .../io/orc/VectorizedOrcAcidRowBatchReader.java |  75 +--
 .../ql/io/orc/VectorizedOrcInputFormat.java     |   2 -
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |  32 ++
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  78 +++-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |   6 +-
 .../hive/ql/parse/LoadSemanticAnalyzer.java     |  22 +-
 .../hadoop/hive/ql/plan/LoadTableDesc.java      |  25 +-
 .../hive/ql/txn/compactor/CompactorMR.java      |  18 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  13 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   7 +-
 .../apache/hadoop/hive/ql/TestTxnLoadData.java  | 467 +++++++++++++++++++
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java |  45 +-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java |  31 +-
 .../hadoop/hive/ql/exec/TestExecDriver.java     |   2 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  51 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |  14 +-
 .../TestVectorizedOrcAcidRowBatchReader.java    |   4 +-
 .../clientnegative/load_data_into_acid.q        |   4 +-
 .../clientnegative/load_data_into_acid.q.out    |   6 +-
 .../clientpositive/acid_table_stats.q.out       |   1 +
 .../clientpositive/autoColumnStats_4.q.out      |   3 +
 .../results/clientpositive/mm_default.q.out     |   1 +
 .../TransactionalValidationListener.java        |  20 +-
 36 files changed, 1392 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ada2318..3be5a8d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1912,7 +1912,7 @@ public class HiveConf extends Configuration {
       "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" +
       "4: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" +
       "This is intended to be used as an internal property for future versions of ACID. (See\n" +
-        "HIVE-14035 for details.)"),
+        "HIVE-14035 for details.  User sets it tblproperites via transactional_properties.)", true),
 
     HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
         "current open transactions reach this limit, future open transaction requests will be \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
index 2f0efce..d73cd64 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
@@ -106,7 +106,7 @@ public class TestHiveHistory extends TestCase {
         db.createTable(src, cols, null, TextInputFormat.class,
             IgnoreKeyTextOutputFormat.class);
         db.loadTable(hadoopDataFile[i], src,
-          LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false);
+          LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0);
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 7103fb9..a1cd9eb 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -114,8 +115,6 @@ public class TestCompactor {
     hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
     hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
-    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
-    //"org.apache.hadoop.hive.ql.io.HiveInputFormat"
 
     TxnDbUtil.setConfValues(hiveConf);
     TxnDbUtil.cleanDb(hiveConf);
@@ -669,7 +668,7 @@ public class TestCompactor {
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L);
+      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L, 1);
 
     } finally {
       connection.close();
@@ -697,7 +696,8 @@ public class TestCompactor {
         writeBatch(connection, writer, false);
       }
 
-      // Start a third batch, but don't close it.
+      // Start a third batch, but don't close it.  this delta will be ignored by compaction since
+      // it has an open txn in it
       writeBatch(connection, writer, true);
 
       // Now, compact
@@ -722,7 +722,7 @@ public class TestCompactor {
       }
       String name = stat[0].getPath().getName();
       Assert.assertEquals(name, "base_0000006");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
     } finally {
       connection.close();
     }
@@ -788,7 +788,7 @@ public class TestCompactor {
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
     } finally {
       connection.close();
     }
@@ -850,7 +850,7 @@ public class TestCompactor {
       if (!name.equals("base_0000006")) {
         Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
       }
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
     } finally {
       connection.close();
     }
@@ -903,7 +903,7 @@ public class TestCompactor {
       }
       String name = stat[0].getPath().getName();
       Assert.assertEquals(name, "base_0000006");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 2);
     } finally {
       connection.close();
     }
@@ -966,7 +966,7 @@ public class TestCompactor {
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
 
     // Verify that we have got correct set of delete_deltas.
     FileStatus[] deleteDeltaStat =
@@ -984,7 +984,7 @@ public class TestCompactor {
     if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L, 1);
   }
 
   @Test
@@ -1043,7 +1043,7 @@ public class TestCompactor {
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
 
     // Verify that we have got correct set of delete_deltas.
     FileStatus[] deleteDeltaStat =
@@ -1062,7 +1062,7 @@ public class TestCompactor {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
     // There should be no rows in the delete_delta because there have been no delete events.
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1);
   }
 
   @Test
@@ -1121,7 +1121,7 @@ public class TestCompactor {
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L);
+      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L, 1);
 
       // Verify that we have got correct set of delete_deltas also
       FileStatus[] deleteDeltaStat =
@@ -1140,7 +1140,7 @@ public class TestCompactor {
         Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
       }
       // There should be no rows in the delete_delta because there have been no delete events.
-      checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+      checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1);
 
     } finally {
       connection.close();
@@ -1295,7 +1295,7 @@ public class TestCompactor {
   }
 
   private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty,
-      String columnTypesProperty, int bucket, long min, long max)
+      String columnTypesProperty, int bucket, long min, long max, int numBuckets)
       throws IOException {
     ValidTxnList txnList = new ValidTxnList() {
       @Override
@@ -1351,9 +1351,10 @@ public class TestCompactor {
     Configuration conf = new Configuration();
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty);
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty);
+    conf.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(numBuckets));
     HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
     AcidInputFormat.RawReader<OrcStruct> reader =
-        aif.getRawReader(conf, false, bucket, txnList, base, deltas);
+        aif.getRawReader(conf, true, bucket, txnList, base, deltas);
     RecordIdentifier identifier = reader.createKey();
     OrcStruct value = reader.createValue();
     long currentTxn = min;

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 186d580..2f7284f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -376,7 +376,6 @@ public enum ErrorMsg {
   DBTXNMGR_REQUIRES_CONCURRENCY(10264,
       "To use DbTxnManager you must set hive.support.concurrency=true"),
   TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true),
-  LOAD_DATA_ON_ACID_TABLE(10266, "LOAD DATA... statement is not supported on transactional table {0}.", true),
   LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " +
       "may have timed out", true),
   LOCK_REQUEST_UNSUPPORTED(10271, "Current transaction manager does not " +
@@ -550,6 +549,8 @@ public enum ErrorMsg {
   ACID_TABLES_MUST_BE_READ_WITH_ACID_READER(30021, "An ORC ACID reader required to read ACID tables"),
   ACID_TABLES_MUST_BE_READ_WITH_HIVEINPUTFORMAT(30022, "Must use HiveInputFormat to read ACID tables " +
           "(set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat)"),
+  ACID_LOAD_DATA_INVALID_FILE_NAME(30023, "{0} file name is not valid in Load Data into Acid " +
+    "table {1}.  Examples of valid names are: 00000_0, 00000_0_copy_1", true),
 
   CONCATENATE_UNSUPPORTED_FILE_FORMAT(30030, "Concatenate/Merge only supported for RCFile and ORCFile formats"),
   CONCATENATE_UNSUPPORTED_TABLE_BUCKETED(30031, "Concatenate/Merge can not be performed on bucketed tables"),

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 4076a9f..9184844 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4488,7 +4488,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       part.getTPartition().getParameters().putAll(alterTbl.getProps());
     } else {
       boolean isFromMmTable = AcidUtils.isInsertOnlyTable(tbl.getParameters());
-      Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(alterTbl.getProps());
+      Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps());
       if (isToMmTable != null) {
         if (!isFromMmTable && isToMmTable) {
           result = generateAddMmTasks(tbl);

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e2f8c1f..6d13773 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -215,7 +215,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
 
     Context ctx = driverContext.getCtx();
     if(ctx.getHiveTxnManager().supportsAcid()) {
-      //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit
+      //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes logic more explicit
       return;
     }
     HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager();
@@ -290,7 +290,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         } else {
           Utilities.FILE_OP_LOGGER.debug("MoveTask moving " + sourcePath + " to " + targetPath);
           if(lfd.getWriteType() == AcidUtils.Operation.INSERT) {
-            //'targetPath' is table root of un-partitioned table/partition
+            //'targetPath' is table root of un-partitioned table or partition
             //'sourcePath' result of 'select ...' part of CTAS statement
             assert lfd.getIsDfsDir();
             FileSystem srcFs = sourcePath.getFileSystem(conf);
@@ -367,7 +367,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         checkFileFormats(db, tbd, table);
 
         boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID
-            && !tbd.isMmTable();
+            && !tbd.isMmTable(); //it seems that LoadTableDesc has Operation.INSERT only for CTAS...
 
         // Create a data container
         DataContainer dc = null;
@@ -379,7 +379,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
           }
           db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
               work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),
-              tbd.getTxnId(), tbd.getStmtId(), tbd.isMmTable());
+              tbd.getTxnId(), tbd.getStmtId());
           if (work.getOutputs() != null) {
             DDLTask.addIfAbsentByName(new WriteEntity(table,
               getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index bb1f4e5..545b7a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -229,6 +229,7 @@ public class LoadTable {
     LoadTableDesc loadTableWork = new LoadTableDesc(
         tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
         replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
+        //todo: what is the point of this?  If this is for replication, who would have opened a txn?
         SessionState.get().getTxnMgr().getCurrentTxnId()
     );
     MoveWork moveWork =

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 4c0b71f..9ab028d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -30,6 +30,8 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,18 +41,17 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.common.util.Ref;
 import org.apache.orc.impl.OrcAcidUtils;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,13 +123,14 @@ public class AcidUtils {
   public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
   public static final Pattern   LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
   /**
-   * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read
-   * a "delta" dir written by a real Acid write - cannot have any copies
+   * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
+   * (Unless via Load Data statment)
    */
   public static final PathFilter originalBucketFilter = new PathFilter() {
     @Override
     public boolean accept(Path path) {
-      return ORIGINAL_PATTERN.matcher(path.getName()).matches();
+      return ORIGINAL_PATTERN.matcher(path.getName()).matches() ||
+        ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches();
     }
   };
 
@@ -137,6 +139,7 @@ public class AcidUtils {
   }
   private static final Logger LOG = LoggerFactory.getLogger(AcidUtils.class);
 
+  public static final Pattern BUCKET_PATTERN = Pattern.compile(BUCKET_PREFIX + "_[0-9]{5}$");
   public static final Pattern ORIGINAL_PATTERN =
       Pattern.compile("[0-9]+_[0-9]+");
   /**
@@ -156,14 +159,30 @@ public class AcidUtils {
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   /**
-   * Create the bucket filename.
+   * Create the bucket filename in Acid format
    * @param subdir the subdirectory for the bucket.
    * @param bucket the bucket number
    * @return the filename
    */
   public static Path createBucketFile(Path subdir, int bucket) {
-    return new Path(subdir,
+    return createBucketFile(subdir, bucket, true);
+  }
+
+  /**
+   * Create acid or original bucket name
+   * @param subdir the subdirectory for the bucket.
+   * @param bucket the bucket number
+   * @return the filename
+   */
+  private static Path createBucketFile(Path subdir, int bucket, boolean isAcidSchema) {
+    if(isAcidSchema) {
+      return new Path(subdir,
         BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
+    }
+    else {
+      return new Path(subdir,
+        String.format(BUCKET_DIGITS, bucket));
+    }
   }
 
   /**
@@ -244,7 +263,7 @@ public class AcidUtils {
    * @param path the base directory name
    * @return the maximum transaction id that is included
    */
-  static long parseBase(Path path) {
+  public static long parseBase(Path path) {
     String filename = path.getName();
     if (filename.startsWith(BASE_PREFIX)) {
       return Long.parseLong(filename.substring(BASE_PREFIX.length()));
@@ -262,7 +281,7 @@ public class AcidUtils {
    */
   public static AcidOutputFormat.Options
                     parseBaseOrDeltaBucketFilename(Path bucketFile,
-                                                   Configuration conf) {
+                                                   Configuration conf) throws IOException {
     AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
     String filename = bucketFile.getName();
     if (ORIGINAL_PATTERN.matcher(filename).matches()) {
@@ -273,7 +292,7 @@ public class AcidUtils {
           .minimumTransactionId(0)
           .maximumTransactionId(0)
           .bucket(bucket)
-          .writingBase(true);
+          .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
     }
     else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
       //todo: define groups in regex and use parseInt(Matcher.group(2))....
@@ -286,7 +305,7 @@ public class AcidUtils {
         .maximumTransactionId(0)
         .bucket(bucket)
         .copyNumber(copyNumber)
-        .writingBase(true);
+        .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
     }
     else if (filename.startsWith(BUCKET_PREFIX)) {
       int bucket =
@@ -299,14 +318,16 @@ public class AcidUtils {
             .bucket(bucket)
             .writingBase(true);
       } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
-        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX);
+        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX,
+          bucketFile.getFileSystem(conf));
         result
             .setOldStyle(false)
             .minimumTransactionId(parsedDelta.minTransaction)
             .maximumTransactionId(parsedDelta.maxTransaction)
             .bucket(bucket);
       } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
-        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX);
+        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX,
+          bucketFile.getFileSystem(conf));
         result
             .setOldStyle(false)
             .minimumTransactionId(parsedDelta.minTransaction)
@@ -344,11 +365,17 @@ public class AcidUtils {
         throw new IllegalArgumentException("Unexpected Operation: " + op);
     }
   }
-
   public enum AcidBaseFileType {
-    COMPACTED_BASE, // a regular base file generated through major compaction
-    ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid
-    INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update
+  /**
+   * File w/o Acid meta columns.  This this would be the case for files that were added to the table
+   * before it was converted to Acid but not yet major compacted.  May also be the the result of
+   * Load Data statement on an acid table.
+   */
+  ORIGINAL_BASE,
+  /**
+   * File that has Acid metadata columns embedded in it.  Found in base_x/ or delta_x_y/.
+   */
+  ACID_SCHEMA,
   }
 
   /**
@@ -366,16 +393,12 @@ public class AcidUtils {
       this.acidBaseFileType = acidBaseFileType;
     }
 
-    public boolean isCompactedBase() {
-      return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE;
-    }
-
     public boolean isOriginal() {
       return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE;
     }
 
-    public boolean isInsertDelta() {
-      return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA;
+    public boolean isAcidSchema() {
+      return this.acidBaseFileType == AcidBaseFileType.ACID_SCHEMA;
     }
 
     public HdfsFileStatusWithId getHdfsFileStatusWithId() {
@@ -545,6 +568,7 @@ public class AcidUtils {
      * @return the base directory to read
      */
     Path getBaseDirectory();
+    boolean isBaseInRawFormat();
 
     /**
      * Get the list of original files.  Not {@code null}.  Must be sorted.
@@ -576,7 +600,10 @@ public class AcidUtils {
     List<FileStatus> getAbortedDirectories();
   }
 
-  public static class ParsedDelta implements Comparable<ParsedDelta> {
+  /**
+   * Immutable
+   */
+  public static final class ParsedDelta implements Comparable<ParsedDelta> {
     private final long minTransaction;
     private final long maxTransaction;
     private final FileStatus path;
@@ -584,19 +611,24 @@ public class AcidUtils {
     //had no statement ID
     private final int statementId;
     private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
+    private final boolean isRawFormat;
 
     /**
      * for pre 1.3.x delta files
      */
-    ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) {
-      this(min, max, path, -1, isDeleteDelta);
+    private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta,
+        boolean isRawFormat) {
+      this(min, max, path, -1, isDeleteDelta, isRawFormat);
     }
-    ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta) {
+    private ParsedDelta(long min, long max, FileStatus path, int statementId,
+        boolean isDeleteDelta, boolean isRawFormat) {
       this.minTransaction = min;
       this.maxTransaction = max;
       this.path = path;
       this.statementId = statementId;
       this.isDeleteDelta = isDeleteDelta;
+      this.isRawFormat = isRawFormat;
+      assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format";
     }
 
     public long getMinTransaction() {
@@ -618,7 +650,12 @@ public class AcidUtils {
     public boolean isDeleteDelta() {
       return isDeleteDelta;
     }
-
+    /**
+     * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE}
+     */
+    public boolean isRawFormat() {
+      return isRawFormat;
+    }
     /**
      * Compactions (Major/Minor) merge deltas/bases but delete of old files
      * happens in a different process; thus it's possible to have bases/deltas with
@@ -698,29 +735,6 @@ public class AcidUtils {
   }
 
   /**
-   * Convert the list of begin/end transaction id pairs to a list of delta
-   * directories.  Note that there may be multiple delta files for the exact same txn range starting
-   * with 1.3.x;
-   * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
-   * @param root the root directory
-   * @param deltas list of begin/end transaction id pairs
-   * @return the list of delta paths
-   */
-  public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException {
-    List<Path> results = new ArrayList<Path>(deltas.size());
-    for(AcidInputFormat.DeltaMetaData dmd : deltas) {
-      if(dmd.getStmtIds().isEmpty()) {
-        results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
-        continue;
-      }
-      for(Integer stmtId : dmd.getStmtIds()) {
-        results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
-      }
-    }
-    return results.toArray(new Path[results.size()]);
-  }
-
-  /**
    * Convert the list of begin/end transaction id pairs to a list of delete delta
    * directories.  Note that there may be multiple delete_delta files for the exact same txn range starting
    * with 2.2.x;
@@ -743,25 +757,29 @@ public class AcidUtils {
     return results.toArray(new Path[results.size()]);
   }
 
-  public static ParsedDelta parsedDelta(Path deltaDir) {
+  public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException {
     String deltaDirName = deltaDir.getName();
     if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
-      return parsedDelta(deltaDir, DELETE_DELTA_PREFIX);
+      return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs);
     }
-    return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix
+    return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix
   }
 
-  private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) {
-    ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix);
+  private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix, FileSystem fs)
+    throws IOException {
+    ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs);
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
     return new ParsedDelta(p.getMinTransaction(),
-        p.getMaxTransaction(), path, p.statementId, isDeleteDelta);
+        p.getMaxTransaction(), path, p.statementId, isDeleteDelta, p.isRawFormat());
   }
 
-  public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) {
+  public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs)
+    throws IOException {
     String filename = deltaDir.getName();
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
     if (filename.startsWith(deltaPrefix)) {
+      //small optimization - delete delta can't be in raw format
+      boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs);
       String rest = filename.substring(deltaPrefix.length());
       int split = rest.indexOf('_');
       int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
@@ -770,10 +788,10 @@ public class AcidUtils {
         Long.parseLong(rest.substring(split + 1)) :
         Long.parseLong(rest.substring(split + 1, split2));
       if(split2 == -1) {
-        return new ParsedDelta(min, max, null, isDeleteDelta);
+        return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat);
       }
       int statementId = Integer.parseInt(rest.substring(split2 + 1));
-      return new ParsedDelta(min, max, null, statementId, isDeleteDelta);
+      return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isRawFormat);
     }
     throw new IllegalArgumentException(deltaDir + " does not start with " +
                                        deltaPrefix);
@@ -871,13 +889,13 @@ public class AcidUtils {
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
         getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original,
-            obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
+            obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
       }
     } else {
       List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
       for (FileStatus child : children) {
         getChildState(child, null, txnList, working, originalDirectories, original, obsolete,
-            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
+            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
       }
     }
 
@@ -976,12 +994,18 @@ public class AcidUtils {
       //this does "Path.uri.compareTo(that.uri)"
       return o1.getFileStatus().compareTo(o2.getFileStatus());
     });
-    return new Directory(){
+
+    final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs);
+    return new Directory() {
 
       @Override
       public Path getBaseDirectory() {
         return base;
       }
+      @Override
+      public boolean isBaseInRawFormat() {
+        return isBaseInRawFormat;
+      }
 
       @Override
       public List<HdfsFileStatusWithId> getOriginalFiles() {
@@ -1022,7 +1046,8 @@ public class AcidUtils {
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
       ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
       List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
-      boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties) throws IOException {
+      boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties,
+      FileSystem fs) throws IOException {
     Path p = child.getPath();
     String fn = p.getName();
     if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
@@ -1050,7 +1075,7 @@ public class AcidUtils {
                     && child.isDir()) {
       String deltaPrefix =
               (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
-      ParsedDelta delta = parseDelta(child, deltaPrefix);
+      ParsedDelta delta = parseDelta(child, deltaPrefix, fs);
       if (tblproperties != null && AcidUtils.isInsertOnlyTable(tblproperties) &&
           ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) {
         aborted.add(child);
@@ -1171,8 +1196,11 @@ public class AcidUtils {
     parameters.put(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, Boolean.toString(isAcidTable));
   }
 
-  public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) {
-    HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable);
+  /**
+   * Means it's a full acid table
+   */
+  public static void setTransactionalTableScan(Configuration conf, boolean isFullAcidTable) {
+    HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isFullAcidTable);
   }
   /**
    * @param p - not null
@@ -1185,6 +1213,8 @@ public class AcidUtils {
    * SessionState.get().getTxnMgr().supportsAcid() here
    * @param table table
    * @return true if table is a legit ACID table, false otherwise
+   * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and covers
+   * both Acid and MM tables. HIVE-18124
    */
   public static boolean isAcidTable(Table table) {
     if (table == null) {
@@ -1197,6 +1227,10 @@ public class AcidUtils {
 
     return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
   }
+  /**
+   * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and convers
+   * both Acid and MM tables. HIVE-18124
+   */
   public static boolean isAcidTable(CreateTableDesc table) {
     if (table == null || table.getTblProps() == null) {
       return false;
@@ -1208,8 +1242,11 @@ public class AcidUtils {
     return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
   }
 
+  /**
+   * after isTransactionalTable() then make this isAcid() HIVE-18124
+   */
   public static boolean isFullAcidTable(Table table) {
-    return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table.getParameters());
+    return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table);
   }
 
   /**
@@ -1336,6 +1373,9 @@ public class AcidUtils {
   public static boolean isInsertOnlyTable(Map<String, String> params) {
     return isInsertOnlyTable(params, false);
   }
+  public static boolean isInsertOnlyTable(Table table) {
+    return isAcidTable(table) && getAcidOperationalProperties(table).isInsertOnly();
+  }
 
   // TODO [MM gap]: CTAS may currently be broken. It used to work. See the old code, and why isCtas isn't used?
   public static boolean isInsertOnlyTable(Map<String, String> params, boolean isCtas) {
@@ -1349,13 +1389,21 @@ public class AcidUtils {
     return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp));
   }
 
-   /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. */
-  public static Boolean isToInsertOnlyTable(Map<String, String> props) {
+   /**
+    * The method for altering table props; may set the table to MM, non-MM, or not affect MM.
+    * todo: All such validation logic should be TransactionValidationListener
+    * @param tbl object image before alter table command
+    * @param props prop values set in this alter table command
+    */
+  public static Boolean isToInsertOnlyTable(Table tbl, Map<String, String> props) {
     // Note: Setting these separately is a very hairy issue in certain combinations, since we
     //       cannot decide what type of table this becomes without taking both into account, and
     //       in many cases the conversion might be illegal.
     //       The only thing we allow is tx = true w/o tx-props, for backward compat.
     String transactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    if(transactional == null) {
+      transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    }
     String transactionalProp = props.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     if (transactional == null && transactionalProp == null) return null; // Not affected.
     boolean isSetToTxn = "true".equalsIgnoreCase(transactional);
@@ -1378,4 +1426,81 @@ public class AcidUtils {
         hasProps = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     return hasTxn || hasProps;
   }
+
+  /**
+   * Load Data commands against Acid tables write {@link AcidBaseFileType#ORIGINAL_BASE} type files
+   * into delta_x_x/ (or base_x in case there is Overwrite clause).  {@link MetaDataFile} is a
+   * small JSON file in this directory that indicates that these files don't have Acid metadata
+   * columns and so the values for these columns need to be assigned at read time/compaction.
+   */
+  public static class MetaDataFile {
+    //export command uses _metadata....
+    private static final String METADATA_FILE = "_metadata_acid";
+    private static final String CURRENT_VERSION = "0";
+    //todo: enums? that have both field name and value list
+    private interface Field {
+      String VERSION = "thisFileVersion";
+      String DATA_FORMAT = "dataFormat";
+    }
+    private interface Value {
+      //plain ORC file
+      String RAW = "raw";
+      //result of acid write, i.e. decorated with ROW__ID info
+      String NATIVE = "native";
+    }
+
+    /**
+     * @param baseOrDeltaDir detla or base dir, must exist
+     */
+    public static void createMetaFile(Path baseOrDeltaDir, FileSystem fs, boolean isRawFormat)
+      throws IOException {
+      /**
+       * create _meta_data json file in baseOrDeltaDir
+       * write thisFileVersion, dataFormat
+       *
+       * on read if the file is not there, assume version 0 and dataFormat=acid
+       */
+      Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
+      Map<String, String> metaData = new HashMap<>();
+      metaData.put(Field.VERSION, CURRENT_VERSION);
+      metaData.put(Field.DATA_FORMAT, isRawFormat ? Value.RAW : Value.NATIVE);
+      try (FSDataOutputStream strm = fs.create(formatFile, false)) {
+        new ObjectMapper().writeValue(strm, metaData);
+      } catch (IOException ioe) {
+        String msg = "Failed to create " + baseOrDeltaDir + "/" + METADATA_FILE
+            + ": " + ioe.getMessage();
+        LOG.error(msg, ioe);
+        throw ioe;
+      }
+    }
+    public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException {
+      Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
+      if(!fs.exists(formatFile)) {
+        return false;
+      }
+      try (FSDataInputStream strm = fs.open(formatFile)) {
+        Map<String, String> metaData = new ObjectMapper().readValue(strm, Map.class);
+        if(!CURRENT_VERSION.equalsIgnoreCase(metaData.get(Field.VERSION))) {
+          throw new IllegalStateException("Unexpected Meta Data version: "
+              + metaData.get(Field.VERSION));
+        }
+        String dataFormat = metaData.getOrDefault(Field.DATA_FORMAT, "null");
+        switch (dataFormat) {
+          case Value.NATIVE:
+            return false;
+          case Value.RAW:
+            return true;
+          default:
+            throw new IllegalArgumentException("Unexpected value for " + Field.DATA_FORMAT
+                + ": " + dataFormat);
+        }
+      }
+      catch(IOException e) {
+        String msg = "Failed to read " + baseOrDeltaDir + "/" + METADATA_FILE
+            + ": " + e.getMessage();
+        LOG.error(msg, e);
+        throw e;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 6a1dc72..819c2a2 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -468,6 +468,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
 
     try {
       Utilities.copyTablePropertiesToConf(table, conf);
+      if(tableScan != null) {
+        AcidUtils.setTransactionalTableScan(conf, tableScan.getConf().isAcidTable());
+      }
     } catch (HiveException e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index dda9f93..becdc71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import org.apache.hadoop.hive.ql.plan.DynamicValue.NoDynamicValuesException;
 
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 import java.io.IOException;
@@ -409,7 +410,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * @param readerSchema the types for the reader
    * @param conf the configuration
    */
-  public static boolean[] genIncludedColumns(TypeDescription readerSchema,
+  static boolean[] genIncludedColumns(TypeDescription readerSchema,
                                              Configuration conf) {
      if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
       List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
@@ -419,7 +420,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
-  public static String[] getSargColumnNames(String[] originalColumnNames,
+  private static String[] getSargColumnNames(String[] originalColumnNames,
       List<OrcProto.Type> types, boolean[] includedColumns, boolean isOriginal) {
     int rootColumn = getRootColumn(isOriginal);
     String[] columnNames = new String[types.size() - rootColumn];
@@ -695,21 +696,29 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    */
   @VisibleForTesting
   static final class AcidDirInfo {
-    public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
+    AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
         List<AcidBaseFileInfo> baseFiles,
-        List<ParsedDelta> parsedDeltas) {
+        List<ParsedDelta> deleteEvents) {
       this.splitPath = splitPath;
       this.acidInfo = acidInfo;
       this.baseFiles = baseFiles;
       this.fs = fs;
-      this.parsedDeltas = parsedDeltas;
+      this.deleteEvents = deleteEvents;
     }
 
     final FileSystem fs;
     final Path splitPath;
     final AcidUtils.Directory acidInfo;
     final List<AcidBaseFileInfo> baseFiles;
-    final List<ParsedDelta> parsedDeltas;
+    final List<ParsedDelta> deleteEvents;
+
+    /**
+     * No (qualifying) data files found in {@link #splitPath}
+     * @return
+     */
+    boolean isEmpty() {
+      return (baseFiles == null || baseFiles.isEmpty());
+    }
   }
 
   @VisibleForTesting
@@ -884,7 +893,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     public CombineResult combineWith(FileSystem fs, Path dir,
         List<HdfsFileStatusWithId> otherFiles, boolean isOriginal) {
       if ((files.size() + otherFiles.size()) > ETL_COMBINE_FILE_LIMIT
-        || this.isOriginal != isOriginal) {
+        || this.isOriginal != isOriginal) {//todo: what is this checking????
         return (files.size() > otherFiles.size())
             ? CombineResult.NO_AND_SWAP : CombineResult.NO_AND_CONTINUE;
       }
@@ -1083,6 +1092,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   static final class FileGenerator implements Callable<AcidDirInfo> {
     private final Context context;
     private final FileSystem fs;
+    /**
+     * For plain or acid tables this is the root of the partition (or table if not partitioned).
+     * For MM table this is delta/ or base/ dir.  In MM case applying of the ValidTxnList that
+     * {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} normally does has already
+     * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidTxnList)}.
+     */
     private final Path dir;
     private final Ref<Boolean> useFileIds;
     private final UserGroupInformation ugi;
@@ -1119,25 +1134,27 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
 
     private AcidDirInfo callInternal() throws IOException {
+      //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
           context.transactionList, useFileIds, true, null);
-      Path base = dirInfo.getBaseDirectory();
       // find the base files (original or new style)
-      List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>();
-      if (base == null) {
+      List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
+      if (dirInfo.getBaseDirectory() == null) {
+        //for non-acid tables, all data files are in getOriginalFiles() list
         for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) {
           baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
         }
       } else {
-        List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(base, useFileIds);
+        List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds);
         for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
-          baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.COMPACTED_BASE));
+          baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ?
+            AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA));
         }
       }
 
       // Find the parsed deltas- some of them containing only the insert delta events
       // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details)
-      List<ParsedDelta> parsedDeltas = new ArrayList<ParsedDelta>();
+      List<ParsedDelta> parsedDeltas = new ArrayList<>();
 
       if (context.acidOperationalProperties != null &&
           context.acidOperationalProperties.isSplitUpdate()) {
@@ -1154,15 +1171,26 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           if (parsedDelta.isDeleteDelta()) {
             parsedDeltas.add(parsedDelta);
           } else {
+            AcidUtils.AcidBaseFileType deltaType = parsedDelta.isRawFormat() ?
+              AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA;
+            PathFilter bucketFilter = parsedDelta.isRawFormat() ?
+              AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter;
+            if(parsedDelta.isRawFormat() && parsedDelta.getMinTransaction() !=
+              parsedDelta.getMaxTransaction()) {
+              //delta/ with files in raw format are a result of Load Data (as opposed to compaction
+              //or streaming ingest so must have interval length == 1.
+              throw new IllegalStateException("Delta in " + AcidUtils.AcidBaseFileType.ORIGINAL_BASE
+               + " format but txnIds are out of range: " + parsedDelta.getPath());
+            }
             // This is a normal insert delta, which only has insert events and hence all the files
             // in this delta directory can be considered as a base.
             Boolean val = useFileIds.value;
             if (val == null || val) {
               try {
                 List<HdfsFileStatusWithId> insertDeltaFiles =
-                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
+                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter);
                 for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
-                  baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+                  baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
                 }
                 if (val == null) {
                   useFileIds.value = true; // The call succeeded, so presumably the API is there.
@@ -1176,15 +1204,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
               }
             }
             // Fall back to regular API and create statuses without ID.
-            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
+            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter);
             for (FileStatus child : children) {
               HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
-              baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+              baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
             }
           }
         }
 
       } else {
+        /*
+        We already handled all delete deltas above and there should not be any other deltas for
+        any table type.  (this was acid 1.0 code path).
+         */
+        assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir list?!: " + dir;
         // When split-update is not enabled, then all the deltas in the current directories
         // should be considered as usual.
         parsedDeltas.addAll(dirInfo.getCurrentDirectories());
@@ -1658,7 +1691,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       pathFutures.add(ecs.submit(fileGenerator));
     }
 
-    boolean isTransactionalTableScan =//this never seems to be set correctly
+    boolean isTransactionalTableScan =
         HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
     boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
     TypeDescription readerSchema =
@@ -1700,13 +1733,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
         // We have received a new directory information, make split strategies.
         --resultsLeft;
-
+        if(adi.isEmpty()) {
+          //no files found, for example empty table/partition
+          continue;
+        }
         // The reason why we can get a list of split strategies here is because for ACID split-update
         // case when we have a mix of original base files & insert deltas, we will produce two
         // independent split strategies for them. There is a global flag 'isOriginal' that is set
         // on a per split strategy basis and it has to be same for all the files in that strategy.
         List<SplitStrategy<?>> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs,
-            adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
+            adi.splitPath, adi.baseFiles, adi.deleteEvents, readerTypes, ugi,
             allowSyntheticFileIds);
 
         for (SplitStrategy<?> splitStrategy : splitStrategies) {
@@ -1790,6 +1826,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       boolean isOriginal, UserGroupInformation ugi, boolean allowSyntheticFileIds,
       boolean isDefaultFs) {
     if (!deltas.isEmpty() || combinedCtx == null) {
+      //why is this checking for deltas.isEmpty() - HIVE-18110
       return new ETLSplitStrategy(
           context, fs, dir, files, readerTypes, isOriginal, deltas, covered, ugi,
           allowSyntheticFileIds, isDefaultFs);
@@ -1955,6 +1992,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
     OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
     mergerOptions.rootPath(split.getRootDir());
+    mergerOptions.bucketPath(split.getPath());
     final int bucket;
     if (split.hasBase()) {
       AcidOutputFormat.Options acidIOOptions =
@@ -1968,8 +2006,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
     } else {
       bucket = (int) split.getStart();
+      assert false : "We should never have a split w/o base in acid 2.0 for full acid: " + split.getPath();
     }
-
+    //todo: createOptionsForReader() assumes it's !isOriginal.... why?
     final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
     readOptions.range(split.getStart(), split.getLength());
 
@@ -2041,6 +2080,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
     final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
     readerOptions.include(OrcInputFormat.genIncludedColumns(schema, conf));
+    //todo: last param is bogus. why is this hardcoded?
     OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, true);
     return readerOptions;
   }
@@ -2144,7 +2184,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       List<AcidBaseFileInfo> baseFiles,
       List<ParsedDelta> parsedDeltas,
       List<OrcProto.Type> readerTypes,
-      UserGroupInformation ugi, boolean allowSyntheticFileIds) {
+      UserGroupInformation ugi, boolean allowSyntheticFileIds) throws IOException {
     List<SplitStrategy<?>> splitStrategies = new ArrayList<SplitStrategy<?>>();
     SplitStrategy<?> splitStrategy;
 
@@ -2153,23 +2193,24 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     boolean isDefaultFs = (!checkDefaultFs) || ((fs instanceof DistributedFileSystem)
             && HdfsUtils.isDefaultFs((DistributedFileSystem) fs));
 
-    // When no baseFiles, we will just generate a single split strategy and return.
-    List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
     if (baseFiles.isEmpty()) {
-      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles,
+      assert false : "acid 2.0 no base?!: " + dir;
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, Collections.emptyList(),
           false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds, isDefaultFs);
       if (splitStrategy != null) {
         splitStrategies.add(splitStrategy);
       }
-      return splitStrategies; // return here
+      return splitStrategies;
     }
 
+    List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<>();
     List<HdfsFileStatusWithId> originalSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
     // Separate the base files into acid schema and non-acid(original) schema files.
     for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) {
       if (acidBaseFileInfo.isOriginal()) {
         originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
       } else {
+        assert acidBaseFileInfo.isAcidSchema();
         acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
       }
     }
@@ -2195,14 +2236,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     return splitStrategies;
   }
 
-  @VisibleForTesting
-  static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
+  private static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
       FileSystem fs, Path dir,
       List<HdfsFileStatusWithId> baseFiles,
       boolean isOriginal,
       List<ParsedDelta> parsedDeltas,
       List<OrcProto.Type> readerTypes,
-      UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) {
+      UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs)
+    throws IOException {
     List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(parsedDeltas);
     boolean[] covered = new boolean[context.numBuckets];
 
@@ -2250,6 +2291,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
+  /**
+   *
+   * @param bucket bucket/writer ID for this split of the compaction job
+   */
   @Override
   public RawReader<OrcStruct> getRawReader(Configuration conf,
                                            boolean collapseEvents,
@@ -2258,25 +2303,26 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                            Path baseDirectory,
                                            Path[] deltaDirectory
                                            ) throws IOException {
-    Reader reader = null;
     boolean isOriginal = false;
+    OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(true)
+      .isMajorCompaction(collapseEvents);
     if (baseDirectory != null) {//this is NULL for minor compaction
-      Path bucketFile = null;
+      //it may also be null if there is no base - only deltas
+      mergerOptions.baseDir(baseDirectory);
       if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
-        bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
+        isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf));
+        mergerOptions.rootPath(baseDirectory.getParent());
       } else {
-        /**we don't know which file to start reading -
-         * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/
         isOriginal = true;
+        mergerOptions.rootPath(baseDirectory);
       }
-      if(bucketFile != null) {
-        reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
-      }
     }
-    OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
-      .isCompacting(true)
-      .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null);
-    return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
+    else {
+      //since we have no base, there must be at least 1 delta which must a result of acid write
+      //so it must be immediate child of the partition
+      mergerOptions.rootPath(deltaDirectory[0].getParent());
+    }
+    return new OrcRawRecordMerger(conf, collapseEvents, null, isOriginal,
         bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
   }
 


[2/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional tables (Eugene Koifman, reviewed by Alan Gates)

Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 95a60dc..73f27e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -88,11 +88,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      */
     private int statementId;//sort on this descending, like currentTransactionId
 
-    public ReaderKey() {
+    ReaderKey() {
       this(-1, -1, -1, -1, 0);
     }
 
-    public ReaderKey(long originalTransaction, int bucket, long rowId,
+    ReaderKey(long originalTransaction, int bucket, long rowId,
                      long currentTransactionId) {
       this(originalTransaction, bucket, rowId, currentTransactionId, 0);
     }
@@ -196,6 +196,34 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     void next(OrcStruct next) throws IOException;
   }
   /**
+   * Used when base_x/bucket_N is missing - makes control flow a bit easier
+   */
+  private class EmptyReaderPair implements ReaderPair {
+    @Override public OrcStruct nextRecord() {
+      return null;
+    }
+    @Override public int getColumns() {
+      return 0;
+    }
+    @Override public RecordReader getRecordReader() {
+      return null;
+    }
+    @Override public Reader getReader() {
+      return null;
+    }
+    @Override public RecordIdentifier getMinKey() {
+      return null;
+    }
+    @Override public RecordIdentifier getMaxKey() {
+      return null;
+    }
+    @Override public ReaderKey getKey() {
+      return null;
+    }
+    @Override public void next(OrcStruct next) throws IOException {
+    }
+  }
+  /**
    * A reader and the next record from that reader. The code reads ahead so that
    * we can return the lowest ReaderKey from each of the readers. Thus, the
    * next available row is nextRecord and only following records are still in
@@ -209,6 +237,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     private final ReaderKey key;
     private final RecordIdentifier minKey;
     private final RecordIdentifier maxKey;
+    @Deprecated//HIVE-18158
     private final int statementId;
 
     /**
@@ -320,12 +349,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     private final ReaderKey key;
     final int bucketId;
     final int bucketProperty;
+    /**
+     * TransactionId to use when generating synthetic ROW_IDs
+     */
+    final long transactionId;
 
-    OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException {
+    OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf, Options mergeOptions,
+      int statementId) throws IOException {
       this.key = key;
       this.bucketId = bucketId;
       assert bucketId >= 0 : "don't support non-bucketed tables yet";
-      this.bucketProperty = encodeBucketId(conf, bucketId);
+      this.bucketProperty = encodeBucketId(conf, bucketId, statementId);
+      transactionId = mergeOptions.getTransactionId();
     }
     @Override public final OrcStruct nextRecord() {
       return nextRecord;
@@ -337,7 +372,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     @Override
     public final ReaderKey getKey() { return key; }
     /**
-     * The cumulative number of row in all files of the logical bucket that precede the file
+     * The cumulative number of rows in all files of the logical bucket that precede the file
      * represented by {@link #getRecordReader()}
      */
     abstract long getRowIdOffset();
@@ -355,9 +390,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
               new IntWritable(OrcRecordUpdater.INSERT_OPERATION);
           nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation);
           nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
-              new LongWritable(0));
+              new LongWritable(transactionId));
           nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
-              new LongWritable(0));
+              new LongWritable(transactionId));
           nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
               new IntWritable(bucketProperty));
           nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
@@ -369,17 +404,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
               .set(OrcRecordUpdater.INSERT_OPERATION);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
-              .set(0);
+              .set(transactionId);
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
               .set(bucketProperty);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
-              .set(0);
+              .set(transactionId);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
               .set(nextRowId);
           nextRecord().setFieldValue(OrcRecordUpdater.ROW,
               getRecordReader().next(OrcRecordUpdater.getRow(next)));
         }
-        key.setValues(0L, bucketProperty, nextRowId, 0L, 0);
+        key.setValues(transactionId, bucketProperty, nextRowId, transactionId, 0);
         if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("key " + key + " > maxkey " + getMaxKey());
@@ -391,9 +426,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       return false;//reached EndOfFile
     }
   }
-  static int encodeBucketId(Configuration conf, int bucketId) {
-    return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId));
+  static int encodeBucketId(Configuration conf, int bucketId, int statementId) {
+    return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId)
+        .statementId(statementId));
   }
+  /**
+   * This handles normal read (as opposed to Compaction) of a {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+   * file.  These may be a result of Load Data or it may be a file that was written to the table
+   * before it was converted to acid.
+   */
   @VisibleForTesting
   final static class OriginalReaderPairToRead extends OriginalReaderPair {
     private final long rowIdOffset;
@@ -401,12 +442,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     private final RecordReader recordReader;
     private final RecordIdentifier minKey;
     private final RecordIdentifier maxKey;
-
     OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId,
                              final RecordIdentifier minKey, final RecordIdentifier maxKey,
                              Reader.Options options, Options mergerOptions, Configuration conf,
-                             ValidTxnList validTxnList) throws IOException {
-      super(key, bucketId, conf);
+                             ValidTxnList validTxnList, int statementId) throws IOException {
+      super(key, bucketId, conf, mergerOptions, statementId);
       this.reader = reader;
       assert !mergerOptions.isCompacting();
       assert mergerOptions.getRootPath() != null : "Since we have original files";
@@ -426,6 +466,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       boolean haveSeenCurrentFile = false;
       long rowIdOffsetTmp = 0;
       {
+        /**
+         * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
+         * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
+         * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
+         */
         //the split is from something other than the 1st file of the logical bucket - compute offset
         AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
           conf, validTxnList, false, true);
@@ -458,7 +503,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         if (rowIdOffset > 0) {
           //rowIdOffset could be 0 if all files before current one are empty
           /**
-           * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration)}
+           * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration, Options)}
            * need to fix min/max key since these are used by
            * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
            * the key.  Clear?  */
@@ -469,7 +514,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              *  If this is not the 1st file, set minKey 1 less than the start of current file
              * (Would not need to set minKey if we knew that there are no delta files)
              * {@link #advanceToMinKey()} needs this */
-            newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1);
+            newMinKey = new RecordIdentifier(transactionId, bucketProperty,rowIdOffset - 1);
           }
           if (maxKey != null) {
             maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
@@ -482,7 +527,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            * of the file so we want to leave it blank to make sure any insert events in delta
            * files are included; Conversely, if it's not the last file, set the maxKey so that
            * events from deltas that don't modify anything in the current split are excluded*/
-        newMaxKey = new RecordIdentifier(0, bucketProperty,
+        newMaxKey = new RecordIdentifier(transactionId, bucketProperty,
           rowIdOffset + reader.getNumberOfRows() - 1);
       }
       this.minKey = newMinKey;
@@ -532,8 +577,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     OriginalReaderPairToCompact(ReaderKey key, int bucketId,
                        Reader.Options options, Options mergerOptions, Configuration conf,
-                       ValidTxnList validTxnList) throws IOException {
-      super(key, bucketId, conf);
+                       ValidTxnList validTxnList, int statementId) throws IOException {
+      super(key, bucketId, conf, mergerOptions, statementId);
       assert mergerOptions.isCompacting() : "Should only be used for Compaction";
       this.conf = conf;
       this.options = options;
@@ -544,9 +589,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       assert options.getMaxOffset() == Long.MAX_VALUE;
       AcidUtils.Directory directoryState = AcidUtils.getAcidState(
         mergerOptions.getRootPath(), conf, validTxnList, false, true);
+      /**
+       * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
+       * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
+       * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
+       */
       originalFiles = directoryState.getOriginalFiles();
       assert originalFiles.size() > 0;
-      this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
+      //in case of Compaction, this is the 1st file of the current bucket
+      this.reader = advanceToNextFile();
       if (reader == null) {
         //Compactor generated a split for a bucket that has no data?
         throw new IllegalStateException("No 'original' files found for bucketId=" + this.bucketId +
@@ -655,7 +706,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    */
   private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
                                          Reader.Options options,
-                                         Configuration conf) throws IOException {
+                                         Configuration conf, Options mergerOptions) throws IOException {
     long rowLength = 0;
     long rowOffset = 0;
     long offset = options.getOffset();//this would usually be at block boundary
@@ -663,7 +714,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     boolean isTail = true;
     RecordIdentifier minKey = null;
     RecordIdentifier maxKey = null;
-    int bucketProperty = encodeBucketId(conf, bucket);
+    TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+      mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
+    int bucketProperty = encodeBucketId(conf, bucket, tfp.statementId);
    /**
     * options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
     * necessarily match stripe boundary.  So we want to come up with minKey to be one before the 1st
@@ -755,13 +808,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts.
    * This makes the "context" explicit.
    */
-  static class Options {
+  static class Options implements Cloneable {
     private int copyIndex = 0;
     private boolean isCompacting = false;
     private Path bucketPath;
     private Path rootPath;
+    private Path baseDir;
     private boolean isMajorCompaction = false;
     private boolean isDeleteReader = false;
+    private long transactionId = 0;
     Options copyIndex(int copyIndex) {
       assert copyIndex >= 0;
       this.copyIndex = copyIndex;
@@ -790,6 +845,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       assert !isCompacting;
       return this;
     }
+    Options transactionId(long transactionId) {
+      this.transactionId = transactionId;
+      return this;
+    }
+    Options baseDir(Path baseDir) {
+      this.baseDir = baseDir;
+      return this;
+    }
     /**
      * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
      */
@@ -825,13 +888,48 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     boolean isDeleteReader() {
       return isDeleteReader;
     }
+    /**
+     * for reading "original" files - i.e. not native acid schema.  Default value of 0 is
+     * appropriate for files that existed in a table before it was made transactional.  0 is the
+     * primordial transaction.  For non-native files resulting from Load Data command, they
+     * are located and base_x or delta_x_x and then transactionId == x.
+     */
+    long getTransactionId() {
+      return transactionId;
+    }
+
+    /**
+     * In case of isMajorCompaction() this is the base dir from the Compactor, i.e. either a base_x
+     * or {@link #rootPath} if it's the 1st major compaction after non-acid2acid conversion
+     */
+    Path getBaseDir() {
+      return baseDir;
+    }
+    /**
+     * shallow clone
+     */
+    public Options clone() {
+      try {
+        return (Options) super.clone();
+      }
+      catch(CloneNotSupportedException ex) {
+        throw new AssertionError();
+      }
+    }
   }
   /**
-   * Create a reader that merge sorts the ACID events together.
+   * Create a reader that merge sorts the ACID events together.  This handles
+   * 1. 'normal' reads on behalf of a query (non vectorized)
+   * 2. Compaction reads (major/minor)
+   * 3. Delete event reads - to create a sorted view of all delete events for vectorized read
+   *
+   * This makes the logic in the constructor confusing and needs to be refactored.  Liberal use of
+   * asserts below is primarily for documentation purposes.
+   *
    * @param conf the configuration
    * @param collapseEvents should the events on the same row be collapsed
-   * @param isOriginal is the base file a pre-acid file
-   * @param bucket the bucket we are reading
+   * @param isOriginal if reading filws w/o acid schema - {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+   * @param bucket the bucket/writer id of the file we are reading
    * @param options the options to read with
    * @param deltaDirectory the list of delta directories to include
    * @throws IOException
@@ -887,11 +985,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     objectInspector = OrcRecordUpdater.createEventSchema
         (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));
+    assert !(mergerOptions.isCompacting() && reader != null) : "don't need a reader for compaction";
 
     // modify the options to reflect the event instead of the base row
     Reader.Options eventOptions = createEventOptions(options);
+    //suppose it's the first Major compaction so we only have deltas
+    boolean isMajorNoBase = mergerOptions.isCompacting() && mergerOptions.isMajorCompaction()
+      && mergerOptions.getBaseDir() == null;
     if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) ||
-      mergerOptions.isDeleteReader()) {
+      mergerOptions.isDeleteReader() || isMajorNoBase) {
       //for minor compaction, there is no progress report and we don't filter deltas
       baseReader = null;
       minKey = maxKey = null;
@@ -906,27 +1008,68 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       } else {
         // find the min/max based on the offset and length (and more for 'original')
         if (isOriginal) {
-          keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+          //note that this KeyInterval may be adjusted later due to copy_N files
+          keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf, mergerOptions);
         } else {
           keyInterval = discoverKeyBounds(reader, options);
         }
       }
       LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
       // use the min/max instead of the byte range
-      ReaderPair pair;
+      ReaderPair pair = null;
       ReaderKey key = new ReaderKey();
       if (isOriginal) {
         options = options.clone();
         if(mergerOptions.isCompacting()) {
-          pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
-            conf, validTxnList);
+          assert mergerOptions.isMajorCompaction();
+          Options readerPairOptions = mergerOptions;
+          if(mergerOptions.getBaseDir().getName().startsWith(AcidUtils.BASE_PREFIX)) {
+            readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+              AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir());
+          }
+          pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions,
+            conf, validTxnList,
+            0);//0 since base_x doesn't have a suffix (neither does pre acid write)
         } else {
+          assert mergerOptions.getBucketPath() != null : " since this is not compaction: "
+            + mergerOptions.getRootPath();
+          //if here it's a non-acid schema file - check if from before table was marked transactional
+          //or in base_x/delta_x_x from Load Data
+          Options readerPairOptions = mergerOptions;
+          TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+            mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
+          if(tfp.syntheticTransactionId > 0) {
+            readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+              tfp.syntheticTransactionId, tfp.folder);
+          }
           pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
-            keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
+            keyInterval.getMaxKey(), options,  readerPairOptions, conf, validTxnList, tfp.statementId);
         }
       } else {
-        pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
-          eventOptions, 0);
+        if(mergerOptions.isCompacting()) {
+          assert mergerOptions.isMajorCompaction() : "expected major compaction: "
+            + mergerOptions.getBaseDir() + ":" + bucket;
+          assert mergerOptions.getBaseDir() != null : "no baseDir?: " + mergerOptions.getRootPath();
+          //we are compacting and it's acid schema so create a reader for the 1st bucket file that is not empty
+          FileSystem fs = mergerOptions.getBaseDir().getFileSystem(conf);
+          Path bucketPath = AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket);
+          if(fs.exists(bucketPath) && fs.getFileStatus(bucketPath).getLen() > 0) {
+            //doing major compaction - it's possible where full compliment of bucket files is not
+            //required (on Tez) that base_x/ doesn't have a file for 'bucket'
+            reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf));
+            pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+              eventOptions, 0);
+          }
+          else {
+            pair = new EmptyReaderPair();
+            LOG.info("No non-empty " + bucketPath + " was found for Major compaction");
+          }
+        }
+        else {
+          assert reader != null : "no reader? " + mergerOptions.getRootPath();
+          pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+            eventOptions, 0);
+        }
       }
       minKey = pair.getMinKey();
       maxKey = pair.getMaxKey();
@@ -937,11 +1080,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       }
       baseReader = pair.getRecordReader();
     }
-
-    if (deltaDirectory != null) {
-      /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
-      * user columns
-      * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/
+    /*now process the delta files.  For normal read these should only be delete deltas.  For
+    * Compaction these may be any delta_x_y/.  The files inside any delta_x_y/ may be in Acid
+    * format (i.e. with Acid metadata columns) or 'original'.*/
+    if (deltaDirectory != null && deltaDirectory.length > 0) {
+      /*For reads, whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
+      * user columns.  For Compaction there is never a SARG.
+      * */
       Reader.Options deltaEventOptions = eventOptions.clone()
         .searchArgument(null, null).range(0, Long.MAX_VALUE);
       for(Path delta: deltaDirectory) {
@@ -950,17 +1095,50 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
         }
         ReaderKey key = new ReaderKey();
-        AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
+        AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta, delta.getFileSystem(conf));
+        if(deltaDir.isRawFormat()) {
+          assert !deltaDir.isDeleteDelta() : delta.toString();
+          assert mergerOptions.isCompacting() : "during regular read anything which is not a" +
+            " delete_delta is treated like base: " + delta;
+          Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions,
+            deltaDir.getMinTransaction(), delta);
+          //this will also handle copy_N files if any
+          ReaderPair deltaPair =  new OriginalReaderPairToCompact(key, bucket, options,
+            rawCompactOptions, conf, validTxnList, deltaDir.getStatementId());
+          if (deltaPair.nextRecord() != null) {
+            readers.put(key, deltaPair);
+          }
+          continue;
+        }
         for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
           FileSystem fs = deltaFile.getFileSystem(conf);
           if(!fs.exists(deltaFile)) {
+            /**
+             * it's possible that the file for a specific {@link bucket} doesn't exist in any given
+             * delta since since no rows hashed to it (and not configured to create empty buckets)
+             */
             continue;
           }
+          if(deltaDir.isDeleteDelta()) {
+            //if here it maybe compaction or regular read or Delete event sorter
+            //in the later 2 cases we should do:
+            //HIVE-17320: we should compute a SARG to push down min/max key to delete_delta
+            Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf));
+            ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
+              deltaEventOptions, deltaDir.getStatementId());
+            if (deltaPair.nextRecord() != null) {
+              readers.put(key, deltaPair);
+            }
+            continue;
+          }
+          //if here then we must be compacting
+          assert mergerOptions.isCompacting() : "not compacting and not delete delta : " + delta;
           /* side files are only created by streaming ingest.  If this is a compaction, we may
           * have an insert delta/ here with side files there because the original writer died.*/
           long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile));
           assert length >= 0;
           Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length));
+          //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty
           ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
             deltaEventOptions, deltaDir.getStatementId());
           if (deltaPair.nextRecord() != null) {
@@ -988,6 +1166,76 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   }
 
   /**
+   * For use with Load Data statement which places {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+   * type files into a base_x/ or delta_x_x.  The data in these are then assigned ROW_IDs at read
+   * time and made permanent at compaction time.  This is identical to how 'original' files (i.e.
+   * those that existed in the table before it was converted to an Acid table) except that the
+   * transaction ID to use in the ROW_ID should be that of the transaction that ran the Load Data.
+   */
+  static final class TransactionMetaData {
+    final long syntheticTransactionId;
+    /**
+     * folder which determines the transaction id to use in synthetic ROW_IDs
+     */
+    final Path folder;
+    final int statementId;
+    TransactionMetaData(long syntheticTransactionId, Path folder) {
+      this(syntheticTransactionId, folder, 0);
+    }
+    TransactionMetaData(long syntheticTransactionId, Path folder, int statementId) {
+      this.syntheticTransactionId = syntheticTransactionId;
+      this.folder = folder;
+      this.statementId = statementId;
+    }
+    static TransactionMetaData findTransactionIDForSynthetcRowIDs(Path splitPath, Path rootPath,
+      Configuration conf) throws IOException {
+      Path parent = splitPath.getParent();
+      if(rootPath.equals(parent)) {
+        //the 'isOriginal' file is at the root of the partition (or table) thus it is
+        //from a pre-acid conversion write and belongs to primordial txnid:0.
+        return new TransactionMetaData(0, parent);
+      }
+      while(parent != null && !rootPath.equals(parent)) {
+        boolean isBase = parent.getName().startsWith(AcidUtils.BASE_PREFIX);
+        boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX);
+        if(isBase || isDelta) {
+          if(isBase) {
+            return new TransactionMetaData(AcidUtils.parseBase(parent), parent);
+          }
+          else {
+            AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX,
+              parent.getFileSystem(conf));
+            assert pd.getMinTransaction() == pd.getMaxTransaction() :
+              "This a delta with raw non acid schema, must be result of single write, no compaction: "
+                + splitPath;
+            return new TransactionMetaData(pd.getMinTransaction(), parent, pd.getStatementId());
+          }
+        }
+        parent = parent.getParent();
+      }
+      if(parent == null) {
+        //spit is marked isOriginal but it's not an immediate child of a partition nor is it in a
+        //base/ or delta/ - this should never happen
+        throw new IllegalStateException("Cannot determine transaction id for original file "
+          + splitPath + " in " + rootPath);
+      }
+      //"warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" is a meaningful path for nonAcid2acid
+      // converted table
+      return new TransactionMetaData(0, rootPath);
+    }
+  }
+  /**
+   * This is done to read non-acid schema files ("original") located in base_x/ or delta_x_x/ which
+   * happens as a result of Load Data statement.  Setting {@code rootPath} to base_x/ or delta_x_x
+   * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} in subsequent
+   * {@link OriginalReaderPair} object to return the files in this dir
+   * in {@link AcidUtils.Directory#getOriginalFiles()}
+   * @return modified clone of {@code baseOptions}
+   */
+  private Options modifyForNonAcidSchemaRead(Options baseOptions, long transactionId, Path rootPath) {
+    return baseOptions.clone().transactionId(transactionId).rootPath(rootPath);
+  }
+  /**
    * This determines the set of {@link ReaderPairAcid} to create for a given delta/.
    * For unbucketed tables {@code bucket} can be thought of as a write tranche.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 315cc1d..8af38b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -196,7 +196,9 @@ public class OrcRecordUpdater implements RecordUpdater {
     fields.add(new OrcStruct.Field("row", rowInspector, ROW));
     return new OrcStruct.OrcStructInspector(fields);
   }
-
+  /**
+   * @param path - partition root
+   */
   OrcRecordUpdater(Path path,
                    AcidOutputFormat.Options options) throws IOException {
     this.options = options;

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 58638b5..edffa5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -51,6 +51,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
   private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class);
   private OrcTail orcTail;
   private boolean hasFooter;
+  /**
+   * This means {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+   */
   private boolean isOriginal;
   private boolean hasBase;
   //partition root

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index bcde4fc..d571bd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.impl.AcidStats;
@@ -156,7 +155,7 @@ public class VectorizedOrcAcidRowBatchReader
     this.vectorizedRowBatchBase = baseReader.createValue();
   }
 
-  private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Reporter reporter,
+  private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter,
       VectorizedRowBatchCtx rowBatchCtx) throws IOException {
     this.rbCtx = rowBatchCtx;
     final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
@@ -165,12 +164,10 @@ public class VectorizedOrcAcidRowBatchReader
 
     // This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is
     // enabled for an ACID case and the file format is ORC.
-    boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate()
-                                   || !(inputSplit instanceof OrcSplit);
+    boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate();
     if (isReadNotAllowed) {
       OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
     }
-    final OrcSplit orcSplit = (OrcSplit) inputSplit;
 
     reporter.setStatus(orcSplit.toString());
     readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf));
@@ -226,9 +223,11 @@ public class VectorizedOrcAcidRowBatchReader
   private static final class OffsetAndBucketProperty {
     private final long rowIdOffset;
     private final int bucketProperty;
-    private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) {
+    private final long syntheticTxnId;
+    private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long syntheticTxnId) {
       this.rowIdOffset = rowIdOffset;
       this.bucketProperty = bucketProperty;
+      this.syntheticTxnId = syntheticTxnId;
     }
   }
   /**
@@ -240,17 +239,34 @@ public class VectorizedOrcAcidRowBatchReader
    *
    * todo: This logic is executed per split of every "original" file.  The computed result is the
    * same for every split form the same file so this could be optimized by moving it to
-   * before/during splt computation and passing the info in the split.  (HIVE-17917)
+   * before/during split computation and passing the info in the split.  (HIVE-17917)
    */
   private OffsetAndBucketProperty computeOffsetAndBucket(
     OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException {
-    if(!needSyntheticRowIds(split, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
-      return new OffsetAndBucketProperty(0,0);
+    if(!needSyntheticRowIds(split.isOriginal(), !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+      if(split.isOriginal()) {
+        /**
+         * Even if we don't need to project ROW_IDs, we still need to check the transaction ID that
+         * created the file to see if it's committed.  See more in
+         * {@link #next(NullWritable, VectorizedRowBatch)}.  (In practice getAcidState() should
+         * filter out base/delta files but this makes fewer dependencies)
+         */
+        OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
+          OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
+            split.getRootDir(), conf);
+        return new OffsetAndBucketProperty(-1,-1,
+          syntheticTxnInfo.syntheticTransactionId);
+      }
+      return null;
     }
     long rowIdOffset = 0;
+    OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
+      OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
+        split.getRootDir(), conf);
     int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId();
-    int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId));
-    AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf,
+    int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf)
+      .statementId(syntheticTxnInfo.statementId).bucket(bucketId));
+    AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf,
       validTxnList, false, true);
     for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
       AcidOutputFormat.Options bucketOptions =
@@ -266,7 +282,8 @@ public class VectorizedOrcAcidRowBatchReader
         OrcFile.readerOptions(conf));
       rowIdOffset += reader.getNumberOfRows();
     }
-    return new OffsetAndBucketProperty(rowIdOffset, bucketProperty);
+    return new OffsetAndBucketProperty(rowIdOffset, bucketProperty,
+      syntheticTxnInfo.syntheticTransactionId);
   }
   /**
    * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables.
@@ -284,7 +301,7 @@ public class VectorizedOrcAcidRowBatchReader
     if(rbCtx == null) {
       throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + split.getPath());
     }
-    return !needSyntheticRowIds(split, hasDeletes, areRowIdsProjected(rbCtx));
+    return !needSyntheticRowIds(split.isOriginal(), hasDeletes, areRowIdsProjected(rbCtx));
   }
 
   /**
@@ -292,8 +309,8 @@ public class VectorizedOrcAcidRowBatchReader
    * Even if ROW__ID is not projected you still need to decorate the rows with them to see if
    * any of the delete events apply.
    */
-  private static boolean needSyntheticRowIds(OrcSplit split, boolean hasDeletes, boolean rowIdProjected) {
-    return split.isOriginal() && (hasDeletes || rowIdProjected);
+  private static boolean needSyntheticRowIds(boolean isOriginal, boolean hasDeletes, boolean rowIdProjected) {
+    return isOriginal && (hasDeletes || rowIdProjected);
   }
   private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) {
     if(rbCtx.getVirtualColumnCount() == 0) {
@@ -316,7 +333,7 @@ public class VectorizedOrcAcidRowBatchReader
       if (orcSplit.isOriginal()) {
         root = orcSplit.getRootDir();
       } else {
-        root = path.getParent().getParent();
+        root = path.getParent().getParent();//todo: why not just use getRootDir()?
         assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() +
           " path.p.p=" + root;
       }
@@ -398,7 +415,9 @@ public class VectorizedOrcAcidRowBatchReader
        * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order
        * to see if any deletes apply
        */
-      if(rowIdProjected || !deleteEventRegistry.isEmpty()) {
+      if(needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+        assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + syntheticProps;
+        assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" + syntheticProps;
         if(innerReader == null) {
           throw new IllegalStateException(getClass().getName() + " requires " +
             org.apache.orc.RecordReader.class +
@@ -409,8 +428,7 @@ public class VectorizedOrcAcidRowBatchReader
          */
         recordIdColumnVector.fields[0].noNulls = true;
         recordIdColumnVector.fields[0].isRepeating = true;
-        //all "original" is considered written by txnid:0 which committed
-        ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0;
+        ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId;
         /**
          * This is {@link RecordIdentifier#getBucketProperty()}
          * Also see {@link BucketCodec}
@@ -433,15 +451,21 @@ public class VectorizedOrcAcidRowBatchReader
         innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0];
         innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1];
         innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2];
+        //these are insert events so (original txn == current) txn for all rows
+        innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_TRANSACTION] = recordIdColumnVector.fields[0];
+      }
+      if(syntheticProps.syntheticTxnId > 0) {
+        //"originals" (written before table was converted to acid) is considered written by
+        // txnid:0 which is always committed so there is no need to check wrt invalid transactions
+        //But originals written by Load Data for example can be in base_x or delta_x_x so we must
+        //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline.
+        findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector,
+          vectorizedRowBatchBase.size, selectedBitSet);
       }
     }
     else {
       // Case 1- find rows which belong to transactions that are not valid.
       findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
-      /**
-       * All "original" data belongs to txnid:0 and is always valid/committed for every reader
-       * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal
-       */
     }
 
     // Case 2- find rows which have been deleted.
@@ -473,11 +497,6 @@ public class VectorizedOrcAcidRowBatchReader
     }
     else {
       // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
-      // NOTE: We only link up the user columns and not the ACID metadata columns because this
-      // vectorized code path is not being used in cases of update/delete, when the metadata columns
-      // would be expected to be passed up the operator pipeline. This is because
-      // currently the update/delete specifically disable vectorized code paths.
-      // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
       StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
       // Transfer columnVector objects from base batch to outgoing batch.
       System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index f7388a4..736034d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -27,12 +27,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.io.NullWritable;

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 6fb0c43..fdb3603 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.LockTableDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+import org.apache.hadoop.hive.ql.plan.api.Query;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
@@ -297,6 +298,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
         break;
       default:
         if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
+          if(allowOperationInATransaction(queryPlan)) {
+            break;
+          }
+          //look at queryPlan.outputs(WriteEntity.t - that's the table)
           //for example, drop table in an explicit txn is not allowed
           //in some cases this requires looking at more than just the operation
           //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
@@ -311,6 +316,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     any non acid and raise an appropriate error
     * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
   }
+
+  /**
+   * This modifies the logic wrt what operations are allowed in a transaction.  Multi-statement
+   * transaction support is incomplete but it makes some Acid tests cases much easier to write.
+   */
+  private boolean allowOperationInATransaction(QueryPlan queryPlan) {
+    //Acid and MM tables support Load Data with transactional semantics.  This will allow Load Data
+    //in a txn assuming we can determine the target is a suitable table type.
+    if(queryPlan.getOperation() == HiveOperation.LOAD && queryPlan.getOutputs() != null && queryPlan.getOutputs().size() == 1) {
+      WriteEntity writeEntity = queryPlan.getOutputs().iterator().next();
+      if(AcidUtils.isFullAcidTable(writeEntity.getTable()) || AcidUtils.isInsertOnlyTable(writeEntity.getTable())) {
+        switch (writeEntity.getWriteType()) {
+          case INSERT:
+            //allow operation in a txn
+            return true;
+          case INSERT_OVERWRITE:
+            //see HIVE-18154
+            return false;
+          default:
+            //not relevant for LOAD
+            return false;
+        }
+      }
+    }
+    //todo: handle Insert Overwrite as well: HIVE-18154
+    return false;
+  }
   /**
    * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
    * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 1a37bf7..9f2c6d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -146,6 +146,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
@@ -1705,18 +1706,20 @@ public class Hive {
    *          location/inputformat/outputformat/serde details from table spec
    * @param isSrcLocal
    *          If the source directory is LOCAL
-   * @param isAcid
-   *          true if this is an ACID operation
+   * @param isAcidIUDoperation
+   *          true if this is an ACID operation Insert/Update/Delete operation
    * @param hasFollowingStatsTask
    *          true if there is a following task which updates the stats, so, this method need not update.
    * @return Partition object being loaded with data
    */
   public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
       LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long txnId, int stmtId)
+      boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long txnId, int stmtId)
           throws HiveException {
     Path tblDataLocationPath =  tbl.getDataLocation();
     boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
+    assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
+    boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
     try {
       // Get the partition object if it already exists
       Partition oldPart = getPartition(tbl, partSpec, false);
@@ -1768,7 +1771,7 @@ public class Hive {
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)");
         }
-        assert !isAcid;
+        assert !isAcidIUDoperation;
         if (areEventsForDmlNeeded(tbl, oldPart)) {
           newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
         }
@@ -1792,16 +1795,22 @@ public class Hive {
           filter = (loadFileType == LoadFileType.REPLACE_ALL)
             ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
         }
+        else if(!isAcidIUDoperation && isFullAcidTable) {
+          destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+        }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + destPath);
         }
-        if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcid)) {
+        //todo: why is "&& !isAcidIUDoperation" needed here?
+        if (!isFullAcidTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) {
+          //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new
+          // base_x.  (there is Insert Overwrite and Load Data Overwrite)
           boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
           replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
               isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite);
         } else {
           FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
-          copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid,
+          copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
             (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles);
         }
       }
@@ -1891,6 +1900,38 @@ public class Hive {
     }
   }
 
+  /**
+   * Load Data commands for fullAcid tables write to base_x (if there is overwrite clause) or
+   * delta_x_x directory - same as any other Acid write.  This method modifies the destPath to add
+   * this path component.
+   * @param txnId - id of current transaction (in which this operation is running)
+   * @param stmtId - see {@link DbTxnManager#getWriteIdAndIncrement()}
+   * @return appropriately modified path
+   */
+  private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long txnId, int stmtId, Table tbl) throws HiveException {
+    switch (loadFileType) {
+      case REPLACE_ALL:
+        destPath = new Path(destPath, AcidUtils.baseDir(txnId));
+        break;
+      case KEEP_EXISTING:
+        destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+        break;
+      case OVERWRITE_EXISTING:
+        //should not happen here - this is for replication
+      default:
+        throw new IllegalArgumentException("Unexpected " + LoadFileType.class.getName() + " " + loadFileType);
+    }
+    try {
+      FileSystem fs = tbl.getDataLocation().getFileSystem(SessionState.getSessionConf());
+      if(!FileUtils.mkdir(fs, destPath, conf)) {
+        LOG.warn(destPath + " already exists?!?!");
+      }
+      AcidUtils.MetaDataFile.createMetaFile(destPath, fs, true);
+    } catch (IOException e) {
+      throw new HiveException("load: error while creating " + destPath + ";loadFileType=" + loadFileType, e);
+    }
+    return destPath;
+  }
 
   private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) {
     return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null;
@@ -2125,7 +2166,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param partSpec
    * @param loadFileType
    * @param numDP number of dynamic partitions
-   * @param listBucketingEnabled
    * @param isAcid true if this is an ACID operation
    * @param txnId txnId, can be 0 unless isAcid == true
    * @return partition map details (PartitionSpec and Partition)
@@ -2273,14 +2313,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
    *          if list bucketing enabled
    * @param hasFollowingStatsTask
    *          if there is any following stats task
-   * @param isAcid true if this is an ACID based write
+   * @param isAcidIUDoperation true if this is an ACID based Insert [overwrite]/update/delete
    */
   public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
-      boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask,
-      Long txnId, int stmtId, boolean isMmTable) throws HiveException {
-
+      boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
+      Long txnId, int stmtId) throws HiveException {
     List<Path> newFiles = null;
     Table tbl = getTable(tableName);
+    assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
+    boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl);
+    boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
     HiveConf sessionConf = SessionState.getSessionConf();
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
       newFiles = Collections.synchronizedList(new ArrayList<Path>());
@@ -2298,24 +2340,31 @@ private void constructOneLBLocationMap(FileStatus fSta,
       newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
     } else {
       // Either a non-MM query, or a load into MM table from an external source.
-      Path tblPath = tbl.getPath(), destPath = tblPath;
+      Path tblPath = tbl.getPath();
+      Path destPath = tblPath;
       PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER;
       if (isMmTable) {
+        assert !isAcidIUDoperation;
         // We will load into MM directory, and delete from the parent if needed.
         destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
         filter = loadFileType == LoadFileType.REPLACE_ALL
             ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
       }
+      else if(!isAcidIUDoperation && isFullAcidTable) {
+        destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+      }
       Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath
           + " (replace = " + loadFileType + ")");
-      if (loadFileType == LoadFileType.REPLACE_ALL) {
+      if (loadFileType == LoadFileType.REPLACE_ALL && !isFullAcidTable) {
+        //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361
+        //todo:  should probably do the same for MM IOW
         boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
         replaceFiles(tblPath, loadPath, destPath, tblPath,
             sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable);
       } else {
         try {
           FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
-          copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcid,
+          copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
             loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles);
         } catch (IOException e) {
           throw new HiveException("addFiles: filesystem error in check phase", e);
@@ -2358,7 +2407,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
     fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
   }
-
   /**
    * Creates a partition.
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index cd75130..a1b6cda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -391,7 +391,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
         Utilities.getTableDesc(table), new TreeMap<>(),
         replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId);
-    loadTableWork.setTxnId(txnId);
     loadTableWork.setStmtId(stmtId);
     MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState());
     Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
@@ -400,6 +399,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     return loadTableTask;
   }
 
+  /**
+   * todo: this is odd: transactions are opened for all statements.  what is this supposed to check?
+   */
+  @Deprecated
   private static boolean isAcid(Long txnId) {
     return (txnId != null) && (txnId != 0);
   }
@@ -490,7 +493,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           partSpec.getPartSpec(),
           replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
           txnId);
-      loadTableWork.setTxnId(txnId);
       loadTableWork.setStmtId(stmtId);
       loadTableWork.setInheritTableSpecs(false);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index 238fbd6..cc956da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -136,7 +136,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   private List<FileStatus> applyConstraintsAndGetFiles(URI fromURI, Tree ast,
-      boolean isLocal) throws SemanticException {
+      boolean isLocal, Table table) throws SemanticException {
 
     FileStatus[] srcs = null;
 
@@ -159,6 +159,14 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
           throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
               "source contains directory: " + oneSrc.getPath().toString()));
         }
+        if(AcidUtils.isFullAcidTable(table)) {
+          if(!AcidUtils.originalBucketFilter.accept(oneSrc.getPath())) {
+            //acid files (e.g. bucket_0000) have ROW_ID embedded in them and so can't be simply
+            //copied to a table so only allow non-acid files for now
+            throw new SemanticException(ErrorMsg.ACID_LOAD_DATA_INVALID_FILE_NAME,
+              oneSrc.getPath().getName(), table.getDbName() + "." + table.getTableName());
+          }
+        }
       }
     } catch (IOException e) {
       // Has to use full name to make sure it does not conflict with
@@ -230,11 +238,8 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
-    if(AcidUtils.isAcidTable(ts.tableHandle) && !AcidUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) {
-      throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, ts.tableHandle.getCompleteName());
-    }
     // make sure the arguments make sense
-    List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal);
+    List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal, ts.tableHandle);
 
     // for managed tables, make sure the file formats match
     if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType())
@@ -277,17 +282,16 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     Long txnId = null;
-    int stmtId = 0;
-    Table tbl = ts.tableHandle;
-    if (AcidUtils.isInsertOnlyTable(tbl.getParameters())) {
+    int stmtId = -1;
+    if (AcidUtils.isAcidTable(ts.tableHandle)) {
       txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+      stmtId = SessionState.get().getTxnMgr().getWriteIdAndIncrement();
     }
 
     LoadTableDesc loadTableWork;
     loadTableWork = new LoadTableDesc(new Path(fromURI),
       Utilities.getTableDesc(ts.tableHandle), partSpec,
       isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, txnId);
-    loadTableWork.setTxnId(txnId);
     loadTableWork.setStmtId(stmtId);
     if (preservePartitionSpecs){
       // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 1fa7b40..4683c9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -47,9 +47,22 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
   private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map
 
   public enum LoadFileType {
-    REPLACE_ALL,        // Remove all existing data before copy/move
-    KEEP_EXISTING,      // If any file exist while copy, then just duplicate the file
-    OVERWRITE_EXISTING  // If any file exist while copy, then just overwrite the file
+    /**
+     * This corresponds to INSERT OVERWRITE and REPL LOAD for INSERT OVERWRITE event.
+     * Remove all existing data before copy/move
+     */
+    REPLACE_ALL,
+    /**
+     * This corresponds to INSERT INTO and LOAD DATA.
+     * If any file exist while copy, then just duplicate the file
+     */
+    KEEP_EXISTING,
+    /**
+     * This corresponds to REPL LOAD where if we re-apply the same event then need to overwrite
+     * the file instead of making a duplicate copy.
+     * If any file exist while copy, then just overwrite the file
+     */
+    OVERWRITE_EXISTING
   }
   public LoadTableDesc(final LoadTableDesc o) {
     super(o.getSourcePath(), o.getWriteType());
@@ -215,14 +228,10 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
     return currentTransactionId == null ? 0 : currentTransactionId;
   }
 
-  public void setTxnId(Long txnId) {
-    this.currentTransactionId = txnId;
-  }
-
   public int getStmtId() {
     return stmtId;
   }
-
+  //todo: should this not be passed in the c'tor?
   public void setStmtId(int stmtId) {
     this.stmtId = stmtId;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 7d4d379..a804527 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -577,11 +576,16 @@ public class CompactorMR {
             dir.getName().startsWith(AcidUtils.DELTA_PREFIX) ||
             dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
           boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
+          boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+            && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format
 
-          FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
+          FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter
+            : AcidUtils.bucketFileFilter);
           for(FileStatus f : files) {
             // For each file, figure out which bucket it is.
-            Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
+            Matcher matcher = isRawFormat ?
+              AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName())
+              : AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
             addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
           }
         } else {
@@ -612,8 +616,12 @@ public class CompactorMR {
     private void addFileToMap(Matcher matcher, Path file, boolean sawBase,
                               Map<Integer, BucketTracker> splitToBucketMap) {
       if (!matcher.find()) {
-        LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " +
-            file.toString() + " Matcher=" + matcher.toString());
+        String msg = "Found a non-bucket file that we thought matched the bucket pattern! " +
+          file.toString() + " Matcher=" + matcher.toString();
+        LOG.error(msg);
+        //following matcher.group() would fail anyway and we don't want to skip files since that
+        //may be a data loss scenario
+        throw new IllegalArgumentException(msg);
       }
       int bucketNum = Integer.parseInt(matcher.group());
       BucketTracker bt = splitToBucketMap.get(bucketNum);

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 52257c4..319e0ee 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -344,7 +344,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     //this should fail because txn aborted due to timeout
     CommandProcessorResponse cpr = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5");
     Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1"));
-    
+
     //now test that we don't timeout locks we should not
     //heartbeater should be running in the background every 1/2 second
     hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
@@ -354,9 +354,9 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     runStatementOnDriver("start transaction");
     runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17");
     pause(750);
-    
+
     TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
-    
+
     //since there is txn open, we are heartbeating the txn not individual locks
     GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo();
     Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size());
@@ -377,7 +377,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     //these 2 values are equal when TXN entry is made.  Should never be equal after 1st heartbeat, which we
     //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec
     Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat);
-    
+
     ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
     TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
     pause(750);
@@ -525,7 +525,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " +
       "when matched then update set cur=0 " +
       "when not matched then insert values(s.key,s.data,1)";
-
+    //to allow cross join from 'teeCurMatch'
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
     runStatementOnDriver(stmt);
     int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
     List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
@@ -569,7 +570,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
     Assert.assertEquals(stringifyValues(resultVals), r);
   }
-  
+
   @Test
   public void testMergeOnTezEdges() throws Exception {
     String query = "merge into " + Table.ACIDTBL +

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 17d976a..ab5f969 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -77,7 +77,7 @@ public class TestTxnCommands2 {
   ).getPath().replaceAll("\\\\", "/");
   protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
   //bucket count for test tables; set it to 1 for easier debugging
-  protected static int BUCKET_COUNT = 2;
+  static int BUCKET_COUNT = 2;
   @Rule
   public TestName testName = new TestName();
 
@@ -117,12 +117,11 @@ public class TestTxnCommands2 {
     setUpWithTableProperties("'transactional'='true'");
   }
 
-  protected void setUpWithTableProperties(String tableProperties) throws Exception {
+  void setUpWithTableProperties(String tableProperties) throws Exception {
     hiveConf = new HiveConf(this.getClass());
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
-    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
     hiveConf
         .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
@@ -406,7 +405,7 @@ public class TestTxnCommands2 {
     expectedException.expect(RuntimeException.class);
     expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
     runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
-    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
+    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'insert_only')");
   }
   /**
    * Test the query correctness and directory layout for ACID table conversion