You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/12/01 02:40:07 UTC

[3/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional tables (Eugene Koifman, reviewed by Alan Gates)

HIVE-17361 Support LOAD DATA for transactional tables (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/508d7e6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/508d7e6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/508d7e6f

Branch: refs/heads/master
Commit: 508d7e6f269398a47147a697aecdbe546425679b
Parents: c03001e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Nov 30 18:39:42 2017 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Nov 30 18:39:42 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +-
 .../hadoop/hive/ql/history/TestHiveHistory.java |   2 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |  33 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   3 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   2 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   8 +-
 .../repl/bootstrap/load/table/LoadTable.java    |   1 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 267 ++++++++---
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   3 +
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 128 +++--
 .../hive/ql/io/orc/OrcRawRecordMerger.java      | 330 +++++++++++--
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |   4 +-
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |   3 +
 .../io/orc/VectorizedOrcAcidRowBatchReader.java |  75 +--
 .../ql/io/orc/VectorizedOrcInputFormat.java     |   2 -
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |  32 ++
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  78 +++-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |   6 +-
 .../hive/ql/parse/LoadSemanticAnalyzer.java     |  22 +-
 .../hadoop/hive/ql/plan/LoadTableDesc.java      |  25 +-
 .../hive/ql/txn/compactor/CompactorMR.java      |  18 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  13 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   7 +-
 .../apache/hadoop/hive/ql/TestTxnLoadData.java  | 467 +++++++++++++++++++
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java |  45 +-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java |  31 +-
 .../hadoop/hive/ql/exec/TestExecDriver.java     |   2 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  51 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |  14 +-
 .../TestVectorizedOrcAcidRowBatchReader.java    |   4 +-
 .../clientnegative/load_data_into_acid.q        |   4 +-
 .../clientnegative/load_data_into_acid.q.out    |   6 +-
 .../clientpositive/acid_table_stats.q.out       |   1 +
 .../clientpositive/autoColumnStats_4.q.out      |   3 +
 .../results/clientpositive/mm_default.q.out     |   1 +
 .../TransactionalValidationListener.java        |  20 +-
 36 files changed, 1392 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ada2318..3be5a8d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1912,7 +1912,7 @@ public class HiveConf extends Configuration {
       "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" +
       "4: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" +
       "This is intended to be used as an internal property for future versions of ACID. (See\n" +
-        "HIVE-14035 for details.)"),
+        "HIVE-14035 for details.  User sets it tblproperites via transactional_properties.)", true),
 
     HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
         "current open transactions reach this limit, future open transaction requests will be \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
index 2f0efce..d73cd64 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
@@ -106,7 +106,7 @@ public class TestHiveHistory extends TestCase {
         db.createTable(src, cols, null, TextInputFormat.class,
             IgnoreKeyTextOutputFormat.class);
         db.loadTable(hadoopDataFile[i], src,
-          LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false);
+          LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0);
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 7103fb9..a1cd9eb 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -114,8 +115,6 @@ public class TestCompactor {
     hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
     hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
-    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
-    //"org.apache.hadoop.hive.ql.io.HiveInputFormat"
 
     TxnDbUtil.setConfValues(hiveConf);
     TxnDbUtil.cleanDb(hiveConf);
@@ -669,7 +668,7 @@ public class TestCompactor {
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L);
+      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L, 1);
 
     } finally {
       connection.close();
@@ -697,7 +696,8 @@ public class TestCompactor {
         writeBatch(connection, writer, false);
       }
 
-      // Start a third batch, but don't close it.
+      // Start a third batch, but don't close it.  this delta will be ignored by compaction since
+      // it has an open txn in it
       writeBatch(connection, writer, true);
 
       // Now, compact
@@ -722,7 +722,7 @@ public class TestCompactor {
       }
       String name = stat[0].getPath().getName();
       Assert.assertEquals(name, "base_0000006");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
     } finally {
       connection.close();
     }
@@ -788,7 +788,7 @@ public class TestCompactor {
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
     } finally {
       connection.close();
     }
@@ -850,7 +850,7 @@ public class TestCompactor {
       if (!name.equals("base_0000006")) {
         Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
       }
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
     } finally {
       connection.close();
     }
@@ -903,7 +903,7 @@ public class TestCompactor {
       }
       String name = stat[0].getPath().getName();
       Assert.assertEquals(name, "base_0000006");
-      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+      checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 2);
     } finally {
       connection.close();
     }
@@ -966,7 +966,7 @@ public class TestCompactor {
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
 
     // Verify that we have got correct set of delete_deltas.
     FileStatus[] deleteDeltaStat =
@@ -984,7 +984,7 @@ public class TestCompactor {
     if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L, 1);
   }
 
   @Test
@@ -1043,7 +1043,7 @@ public class TestCompactor {
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
     }
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
 
     // Verify that we have got correct set of delete_deltas.
     FileStatus[] deleteDeltaStat =
@@ -1062,7 +1062,7 @@ public class TestCompactor {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
     // There should be no rows in the delete_delta because there have been no delete events.
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1);
   }
 
   @Test
