You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/12/01 02:40:06 UTC

[2/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional tables (Eugene Koifman, reviewed by Alan Gates)

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 95a60dc..73f27e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -88,11 +88,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      */
     private int statementId;//sort on this descending, like currentTransactionId
 
-    public ReaderKey() {
+    ReaderKey() {
       this(-1, -1, -1, -1, 0);
     }
 
-    public ReaderKey(long originalTransaction, int bucket, long rowId,
+    ReaderKey(long originalTransaction, int bucket, long rowId,
                      long currentTransactionId) {
       this(originalTransaction, bucket, rowId, currentTransactionId, 0);
     }
@@ -196,6 +196,34 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     void next(OrcStruct next) throws IOException;
   }
   /**
+   * Used when base_x/bucket_N is missing - makes control flow a bit easier
+   */
+  private class EmptyReaderPair implements ReaderPair {
+    @Override public OrcStruct nextRecord() {
+      return null;
+    }
+    @Override public int getColumns() {
+      return 0;
+    }
+    @Override public RecordReader getRecordReader() {
+      return null;
+    }
+    @Override public Reader getReader() {
+      return null;
+    }
+    @Override public RecordIdentifier getMinKey() {
+      return null;
+    }
+    @Override public RecordIdentifier getMaxKey() {
+      return null;
+    }
+    @Override public ReaderKey getKey() {
+      return null;
+    }
+    @Override public void next(OrcStruct next) throws IOException {
+    }
+  }
+  /**
    * A reader and the next record from that reader. The code reads ahead so that
    * we can return the lowest ReaderKey from each of the readers. Thus, the
    * next available row is nextRecord and only following records are still in
@@ -209,6 +237,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     private final ReaderKey key;
     private final RecordIdentifier minKey;
     private final RecordIdentifier maxKey;
+    @Deprecated//HIVE-18158
     private final int statementId;
 
     /**
@@ -320,12 +349,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     private final ReaderKey key;
     final int bucketId;
     final int bucketProperty;
+    /**
+     * TransactionId to use when generating synthetic ROW_IDs
+     */
+    final long transactionId;
 
-    OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException {
+    OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf, Options mergeOptions,
+      int statementId) throws IOException {
       this.key = key;
       this.bucketId = bucketId;
       assert bucketId >= 0 : "don't support non-bucketed tables yet";
-      this.bucketProperty = encodeBucketId(conf, bucketId);
+      this.bucketProperty = encodeBucketId(conf, bucketId, statementId);
+      transactionId = mergeOptions.getTransactionId();
     }
     @Override public final OrcStruct nextRecord() {
       return nextRecord;
@@ -337,7 +372,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     @Override
     public final ReaderKey getKey() { return key; }
     /**
-     * The cumulative number of row in all files of the logical bucket that precede the file
+     * The cumulative number of rows in all files of the logical bucket that precede the file
      * represented by {@link #getRecordReader()}
      */
     abstract long getRowIdOffset();
@@ -355,9 +390,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
               new IntWritable(OrcRecordUpdater.INSERT_OPERATION);
           nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation);
           nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
-              new LongWritable(0));
+              new LongWritable(transactionId));
           nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
-              new LongWritable(0));
+              new LongWritable(transactionId));
           nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
               new IntWritable(bucketProperty));
           nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
@@ -369,17 +404,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
               .set(OrcRecordUpdater.INSERT_OPERATION);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
-              .set(0);
+              .set(transactionId);
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
               .set(bucketProperty);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
-              .set(0);
+              .set(transactionId);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
               .set(nextRowId);
           nextRecord().setFieldValue(OrcRecordUpdater.ROW,
               getRecordReader().next(OrcRecordUpdater.getRow(next)));
         }
-        key.setValues(0L, bucketProperty, nextRowId, 0L, 0);
+        key.setValues(transactionId, bucketProperty, nextRowId, transactionId, 0);
         if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("key " + key + " > maxkey " + getMaxKey());
@@ -391,9 +426,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       return false;//reached EndOfFile
     }
   }
-  static int encodeBucketId(Configuration conf, int bucketId) {
-    return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId));
+  static int encodeBucketId(Configuration conf, int bucketId, int statementId) {
+    return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId)
+        .statementId(statementId));
   }
