You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/05/21 07:17:38 UTC

[hive] branch master updated: HIVE-22971: Eliminate file rename in insert-only compactor (Karen Coppage via Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 29db9e6  HIVE-22971: Eliminate file rename in insert-only compactor (Karen Coppage via Peter Vary)
29db9e6 is described below

commit 29db9e61c6164fe8d2e21c10e57fe00df0b6cea4
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Thu May 21 09:17:03 2020 +0200

    HIVE-22971: Eliminate file rename in insert-only compactor (Karen Coppage via Peter Vary)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +-
 .../hive/ql/txn/compactor/TestCompactor.java       | 131 +++++++++++++--------
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   |   4 +-
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |  13 +-
 .../hive/ql/txn/compactor/MajorQueryCompactor.java |  16 +--
 .../hive/ql/txn/compactor/MinorQueryCompactor.java |  38 +++---
 .../ql/txn/compactor/MmMajorQueryCompactor.java    |  37 ++----
 .../ql/txn/compactor/MmMinorQueryCompactor.java    |  60 +++-------
 .../hive/ql/txn/compactor/QueryCompactor.java      |  54 +++++++--
 .../ql/txn/compactor/QueryCompactorFactory.java    |   6 +-
 10 files changed, 195 insertions(+), 169 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5a39006..26eeeb7 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2665,8 +2665,9 @@ public class HiveConf extends Configuration {
         "The default value is 1000000, since the data limit of a znode is 1MB"),
     HIVE_MM_ALLOW_ORIGINALS("hive.mm.allow.originals", false,
         "Whether to allow original files in MM tables. Conversion to MM may be expensive if\n" +
-        "this is set to false, however unless MAPREDUCE-7086 fix is present, queries that\n" +
-        "read MM tables with original files will fail. The default in Hive 3.0 is false."),
+        "this is set to false, however unless MAPREDUCE-7086 fix is present (hadoop 3.1.1+),\n" +
+        "queries that read non-orc MM tables with original files will fail. The default in\n" +
+        "Hive 3.0 is false."),
 
     // Zookeeper related configs
     HIVE_ZOOKEEPER_USE_KERBEROS("hive.zookeeper.kerberos.enabled", true,
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index b9db1d1..c687f14 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -818,96 +818,131 @@ public class TestCompactor {
     verifyHasBase(table.getSd(), fs, "base_0000002_v0000006");
   }
 
-  @Test
-  public void mmTableOriginalsOrc() throws Exception {
-    mmTableOriginals("ORC");
+  @Test public void mmTableOriginalsMajorOrc() throws Exception {
+    mmTableOriginalsMajor("orc", true);
   }
 
-  @Test
-  public void mmTableOriginalsText() throws Exception {
-    mmTableOriginals("TEXTFILE");
+  @Test public void mmTableOriginalsMajorText() throws Exception {
+    mmTableOriginalsMajor("textfile", false);
   }
 
-  private void mmTableOriginals(String format) throws Exception {
-    // Originals split won't work due to MAPREDUCE-7086 issue in FileInputFormat.
-    boolean isBrokenUntilMapreduce7086 = "TEXTFILE".equals(format);
+  /**
+   * Major compact an mm table that contains original files.
+   */
+  private void mmTableOriginalsMajor(String format, boolean allowOriginals) throws Exception {
+    driver.getConf().setBoolVar(ConfVars.HIVE_MM_ALLOW_ORIGINALS, allowOriginals);
     String dbName = "default";
     String tblName = "mm_nonpart";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
-    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " +
-        format + " TBLPROPERTIES ('transactional'='false')", driver);
-    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format
+        + " TBLPROPERTIES ('transactional'='false')", driver);
     Table table = msClient.getTable(dbName, tblName);
 
     executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver);
-    executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver);
-    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM "
-        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName
+        + " UNION ALL SELECT a,b FROM " + tblName, driver);
 
     verifyFooBarResult(tblName, 3);
 
     FileSystem fs = FileSystem.get(conf);
     executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES "
-       + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
+        + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
 
-    verifyFooBarResult(tblName, 3);
+    if (allowOriginals) {
+      verifyDeltaCount(table.getSd(), fs, 0);
+      verifyFooBarResult(tblName, 3);
 
-    runMajorCompaction(dbName, tblName);
-    verifyFooBarResult(tblName, 3);
-    verifyHasBase(table.getSd(), fs, "base_0000001_v0000009");
+      runMajorCompaction(dbName, tblName);
+      verifyFooBarResult(tblName, 3);
+      verifyHasBase(table.getSd(), fs, "base_0000001_v0000009");
+    } else {
+      verifyDeltaCount(table.getSd(), fs, 1);
+      // 1 delta dir won't be compacted. Skip testing major compaction.
+    }
+  }
+
+  @Test public void mmMajorOriginalsDeltasOrc() throws Exception {
+    mmMajorOriginalsDeltas("orc", true);
+  }
+
+  @Test public void mmMajorOriginalsDeltasText() throws Exception {
+    mmMajorOriginalsDeltas("textfile", false);
+  }
 
-    // Try with an extra delta.
+  /**
+   * Major compact an mm table with both original and delta files.
+   */
+  private void mmMajorOriginalsDeltas(String format, boolean allowOriginals) throws Exception {
+    driver.getConf().setBoolVar(ConfVars.HIVE_MM_ALLOW_ORIGINALS, allowOriginals);
+    String dbName = "default";
+    String tblName = "mm_nonpart";
+    FileSystem fs = FileSystem.get(conf);
     executeStatementOnDriver("drop table if exists " + tblName, driver);
-    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " +
-        format + " TBLPROPERTIES ('transactional'='false')", driver);
-    table = msClient.getTable(dbName, tblName);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format
+        + " TBLPROPERTIES ('transactional'='false')", driver);
+    Table table = msClient.getTable(dbName, tblName);
 
     executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver);
-    executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver);
-    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM "
-        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName
+        + " UNION ALL SELECT a,b FROM " + tblName, driver);
     verifyFooBarResult(tblName, 3);
 
     executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES "
-       + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
-    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM "
-        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
-
-    // Neither select nor compaction (which is a select) wil work after this.
-    if (isBrokenUntilMapreduce7086) return;
+        + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName
+        + " UNION ALL SELECT a,b FROM " + tblName, driver);
 
     verifyFooBarResult(tblName, 9);
 
     runMajorCompaction(dbName, tblName);
     verifyFooBarResult(tblName, 9);
-    verifyHasBase(table.getSd(), fs, "base_0000002_v0000023");
+    verifyHasBase(table.getSd(), fs, "base_0000002_v0000010");
+  }
 
+  @Test public void mmMajorOriginalsBaseOrc() throws Exception {
+    mmMajorOriginalsBase("orc", true);
+  }
+
+  @Test public void mmMajorOriginalsBaseText() throws Exception {
+    mmMajorOriginalsBase("textfile", false);
+  }
+
+  /**
+   * Insert overwrite and major compact an mm table with only original files.
+   *
+   * @param format file format for table
+   * @throws Exception
+   */
+  private void mmMajorOriginalsBase(String format, boolean allowOriginals) throws Exception {
+    driver.getConf().setBoolVar(ConfVars.HIVE_MM_ALLOW_ORIGINALS, allowOriginals);
     // Try with an extra base.
+    String dbName = "default";
+    String tblName = "mm_nonpart";
+    FileSystem fs = FileSystem.get(conf);
     executeStatementOnDriver("drop table if exists " + tblName, driver);
-    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " +
-        format + " TBLPROPERTIES ('transactional'='false')", driver);
-    table = msClient.getTable(dbName, tblName);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format
+        + " TBLPROPERTIES ('transactional'='false')", driver);
+    Table table = msClient.getTable(dbName, tblName);
 
     executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver);
