You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2019/01/11 20:54:45 UTC

hive git commit: HIVE-20960 Make MM compactor run in a transaction and remove CompactorMR.createCompactorMarker() (Eugene Koifman, reviewed by Vaibhav Gumashta)

Repository: hive
Updated Branches:
  refs/heads/master f713140ba -> cb9d5ccd8


HIVE-20960 Make MM compactor run in a transaction and remove CompactorMR.createCompactorMarker() (Eugene Koifman, reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb9d5ccd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb9d5ccd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb9d5ccd

Branch: refs/heads/master
Commit: cb9d5ccd87997985b4623369247aead32bd91390
Parents: f713140
Author: Eugene Koifman <ek...@apache.org>
Authored: Fri Jan 11 12:54:28 2019 -0800
Committer: Eugene Koifman <ek...@apache.org>
Committed: Fri Jan 11 12:54:28 2019 -0800

----------------------------------------------------------------------
 .../hive/ql/txn/compactor/TestCompactor.java    | 76 ++++++++++++-------
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 13 +++-
 .../org/apache/hadoop/hive/ql/DriverUtils.java  | 18 ++++-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 78 ++++++++++----------
 .../hive/ql/stats/StatsUpdaterThread.java       |  2 +-
 .../hive/ql/txn/compactor/CompactorMR.java      | 59 +++++++--------
 .../hadoop/hive/ql/txn/compactor/Worker.java    |  7 ++
 .../hadoop/hive/ql/TestTxnConcatenate.java      | 28 +++----
 .../hive/ql/stats/TestStatsUpdaterThread.java   |  2 +-
 9 files changed, 160 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 2b22a62..dc7b287 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -97,12 +96,10 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@RunWith(Parameterized.class)
 public class TestCompactor {
   private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt());
   private static final Logger LOG = LoggerFactory.getLogger(TestCompactor.class);
@@ -117,12 +114,6 @@ public class TestCompactor {
     return Arrays.asList(new Object[][]{{true}, {false}});
   }
 
-  private boolean newStreamingAPI;
-
-  public TestCompactor(boolean newStreamingAPI) {
-    this.newStreamingAPI = newStreamingAPI;
-  }
-
   @Rule
   public TemporaryFolder stagingFolder = new TemporaryFolder();
 
@@ -366,6 +357,13 @@ public class TestCompactor {
    */
   @Test
   public void testStatsAfterCompactionPartTbl() throws Exception {
+    testStatsAfterCompactionPartTbl(false);
+  }
+  @Test
+  public void testStatsAfterCompactionPartTblNew() throws Exception {
+    testStatsAfterCompactionPartTbl(true);
+  }
+  private void testStatsAfterCompactionPartTbl(boolean newStreamingAPI) throws Exception {
     //as of (8/27/2014) Hive 0.14, ACID/Orc requires HiveInputFormat
     String tblName = "compaction_test";
     String tblNameStg = tblName + "_stg";
@@ -710,6 +708,13 @@ public class TestCompactor {
 
   @Test
   public void minorCompactAfterAbort() throws Exception {
+    minorCompactAfterAbort(false);
+  }
+  @Test
+  public void minorCompactAfterAbortNew() throws Exception {
+    minorCompactAfterAbort(true);
+  }
+  private void minorCompactAfterAbort(boolean newStreamingAPI) throws Exception {
     String dbName = "default";
     String tblName = "cws";
     String columnNamesProperty = "a,b";
@@ -719,7 +724,7 @@ public class TestCompactor {
       " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
       " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
 
-    processStreamingAPI(dbName, tblName);
+    processStreamingAPI(dbName, tblName, newStreamingAPI);
     // Now, compact
     TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
@@ -750,6 +755,13 @@ public class TestCompactor {
 
   @Test
   public void majorCompactAfterAbort() throws Exception {
+    majorCompactAfterAbort(false);
+  }
+  @Test
+  public void majorCompactAfterAbortNew() throws Exception {
+    majorCompactAfterAbort(true);
+  }
+  private void majorCompactAfterAbort(boolean newStreamingAPI) throws Exception {
     String dbName = "default";
     String tblName = "cws";
     String columnNamesProperty = "a,b";
@@ -759,7 +771,7 @@ public class TestCompactor {
       " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
       " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
 
-    processStreamingAPI(dbName, tblName);
+    processStreamingAPI(dbName, tblName, newStreamingAPI);
     runMajorCompaction(dbName, tblName);
 
     // Find the location of the table
@@ -805,12 +817,12 @@ public class TestCompactor {
 
     runMajorCompaction(dbName, tblName);
     verifyFooBarResult(tblName, 1);
-    verifyHasBase(table.getSd(), fs, "base_0000002");
+    verifyHasBase(table.getSd(), fs, "base_0000002_v0000006");
 
     // Make sure we don't compact if we don't need to compact.
     runMajorCompaction(dbName, tblName);
     verifyFooBarResult(tblName, 1);
-    verifyHasBase(table.getSd(), fs, "base_0000002");
+    verifyHasBase(table.getSd(), fs, "base_0000002_v0000006");
   }
 
   @Test
@@ -849,7 +861,7 @@ public class TestCompactor {
 
     runMajorCompaction(dbName, tblName);
     verifyFooBarResult(tblName, 3);
-    verifyHasBase(table.getSd(), fs, "base_0000001");
+    verifyHasBase(table.getSd(), fs, "base_0000001_v0000009");
 
     // Try with an extra delta.
     executeStatementOnDriver("drop table if exists " + tblName, driver);
@@ -875,7 +887,7 @@ public class TestCompactor {
 
     runMajorCompaction(dbName, tblName);
     verifyFooBarResult(tblName, 9);
-    verifyHasBase(table.getSd(), fs, "base_0000002");
+    verifyHasBase(table.getSd(), fs, "base_0000002_v0000023");
 
     // Try with an extra base.
     executeStatementOnDriver("drop table if exists " + tblName, driver);
@@ -926,7 +938,7 @@ public class TestCompactor {
 
     runMajorCompaction(dbName, tblName);
     verifyFooBarResult(tblName, 1);
-    String baseDir = "base_0000002";
+    String baseDir = "base_0000002_v0000006";
     verifyHasBase(table.getSd(), fs, baseDir);
 
     FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), baseDir),
@@ -962,7 +974,7 @@ public class TestCompactor {
 
     runMajorCompaction(dbName, tblName); // Don't compact 4 and 5; 3 is opened.
     FileSystem fs = FileSystem.get(conf);
-    verifyHasBase(table.getSd(), fs, "base_0000002");
+    verifyHasBase(table.getSd(), fs, "base_0000002_v0000010");
     verifyDirCount(table.getSd(), fs, 1, AcidUtils.baseFileFilter);
     verifyFooBarResult(tblName, 2);
 
@@ -974,7 +986,7 @@ public class TestCompactor {
     msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3.
     runMajorCompaction(dbName, tblName); // Compact 4 and 5.
     verifyFooBarResult(tblName, 2);
-    verifyHasBase(table.getSd(), fs, "base_0000005");
+    verifyHasBase(table.getSd(), fs, "base_0000005_v0000016");
     runCleaner(conf);
     verifyDeltaCount(table.getSd(), fs, 0);
   }
@@ -1038,8 +1050,8 @@ public class TestCompactor {
 
     verifyFooBarResult(tblName, 3);
     verifyDeltaCount(p3.getSd(), fs, 1);
-    verifyHasBase(p1.getSd(), fs, "base_0000006");
-    verifyHasBase(p2.getSd(), fs, "base_0000006");
+    verifyHasBase(p1.getSd(), fs, "base_0000006_v0000010");
+    verifyHasBase(p2.getSd(), fs, "base_0000006_v0000014");
 
     executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver);
     executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver);
@@ -1049,8 +1061,8 @@ public class TestCompactor {
     // Make sure we don't compact if we don't need to compact; but do if we do.
     verifyFooBarResult(tblName, 4);
     verifyDeltaCount(p3.getSd(), fs, 1);
-    verifyHasBase(p1.getSd(), fs, "base_0000006");
-    verifyHasBase(p2.getSd(), fs, "base_0000008");
+    verifyHasBase(p1.getSd(), fs, "base_0000006_v0000010");
+    verifyHasBase(p2.getSd(), fs, "base_0000008_v0000023");
 
   }
 
@@ -1095,6 +1107,13 @@ public class TestCompactor {
 
   @Test
   public void majorCompactWhileStreamingForSplitUpdate() throws Exception {
+    majorCompactWhileStreamingForSplitUpdate(false);
+  }
+  @Test
+  public void majorCompactWhileStreamingForSplitUpdateNew() throws Exception {
+    majorCompactWhileStreamingForSplitUpdate(true);
+  }
+  private void majorCompactWhileStreamingForSplitUpdate(boolean newStreamingAPI) throws Exception {
     String dbName = "default";
     String tblName = "cws";
     String columnNamesProperty = "a,b";
@@ -1283,6 +1302,13 @@ public class TestCompactor {
 
   @Test
   public void minorCompactWhileStreamingWithSplitUpdate() throws Exception {
+    minorCompactWhileStreamingWithSplitUpdate(true);
+  }
+  @Test
+  public void minorCompactWhileStreamingWithSplitUpdateNew() throws Exception {
+    minorCompactWhileStreamingWithSplitUpdate(true);
+  }
+  private void minorCompactWhileStreamingWithSplitUpdate(boolean newStreamingAPI) throws Exception {
     String dbName = "default";
     String tblName = "cws";
     String columnNamesProperty = "a,b";
@@ -1713,9 +1739,9 @@ public class TestCompactor {
   }
 
 
-  private void processStreamingAPI(String dbName, String tblName)
-      throws StreamingException, ClassNotFoundException, org.apache.hive.hcatalog.streaming.StreamingException,
-      InterruptedException {
+  private void processStreamingAPI(String dbName, String tblName, boolean newStreamingAPI)
+      throws StreamingException, ClassNotFoundException,
+      org.apache.hive.hcatalog.streaming.StreamingException, InterruptedException {
     if (newStreamingAPI) {
       StreamingConnection connection = null;
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 608cbd5..01ecf0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -223,6 +223,7 @@ public class Driver implements IDriver {
   private CacheUsage cacheUsage;
   private CacheEntry usedCacheEntry;
   private ValidWriteIdList compactionWriteIds = null;
+  private long compactorTxnId = 0;
 
   private Context backupContext = null;
   private boolean retrial = false;
@@ -1481,11 +1482,18 @@ public class Driver implements IDriver {
     List<String> txnTables = getTransactionalTableList(plan);
     ValidTxnWriteIdList txnWriteIds = null;
     if (compactionWriteIds != null) {
+      /**
+       * This is kludgy: here we need to read with Compactor's snapshot/txn
+       * rather than the snapshot of the current {@code txnMgr}, in effect
+       * simulating a "flashback query" but can't actually share compactor's
+       * txn since it would run multiple statements.  See more comments in
+       * {@link org.apache.hadoop.hive.ql.txn.compactor.Worker} where it start
+       * the compactor txn*/
       if (txnTables.size() != 1) {
         throw new LockException("Unexpected tables in compaction: " + txnTables);
       }
       String fullTableName = txnTables.get(0);
-      txnWriteIds = new ValidTxnWriteIdList(0L); // No transaction for the compaction for now. todo: Since MM compaction is a query, a txn has been opened at this point
+      txnWriteIds = new ValidTxnWriteIdList(compactorTxnId);
       txnWriteIds.addTableValidWriteIdList(compactionWriteIds);
     } else {
       txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
@@ -3015,7 +3023,8 @@ public class Driver implements IDriver {
     }
   }
 
-  public void setCompactionWriteIds(ValidWriteIdList val) {
+  public void setCompactionWriteIds(ValidWriteIdList val, long compactorTxnId) {
     this.compactionWriteIds = val;
+    this.compactorTxnId = compactorTxnId;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
index 8228109..a2c6862 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql;
 
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -28,14 +29,27 @@ import org.slf4j.LoggerFactory;
 public class DriverUtils {
   private static final Logger LOG = LoggerFactory.getLogger(DriverUtils.class);
 
+  public static void runOnDriver(HiveConf conf, String user, SessionState sessionState,
+      String query) throws HiveException {
+    runOnDriver(conf, user, sessionState, query, null, -1);
+  }
+
+  /**
+   * For Query Based compaction to run the query to generate the compacted data.
+   */
   public static void runOnDriver(HiveConf conf, String user,
-      SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException {
+      SessionState sessionState, String query, ValidWriteIdList writeIds, long compactorTxnId)
+      throws HiveException {
+    if(writeIds != null && compactorTxnId < 0) {
+      throw new IllegalArgumentException(JavaUtils.txnIdToString(compactorTxnId) +
+          " is not valid. Context: " + query);
+    }
     SessionState.setCurrentSessionState(sessionState);
     boolean isOk = false;
     try {
       QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build();
       Driver driver = new Driver(qs, user, null, null);
-      driver.setCompactionWriteIds(writeIds);
+      driver.setCompactionWriteIds(writeIds, compactorTxnId);
       try {
         CommandProcessorResponse cpr = driver.run(query);
         if (cpr.getResponseCode() != 0) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index badcc55..5dbf634 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -27,7 +27,6 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -774,12 +773,14 @@ public class AcidUtils {
   public static final class ParsedBase {
     private final long writeId;
     private final long visibilityTxnId;
-    ParsedBase(long writeId) {
-      this(writeId, 0);
+    private final Path baseDirPath;
+    ParsedBase(long writeId, Path baseDirPath) {
+      this(writeId, 0, baseDirPath);
     }
-    ParsedBase(long writeId, long visibilityTxnId) {
+    ParsedBase(long writeId, long visibilityTxnId, Path baseDirPath) {
       this.writeId = writeId;
       this.visibilityTxnId = visibilityTxnId;
+      this.baseDirPath = baseDirPath;
     }
     public long getWriteId() {
       return writeId;
@@ -787,6 +788,9 @@ public class AcidUtils {
     public long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+    public Path getBaseDirPath() {
+      return baseDirPath;
+    }
     public static ParsedBase parseBase(Path path) {
       String filename = path.getName();
       if(!filename.startsWith(BASE_PREFIX)) {
@@ -794,10 +798,10 @@ public class AcidUtils {
       }
       int idxOfv = filename.indexOf(VISIBILITY_PREFIX);
       if(idxOfv < 0) {
-        return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length())));
+        return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length())), path);
       }
       return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length(), idxOfv)),
-          Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())));
+          Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())), path);
     }
   }
   /**
@@ -1229,7 +1233,7 @@ public class AcidUtils {
     }
 
     if(bestBase.oldestBase != null && bestBase.status == null &&
-        MetaDataFile.isCompacted(bestBase.oldestBase, fs)) {
+        isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs)) {
       /**
        * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
        * {@link writeIdList}.  Note that 'original' files are logically a base_Long.MIN_VALUE and thus
@@ -1276,23 +1280,33 @@ public class AcidUtils {
    * Note that such base is NOT obsolete.  Obsolete files are those that are "covered" by other
    * files within the snapshot.
    * A base produced by Insert Overwrite is different.  Logically it's a delta file but one that
-   * causes anything written previously is ignored (hence the overwrite).  In this case, base_x
+   * causes anything written previously to be ignored (hence the overwrite).  In this case, base_x
    * is visible if writeid:x is committed for current reader.
    */
-  private static boolean isValidBase(long baseWriteId, ValidWriteIdList writeIdList, Path baseDir,
+  private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList,
             FileSystem fs) throws IOException {
-    if(baseWriteId == Long.MIN_VALUE) {
+    if(parsedBase.getWriteId() == Long.MIN_VALUE) {
       //such base is created by 1st compaction in case of non-acid to acid table conversion
       //By definition there are no open txns with id < 1.
       return true;
     }
-    if(!MetaDataFile.isCompacted(baseDir, fs)) {
-      //this is the IOW case
-      return writeIdList.isWriteIdValid(baseWriteId);
+    if(isCompactedBase(parsedBase, fs)) {
+      return writeIdList.isValidBase(parsedBase.getWriteId());
     }
-    return writeIdList.isValidBase(baseWriteId);
+    //if here, it's a result of IOW
+    return writeIdList.isWriteIdValid(parsedBase.getWriteId());
   }
+  /**
+   * Returns {@code true} if {@code parsedBase} was created by compaction.
+   * As of Hive 4.0 we can tell if a directory is a result of compaction based on the
+   * presence of {@link AcidUtils#VISIBILITY_PATTERN} suffix.  Base directories written prior to
+   * that, have to rely on the {@link MetaDataFile} in the directory. So look at the filename first
+   * since that is the cheaper test.*/
+  private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) throws IOException {
+    return parsedBase.getVisibilityTxnId() > 0 ||
+        MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs);
 
+  }
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
       ValidWriteIdList writeIdList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
       List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
@@ -1307,23 +1321,23 @@ public class AcidUtils {
       return;
     }
     if (fn.startsWith(BASE_PREFIX)) {
-      ParsedBase  parsedBase = ParsedBase.parseBase(p);
+      ParsedBase parsedBase = ParsedBase.parseBase(p);
       if(!isDirUsable(child, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
         return;
       }
-      long writeId = parsedBase.getWriteId();
+      final long writeId = parsedBase.getWriteId();
       if(bestBase.oldestBaseWriteId > writeId) {
         //keep track for error reporting
         bestBase.oldestBase = p;
         bestBase.oldestBaseWriteId = writeId;
       }
       if (bestBase.status == null) {
-        if(isValidBase(writeId, writeIdList, p, fs)) {
+        if(isValidBase(parsedBase, writeIdList, fs)) {
           bestBase.status = child;
           bestBase.writeId = writeId;
         }
       } else if (bestBase.writeId < writeId) {
-        if(isValidBase(writeId, writeIdList, p, fs)) {
+        if(isValidBase(parsedBase, writeIdList, fs)) {
           obsolete.add(bestBase.status);
           bestBase.status = child;
           bestBase.writeId = writeId;
@@ -1943,31 +1957,13 @@ public class AcidUtils {
       String COMPACTED = "compacted";
     }
 
-    /**
-     * @param baseOrDeltaDir detla or base dir, must exist
-     */
-    public static void createCompactorMarker(Path baseOrDeltaDir, FileSystem fs) throws IOException {
+    static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs) throws IOException {
       /**
-       * create _meta_data json file in baseOrDeltaDir
-       * write thisFileVersion, dataFormat
-       *
-       * on read if the file is not there, assume version 0 and dataFormat=acid
+       * this file was written by Hive versions before 4.0 into a base_x/ dir
+       * created by compactor so that it can be distinguished from the one
+       * created by Insert Overwrite
        */
       Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
-      Map<String, String> metaData = new HashMap<>();
-      metaData.put(Field.VERSION, CURRENT_VERSION);
-      metaData.put(Field.DATA_FORMAT, Value.COMPACTED);
-      try (FSDataOutputStream strm = fs.create(formatFile, false)) {
-        new ObjectMapper().writeValue(strm, metaData);
-      } catch (IOException ioe) {
-        String msg = "Failed to create " + baseOrDeltaDir + "/" + METADATA_FILE
-            + ": " + ioe.getMessage();
-        LOG.error(msg, ioe);
-        throw ioe;
-      }
-    }
-    static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs) throws IOException {
-      Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
       if(!fs.exists(formatFile)) {
         return false;
       }
@@ -1995,7 +1991,7 @@ public class AcidUtils {
     }
 
     /**
-     * Chooses 1 representantive file from {@code baseOrDeltaDir}
+     * Chooses 1 representative file from {@code baseOrDeltaDir}
      * This assumes that all files in the dir are of the same type: either written by an acid
      * write or Load Data.  This should always be the case for an Acid table.
      */

http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index 302269b..c028e12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -616,7 +616,7 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread {
       if (doWait) {
         SessionState.start(ss); // This is the first call, open the session
       }
-      DriverUtils.runOnDriver(conf, user, ss, cmd, null);
+      DriverUtils.runOnDriver(conf, user, ss, cmd);
     } catch (Exception e) {
       LOG.error("Analyze command failed: " + cmd, e);
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 7d5ee4a..dc05e19 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -346,12 +346,13 @@ public class CompactorMR {
       Path baseLocation = new Path(tmpLocation, "_base");
 
       // Set up the session for driver.
-      conf = new HiveConf(conf);
-      conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
-      conf.unset(ValidTxnList.VALID_TXNS_KEY);//so Driver doesn't get confused
+      HiveConf driverConf = new HiveConf(conf);
+      driverConf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+      driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused
+      //thinking it already has a txn opened
 
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      SessionState sessionState = DriverUtils.setUpSessionState(conf, user, false);
+      SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, false);
 
       // Note: we could skip creating the table and just add table type stuff directly to the
       //       "insert overwrite directory" command if there were no bucketing or list bucketing.
@@ -363,7 +364,7 @@ public class CompactorMR {
             p == null ? t.getSd() : p.getSd(), baseLocation.toString());
         LOG.info("Compacting a MM table into " + query);
         try {
-          DriverUtils.runOnDriver(conf, user, sessionState, query, null);
+          DriverUtils.runOnDriver(driverConf, user, sessionState, query);
           break;
         } catch (Exception ex) {
           Throwable cause = ex;
@@ -376,12 +377,13 @@ public class CompactorMR {
         }
       }
 
-      String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
+      String query = buildMmCompactionQuery(driverConf, t, p, tmpTableName);
       LOG.info("Compacting a MM table via " + query);
-      DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds);
-      commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds);
-      DriverUtils.runOnDriver(conf, user, sessionState,
-          "drop table if exists " + tmpTableName, null);
+      long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
+      DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId);
+      commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds, compactorTxnId);
+      DriverUtils.runOnDriver(driverConf, user, sessionState,
+          "drop table if exists " + tmpTableName);
     } catch (HiveException e) {
       LOG.error("Error compacting a MM table", e);
       throw new IOException(e);
@@ -1002,7 +1004,7 @@ public class CompactorMR {
         deleteEventWriter.close(false);
       }
     }
-    private long getCompactorTxnId() {
+    private static long getCompactorTxnId(Configuration jobConf) {
       String snapshot = jobConf.get(ValidTxnList.VALID_TXNS_KEY);
       if(Strings.isNullOrEmpty(snapshot)) {
         throw new IllegalStateException(ValidTxnList.VALID_TXNS_KEY + " not found for writing to "
@@ -1010,7 +1012,7 @@ public class CompactorMR {
       }
       ValidTxnList validTxnList = new ValidReadTxnList();
       validTxnList.readFromString(snapshot);
-      //this is id of the current txn
+      //this is id of the current (compactor) txn
       return validTxnList.getHighWatermark();
     }
     private void getWriter(Reporter reporter, ObjectInspector inspector,
@@ -1026,7 +1028,7 @@ public class CompactorMR {
             .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
             .bucket(bucket)
             .statementId(-1)//setting statementId == -1 makes compacted delta files use
-            .visibilityTxnId(getCompactorTxnId());
+            .visibilityTxnId(getCompactorTxnId(jobConf));
       //delta_xxxx_yyyy format
 
         // Instantiate the underlying output format
@@ -1050,7 +1052,7 @@ public class CompactorMR {
           .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE)).bucket(bucket)
           .statementId(-1)//setting statementId == -1 makes compacted delta files use
           // delta_xxxx_yyyy format
-          .visibilityTxnId(getCompactorTxnId());
+          .visibilityTxnId(getCompactorTxnId(jobConf));
 
       // Instantiate the underlying output format
       @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
@@ -1171,19 +1173,17 @@ public class CompactorMR {
             .minimumWriteId(conf.getLong(MIN_TXN, Long.MAX_VALUE))
             .maximumWriteId(conf.getLong(MAX_TXN, Long.MIN_VALUE))
             .bucket(0)
-            .statementId(-1);
+            .statementId(-1)
+            .visibilityTxnId(CompactorMap.getCompactorTxnId(conf));
         Path newDeltaDir = AcidUtils.createFilename(finalLocation, options).getParent();
         LOG.info(context.getJobID() + ": " + tmpLocation +
             " not found.  Assuming 0 splits.  Creating " + newDeltaDir);
         fs.mkdirs(newDeltaDir);
-        createCompactorMarker(conf, newDeltaDir, fs);
         AcidUtils.OrcAcidVersion.writeVersionFile(newDeltaDir, fs);
         return;
       }
-      FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list
-      //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir
-      //name is that we want to rename; leave it for another day
-      //todo: may actually have delta_x_y and delete_delta_x_y
+      FileStatus[] contents = fs.listStatus(tmpLocation);
+      //minor compaction may actually have delta_x_y and delete_delta_x_y
       for (FileStatus fileStatus : contents) {
         //newPath is the base/delta dir
         Path newPath = new Path(finalLocation, fileStatus.getPath().getName());
@@ -1193,16 +1193,9 @@ public class CompactorMR {
         * meta files which will create base_x/ (i.e. B)...*/
         fs.rename(fileStatus.getPath(), newPath);
         AcidUtils.OrcAcidVersion.writeVersionFile(newPath, fs);
-        createCompactorMarker(conf, newPath, fs);
       }
       fs.delete(tmpLocation, true);
     }
-    private void createCompactorMarker(JobConf conf, Path finalLocation, FileSystem fs)
-        throws IOException {
-      if(conf.getBoolean(IS_MAJOR, false)) {
-        AcidUtils.MetaDataFile.createCompactorMarker(finalLocation, fs);
-      }
-    }
 
     @Override
     public void abortJob(JobContext context, int status) throws IOException {
@@ -1218,22 +1211,23 @@ public class CompactorMR {
    * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
    * @param from The temp directory used for compactor output. Not the actual base/delta.
    * @param to The final directory; basically a SD directory. Not the actual base/delta.
+   * @param compactorTxnId txn that the compactor started
    */
   private void commitMmCompaction(String from, String to, Configuration conf,
-      ValidWriteIdList actualWriteIds) throws IOException {
+      ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException {
     Path fromPath = new Path(from), toPath = new Path(to);
     FileSystem fs = fromPath.getFileSystem(conf);
-    //todo: is that true?  can it be aborted? does it matter for compaction? probably OK since
-    //getAcidState() doesn't check if X is valid in base_X_cY for compacted base dirs.
     // Assume the high watermark can be used as maximum transaction ID.
+    //todo: is that true?  can it be aborted? does it matter for compaction? probably OK since
+    //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
     long maxTxn = actualWriteIds.getHighWatermark();
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
-      .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1);
+        .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1)
+        .visibilityTxnId(compactorTxnId);
     Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
     if (!fs.exists(fromPath)) {
       LOG.info(from + " not found.  Assuming 0 splits. Creating " + newBaseDir);
       fs.mkdirs(newBaseDir);
-      AcidUtils.MetaDataFile.createCompactorMarker(toPath, fs);
       return;
     }
     LOG.info("Moving contents of " + from + " to " + to);
@@ -1243,7 +1237,6 @@ public class CompactorMR {
     }
     FileStatus dirPath = children[0];
     fs.rename(dirPath.getPath(), newBaseDir);
-    AcidUtils.MetaDataFile.createCompactorMarker(newBaseDir, fs);
     fs.delete(fromPath, true);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index e3fab69..42ccfdc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreThread;
@@ -156,6 +157,12 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
         if (ci.runAs == null) {
           ci.runAs = findUserToRunAs(sd.getLocation(), t);
         }
+        /**
+         * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) since
+         * then the Driver would already have the an open txn but then this txn would have
+         * multiple statements in it (for query based compactor) which is not supported (and since
+         * this case some of the statements are DDL, even in the future will not be allowed in a
+         * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
         long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
 
         heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
index eb0bfd9..05b5f3e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
@@ -142,18 +142,14 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
     runStatementOnDriver("insert into T values(5,6),(8,8)");
     String testQuery = "select a, b, INPUT__FILE__NAME from T order by a, b";
     String[][] expected = new String[][] {
-        {"1\t2",
-            "t/delta_0000001_0000001_0000/000000_0"},
-        {"4\t5",
-            "t/delta_0000001_0000001_0000/000000_0"},
-        {"5\t6",
-            "t/delta_0000002_0000002_0000/000000_0"},
-        {"8\t8",
-            "t/delta_0000002_0000002_0000/000000_0"}};
+        {"1\t2", "t/delta_0000001_0000001_0000/000000_0"},
+        {"4\t5", "t/delta_0000001_0000001_0000/000000_0"},
+        {"5\t6", "t/delta_0000002_0000002_0000/000000_0"},
+        {"8\t8", "t/delta_0000002_0000002_0000/000000_0"}};
     checkResult(expected, testQuery, false, "check data", LOG);
 
-        /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
-     but in normal usage 'concatenate' is blocking, */
+    /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
+      but in normal usage 'concatenate' is blocking, */
     hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true);
     runStatementOnDriver("alter table T concatenate");
 
@@ -166,14 +162,10 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
     Assert.assertEquals(1, rsp.getCompactsSize());
     Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
     String[][] expected2 = new String[][] {
-        {"1\t2",
-            "t/base_0000002/000000_0"},
-        {"4\t5",
-            "t/base_0000002/000000_0"},
-        {"5\t6",
-            "t/base_0000002/000000_0"},
-        {"8\t8",
-            "t/base_0000002/000000_0"}};
+        {"1\t2", "t/base_0000002_v0000020/000000_0"},
+        {"4\t5", "t/base_0000002_v0000020/000000_0"},
+        {"5\t6", "t/base_0000002_v0000020/000000_0"},
+        {"8\t8", "t/base_0000002_v0000020/000000_0"}};
     checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cb9d5ccd/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
index 55131f3..2512579 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
@@ -610,7 +610,7 @@ public class TestStatsUpdaterThread {
   }
 
   private void executeQuery(String query) throws HiveException {
-    DriverUtils.runOnDriver(hiveConf, ss.getUserName(), ss, query, null);
+    DriverUtils.runOnDriver(hiveConf, ss.getUserName(), ss, query);
   }
 
   private StatsUpdaterThread createUpdater() throws MetaException {