+  /**
+   * This handles normal read (as opposed to Compaction) of a {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+   * file.  These may be a result of Load Data or it may be a file that was written to the table
+   * before it was converted to acid.
+   */
   @VisibleForTesting
   final static class OriginalReaderPairToRead extends OriginalReaderPair {
     private final long rowIdOffset;
@@ -401,12 +442,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     private final RecordReader recordReader;
     private final RecordIdentifier minKey;
     private final RecordIdentifier maxKey;
-
     OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId,
                              final RecordIdentifier minKey, final RecordIdentifier maxKey,
                              Reader.Options options, Options mergerOptions, Configuration conf,
-                             ValidTxnList validTxnList) throws IOException {
-      super(key, bucketId, conf);
+                             ValidTxnList validTxnList, int statementId) throws IOException {
+      super(key, bucketId, conf, mergerOptions, statementId);
       this.reader = reader;
       assert !mergerOptions.isCompacting();
       assert mergerOptions.getRootPath() != null : "Since we have original files";
@@ -426,6 +466,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       boolean haveSeenCurrentFile = false;
       long rowIdOffsetTmp = 0;
       {
+        /**
+         * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
+         * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
+         * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
+         */
         //the split is from something other than the 1st file of the logical bucket - compute offset
         AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
           conf, validTxnList, false, true);
@@ -458,7 +503,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         if (rowIdOffset > 0) {
           //rowIdOffset could be 0 if all files before current one are empty
           /**
-           * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration)}
+           * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration, Options)}
            * need to fix min/max key since these are used by
            * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
            * the key.  Clear?  */
@@ -469,7 +514,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              *  If this is not the 1st file, set minKey 1 less than the start of current file
              * (Would not need to set minKey if we knew that there are no delta files)
              * {@link #advanceToMinKey()} needs this */
-            newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1);
+            newMinKey = new RecordIdentifier(transactionId, bucketProperty,rowIdOffset - 1);
           }
           if (maxKey != null) {
             maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
@@ -482,7 +527,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            * of the file so we want to leave it blank to make sure any insert events in delta
            * files are included; Conversely, if it's not the last file, set the maxKey so that
            * events from deltas that don't modify anything in the current split are excluded*/
-        newMaxKey = new RecordIdentifier(0, bucketProperty,
+        newMaxKey = new RecordIdentifier(transactionId, bucketProperty,
           rowIdOffset + reader.getNumberOfRows() - 1);
       }
       this.minKey = newMinKey;
@@ -532,8 +577,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     OriginalReaderPairToCompact(ReaderKey key, int bucketId,
                        Reader.Options options, Options mergerOptions, Configuration conf,
-                       ValidTxnList validTxnList) throws IOException {
-      super(key, bucketId, conf);
+                       ValidTxnList validTxnList, int statementId) throws IOException {
+      super(key, bucketId, conf, mergerOptions, statementId);
       assert mergerOptions.isCompacting() : "Should only be used for Compaction";
       this.conf = conf;
       this.options = options;
@@ -544,9 +589,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       assert options.getMaxOffset() == Long.MAX_VALUE;
       AcidUtils.Directory directoryState = AcidUtils.getAcidState(
         mergerOptions.getRootPath(), conf, validTxnList, false, true);
+      /**
+       * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
+       * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
+       * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
+       */
       originalFiles = directoryState.getOriginalFiles();
       assert originalFiles.size() > 0;
-      this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
+      //in case of Compaction, this is the 1st file of the current bucket
+      this.reader = advanceToNextFile();
       if (reader == null) {
         //Compactor generated a split for a bucket that has no data?
         throw new IllegalStateException("No 'original' files found for bucketId=" + this.bucketId +
@@ -655,7 +706,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    */
   private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
                                          Reader.Options options,
-                                         Configuration conf) throws IOException {
+                                         Configuration conf, Options mergerOptions) throws IOException {
     long rowLength = 0;
     long rowOffset = 0;
     long offset = options.getOffset();//this would usually be at block boundary
@@ -663,7 +714,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     boolean isTail = true;
     RecordIdentifier minKey = null;
     RecordIdentifier maxKey = null;
-    int bucketProperty = encodeBucketId(conf, bucket);
+    TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+      mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
+    int bucketProperty = encodeBucketId(conf, bucket, tfp.statementId);
    /**
     * options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
     * necessarily match stripe boundary.  So we want to come up with minKey to be one before the 1st
@@ -755,13 +808,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts.
    * This makes the "context" explicit.
    */
-  static class Options {
+  static class Options implements Cloneable {
     private int copyIndex = 0;
     private boolean isCompacting = false;
     private Path bucketPath;
     private Path rootPath;
+    private Path baseDir;
     private boolean isMajorCompaction = false;
     private boolean isDeleteReader = false;
+    private long transactionId = 0;
     Options copyIndex(int copyIndex) {
       assert copyIndex >= 0;
       this.copyIndex = copyIndex;
@@ -790,6 +845,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       assert !isCompacting;
       return this;
     }
+    Options transactionId(long transactionId) {
+      this.transactionId = transactionId;
+      return this;
+    }
+    Options baseDir(Path baseDir) {
+      this.baseDir = baseDir;
+      return this;
+    }
     /**
      * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
      */
@@ -825,13 +888,48 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     boolean isDeleteReader() {
       return isDeleteReader;
     }
+    /**
+     * for reading "original" files - i.e. not native acid schema.  Default value of 0 is
+     * appropriate for files that existed in a table before it was made transactional.  0 is the
+     * primordial transaction.  For non-native files resulting from Load Data command, they
+     * are located and base_x or delta_x_x and then transactionId == x.
+     */
+    long getTransactionId() {
+      return transactionId;
+    }
+
+    /**
+     * In case of isMajorCompaction() this is the base dir from the Compactor, i.e. either a base_x
+     * or {@link #rootPath} if it's the 1st major compaction after non-acid2acid conversion
+     */
+    Path getBaseDir() {
+      return baseDir;
+    }
+    /**
+     * shallow clone
+     */
+    public Options clone() {
+      try {
+        return (Options) super.clone();
+      }
+      catch(CloneNotSupportedException ex) {
+        throw new AssertionError();
+      }
+    }
   }
   /**
-   * Create a reader that merge sorts the ACID events together.
+   * Create a reader that merge sorts the ACID events together.  This handles
+   * 1. 'normal' reads on behalf of a query (non vectorized)
+   * 2. Compaction reads (major/minor)
+   * 3. Delete event reads - to create a sorted view of all delete events for vectorized read
+   *
+   * This makes the logic in the constructor confusing and needs to be refactored.  Liberal use of
+   * asserts below is primarily for documentation purposes.
+   *
    * @param conf the configuration
    * @param collapseEvents should the events on the same row be collapsed
-   * @param isOriginal is the base file a pre-acid file
-   * @param bucket the bucket we are reading
+   * @param isOriginal if reading filws w/o acid schema - {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+   * @param bucket the bucket/writer id of the file we are reading
    * @param options the options to read with
    * @param deltaDirectory the list of delta directories to include
    * @throws IOException
@@ -887,11 +985,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     objectInspector = OrcRecordUpdater.createEventSchema
         (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));
+    assert !(mergerOptions.isCompacting() && reader != null) : "don't need a reader for compaction";
 
     // modify the options to reflect the event instead of the base row
     Reader.Options eventOptions = createEventOptions(options);
+    //suppose it's the first Major compaction so we only have deltas
+    boolean isMajorNoBase = mergerOptions.isCompacting() && mergerOptions.isMajorCompaction()
+      && mergerOptions.getBaseDir() == null;
     if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) ||
-      mergerOptions.isDeleteReader()) {
+      mergerOptions.isDeleteReader() || isMajorNoBase) {
       //for minor compaction, there is no progress report and we don't filter deltas
       baseReader = null;
       minKey = maxKey = null;
@@ -906,27 +1008,68 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       } else {
         // find the min/max based on the offset and length (and more for 'original')
         if (isOriginal) {
-          keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+          //note that this KeyInterval may be adjusted later due to copy_N files
+          keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf, mergerOptions);
         } else {
           keyInterval = discoverKeyBounds(reader, options);
         }
       }
       LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
       // use the min/max instead of the byte range
-      ReaderPair pair;
+      ReaderPair pair = null;
       ReaderKey key = new ReaderKey();
       if (isOriginal) {
         options = options.clone();
         if(mergerOptions.isCompacting()) {
-          pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
-            conf, validTxnList);
+          assert mergerOptions.isMajorCompaction();
+          Options readerPairOptions = mergerOptions;
+          if(mergerOptions.getBaseDir().getName().startsWith(AcidUtils.BASE_PREFIX)) {
+            readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+              AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir());
+          }
+          pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions,
+            conf, validTxnList,
+            0);//0 since base_x doesn't have a suffix (neither does pre acid write)
         } else {
+          assert mergerOptions.getBucketPath() != null : " since this is not compaction: "
+            + mergerOptions.getRootPath();
+          //if here it's a non-acid schema file - check if from before table was marked transactional
+          //or in base_x/delta_x_x from Load Data
+          Options readerPairOptions = mergerOptions;
+          TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+            mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
+          if(tfp.syntheticTransactionId > 0) {
+            readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+              tfp.syntheticTransactionId, tfp.folder);
+          }
           pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
-            keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
+            keyInterval.getMaxKey(), options,  readerPairOptions, conf, validTxnList, tfp.statementId);
         }
       } else {
-        pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
-          eventOptions, 0);
+        if(mergerOptions.isCompacting()) {
+          assert mergerOptions.isMajorCompaction() : "expected major compaction: "
+            + mergerOptions.getBaseDir() + ":" + bucket;
+          assert mergerOptions.getBaseDir() != null : "no baseDir?: " + mergerOptions.getRootPath();
+          //we are compacting and it's acid schema so create a reader for the 1st bucket file that is not empty
+          FileSystem fs = mergerOptions.getBaseDir().getFileSystem(conf);
+          Path bucketPath = AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket);
+          if(fs.exists(bucketPath) && fs.getFileStatus(bucketPath).getLen() > 0) {
+            //doing major compaction - it's possible where full compliment of bucket files is not
+            //required (on Tez) that base_x/ doesn't have a file for 'bucket'
+            reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf));
+            pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+              eventOptions, 0);
+          }
+          else {
+            pair = new EmptyReaderPair();
+            LOG.info("No non-empty " + bucketPath + " was found for Major compaction");
+          }
+        }
+        else {
+          assert reader != null : "no reader? " + mergerOptions.getRootPath();
+          pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+            eventOptions, 0);
+        }
       }
       minKey = pair.getMinKey();
       maxKey = pair.getMaxKey();
@@ -937,11 +1080,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       }
       baseReader = pair.getRecordReader();
     }
-
-    if (deltaDirectory != null) {
-      /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
-      * user columns
-      * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/
+    /*now process the delta files.  For normal read these should only be delete deltas.  For
+    * Compaction these may be any delta_x_y/.  The files inside any delta_x_y/ may be in Acid
+    * format (i.e. with Acid metadata columns) or 'original'.*/
+    if (deltaDirectory != null && deltaDirectory.length > 0) {
+      /*For reads, whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
+      * user columns.  For Compaction there is never a SARG.
+      * */
       Reader.Options deltaEventOptions = eventOptions.clone()
         .searchArgument(null, null).range(0, Long.MAX_VALUE);
       for(Path delta: deltaDirectory) {
@@ -950,17 +1095,50 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
         }
         ReaderKey key = new ReaderKey();
-        AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
+        AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta, delta.getFileSystem(conf));
+        if(deltaDir.isRawFormat()) {
+          assert !deltaDir.isDeleteDelta() : delta.toString();
+          assert mergerOptions.isCompacting() : "during regular read anything which is not a" +
+            " delete_delta is treated like base: " + delta;
+          Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions,
+            deltaDir.getMinTransaction(), delta);
+          //this will also handle copy_N files if any
+          ReaderPair deltaPair =  new OriginalReaderPairToCompact(key, bucket, options,
+            rawCompactOptions, conf, validTxnList, deltaDir.getStatementId());
+          if (deltaPair.nextRecord() != null) {
+            readers.put(key, deltaPair);
+          }
+          continue;
+        }
         for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
           FileSystem fs = deltaFile.getFileSystem(conf);
           if(!fs.exists(deltaFile)) {
+            /**
+             * it's possible that the file for a specific {@link bucket} doesn't exist in any given
+             * delta since since no rows hashed to it (and not configured to create empty buckets)
+             */
             continue;
           }
+          if(deltaDir.isDeleteDelta()) {
+            //if here it maybe compaction or regular read or Delete event sorter
+            //in the later 2 cases we should do:
+            //HIVE-17320: we should compute a SARG to push down min/max key to delete_delta
+            Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf));
+            ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
+              deltaEventOptions, deltaDir.getStatementId());
+            if (deltaPair.nextRecord() != null) {
+              readers.put(key, deltaPair);
+            }
+            continue;
+          }
+          //if here then we must be compacting
+          assert mergerOptions.isCompacting() : "not compacting and not delete delta : " + delta;
           /* side files are only created by streaming ingest.  If this is a compaction, we may
           * have an insert delta/ here with side files there because the original writer died.*/
           long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile));
           assert length >= 0;
           Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length));