-    executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver);
-    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM "
-        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName
+        + " UNION ALL SELECT a,b FROM " + tblName, driver);
     verifyFooBarResult(tblName, 3);
 
     executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES "
-       + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
-    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM "
-        + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+        + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM " + tblName
+        + " UNION ALL SELECT a,b FROM " + tblName, driver);
     verifyFooBarResult(tblName, 6);
 
     runMajorCompaction(dbName, tblName);
     verifyFooBarResult(tblName, 6);
     verifyHasBase(table.getSd(), fs, "base_0000002");
-
-    msClient.close();
   }
 
-
   @Test
   public void mmTableBucketed() throws Exception {
     String dbName = "default";
@@ -1059,8 +1094,8 @@ public class TestCompactor {
 
   }
 
-  private void verifyFooBarResult(String tblName, int count) throws Exception, IOException {
-    List<String> valuesReadFromHiveDriver = new ArrayList<String>();
+  private void verifyFooBarResult(String tblName, int count) throws Exception {
+    List<String> valuesReadFromHiveDriver = new ArrayList<>();
     executeStatementOnDriver("SELECT a,b FROM " + tblName, driver);
     driver.getResults(valuesReadFromHiveDriver);
     Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size());
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 89920cc..7bbc4bc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -1008,12 +1008,12 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     hiveConf.set(ValidTxnList.VALID_TXNS_KEY, "8:9223372036854775807::");
 
     // Check for default case.
-    qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries);
+    qc.runCompactionQueries(hiveConf, null, sdMock, null, null, null, emptyQueries, emptyQueries, emptyQueries);
     Assert.assertEquals("all", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));
 
     // Check for case where  hive.llap.io.etl.skip.format is explicitly set to none - as to always use cache.
     hiveConf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "none");