@@ -1121,7 +1121,7 @@ public class TestCompactor {
       if (!Arrays.deepEquals(expected, names)) {
         Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
       }
-      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L);
+      checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty,  0, 3L, 6L, 1);
 
       // Verify that we have got correct set of delete_deltas also
       FileStatus[] deleteDeltaStat =
@@ -1140,7 +1140,7 @@ public class TestCompactor {
         Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
       }
       // There should be no rows in the delete_delta because there have been no delete events.
-      checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+      checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1);
 
     } finally {
       connection.close();
@@ -1295,7 +1295,7 @@ public class TestCompactor {
   }
 
   private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty,
-      String columnTypesProperty, int bucket, long min, long max)
+      String columnTypesProperty, int bucket, long min, long max, int numBuckets)
       throws IOException {
     ValidTxnList txnList = new ValidTxnList() {
       @Override
@@ -1351,9 +1351,10 @@ public class TestCompactor {
     Configuration conf = new Configuration();
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty);
     conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty);
+    conf.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(numBuckets));
     HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
     AcidInputFormat.RawReader<OrcStruct> reader =
-        aif.getRawReader(conf, false, bucket, txnList, base, deltas);
+        aif.getRawReader(conf, true, bucket, txnList, base, deltas);
     RecordIdentifier identifier = reader.createKey();
     OrcStruct value = reader.createValue();
     long currentTxn = min;

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 186d580..2f7284f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -376,7 +376,6 @@ public enum ErrorMsg {
   DBTXNMGR_REQUIRES_CONCURRENCY(10264,
       "To use DbTxnManager you must set hive.support.concurrency=true"),
   TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true),