+          //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty
           ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
             deltaEventOptions, deltaDir.getStatementId());
           if (deltaPair.nextRecord() != null) {
@@ -988,6 +1166,76 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   }
 
   /**
+   * For use with Load Data statement which places {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+   * type files into a base_x/ or delta_x_x.  The data in these are then assigned ROW_IDs at read
+   * time and made permanent at compaction time.  This is identical to how 'original' files (i.e.
+   * those that existed in the table before it was converted to an Acid table) except that the
+   * transaction ID to use in the ROW_ID should be that of the transaction that ran the Load Data.
+   */
+  static final class TransactionMetaData {
+    final long syntheticTransactionId;
+    /**
+     * folder which determines the transaction id to use in synthetic ROW_IDs
+     */
+    final Path folder;
+    final int statementId;
+    TransactionMetaData(long syntheticTransactionId, Path folder) {
+      this(syntheticTransactionId, folder, 0);
+    }
+    TransactionMetaData(long syntheticTransactionId, Path folder, int statementId) {
+      this.syntheticTransactionId = syntheticTransactionId;
+      this.folder = folder;
+      this.statementId = statementId;
+    }
+    static TransactionMetaData findTransactionIDForSynthetcRowIDs(Path splitPath, Path rootPath,
+      Configuration conf) throws IOException {
+      Path parent = splitPath.getParent();
+      if(rootPath.equals(parent)) {
+        //the 'isOriginal' file is at the root of the partition (or table) thus it is
+        //from a pre-acid conversion write and belongs to primordial txnid:0.
+        return new TransactionMetaData(0, parent);
+      }
+      while(parent != null && !rootPath.equals(parent)) {
+        boolean isBase = parent.getName().startsWith(AcidUtils.BASE_PREFIX);
+        boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX);
+        if(isBase || isDelta) {
+          if(isBase) {
+            return new TransactionMetaData(AcidUtils.parseBase(parent), parent);
+          }
+          else {
+            AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX,
+              parent.getFileSystem(conf));
+            assert pd.getMinTransaction() == pd.getMaxTransaction() :
+              "This a delta with raw non acid schema, must be result of single write, no compaction: "
+                + splitPath;
+            return new TransactionMetaData(pd.getMinTransaction(), parent, pd.getStatementId());
+          }
+        }
+        parent = parent.getParent();
+      }
+      if(parent == null) {
+        //spit is marked isOriginal but it's not an immediate child of a partition nor is it in a
+        //base/ or delta/ - this should never happen
+        throw new IllegalStateException("Cannot determine transaction id for original file "
+          + splitPath + " in " + rootPath);
+      }
+      //"warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" is a meaningful path for nonAcid2acid
+      // converted table
+      return new TransactionMetaData(0, rootPath);
+    }
+  }
+  /**
+   * This is done to read non-acid schema files ("original") located in base_x/ or delta_x_x/ which
+   * happens as a result of Load Data statement.  Setting {@code rootPath} to base_x/ or delta_x_x
+   * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} in subsequent
+   * {@link OriginalReaderPair} object to return the files in this dir
+   * in {@link AcidUtils.Directory#getOriginalFiles()}
+   * @return modified clone of {@code baseOptions}
+   */
+  private Options modifyForNonAcidSchemaRead(Options baseOptions, long transactionId, Path rootPath) {
+    return baseOptions.clone().transactionId(transactionId).rootPath(rootPath);
+  }
+  /**
    * This determines the set of {@link ReaderPairAcid} to create for a given delta/.
    * For unbucketed tables {@code bucket} can be thought of as a write tranche.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 315cc1d..8af38b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -196,7 +196,9 @@ public class OrcRecordUpdater implements RecordUpdater {
     fields.add(new OrcStruct.Field("row", rowInspector, ROW));
     return new OrcStruct.OrcStructInspector(fields);
   }
-
+  /**
+   * @param path - partition root
+   */
   OrcRecordUpdater(Path path,
                    AcidOutputFormat.Options options) throws IOException {
     this.options = options;

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 58638b5..edffa5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -51,6 +51,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
   private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class);
   private OrcTail orcTail;
   private boolean hasFooter;