-    qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries);
+    qc.runCompactionQueries(hiveConf, null, sdMock, null, null, null, emptyQueries, emptyQueries, emptyQueries);
     Assert.assertEquals("none", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 05ea38c..018c733 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.regex.Matcher;
 
 import org.apache.hadoop.conf.Configuration;
@@ -133,7 +134,7 @@ public class CompactorMR {
     job.setOutputCommitter(CompactorOutputCommitter.class);
 
     job.set(FINAL_LOCATION, sd.getLocation());
-    job.set(TMP_LOCATION, QueryCompactor.Util.generateTmpPath(sd));
+    job.set(TMP_LOCATION, generateTmpPath(sd));
     job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
     job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
     job.setBoolean(IS_COMPRESSED, sd.isCompressed());
@@ -230,6 +231,7 @@ public class CompactorMR {
      */
     QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci);
     if (queryCompactor != null) {
+      LOG.info("Will compact with  " + queryCompactor.getClass().getName());
       queryCompactor.runCompaction(conf, t, p, sd, writeIds, ci);
       return;
     }
@@ -296,6 +298,15 @@ public class CompactorMR {
   }
 
   /**
+   * Generate a random tmp path, under the provided storage.
+   * @param sd storage descriptor, must be not null.
+   * @return path, always not null
+   */
+  private static String generateTmpPath(StorageDescriptor sd) {
+    return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
+  }
+
+  /**
    * @param baseDir if not null, it's either table/partition root folder or base_xxxx.
    *                If it's base_xxxx, it's in dirsToSearch, else the actual original files
    *                (all leaves recursively) are in the dirsToSearch list
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index c70d4f3..9558409 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -53,20 +53,14 @@ final class MajorQueryCompactor extends QueryCompactor {
 
     String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_";
     String tmpTableName = tmpPrefix + System.currentTimeMillis();
-
-    long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
-    long highWaterMark = writeIds.getHighWatermark();
-    long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
-    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true)
-        .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
-        .maximumWriteId(highWaterMark).statementId(-1).visibilityTxnId(compactorTxnId);
-    Path tmpTablePath = AcidUtils.baseOrDeltaSubdirPath(new Path(storageDescriptor.getLocation()), options);
+    Path tmpTablePath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds,
+        conf, true, false, false);
 
     List<String> createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString());
     List<String> compactionQueries = getCompactionQueries(table, partition, tmpTableName);
     List<String> dropQueries = getDropQueries(tmpTableName);
-    runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries,
-        compactionQueries, dropQueries);
+    runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
+        Lists.newArrayList(tmpTablePath), createQueries, compactionQueries, dropQueries);
   }
 
   @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index 4d0e5f7..d83a50f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hive.common.util.Ref;
@@ -45,7 +44,7 @@ final class MinorQueryCompactor extends QueryCompactor {
 
   @Override
   void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
-      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException {
+      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
     LOG.info("Running query based minor compaction");
     AcidUtils
         .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
@@ -61,14 +60,22 @@ final class MinorQueryCompactor extends QueryCompactor {
     String tmpTableName =
         table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis();
 
-    List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, conf, storageDescriptor);
+    Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor,
+        writeIds, conf, false, false, false);
+    Path resultDeleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor,
+        writeIds, conf, false, true, false);
+
+    List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds,
+        resultDeltaDir, resultDeleteDeltaDir);
     List<String> compactionQueries = getCompactionQueries(tmpTableName, table, writeIds);
     List<String> dropQueries = getDropQueries(tmpTableName);
 
-    runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries,
+    runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
+        Lists.newArrayList(resultDeltaDir, resultDeleteDeltaDir), createQueries,
         compactionQueries, dropQueries);
   }
 
+
   @Override
   protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
       ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
@@ -89,16 +96,13 @@ final class MinorQueryCompactor extends QueryCompactor {
    * @param tempTableBase an unique identifier which is used to create delta/delete-delta temp tables
    * @param dir the directory, where the delta directories resides
    * @param writeIds list of valid write ids, used to filter out delta directories which are not relevant for compaction
-   * @param conf hive configuration
-   * @param storageDescriptor this is the resolved storage descriptor
+   * @param tmpTableResultLocation result delta dir
+   * @param tmpTableDeleteResultLocation result delete delta dir
    * @return list of create/alter queries, always non-null
    */
   private List<String> getCreateQueries(Table table, String tempTableBase, AcidUtils.Directory dir,
-      ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor storageDescriptor) {
+      ValidWriteIdList writeIds, Path tmpTableResultLocation, Path tmpTableDeleteResultLocation) {
     List<String> queries = new ArrayList<>();
-    long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
-    long highWatermark = writeIds.getHighWatermark();
-    long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
     // create delta temp table
     String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase;
     queries.add(buildCreateTableQuery(table, tmpTableName, true, false, null));
@@ -106,15 +110,9 @@ final class MinorQueryCompactor extends QueryCompactor {
     if (!alterQuery.isEmpty()) {
       queries.add(alterQuery);
     }
-    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(false)
-        .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
-        .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
-    Path location = new Path(storageDescriptor.getLocation());
-    String tmpTableResultLocation = AcidUtils.baseOrDeltaSubdirPath(location,
-        options).toString();
     // create delta result temp table
     queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, true,
-        tmpTableResultLocation));
+        tmpTableResultLocation.toString()));
 
     // create delete delta temp tables
     String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase;
@@ -124,13 +122,9 @@ final class MinorQueryCompactor extends QueryCompactor {
     if (!alterQuery.isEmpty()) {
       queries.add(alterQuery);
     }
-    options = new AcidOutputFormat.Options(conf).writingBase(false).writingDeleteDelta(true).isCompressed(false)
-        .minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
-    String tmpTableDeleteResultLocation = AcidUtils.baseOrDeltaSubdirPath(location,
-        options).toString();
     // create delete delta result temp table
     queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result",  false, true,
-        tmpTableDeleteResultLocation));
+        tmpTableDeleteResultLocation.toString()));
     return queries;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 724a437..5e11d8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,9 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
@@ -53,9 +50,6 @@ final class MmMajorQueryCompactor extends QueryCompactor {
             table.getParameters(), false);
     QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
 