-  LOAD_DATA_ON_ACID_TABLE(10266, "LOAD DATA... statement is not supported on transactional table {0}.", true),
   LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " +
       "may have timed out", true),
   LOCK_REQUEST_UNSUPPORTED(10271, "Current transaction manager does not " +
@@ -550,6 +549,8 @@ public enum ErrorMsg {
   ACID_TABLES_MUST_BE_READ_WITH_ACID_READER(30021, "An ORC ACID reader required to read ACID tables"),
   ACID_TABLES_MUST_BE_READ_WITH_HIVEINPUTFORMAT(30022, "Must use HiveInputFormat to read ACID tables " +
           "(set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat)"),
+  ACID_LOAD_DATA_INVALID_FILE_NAME(30023, "{0} file name is not valid in Load Data into Acid " +
+    "table {1}.  Examples of valid names are: 00000_0, 00000_0_copy_1", true),
 
   CONCATENATE_UNSUPPORTED_FILE_FORMAT(30030, "Concatenate/Merge only supported for RCFile and ORCFile formats"),
   CONCATENATE_UNSUPPORTED_TABLE_BUCKETED(30031, "Concatenate/Merge can not be performed on bucketed tables"),

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 4076a9f..9184844 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4488,7 +4488,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       part.getTPartition().getParameters().putAll(alterTbl.getProps());
     } else {
       boolean isFromMmTable = AcidUtils.isInsertOnlyTable(tbl.getParameters());
-      Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(alterTbl.getProps());
+      Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps());
       if (isToMmTable != null) {
         if (!isFromMmTable && isToMmTable) {
           result = generateAddMmTasks(tbl);

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e2f8c1f..6d13773 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -215,7 +215,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
 
     Context ctx = driverContext.getCtx();
     if(ctx.getHiveTxnManager().supportsAcid()) {
-      //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit
+      //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes logic more explicit
       return;
     }
     HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager();
@@ -290,7 +290,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         } else {
           Utilities.FILE_OP_LOGGER.debug("MoveTask moving " + sourcePath + " to " + targetPath);
           if(lfd.getWriteType() == AcidUtils.Operation.INSERT) {
-            //'targetPath' is table root of un-partitioned table/partition
+            //'targetPath' is table root of un-partitioned table or partition
             //'sourcePath' result of 'select ...' part of CTAS statement
             assert lfd.getIsDfsDir();
             FileSystem srcFs = sourcePath.getFileSystem(conf);
@@ -367,7 +367,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         checkFileFormats(db, tbd, table);
 
         boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID
-            && !tbd.isMmTable();
+            && !tbd.isMmTable(); //it seems that LoadTableDesc has Operation.INSERT only for CTAS...
 
         // Create a data container
         DataContainer dc = null;
@@ -379,7 +379,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
           }
           db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
               work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),
-              tbd.getTxnId(), tbd.getStmtId(), tbd.isMmTable());
+              tbd.getTxnId(), tbd.getStmtId());
           if (work.getOutputs() != null) {
             DDLTask.addIfAbsentByName(new WriteEntity(table,
               getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index bb1f4e5..545b7a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -229,6 +229,7 @@ public class LoadTable {
     LoadTableDesc loadTableWork = new LoadTableDesc(
         tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
         replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
+        //todo: what is the point of this?  If this is for replication, who would have opened a txn?
         SessionState.get().getTxnMgr().getCurrentTxnId()
     );
     MoveWork moveWork =

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 4c0b71f..9ab028d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -30,6 +30,8 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,18 +41,17 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.common.util.Ref;
 import org.apache.orc.impl.OrcAcidUtils;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,13 +123,14 @@ public class AcidUtils {
   public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
   public static final Pattern   LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
   /**
-   * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read
-   * a "delta" dir written by a real Acid write - cannot have any copies
+   * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
+   * (Unless via Load Data statment)
    */
   public static final PathFilter originalBucketFilter = new PathFilter() {
     @Override
     public boolean accept(Path path) {
-      return ORIGINAL_PATTERN.matcher(path.getName()).matches();
+      return ORIGINAL_PATTERN.matcher(path.getName()).matches() ||
+        ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches();
     }
   };
 
@@ -137,6 +139,7 @@ public class AcidUtils {
   }
   private static final Logger LOG = LoggerFactory.getLogger(AcidUtils.class);
 
+  public static final Pattern BUCKET_PATTERN = Pattern.compile(BUCKET_PREFIX + "_[0-9]{5}$");
   public static final Pattern ORIGINAL_PATTERN =
       Pattern.compile("[0-9]+_[0-9]+");
   /**
@@ -156,14 +159,30 @@ public class AcidUtils {
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
   /**
-   * Create the bucket filename.
+   * Create the bucket filename in Acid format
    * @param subdir the subdirectory for the bucket.
    * @param bucket the bucket number
    * @return the filename
    */
   public static Path createBucketFile(Path subdir, int bucket) {
-    return new Path(subdir,
+    return createBucketFile(subdir, bucket, true);
+  }
+
+  /**
+   * Create acid or original bucket name
+   * @param subdir the subdirectory for the bucket.
+   * @param bucket the bucket number
+   * @return the filename
+   */
+  private static Path createBucketFile(Path subdir, int bucket, boolean isAcidSchema) {
+    if(isAcidSchema) {
+      return new Path(subdir,
         BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
+    }
+    else {
+      return new Path(subdir,
+        String.format(BUCKET_DIGITS, bucket));
+    }
   }
 
   /**
@@ -244,7 +263,7 @@ public class AcidUtils {
    * @param path the base directory name
    * @return the maximum transaction id that is included
    */
-  static long parseBase(Path path) {
+  public static long parseBase(Path path) {
     String filename = path.getName();
     if (filename.startsWith(BASE_PREFIX)) {
       return Long.parseLong(filename.substring(BASE_PREFIX.length()));
@@ -262,7 +281,7 @@ public class AcidUtils {
    */
   public static AcidOutputFormat.Options
                     parseBaseOrDeltaBucketFilename(Path bucketFile,
-                                                   Configuration conf) {
+                                                   Configuration conf) throws IOException {
     AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
     String filename = bucketFile.getName();
     if (ORIGINAL_PATTERN.matcher(filename).matches()) {
@@ -273,7 +292,7 @@ public class AcidUtils {
           .minimumTransactionId(0)
           .maximumTransactionId(0)
           .bucket(bucket)
-          .writingBase(true);
+          .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
     }
     else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
       //todo: define groups in regex and use parseInt(Matcher.group(2))....
@@ -286,7 +305,7 @@ public class AcidUtils {
         .maximumTransactionId(0)
         .bucket(bucket)
         .copyNumber(copyNumber)
-        .writingBase(true);
+        .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
     }
     else if (filename.startsWith(BUCKET_PREFIX)) {
       int bucket =
@@ -299,14 +318,16 @@ public class AcidUtils {
             .bucket(bucket)
             .writingBase(true);
       } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
-        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX);
+        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX,
+          bucketFile.getFileSystem(conf));
         result
             .setOldStyle(false)
             .minimumTransactionId(parsedDelta.minTransaction)
             .maximumTransactionId(parsedDelta.maxTransaction)
             .bucket(bucket);
       } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
-        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX);
+        ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX,
+          bucketFile.getFileSystem(conf));
         result
             .setOldStyle(false)
             .minimumTransactionId(parsedDelta.minTransaction)
@@ -344,11 +365,17 @@ public class AcidUtils {
         throw new IllegalArgumentException("Unexpected Operation: " + op);
     }
   }
-
   public enum AcidBaseFileType {
-    COMPACTED_BASE, // a regular base file generated through major compaction
-    ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid
-    INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update
+  /**
+   * File w/o Acid meta columns.  This this would be the case for files that were added to the table
+   * before it was converted to Acid but not yet major compacted.  May also be the the result of
+   * Load Data statement on an acid table.
+   */
+  ORIGINAL_BASE,
+  /**
+   * File that has Acid metadata columns embedded in it.  Found in base_x/ or delta_x_y/.
+   */
+  ACID_SCHEMA,
   }
 
   /**
@@ -366,16 +393,12 @@ public class AcidUtils {
       this.acidBaseFileType = acidBaseFileType;
     }
 
-    public boolean isCompactedBase() {
-      return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE;
-    }
-
     public boolean isOriginal() {
       return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE;
     }
 
-    public boolean isInsertDelta() {
-      return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA;
+    public boolean isAcidSchema() {
+      return this.acidBaseFileType == AcidBaseFileType.ACID_SCHEMA;
     }
 
     public HdfsFileStatusWithId getHdfsFileStatusWithId() {
@@ -545,6 +568,7 @@ public class AcidUtils {
      * @return the base directory to read
      */
     Path getBaseDirectory();
+    boolean isBaseInRawFormat();
 
     /**
      * Get the list of original files.  Not {@code null}.  Must be sorted.
@@ -576,7 +600,10 @@ public class AcidUtils {
     List<FileStatus> getAbortedDirectories();
   }
 
-  public static class ParsedDelta implements Comparable<ParsedDelta> {
+  /**
+   * Immutable
+   */
+  public static final class ParsedDelta implements Comparable<ParsedDelta> {
     private final long minTransaction;
     private final long maxTransaction;
     private final FileStatus path;
@@ -584,19 +611,24 @@ public class AcidUtils {
     //had no statement ID
     private final int statementId;
     private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
+    private final boolean isRawFormat;
 
     /**
      * for pre 1.3.x delta files
      */
-    ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) {
-      this(min, max, path, -1, isDeleteDelta);
+    private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta,
+        boolean isRawFormat) {
+      this(min, max, path, -1, isDeleteDelta, isRawFormat);
     }
-    ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta) {
+    private ParsedDelta(long min, long max, FileStatus path, int statementId,
+        boolean isDeleteDelta, boolean isRawFormat) {
       this.minTransaction = min;
       this.maxTransaction = max;
       this.path = path;
       this.statementId = statementId;
       this.isDeleteDelta = isDeleteDelta;
+      this.isRawFormat = isRawFormat;
+      assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format";
     }
 
     public long getMinTransaction() {
@@ -618,7 +650,12 @@ public class AcidUtils {
     public boolean isDeleteDelta() {
       return isDeleteDelta;
     }
-
+    /**
+     * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE}
+     */
+    public boolean isRawFormat() {
+      return isRawFormat;
+    }
     /**
      * Compactions (Major/Minor) merge deltas/bases but delete of old files
      * happens in a different process; thus it's possible to have bases/deltas with
@@ -698,29 +735,6 @@ public class AcidUtils {
   }
 
   /**
-   * Convert the list of begin/end transaction id pairs to a list of delta
-   * directories.  Note that there may be multiple delta files for the exact same txn range starting
-   * with 1.3.x;
-   * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
-   * @param root the root directory
-   * @param deltas list of begin/end transaction id pairs
-   * @return the list of delta paths
-   */
-  public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException {
-    List<Path> results = new ArrayList<Path>(deltas.size());
-    for(AcidInputFormat.DeltaMetaData dmd : deltas) {
-      if(dmd.getStmtIds().isEmpty()) {
-        results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
-        continue;
-      }
-      for(Integer stmtId : dmd.getStmtIds()) {
-        results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
-      }
-    }
-    return results.toArray(new Path[results.size()]);
-  }
-
-  /**
    * Convert the list of begin/end transaction id pairs to a list of delete delta
    * directories.  Note that there may be multiple delete_delta files for the exact same txn range starting
    * with 2.2.x;
@@ -743,25 +757,29 @@ public class AcidUtils {
     return results.toArray(new Path[results.size()]);
   }
 
-  public static ParsedDelta parsedDelta(Path deltaDir) {
+  public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException {
     String deltaDirName = deltaDir.getName();
     if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
-      return parsedDelta(deltaDir, DELETE_DELTA_PREFIX);
+      return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs);
     }
-    return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix
+    return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix
   }
 
-  private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) {
-    ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix);
+  private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix, FileSystem fs)
+    throws IOException {
+    ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs);
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
     return new ParsedDelta(p.getMinTransaction(),
-        p.getMaxTransaction(), path, p.statementId, isDeleteDelta);
+        p.getMaxTransaction(), path, p.statementId, isDeleteDelta, p.isRawFormat());
   }
 
-  public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) {
+  public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs)
+    throws IOException {
     String filename = deltaDir.getName();
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
     if (filename.startsWith(deltaPrefix)) {
+      //small optimization - delete delta can't be in raw format
+      boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs);
       String rest = filename.substring(deltaPrefix.length());
       int split = rest.indexOf('_');
       int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
@@ -770,10 +788,10 @@ public class AcidUtils {
         Long.parseLong(rest.substring(split + 1)) :
         Long.parseLong(rest.substring(split + 1, split2));
       if(split2 == -1) {
-        return new ParsedDelta(min, max, null, isDeleteDelta);
+        return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat);
       }
       int statementId = Integer.parseInt(rest.substring(split2 + 1));
-      return new ParsedDelta(min, max, null, statementId, isDeleteDelta);
+      return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isRawFormat);
     }
     throw new IllegalArgumentException(deltaDir + " does not start with " +
                                        deltaPrefix);
@@ -871,13 +889,13 @@ public class AcidUtils {
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
         getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original,
-            obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
+            obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
       }
     } else {
       List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
       for (FileStatus child : children) {
         getChildState(child, null, txnList, working, originalDirectories, original, obsolete,
-            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
+            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
       }
     }
 
@@ -976,12 +994,18 @@ public class AcidUtils {
       //this does "Path.uri.compareTo(that.uri)"
       return o1.getFileStatus().compareTo(o2.getFileStatus());
     });
-    return new Directory(){
+
+    final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs);
+    return new Directory() {
 
       @Override
       public Path getBaseDirectory() {
         return base;
       }
+      @Override
+      public boolean isBaseInRawFormat() {
+        return isBaseInRawFormat;
+      }
 
       @Override
       public List<HdfsFileStatusWithId> getOriginalFiles() {
@@ -1022,7 +1046,8 @@ public class AcidUtils {
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
       ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
       List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
-      boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties) throws IOException {
+      boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties,
+      FileSystem fs) throws IOException {
     Path p = child.getPath();
     String fn = p.getName();
     if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
@@ -1050,7 +1075,7 @@ public class AcidUtils {
                     && child.isDir()) {
       String deltaPrefix =
               (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
-      ParsedDelta delta = parseDelta(child, deltaPrefix);
+      ParsedDelta delta = parseDelta(child, deltaPrefix, fs);
       if (tblproperties != null && AcidUtils.isInsertOnlyTable(tblproperties) &&
           ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) {
         aborted.add(child);
@@ -1171,8 +1196,11 @@ public class AcidUtils {
     parameters.put(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, Boolean.toString(isAcidTable));
   }
 
-  public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) {
-    HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable);
+  /**
+   * Means it's a full acid table
+   */
+  public static void setTransactionalTableScan(Configuration conf, boolean isFullAcidTable) {
+    HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isFullAcidTable);
   }
   /**
    * @param p - not null
@@ -1185,6 +1213,8 @@ public class AcidUtils {
    * SessionState.get().getTxnMgr().supportsAcid() here
    * @param table table
    * @return true if table is a legit ACID table, false otherwise
+   * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and covers
+   * both Acid and MM tables. HIVE-18124
    */
   public static boolean isAcidTable(Table table) {
     if (table == null) {
@@ -1197,6 +1227,10 @@ public class AcidUtils {
 
     return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
   }
+  /**
+   * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and convers
+   * both Acid and MM tables. HIVE-18124
+   */
   public static boolean isAcidTable(CreateTableDesc table) {
     if (table == null || table.getTblProps() == null) {
       return false;
@@ -1208,8 +1242,11 @@ public class AcidUtils {
     return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
   }
 
+  /**
+   * after isTransactionalTable() then make this isAcid() HIVE-18124
+   */
   public static boolean isFullAcidTable(Table table) {
-    return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table.getParameters());
+    return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table);
   }
 
   /**
@@ -1336,6 +1373,9 @@ public class AcidUtils {
   public static boolean isInsertOnlyTable(Map<String, String> params) {
     return isInsertOnlyTable(params, false);
   }
+  public static boolean isInsertOnlyTable(Table table) {
+    return isAcidTable(table) && getAcidOperationalProperties(table).isInsertOnly();
+  }
 
   // TODO [MM gap]: CTAS may currently be broken. It used to work. See the old code, and why isCtas isn't used?
   public static boolean isInsertOnlyTable(Map<String, String> params, boolean isCtas) {
@@ -1349,13 +1389,21 @@ public class AcidUtils {
     return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp));
   }
 
-   /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. */
-  public static Boolean isToInsertOnlyTable(Map<String, String> props) {
+   /**
+    * The method for altering table props; may set the table to MM, non-MM, or not affect MM.
+    * todo: All such validation logic should be TransactionValidationListener
+    * @param tbl object image before alter table command
+    * @param props prop values set in this alter table command
+    */
+  public static Boolean isToInsertOnlyTable(Table tbl, Map<String, String> props) {
     // Note: Setting these separately is a very hairy issue in certain combinations, since we
     //       cannot decide what type of table this becomes without taking both into account, and
     //       in many cases the conversion might be illegal.
     //       The only thing we allow is tx = true w/o tx-props, for backward compat.
     String transactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    if(transactional == null) {
+      transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    }
     String transactionalProp = props.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     if (transactional == null && transactionalProp == null) return null; // Not affected.
     boolean isSetToTxn = "true".equalsIgnoreCase(transactional);
@@ -1378,4 +1426,81 @@ public class AcidUtils {
         hasProps = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     return hasTxn || hasProps;
   }
+
+  /**
+   * Load Data commands against Acid tables write {@link AcidBaseFileType#ORIGINAL_BASE} type files
+   * into delta_x_x/ (or base_x in case there is Overwrite clause).  {@link MetaDataFile} is a
+   * small JSON file in this directory that indicates that these files don't have Acid metadata
+   * columns and so the values for these columns need to be assigned at read time/compaction.
+   */
+  public static class MetaDataFile {
+    //export command uses _metadata....
+    private static final String METADATA_FILE = "_metadata_acid";
+    private static final String CURRENT_VERSION = "0";
+    //todo: enums? that have both field name and value list
+    private interface Field {
+      String VERSION = "thisFileVersion";
+      String DATA_FORMAT = "dataFormat";
+    }
+    private interface Value {
+      //plain ORC file
+      String RAW = "raw";
+      //result of acid write, i.e. decorated with ROW__ID info
+      String NATIVE = "native";
+    }
+
+    /**
+     * @param baseOrDeltaDir detla or base dir, must exist
+     */
+    public static void createMetaFile(Path baseOrDeltaDir, FileSystem fs, boolean isRawFormat)
+      throws IOException {
+      /**
+       * create _meta_data json file in baseOrDeltaDir
+       * write thisFileVersion, dataFormat
+       *
+       * on read if the file is not there, assume version 0 and dataFormat=acid
+       */
+      Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
+      Map<String, String> metaData = new HashMap<>();
+      metaData.put(Field.VERSION, CURRENT_VERSION);
+      metaData.put(Field.DATA_FORMAT, isRawFormat ? Value.RAW : Value.NATIVE);
+      try (FSDataOutputStream strm = fs.create(formatFile, false)) {
+        new ObjectMapper().writeValue(strm, metaData);
+      } catch (IOException ioe) {
+        String msg = "Failed to create " + baseOrDeltaDir + "/" + METADATA_FILE
+            + ": " + ioe.getMessage();
+        LOG.error(msg, ioe);
+        throw ioe;
+      }
+    }
+    public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException {
+      Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
+      if(!fs.exists(formatFile)) {
+        return false;
+      }
+      try (FSDataInputStream strm = fs.open(formatFile)) {
+        Map<String, String> metaData = new ObjectMapper().readValue(strm, Map.class);
+        if(!CURRENT_VERSION.equalsIgnoreCase(metaData.get(Field.VERSION))) {
+          throw new IllegalStateException("Unexpected Meta Data version: "
+              + metaData.get(Field.VERSION));
+        }
+        String dataFormat = metaData.getOrDefault(Field.DATA_FORMAT, "null");
+        switch (dataFormat) {
+          case Value.NATIVE:
+            return false;
+          case Value.RAW:
+            return true;
+          default:
+            throw new IllegalArgumentException("Unexpected value for " + Field.DATA_FORMAT
+                + ": " + dataFormat);
+        }
+      }
+      catch(IOException e) {
+        String msg = "Failed to read " + baseOrDeltaDir + "/" + METADATA_FILE
+            + ": " + e.getMessage();
+        LOG.error(msg, e);
+        throw e;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 6a1dc72..819c2a2 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -468,6 +468,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
 
     try {
       Utilities.copyTablePropertiesToConf(table, conf);
+      if(tableScan != null) {
+        AcidUtils.setTransactionalTableScan(conf, tableScan.getConf().isAcidTable());
+      }
     } catch (HiveException e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index dda9f93..becdc71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import org.apache.hadoop.hive.ql.plan.DynamicValue.NoDynamicValuesException;
 
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 import java.io.IOException;
@@ -409,7 +410,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * @param readerSchema the types for the reader
    * @param conf the configuration
    */
-  public static boolean[] genIncludedColumns(TypeDescription readerSchema,
+  static boolean[] genIncludedColumns(TypeDescription readerSchema,
                                              Configuration conf) {
      if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
       List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
@@ -419,7 +420,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
-  public static String[] getSargColumnNames(String[] originalColumnNames,
+  private static String[] getSargColumnNames(String[] originalColumnNames,
       List<OrcProto.Type> types, boolean[] includedColumns, boolean isOriginal) {
     int rootColumn = getRootColumn(isOriginal);
     String[] columnNames = new String[types.size() - rootColumn];
@@ -695,21 +696,29 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    */
   @VisibleForTesting
   static final class AcidDirInfo {
-    public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
+    AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
         List<AcidBaseFileInfo> baseFiles,
-        List<ParsedDelta> parsedDeltas) {
+        List<ParsedDelta> deleteEvents) {
       this.splitPath = splitPath;
       this.acidInfo = acidInfo;
       this.baseFiles = baseFiles;
       this.fs = fs;
-      this.parsedDeltas = parsedDeltas;
+      this.deleteEvents = deleteEvents;
     }
 
     final FileSystem fs;
     final Path splitPath;
     final AcidUtils.Directory acidInfo;
     final List<AcidBaseFileInfo> baseFiles;
-    final List<ParsedDelta> parsedDeltas;
+    final List<ParsedDelta> deleteEvents;
+
+    /**
+     * No (qualifying) data files found in {@link #splitPath}
+     * @return
+     */
+    boolean isEmpty() {
+      return (baseFiles == null || baseFiles.isEmpty());
+    }
   }
 
   @VisibleForTesting
@@ -884,7 +893,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     public CombineResult combineWith(FileSystem fs, Path dir,
         List<HdfsFileStatusWithId> otherFiles, boolean isOriginal) {
       if ((files.size() + otherFiles.size()) > ETL_COMBINE_FILE_LIMIT
-        || this.isOriginal != isOriginal) {
+        || this.isOriginal != isOriginal) {//todo: what is this checking????
         return (files.size() > otherFiles.size())
             ? CombineResult.NO_AND_SWAP : CombineResult.NO_AND_CONTINUE;
       }
@@ -1083,6 +1092,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   static final class FileGenerator implements Callable<AcidDirInfo> {
     private final Context context;
     private final FileSystem fs;
+    /**
+     * For plain or acid tables this is the root of the partition (or table if not partitioned).
+     * For MM table this is delta/ or base/ dir.  In MM case applying of the ValidTxnList that
+     * {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} normally does has already
+     * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidTxnList)}.
+     */
     private final Path dir;
     private final Ref<Boolean> useFileIds;
     private final UserGroupInformation ugi;
@@ -1119,25 +1134,27 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
 
     private AcidDirInfo callInternal() throws IOException {
+      //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
           context.transactionList, useFileIds, true, null);
-      Path base = dirInfo.getBaseDirectory();
       // find the base files (original or new style)
-      List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>();
-      if (base == null) {
+      List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
+      if (dirInfo.getBaseDirectory() == null) {
+        //for non-acid tables, all data files are in getOriginalFiles() list
         for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) {
           baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
         }
       } else {
-        List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(base, useFileIds);
+        List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds);
         for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
-          baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.COMPACTED_BASE));
+          baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ?
+            AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA));
         }
       }
 
       // Find the parsed deltas- some of them containing only the insert delta events
       // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details)
-      List<ParsedDelta> parsedDeltas = new ArrayList<ParsedDelta>();
+      List<ParsedDelta> parsedDeltas = new ArrayList<>();
 
       if (context.acidOperationalProperties != null &&
           context.acidOperationalProperties.isSplitUpdate()) {
@@ -1154,15 +1171,26 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           if (parsedDelta.isDeleteDelta()) {
             parsedDeltas.add(parsedDelta);
           } else {
+            AcidUtils.AcidBaseFileType deltaType = parsedDelta.isRawFormat() ?
+              AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA;
+            PathFilter bucketFilter = parsedDelta.isRawFormat() ?
+              AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter;
+            if(parsedDelta.isRawFormat() && parsedDelta.getMinTransaction() !=
+              parsedDelta.getMaxTransaction()) {
+              //delta/ with files in raw format are a result of Load Data (as opposed to compaction
+              //or streaming ingest so must have interval length == 1.
+              throw new IllegalStateException("Delta in " + AcidUtils.AcidBaseFileType.ORIGINAL_BASE
+               + " format but txnIds are out of range: " + parsedDelta.getPath());
+            }
             // This is a normal insert delta, which only has insert events and hence all the files
             // in this delta directory can be considered as a base.
             Boolean val = useFileIds.value;
             if (val == null || val) {
               try {
                 List<HdfsFileStatusWithId> insertDeltaFiles =
-                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
+                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter);
                 for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
-                  baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+                  baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
                 }
                 if (val == null) {
                   useFileIds.value = true; // The call succeeded, so presumably the API is there.
@@ -1176,15 +1204,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
               }
             }
             // Fall back to regular API and create statuses without ID.
-            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
+            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter);
             for (FileStatus child : children) {
               HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
-              baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+              baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
             }
           }
         }
 
       } else {
+        /*
+        We already handled all delete deltas above and there should not be any other deltas for
+        any table type.  (this was acid 1.0 code path).
+         */
+        assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir list?!: " + dir;
         // When split-update is not enabled, then all the deltas in the current directories
         // should be considered as usual.
         parsedDeltas.addAll(dirInfo.getCurrentDirectories());
@@ -1658,7 +1691,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       pathFutures.add(ecs.submit(fileGenerator));
     }
 
-    boolean isTransactionalTableScan =//this never seems to be set correctly
+    boolean isTransactionalTableScan =
         HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
     boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
     TypeDescription readerSchema =
@@ -1700,13 +1733,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
         // We have received a new directory information, make split strategies.
         --resultsLeft;
-
+        if(adi.isEmpty()) {
+          //no files found, for example empty table/partition
+          continue;
+        }
         // The reason why we can get a list of split strategies here is because for ACID split-update
         // case when we have a mix of original base files & insert deltas, we will produce two
         // independent split strategies for them. There is a global flag 'isOriginal' that is set
         // on a per split strategy basis and it has to be same for all the files in that strategy.
         List<SplitStrategy<?>> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs,
-            adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
+            adi.splitPath, adi.baseFiles, adi.deleteEvents, readerTypes, ugi,
             allowSyntheticFileIds);
 
         for (SplitStrategy<?> splitStrategy : splitStrategies) {
@@ -1790,6 +1826,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       boolean isOriginal, UserGroupInformation ugi, boolean allowSyntheticFileIds,
       boolean isDefaultFs) {
     if (!deltas.isEmpty() || combinedCtx == null) {
+      //why is this checking for deltas.isEmpty() - HIVE-18110
       return new ETLSplitStrategy(
           context, fs, dir, files, readerTypes, isOriginal, deltas, covered, ugi,
           allowSyntheticFileIds, isDefaultFs);
@@ -1955,6 +1992,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
     OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
     mergerOptions.rootPath(split.getRootDir());
+    mergerOptions.bucketPath(split.getPath());
     final int bucket;
     if (split.hasBase()) {
       AcidOutputFormat.Options acidIOOptions =
@@ -1968,8 +2006,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
     } else {
       bucket = (int) split.getStart();
+      assert false : "We should never have a split w/o base in acid 2.0 for full acid: " + split.getPath();
     }
-
+    //todo: createOptionsForReader() assumes it's !isOriginal.... why?
     final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
     readOptions.range(split.getStart(), split.getLength());
 
@@ -2041,6 +2080,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
     final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
     readerOptions.include(OrcInputFormat.genIncludedColumns(schema, conf));
+    //todo: last param is bogus. why is this hardcoded?
     OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, true);
     return readerOptions;
   }
@@ -2144,7 +2184,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       List<AcidBaseFileInfo> baseFiles,
       List<ParsedDelta> parsedDeltas,
       List<OrcProto.Type> readerTypes,
-      UserGroupInformation ugi, boolean allowSyntheticFileIds) {
+      UserGroupInformation ugi, boolean allowSyntheticFileIds) throws IOException {
     List<SplitStrategy<?>> splitStrategies = new ArrayList<SplitStrategy<?>>();
     SplitStrategy<?> splitStrategy;
 
@@ -2153,23 +2193,24 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     boolean isDefaultFs = (!checkDefaultFs) || ((fs instanceof DistributedFileSystem)
             && HdfsUtils.isDefaultFs((DistributedFileSystem) fs));
 
-    // When no baseFiles, we will just generate a single split strategy and return.
-    List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
     if (baseFiles.isEmpty()) {
-      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles,
+      assert false : "acid 2.0 no base?!: " + dir;
+      splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, Collections.emptyList(),
           false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds, isDefaultFs);
       if (splitStrategy != null) {
         splitStrategies.add(splitStrategy);
       }
-      return splitStrategies; // return here
+      return splitStrategies;
     }
 
+    List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<>();
     List<HdfsFileStatusWithId> originalSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
     // Separate the base files into acid schema and non-acid(original) schema files.
     for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) {
       if (acidBaseFileInfo.isOriginal()) {
         originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
       } else {
+        assert acidBaseFileInfo.isAcidSchema();
         acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
       }
     }
@@ -2195,14 +2236,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     return splitStrategies;
   }
 
-  @VisibleForTesting
-  static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
+  private static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
       FileSystem fs, Path dir,
       List<HdfsFileStatusWithId> baseFiles,
       boolean isOriginal,
       List<ParsedDelta> parsedDeltas,
       List<OrcProto.Type> readerTypes,
-      UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) {
+      UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs)
+    throws IOException {
     List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(parsedDeltas);
     boolean[] covered = new boolean[context.numBuckets];
 
@@ -2250,6 +2291,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
+  /**
+   *
+   * @param bucket bucket/writer ID for this split of the compaction job
+   */
   @Override
   public RawReader<OrcStruct> getRawReader(Configuration conf,
                                            boolean collapseEvents,
@@ -2258,25 +2303,26 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                            Path baseDirectory,
                                            Path[] deltaDirectory
                                            ) throws IOException {
-    Reader reader = null;
     boolean isOriginal = false;
+    OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(true)
+      .isMajorCompaction(collapseEvents);
     if (baseDirectory != null) {//this is NULL for minor compaction
-      Path bucketFile = null;
+      //it may also be null if there is no base - only deltas
+      mergerOptions.baseDir(baseDirectory);
       if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
-        bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
+        isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf));
+        mergerOptions.rootPath(baseDirectory.getParent());
       } else {
-        /**we don't know which file to start reading -
-         * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/
         isOriginal = true;
+        mergerOptions.rootPath(baseDirectory);
       }
-      if(bucketFile != null) {
-        reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
-      }
     }
-    OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
-      .isCompacting(true)
-      .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null);
-    return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
+    else {
+      //since we have no base, there must be at least 1 delta which must a result of acid write
+      //so it must be immediate child of the partition
+      mergerOptions.rootPath(deltaDirectory[0].getParent());
+    }
+    return new OrcRawRecordMerger(conf, collapseEvents, null, isOriginal,
         bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
   }