+  /**
+   * This means {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+   */
   private boolean isOriginal;
   private boolean hasBase;
   //partition root

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index bcde4fc..d571bd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.impl.AcidStats;
@@ -156,7 +155,7 @@ public class VectorizedOrcAcidRowBatchReader
     this.vectorizedRowBatchBase = baseReader.createValue();
   }
 
-  private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Reporter reporter,
+  private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter,
       VectorizedRowBatchCtx rowBatchCtx) throws IOException {
     this.rbCtx = rowBatchCtx;
     final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
@@ -165,12 +164,10 @@ public class VectorizedOrcAcidRowBatchReader
 
     // This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is
     // enabled for an ACID case and the file format is ORC.
-    boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate()
-                                   || !(inputSplit instanceof OrcSplit);
+    boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate();
     if (isReadNotAllowed) {
       OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
     }
-    final OrcSplit orcSplit = (OrcSplit) inputSplit;
 
     reporter.setStatus(orcSplit.toString());
     readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf));
@@ -226,9 +223,11 @@ public class VectorizedOrcAcidRowBatchReader
   private static final class OffsetAndBucketProperty {
     private final long rowIdOffset;
     private final int bucketProperty;
-    private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) {
+    private final long syntheticTxnId;
+    private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long syntheticTxnId) {
       this.rowIdOffset = rowIdOffset;
       this.bucketProperty = bucketProperty;
+      this.syntheticTxnId = syntheticTxnId;
     }
   }
   /**
@@ -240,17 +239,34 @@ public class VectorizedOrcAcidRowBatchReader
    *
    * todo: This logic is executed per split of every "original" file.  The computed result is the
    * same for every split form the same file so this could be optimized by moving it to
-   * before/during splt computation and passing the info in the split.  (HIVE-17917)
+   * before/during split computation and passing the info in the split.  (HIVE-17917)
    */
   private OffsetAndBucketProperty computeOffsetAndBucket(
     OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException {
-    if(!needSyntheticRowIds(split, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
-      return new OffsetAndBucketProperty(0,0);
+    if(!needSyntheticRowIds(split.isOriginal(), !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+      if(split.isOriginal()) {
+        /**
+         * Even if we don't need to project ROW_IDs, we still need to check the transaction ID that
+         * created the file to see if it's committed.  See more in
+         * {@link #next(NullWritable, VectorizedRowBatch)}.  (In practice getAcidState() should
+         * filter out base/delta files but this makes fewer dependencies)
+         */
+        OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
+          OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
+            split.getRootDir(), conf);
+        return new OffsetAndBucketProperty(-1,-1,
+          syntheticTxnInfo.syntheticTransactionId);
+      }
+      return null;
     }
     long rowIdOffset = 0;
+    OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
+      OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
+        split.getRootDir(), conf);
     int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId();
-    int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId));
-    AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf,
+    int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf)
+      .statementId(syntheticTxnInfo.statementId).bucket(bucketId));
+    AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf,
       validTxnList, false, true);
     for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
       AcidOutputFormat.Options bucketOptions =