-    String tmpLocation = Util.generateTmpPath(storageDescriptor);
-    Path baseLocation = new Path(tmpLocation, "_base");
-
     // Set up the session for driver.
     HiveConf driverConf = new HiveConf(hiveConf);
     driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
@@ -64,13 +58,15 @@ final class MmMajorQueryCompactor extends QueryCompactor {
     //       "insert overwrite directory" command if there were no bucketing or list bucketing.
     String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_";
     String tmpTableName = tmpPrefix + System.currentTimeMillis();
-    List<String> createTableQueries =
-        getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
-            baseLocation.toString());
+    Path resultBaseDir = QueryCompactor.Util.getCompactionResultDir(
+        storageDescriptor, writeIds, driverConf, true, true, false);
+
+    List<String> createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor,
+        resultBaseDir.toString());
     List<String> compactionQueries = getCompactionQueries(table, partition, tmpTableName);
     List<String> dropQueries = getDropQueries(tmpTableName);
     runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
-        createTableQueries, compactionQueries, dropQueries);
+        Lists.newArrayList(resultBaseDir), createTableQueries, compactionQueries, dropQueries);
   }
 
   /**
@@ -81,26 +77,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
   @Override
   protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
       ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
-    org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
-    String from = tempTable.getSd().getLocation();
-    Path fromPath = new Path(from), toPath = new Path(dest);
-    FileSystem fs = fromPath.getFileSystem(conf);
-    // Assume the high watermark can be used as maximum transaction ID.
-    //todo: is that true?  can it be aborted? does it matter for compaction? probably OK since
-    //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
-    long maxTxn = actualWriteIds.getHighWatermark();
-    AcidOutputFormat.Options options =
-        new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0)
-            .statementId(-1).visibilityTxnId(compactorTxnId);
-    Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
-    if (!fs.exists(fromPath)) {
-      LOG.info(from + " not found.  Assuming 0 splits. Creating " + newBaseDir);
-      fs.mkdirs(newBaseDir);
-      return;
-    }
-    LOG.info("Moving contents of " + from + " to " + dest);
-    fs.rename(fromPath, newBaseDir);
-    fs.delete(fromPath, true);
+    Util.cleanupEmptyDir(conf, tmpTableName);
   }
 
   private List<String> getCreateQueries(String tmpTableName, Table table,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
index 1cd95f80..1bdec7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,9 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
@@ -51,27 +48,25 @@ final class MmMinorQueryCompactor extends QueryCompactor {
         "Going to delete directories for aborted transactions for MM table " + table.getDbName()
             + "." + table.getTableName());
 
-    AcidUtils.Directory dir = AcidUtils
-        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
-            Ref.from(false), false, table.getParameters(), false);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null,
+        new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
+        Ref.from(false), false, table.getParameters(), false);
     QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
-    String tmpLocation = Util.generateTmpPath(storageDescriptor);
-    Path sourceTabLocation = new Path(tmpLocation);
-    Path resultTabLocation = new Path(tmpLocation, "_result");
 
     HiveConf driverConf = setUpDriverSession(hiveConf);
 
     String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_";
     String tmpTableName = tmpPrefix + System.currentTimeMillis();
     String resultTmpTableName = tmpTableName + "_result";
+    Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, driverConf,
+        false, false, false);
 
-    List<String> createTableQueries =
-        getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
-            sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds);
+    List<String> createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor, dir,
+        writeIds, resultDeltaDir);
     List<String> compactionQueries = getCompactionQueries(tmpTableName, resultTmpTableName, table);
     List<String> dropQueries = getDropQueries(tmpTableName);
     runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
-        createTableQueries, compactionQueries, dropQueries);
+        Lists.newArrayList(resultDeltaDir), createTableQueries, compactionQueries, dropQueries);
   }
 
   /**
@@ -79,26 +74,7 @@ final class MmMinorQueryCompactor extends QueryCompactor {
    */
   @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
       ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