@@ -266,7 +282,8 @@ public class VectorizedOrcAcidRowBatchReader
         OrcFile.readerOptions(conf));
       rowIdOffset += reader.getNumberOfRows();
     }
-    return new OffsetAndBucketProperty(rowIdOffset, bucketProperty);
+    return new OffsetAndBucketProperty(rowIdOffset, bucketProperty,
+      syntheticTxnInfo.syntheticTransactionId);
   }
   /**
    * {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables.
@@ -284,7 +301,7 @@ public class VectorizedOrcAcidRowBatchReader
     if(rbCtx == null) {
       throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + split.getPath());
     }
-    return !needSyntheticRowIds(split, hasDeletes, areRowIdsProjected(rbCtx));
+    return !needSyntheticRowIds(split.isOriginal(), hasDeletes, areRowIdsProjected(rbCtx));
   }
 
   /**
@@ -292,8 +309,8 @@ public class VectorizedOrcAcidRowBatchReader
    * Even if ROW__ID is not projected you still need to decorate the rows with them to see if
    * any of the delete events apply.
    */
-  private static boolean needSyntheticRowIds(OrcSplit split, boolean hasDeletes, boolean rowIdProjected) {
-    return split.isOriginal() && (hasDeletes || rowIdProjected);
+  private static boolean needSyntheticRowIds(boolean isOriginal, boolean hasDeletes, boolean rowIdProjected) {
+    return isOriginal && (hasDeletes || rowIdProjected);
   }
   private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) {
     if(rbCtx.getVirtualColumnCount() == 0) {
@@ -316,7 +333,7 @@ public class VectorizedOrcAcidRowBatchReader
       if (orcSplit.isOriginal()) {
         root = orcSplit.getRootDir();
       } else {
-        root = path.getParent().getParent();
+        root = path.getParent().getParent();//todo: why not just use getRootDir()?
         assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() +
           " path.p.p=" + root;
       }
@@ -398,7 +415,9 @@ public class VectorizedOrcAcidRowBatchReader
        * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order
        * to see if any deletes apply
        */
-      if(rowIdProjected || !deleteEventRegistry.isEmpty()) {
+      if(needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+        assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + syntheticProps;
+        assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" + syntheticProps;
         if(innerReader == null) {
           throw new IllegalStateException(getClass().getName() + " requires " +
             org.apache.orc.RecordReader.class +
@@ -409,8 +428,7 @@ public class VectorizedOrcAcidRowBatchReader
          */
         recordIdColumnVector.fields[0].noNulls = true;
         recordIdColumnVector.fields[0].isRepeating = true;
-        //all "original" is considered written by txnid:0 which committed
-        ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0;
+        ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId;
         /**
          * This is {@link RecordIdentifier#getBucketProperty()}
          * Also see {@link BucketCodec}
@@ -433,15 +451,21 @@ public class VectorizedOrcAcidRowBatchReader
         innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0];
         innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1];
         innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2];
+        //these are insert events so (original txn == current) txn for all rows
+        innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_TRANSACTION] = recordIdColumnVector.fields[0];
+      }
+      if(syntheticProps.syntheticTxnId > 0) {
+        //"originals" (written before table was converted to acid) is considered written by
+        // txnid:0 which is always committed so there is no need to check wrt invalid transactions
+        //But originals written by Load Data for example can be in base_x or delta_x_x so we must
+        //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline.
+        findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector,
+          vectorizedRowBatchBase.size, selectedBitSet);
       }
     }
     else {
       // Case 1- find rows which belong to transactions that are not valid.
       findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
-      /**
-       * All "original" data belongs to txnid:0 and is always valid/committed for every reader
-       * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal
-       */
     }
 
     // Case 2- find rows which have been deleted.
@@ -473,11 +497,6 @@ public class VectorizedOrcAcidRowBatchReader
     }
     else {
       // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
-      // NOTE: We only link up the user columns and not the ACID metadata columns because this
-      // vectorized code path is not being used in cases of update/delete, when the metadata columns
-      // would be expected to be passed up the operator pipeline. This is because
-      // currently the update/delete specifically disable vectorized code paths.
-      // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
       StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
       // Transfer columnVector objects from base batch to outgoing batch.
       System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index f7388a4..736034d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -27,12 +27,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.io.NullWritable;

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 6fb0c43..fdb3603 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.LockTableDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+import org.apache.hadoop.hive.ql.plan.api.Query;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
@@ -297,6 +298,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
         break;
       default:
         if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
+          if(allowOperationInATransaction(queryPlan)) {
+            break;
+          }
+          //look at queryPlan.outputs(WriteEntity.t - that's the table)
           //for example, drop table in an explicit txn is not allowed
           //in some cases this requires looking at more than just the operation
           //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
@@ -311,6 +316,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     any non acid and raise an appropriate error
     * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
   }
+
+  /**
+   * This modifies the logic wrt what operations are allowed in a transaction.  Multi-statement
+   * transaction support is incomplete but it makes some Acid tests cases much easier to write.
+   */
+  private boolean allowOperationInATransaction(QueryPlan queryPlan) {
+    //Acid and MM tables support Load Data with transactional semantics.  This will allow Load Data
+    //in a txn assuming we can determine the target is a suitable table type.
+    if(queryPlan.getOperation() == HiveOperation.LOAD && queryPlan.getOutputs() != null && queryPlan.getOutputs().size() == 1) {
+      WriteEntity writeEntity = queryPlan.getOutputs().iterator().next();
+      if(AcidUtils.isFullAcidTable(writeEntity.getTable()) || AcidUtils.isInsertOnlyTable(writeEntity.getTable())) {
+        switch (writeEntity.getWriteType()) {
+          case INSERT:
+            //allow operation in a txn
+            return true;
+          case INSERT_OVERWRITE:
+            //see HIVE-18154
+            return false;
+          default:
+            //not relevant for LOAD
+            return false;
+        }
+      }
+    }
+    //todo: handle Insert Overwrite as well: HIVE-18154
+    return false;
+  }
   /**
    * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
    * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 1a37bf7..9f2c6d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -146,6 +146,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
@@ -1705,18 +1706,20 @@ public class Hive {
    *          location/inputformat/outputformat/serde details from table spec
    * @param isSrcLocal
    *          If the source directory is LOCAL
-   * @param isAcid
-   *          true if this is an ACID operation
+   * @param isAcidIUDoperation
+   *          true if this is an ACID operation Insert/Update/Delete operation
    * @param hasFollowingStatsTask
    *          true if there is a following task which updates the stats, so, this method need not update.
    * @return Partition object being loaded with data
    */
   public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
       LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long txnId, int stmtId)
+      boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long txnId, int stmtId)
           throws HiveException {
     Path tblDataLocationPath =  tbl.getDataLocation();
     boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
+    assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
+    boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
     try {
       // Get the partition object if it already exists
       Partition oldPart = getPartition(tbl, partSpec, false);
@@ -1768,7 +1771,7 @@ public class Hive {
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)");
         }
-        assert !isAcid;
+        assert !isAcidIUDoperation;
         if (areEventsForDmlNeeded(tbl, oldPart)) {
           newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
         }
@@ -1792,16 +1795,22 @@ public class Hive {
           filter = (loadFileType == LoadFileType.REPLACE_ALL)
             ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
         }
+        else if(!isAcidIUDoperation && isFullAcidTable) {
+          destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+        }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + destPath);
         }
-        if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcid)) {
+        //todo: why is "&& !isAcidIUDoperation" needed here?
+        if (!isFullAcidTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) {
+          //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new
+          // base_x.  (there is Insert Overwrite and Load Data Overwrite)
           boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
           replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
               isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite);
         } else {
           FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
-          copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid,
+          copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
             (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles);
         }
       }
@@ -1891,6 +1900,38 @@ public class Hive {
     }
   }
 
+  /**
+   * Load Data commands for fullAcid tables write to base_x (if there is overwrite clause) or
+   * delta_x_x directory - same as any other Acid write.  This method modifies the destPath to add
+   * this path component.
+   * @param txnId - id of current transaction (in which this operation is running)
+   * @param stmtId - see {@link DbTxnManager#getWriteIdAndIncrement()}
+   * @return appropriately modified path
+   */
+  private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long txnId, int stmtId, Table tbl) throws HiveException {
+    switch (loadFileType) {
+      case REPLACE_ALL:
+        destPath = new Path(destPath, AcidUtils.baseDir(txnId));
+        break;
+      case KEEP_EXISTING:
+        destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+        break;
+      case OVERWRITE_EXISTING:
+        //should not happen here - this is for replication
+      default:
+        throw new IllegalArgumentException("Unexpected " + LoadFileType.class.getName() + " " + loadFileType);
+    }
+    try {
+      FileSystem fs = tbl.getDataLocation().getFileSystem(SessionState.getSessionConf());
+      if(!FileUtils.mkdir(fs, destPath, conf)) {
+        LOG.warn(destPath + " already exists?!?!");
+      }
+      AcidUtils.MetaDataFile.createMetaFile(destPath, fs, true);
+    } catch (IOException e) {
+      throw new HiveException("load: error while creating " + destPath + ";loadFileType=" + loadFileType, e);
+    }
+    return destPath;
+  }
 
   private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) {
     return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null;
@@ -2125,7 +2166,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param partSpec
    * @param loadFileType
    * @param numDP number of dynamic partitions
-   * @param listBucketingEnabled
    * @param isAcid true if this is an ACID operation
    * @param txnId txnId, can be 0 unless isAcid == true
    * @return partition map details (PartitionSpec and Partition)
@@ -2273,14 +2313,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
    *          if list bucketing enabled
    * @param hasFollowingStatsTask
    *          if there is any following stats task
-   * @param isAcid true if this is an ACID based write
+   * @param isAcidIUDoperation true if this is an ACID based Insert [overwrite]/update/delete
    */
   public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
-      boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask,
-      Long txnId, int stmtId, boolean isMmTable) throws HiveException {
-
+      boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
+      Long txnId, int stmtId) throws HiveException {
     List<Path> newFiles = null;
     Table tbl = getTable(tableName);
+    assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
+    boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl);
+    boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
     HiveConf sessionConf = SessionState.getSessionConf();
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
       newFiles = Collections.synchronizedList(new ArrayList<Path>());