-    org.apache.hadoop.hive.ql.metadata.Table resultTable =
-        Hive.get().getTable(tmpTableName + "_result");
-    String from = resultTable.getSd().getLocation();
-    Path fromPath = new Path(from);
-    Path toPath = new Path(dest);
-    FileSystem fs = fromPath.getFileSystem(conf);
-    long maxTxn = actualWriteIds.getHighWatermark();
-    AcidOutputFormat.Options options =
-        new AcidOutputFormat.Options(conf).writingBase(false).isCompressed(false)
-            .minimumWriteId(1).maximumWriteId(maxTxn).bucket(0).statementId(-1)
-            .visibilityTxnId(compactorTxnId);
-    Path newDeltaDir = AcidUtils.createFilename(toPath, options).getParent();
-    if (!fs.exists(fromPath)) {
-      LOG.info(from + " not found.  Assuming 0 splits. Creating " + newDeltaDir);
-      fs.mkdirs(newDeltaDir);
-      return;
-    }
-    LOG.info("Moving contents of " + from + " to " + dest);
-    fs.rename(fromPath, newDeltaDir);
-    fs.delete(fromPath, true);
+    Util.cleanupEmptyDir(conf, tmpTableName);
   }
 
   /**
@@ -114,32 +90,30 @@ final class MmMinorQueryCompactor extends QueryCompactor {
    * @param tmpTableBase name of the first temp table (second will be $tmpTableBase_result)
    * @param t Table to compact
    * @param sd storage descriptor of table or partition to compact
-   * @param sourceTabLocation location the "source table" (temp table 1) should go
-   * @param resultTabLocation location the "result table (temp table 2) should go
    * @param dir the parent directory of delta directories
-   * @param validWriteIdList valid write ids for the table/partition to compact
+   * @param writeIds ValidWriteIdList for the table/partition we are compacting
+   * @param resultDeltaDir the final location for the
    * @return List of 3 query strings: 2 create table, 1 alter table
    */
   private List<String> getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd,
-      String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir,
-      ValidWriteIdList validWriteIdList) {
+      AcidUtils.Directory dir, ValidWriteIdList writeIds, Path resultDeltaDir) {
     List<String> queries = Lists.newArrayList(
-        getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true),
-        getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false)
+        getCreateQuery(tmpTableBase, t, sd, null, true),
+        getCreateQuery(tmpTableBase + "_result", t, sd, resultDeltaDir.toString(), false)
     );
-    String alterQuery = buildAlterTableQuery(tmpTableBase, dir, validWriteIdList);
+    String alterQuery = buildAlterTableQuery(tmpTableBase, dir, writeIds);
     if (!alterQuery.isEmpty()) {
       queries.add(alterQuery);
     }
     return queries;
   }
 