@@ -2298,24 +2340,31 @@ private void constructOneLBLocationMap(FileStatus fSta,
       newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
     } else {
       // Either a non-MM query, or a load into MM table from an external source.
-      Path tblPath = tbl.getPath(), destPath = tblPath;
+      Path tblPath = tbl.getPath();
+      Path destPath = tblPath;
       PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER;
       if (isMmTable) {
+        assert !isAcidIUDoperation;
         // We will load into MM directory, and delete from the parent if needed.
         destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
         filter = loadFileType == LoadFileType.REPLACE_ALL
             ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
       }
+      else if(!isAcidIUDoperation && isFullAcidTable) {
+        destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+      }
       Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath
           + " (replace = " + loadFileType + ")");
-      if (loadFileType == LoadFileType.REPLACE_ALL) {
+      if (loadFileType == LoadFileType.REPLACE_ALL && !isFullAcidTable) {
+        //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361
+        //todo:  should probably do the same for MM IOW
         boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
         replaceFiles(tblPath, loadPath, destPath, tblPath,
             sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable);
       } else {
         try {
           FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
-          copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcid,
+          copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
             loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles);
         } catch (IOException e) {
           throw new HiveException("addFiles: filesystem error in check phase", e);
@@ -2358,7 +2407,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
     fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
   }
-
   /**
    * Creates a partition.
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index cd75130..a1b6cda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -391,7 +391,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
         Utilities.getTableDesc(table), new TreeMap<>(),
         replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId);
-    loadTableWork.setTxnId(txnId);
     loadTableWork.setStmtId(stmtId);
     MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState());
     Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
@@ -400,6 +399,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     return loadTableTask;
   }
 
+  /**
+   * todo: this is odd: transactions are opened for all statements.  what is this supposed to check?
+   */
+  @Deprecated
   private static boolean isAcid(Long txnId) {
     return (txnId != null) && (txnId != 0);
   }
@@ -490,7 +493,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           partSpec.getPartSpec(),
           replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
           txnId);
-      loadTableWork.setTxnId(txnId);
       loadTableWork.setStmtId(stmtId);
       loadTableWork.setInheritTableSpecs(false);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index 238fbd6..cc956da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -136,7 +136,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   private List<FileStatus> applyConstraintsAndGetFiles(URI fromURI, Tree ast,
-      boolean isLocal) throws SemanticException {
+      boolean isLocal, Table table) throws SemanticException {
 
     FileStatus[] srcs = null;
 
@@ -159,6 +159,14 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
           throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
               "source contains directory: " + oneSrc.getPath().toString()));
         }
+        if(AcidUtils.isFullAcidTable(table)) {
+          if(!AcidUtils.originalBucketFilter.accept(oneSrc.getPath())) {
+            //acid files (e.g. bucket_0000) have ROW_ID embedded in them and so can't be simply
+            //copied to a table so only allow non-acid files for now
+            throw new SemanticException(ErrorMsg.ACID_LOAD_DATA_INVALID_FILE_NAME,
+              oneSrc.getPath().getName(), table.getDbName() + "." + table.getTableName());
+          }
+        }
       }
     } catch (IOException e) {
       // Has to use full name to make sure it does not conflict with
@@ -230,11 +238,8 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
-    if(AcidUtils.isAcidTable(ts.tableHandle) && !AcidUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) {
-      throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, ts.tableHandle.getCompleteName());
-    }
     // make sure the arguments make sense
-    List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal);
+    List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal, ts.tableHandle);
 
     // for managed tables, make sure the file formats match
     if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType())
@@ -277,17 +282,16 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     Long txnId = null;
-    int stmtId = 0;
-    Table tbl = ts.tableHandle;
-    if (AcidUtils.isInsertOnlyTable(tbl.getParameters())) {
+    int stmtId = -1;
+    if (AcidUtils.isAcidTable(ts.tableHandle)) {
       txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+      stmtId = SessionState.get().getTxnMgr().getWriteIdAndIncrement();
     }
 
     LoadTableDesc loadTableWork;
     loadTableWork = new LoadTableDesc(new Path(fromURI),
       Utilities.getTableDesc(ts.tableHandle), partSpec,
       isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, txnId);
-    loadTableWork.setTxnId(txnId);
     loadTableWork.setStmtId(stmtId);
     if (preservePartitionSpecs){
       // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 1fa7b40..4683c9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -47,9 +47,22 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
   private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map
 
   public enum LoadFileType {
-    REPLACE_ALL,        // Remove all existing data before copy/move
-    KEEP_EXISTING,      // If any file exist while copy, then just duplicate the file
-    OVERWRITE_EXISTING  // If any file exist while copy, then just overwrite the file
+    /**
+     * This corresponds to INSERT OVERWRITE and REPL LOAD for INSERT OVERWRITE event.
+     * Remove all existing data before copy/move
+     */
+    REPLACE_ALL,
+    /**
+     * This corresponds to INSERT INTO and LOAD DATA.
+     * If any file exist while copy, then just duplicate the file
+     */
+    KEEP_EXISTING,
+    /**
+     * This corresponds to REPL LOAD where if we re-apply the same event then need to overwrite
+     * the file instead of making a duplicate copy.
+     * If any file exist while copy, then just overwrite the file
+     */
+    OVERWRITE_EXISTING
   }
   public LoadTableDesc(final LoadTableDesc o) {
     super(o.getSourcePath(), o.getWriteType());
@@ -215,14 +228,10 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
     return currentTransactionId == null ? 0 : currentTransactionId;
   }
 
-  public void setTxnId(Long txnId) {
-    this.currentTransactionId = txnId;
-  }
-
   public int getStmtId() {
     return stmtId;
   }
-
+  //todo: should this not be passed in the c'tor?
   public void setStmtId(int stmtId) {
     this.stmtId = stmtId;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 7d4d379..a804527 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -577,11 +576,16 @@ public class CompactorMR {
             dir.getName().startsWith(AcidUtils.DELTA_PREFIX) ||
             dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
           boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
+          boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+            && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format
 
-          FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
+          FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter
+            : AcidUtils.bucketFileFilter);
           for(FileStatus f : files) {
             // For each file, figure out which bucket it is.
-            Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
+            Matcher matcher = isRawFormat ?
+              AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName())
+              : AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
             addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
           }
         } else {
@@ -612,8 +616,12 @@ public class CompactorMR {
     private void addFileToMap(Matcher matcher, Path file, boolean sawBase,
                               Map<Integer, BucketTracker> splitToBucketMap) {
       if (!matcher.find()) {
-        LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " +
-            file.toString() + " Matcher=" + matcher.toString());
+        String msg = "Found a non-bucket file that we thought matched the bucket pattern! " +
+          file.toString() + " Matcher=" + matcher.toString();
+        LOG.error(msg);
+        //following matcher.group() would fail anyway and we don't want to skip files since that
+        //may be a data loss scenario
+        throw new IllegalArgumentException(msg);
       }
       int bucketNum = Integer.parseInt(matcher.group());
       BucketTracker bt = splitToBucketMap.get(bucketNum);

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 52257c4..319e0ee 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -344,7 +344,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     //this should fail because txn aborted due to timeout
     CommandProcessorResponse cpr = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5");
     Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1"));
-    
+
     //now test that we don't timeout locks we should not
     //heartbeater should be running in the background every 1/2 second
     hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
@@ -354,9 +354,9 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     runStatementOnDriver("start transaction");
     runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17");
     pause(750);
-    
+
     TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
-    
+
     //since there is txn open, we are heartbeating the txn not individual locks
     GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo();
     Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size());
@@ -377,7 +377,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     //these 2 values are equal when TXN entry is made.  Should never be equal after 1st heartbeat, which we
     //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec
     Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat);
-    
+
     ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
     TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
     pause(750);
@@ -525,7 +525,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " +
       "when matched then update set cur=0 " +
       "when not matched then insert values(s.key,s.data,1)";
-
+    //to allow cross join from 'teeCurMatch'
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
     runStatementOnDriver(stmt);
     int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
     List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
@@ -569,7 +570,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
     Assert.assertEquals(stringifyValues(resultVals), r);
   }
-  
+
   @Test
   public void testMergeOnTezEdges() throws Exception {
     String query = "merge into " + Table.ACIDTBL +

http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 17d976a..ab5f969 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -77,7 +77,7 @@ public class TestTxnCommands2 {
   ).getPath().replaceAll("\\\\", "/");
   protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
   //bucket count for test tables; set it to 1 for easier debugging
-  protected static int BUCKET_COUNT = 2;
+  static int BUCKET_COUNT = 2;
   @Rule
   public TestName testName = new TestName();
 
@@ -117,12 +117,11 @@ public class TestTxnCommands2 {
     setUpWithTableProperties("'transactional'='true'");
   }
 
-  protected void setUpWithTableProperties(String tableProperties) throws Exception {
+  void setUpWithTableProperties(String tableProperties) throws Exception {
     hiveConf = new HiveConf(this.getClass());
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
-    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
     hiveConf
         .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
@@ -406,7 +405,7 @@ public class TestTxnCommands2 {
     expectedException.expect(RuntimeException.class);
     expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
     runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
-    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
+    runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'insert_only')");
   }
   /**
    * Test the query correctness and directory layout for ACID table conversion