-  private String getCreateQuery(String resultTableName, Table t, StorageDescriptor sd,
+  private String getCreateQuery(String newTableName, Table t, StorageDescriptor sd,
       String location, boolean isPartitioned) {
     return new CompactionQueryBuilder(
         CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
         CompactionQueryBuilder.Operation.CREATE,
-        resultTableName)
+        newTableName)
         .setSourceTab(t)
         .setStorageDescriptor(sd)
         .setLocation(location)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 7f3ccfa..1f732f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -38,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 
 /**
  * Common interface for query based compactions.
@@ -82,6 +82,7 @@ abstract class QueryCompactor {
    * @param storageDescriptor this is the resolved storage descriptor.
    * @param writeIds valid write IDs used to filter rows while they're being read for compaction.
    * @param compactionInfo provides info about the type of compaction.
+   * @param resultDirs the delta/base directory that is created as the result of compaction.
    * @param createQueries collection of queries which creates the temporary tables.
    * @param compactionQueries collection of queries which uses data from the original table and writes in temporary
    *                          tables.
@@ -89,8 +90,9 @@ abstract class QueryCompactor {
    * @throws IOException error during the run of the compaction.
    */
   void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor storageDescriptor,
-      ValidWriteIdList writeIds, CompactionInfo compactionInfo, List<String> createQueries,
-      List<String> compactionQueries, List<String> dropQueries) throws IOException {
+      ValidWriteIdList writeIds, CompactionInfo compactionInfo, List<Path> resultDirs,
+      List<String> createQueries, List<String> compactionQueries, List<String> dropQueries)
+      throws IOException {
     Util.disableLlapCaching(conf);
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
     SessionState sessionState = DriverUtils.setUpSessionState(conf, user, true);
@@ -118,6 +120,7 @@ abstract class QueryCompactor {
       commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf, writeIds, compactorTxnId);
     } catch (HiveException e) {
       LOG.error("Error doing query based {} compaction", compactionInfo.isMajorCompaction() ? "major" : "minor", e);
+      removeResultDirs(resultDirs, conf);
       throw new IOException(e);
     } finally {
       try {
@@ -135,16 +138,51 @@ abstract class QueryCompactor {
   }
 
   /**
+   * Call in case compaction failed. Removes the new empty compacted delta/base.
+   * Cleaner would handle this later but clean up now just in case.
+   */
+  private void removeResultDirs(List<Path> resultDirPaths, HiveConf conf) throws IOException {
+    for (Path path : resultDirPaths) {
+      LOG.info("Compaction failed, removing directory: " + path.toString());
+      FileSystem fs = path.getFileSystem(conf);
+      if (!fs.listFiles(path, false).hasNext()) {
+        fs.delete(path, true);
+      }
+    }
+  }
+
+  /**
    * Collection of some helper functions.
    */
   static class Util {
+
     /**
-     * Generate a random tmp path, under the provided storage.
-     * @param sd storage descriptor, must be not null.
-     * @return path, always not null
+     * Get the path of the base, delta, or delete delta directory that will be the final
+     * destination of the files during compaction.
+     *
+     * @param sd storage descriptor of table or partition to compact
+     * @param writeIds list of valid writeids
+     * @param conf HiveConf
+     * @param writingBase if true, we are creating a base directory, otherwise a delta
+     * @param createDeleteDelta if true, the delta dir we are creating is a delete delta
+     * @param bucket0 whether to specify 0 as the bucketid
+     *
+     * @return Path of new base/delta/delete delta directory
      */
-    static String generateTmpPath(StorageDescriptor sd) {
-      return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
+    static Path getCompactionResultDir(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf,
+        boolean writingBase, boolean createDeleteDelta, boolean bucket0) {
+      long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
+      long highWatermark = writeIds.getHighWatermark();
+      long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
+      AcidOutputFormat.Options options =
+          new AcidOutputFormat.Options(conf).isCompressed(false).minimumWriteId(minOpenWriteId)
+              .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId)
+              .writingBase(writingBase).writingDeleteDelta(createDeleteDelta);
+      if (bucket0) {
+        options = options.bucket(0);
+      }
+      Path location = new Path(sd.getLocation());
+      return AcidUtils.baseOrDeltaSubdirPath(location, options);
     }
 
     /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
index 6542eef..93dd85b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
@@ -40,7 +40,9 @@ final class QueryCompactorFactory {
    * <br>
    * {@link MinorQueryCompactor} - handles query based minor compaction
    * <br>
-   * {@link MmMajorQueryCompactor} - handles query based minor compaction for micro-managed tables
+   * {@link MmMajorQueryCompactor} - handles query based major compaction for micro-managed tables
+   * <br>
+   * {@link MmMinorQueryCompactor} - handles query based minor compaction for micro-managed tables
    * <br>
    * </p>
    * @param table the table, on which the compaction should be running, must be not null.
@@ -55,7 +57,7 @@ final class QueryCompactorFactory {
         return new MajorQueryCompactor();
       } else if (!compactionInfo.isMajorCompaction() && "tez"
           .equalsIgnoreCase(HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
-        // query based minor compaction is only supported on tez
+        // query based minor compactigenerateAddMmTaskson is only supported on tez
         return new MinorQueryCompactor();
       }
     }