You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/02/17 13:54:27 UTC

[hive] branch master updated: HIVE-22610: Minor compaction for MM (insert-only) tables (Karen Coppage, reviewed by Laszlo Pinter)

This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9251663  HIVE-22610: Minor compaction for MM (insert-only) tables (Karen Coppage, reviewed by Laszlo Pinter)
9251663 is described below

commit 92516631ab39f39df5d0692f98ac32c2cd320997
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Mon Feb 17 14:29:02 2020 +0100

    HIVE-22610: Minor compaction for MM (insert-only) tables (Karen Coppage, reviewed by Laszlo Pinter)
---
 .../hive/ql/txn/compactor/CompactorOnTezTest.java  | 240 +++++++++
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   | 216 +++-----
 .../ql/txn/compactor/TestMmCompactorOnTez.java     | 564 +++++++++++++++++++++
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |  15 -
 .../ql/txn/compactor/MmMajorQueryCompactor.java    | 151 +-----
 .../ql/txn/compactor/MmMinorQueryCompactor.java    | 211 ++++++++
 .../ql/txn/compactor/MmQueryCompactorUtils.java    | 200 ++++++++
 .../ql/txn/compactor/QueryCompactorFactory.java    |   6 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |  37 --
 .../hadoop/hive/ql/TestTxnCommandsForMmTable.java  | 126 -----
 10 files changed, 1295 insertions(+), 471 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
new file mode 100644
index 0000000..78174f3
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults;
+import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
+
+/**
+ * Superclass for Test[Crud|Mm]CompactorOnTez, for setup and helper classes.
+ */
+public class CompactorOnTezTest {
+  private static final AtomicInteger RANDOM_INT = new AtomicInteger(new Random().nextInt());
+  private static final String TEST_DATA_DIR = new File(
+      System.getProperty("java.io.tmpdir") + File.separator + TestCrudCompactorOnTez.class
+          .getCanonicalName() + "-" + System.currentTimeMillis() + "_" + RANDOM_INT
+          .getAndIncrement()).getPath().replaceAll("\\\\", "/");
+  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+  protected HiveConf conf;
+  protected IMetaStoreClient msClient;
+  protected IDriver driver;
+
+  @Before
+  // Note: we create a new conf and driver object before every test
+  public void setup() throws Exception {
+    File f = new File(TEST_WAREHOUSE_DIR);
+    if (f.exists()) {
+      FileUtil.fullyDelete(f);
+    }
+    if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
+      throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
+    }
+    HiveConf hiveConf = new HiveConf(this.getClass());
+    hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+    hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+    hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
+    hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+    hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+    TxnDbUtil.setConfValues(hiveConf);
+    TxnDbUtil.cleanDb(hiveConf);
+    TxnDbUtil.prepDb(hiveConf);
+    conf = hiveConf;
+    // Use tez as execution engine for this test class
+    setupTez(conf);
+    msClient = new HiveMetaStoreClient(conf);
+    driver = DriverFactory.newDriver(conf);
+    SessionState.start(new CliSessionState(conf));
+  }
+
+  private void setupTez(HiveConf conf) {
+    conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+    conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
+    conf.set("tez.am.resource.memory.mb", "128");
+    conf.set("tez.am.dag.scheduler.class",
+        "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
+    conf.setBoolean("tez.local.mode", true);
+    conf.set("fs.defaultFS", "file:///");
+    conf.setBoolean("tez.runtime.optimize.local.fetch", true);
+    conf.set("tez.staging-dir", TEST_DATA_DIR);
+    conf.setBoolean("tez.ignore.lib.uris", true);
+    conf.set("hive.tez.container.size", "128");
+    conf.setBoolean("hive.merge.tezfiles", false);
+    conf.setBoolean("hive.in.tez.test", true);
+  }
+
+  @After public void tearDown() {
+    if (msClient != null) {
+      msClient.close();
+    }
+    if (driver != null) {
+      driver.close();
+    }
+    conf = null;
+  }
+
+  protected class TestDataProvider {
+
+    void createFullAcidTable(String tblName, boolean isPartitioned, boolean isBucketed)
+        throws Exception {
+      createTable(tblName, isPartitioned, isBucketed, false, "orc");
+    }
+
+    void createMmTable(String tblName, boolean isPartitioned, boolean isBucketed)
+        throws Exception {
+      createMmTable(tblName, isPartitioned, isBucketed, "orc");
+    }
+
+    void createMmTable(String tblName, boolean isPartitioned, boolean isBucketed, String fileFormat)
+        throws Exception {
+      createTable(tblName, isPartitioned, isBucketed, true, fileFormat);
+    }
+
+    private void createTable(String tblName, boolean isPartitioned, boolean isBucketed,
+        boolean insertOnly, String fileFormat) throws Exception {
+
+      executeStatementOnDriver("drop table if exists " + tblName, driver);
+      StringBuilder query = new StringBuilder();
+      query.append("create table ").append(tblName).append(" (a string, b int)");
+      if (isPartitioned) {
+        query.append(" partitioned by (ds string)");
+      }
+      if (isBucketed) {
+        query.append(" clustered by (a) into 2 buckets");
+      }
+      query.append(" stored as ").append(fileFormat);
+      query.append(" TBLPROPERTIES('transactional'='true',");
+      if (insertOnly) {
+        query.append(" 'transactional_properties'='insert_only')");
+      } else {
+        query.append(" 'transactional_properties'='default')");
+      }
+      executeStatementOnDriver(query.toString(), driver);
+    }
+
+    /**
+     * 5 txns.
+     */
+    void insertTestDataPartitioned(String tblName) throws Exception {
+      executeStatementOnDriver("insert into " + tblName
+          + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow'),"
+          + "('2',3, 'yesterday'),('2',4, 'today')", driver);
+      executeStatementOnDriver("insert into " + tblName
+          + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today'),"
+          + "('4',3, 'tomorrow'),('4',4, 'today')", driver);
+      executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+      executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday'),"
+          + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver);
+      executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver);
+    }
+
+    /**
+     * 3 txns.
+     */
+    protected void insertMmTestDataPartitioned(String tblName) throws Exception {
+      executeStatementOnDriver("insert into " + tblName
+          + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow'),"
+          + "('2',3, 'yesterday'),('2',4, 'today')", driver);
+      executeStatementOnDriver("insert into " + tblName
+          + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today'),"
+          + "('4',3, 'tomorrow'),('4',4, 'today')", driver);
+      executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday'),"
+          + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver);
+    }
+
+    /**
+     * 5 txns.
+     */
+    protected void insertTestData(String tblName) throws Exception {
+      executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)",
+          driver);
+      executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)",
+          driver);
+      executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+      executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)",
+          driver);
+      executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver);
+    }
+
+    /**
+     * 3 txns.
+     */
+    protected void insertMmTestData(String tblName) throws Exception {
+      executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)",
+          driver);
+      executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)",
+          driver);
+      executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)",
+          driver);
+    }
+
+    /**
+     * i * 1.5 txns.
+     */
+    protected void insertTestData(String tblName, int iterations) throws Exception {
+      for (int i = 0; i < iterations; i++) {
+        executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver);
+      }
+      for (int i = 0; i < iterations; i += 2) {
+        executeStatementOnDriver("delete from " + tblName + " where b = " + i, driver);
+      }
+    }
+
+    /**
+     * i txns.
+     */
+    protected void insertMmTestData(String tblName, int iterations) throws Exception {
+      for (int i = 0; i < iterations; i++) {
+        executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver);
+      }
+    }
+
+    protected List<String> getAllData(String tblName) throws Exception {
+      List<String> result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver);
+      Collections.sort(result);
+      return result;
+    }
+
+    protected List<String> getBucketData(String tblName, String bucketId) throws Exception {
+      return executeStatementOnDriverAndReturnResults(
+          "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID", driver);
+    }
+
+    protected void dropTable(String tblName) throws Exception {
+      executeStatementOnDriver("drop table " + tblName, driver);
+    }
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 4c01311..9659a3f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -17,21 +17,16 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -44,20 +39,14 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.streaming.HiveStreamingConnection;
 import org.apache.hive.streaming.StreamingConnection;
 import org.apache.hive.streaming.StrictDelimitedInputWriter;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
@@ -66,68 +55,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
 @SuppressWarnings("deprecation")
-public class TestCrudCompactorOnTez {
-  private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt());
-  private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator
-      + TestCrudCompactorOnTez.class.getCanonicalName() + "-" + System.currentTimeMillis() + "_" + salt
-          .getAndIncrement()).getPath().replaceAll("\\\\", "/");
-  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
-  private HiveConf conf;
-  private IMetaStoreClient msClient;
-  private IDriver driver;
-
-  @Before
-  // Note: we create a new conf and driver object before every test
-  public void setup() throws Exception {
-    File f = new File(TEST_WAREHOUSE_DIR);
-    if (f.exists()) {
-      FileUtil.fullyDelete(f);
-    }
-    if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
-      throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
-    }
-    HiveConf hiveConf = new HiveConf(this.getClass());
-    hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
-    hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
-    hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
-    hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
-    hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
-    TxnDbUtil.setConfValues(hiveConf);
-    TxnDbUtil.cleanDb(hiveConf);
-    TxnDbUtil.prepDb(hiveConf);
-    conf = hiveConf;
-    // Use tez as execution engine for this test class
-    setupTez(conf);
-    msClient = new HiveMetaStoreClient(conf);
-    driver = DriverFactory.newDriver(conf);
-    SessionState.start(new CliSessionState(conf));
-  }
-
-  private void setupTez(HiveConf conf) {
-    conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
-    conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
-    conf.set("tez.am.resource.memory.mb", "128");
-    conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
-    conf.setBoolean("tez.local.mode", true);
-    conf.set("fs.defaultFS", "file:///");
-    conf.setBoolean("tez.runtime.optimize.local.fetch", true);
-    conf.set("tez.staging-dir", TEST_DATA_DIR);
-    conf.setBoolean("tez.ignore.lib.uris", true);
-    conf.set("hive.tez.container.size", "128");
-    conf.setBoolean("hive.merge.tezfiles", false);
-    conf.setBoolean("hive.in.tez.test", true);
-  }
-
-  @After
-  public void tearDown() {
-    if (msClient != null) {
-      msClient.close();
-    }
-    if (driver != null) {
-      driver.close();
-    }
-    conf = null;
-  }
+public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
   @Test
   public void testMajorCompaction() throws Exception {
@@ -217,7 +145,7 @@ public class TestCrudCompactorOnTez {
     String tableName = "testMinorCompaction";
     // Create test table
     TestDataProvider dataProvider = new TestDataProvider();
-    dataProvider.createTable(tableName, false, false);
+    dataProvider.createFullAcidTable(tableName, false, false);
     // Find the location of the table
     IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
     Table table = msClient.getTable(dbName, tableName);
@@ -287,7 +215,7 @@ public class TestCrudCompactorOnTez {
     String tableName = "testMinorCompaction";
     // Create test table
     TestDataProvider dataProvider = new TestDataProvider();
-    dataProvider.createTable(tableName, false, true);
+    dataProvider.createFullAcidTable(tableName, false, true);
     // Find the location of the table
     IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
     Table table = metaStoreClient.getTable(dbName, tableName);
@@ -360,7 +288,7 @@ public class TestCrudCompactorOnTez {
     String tableName = "testMinorCompaction";
     // Create test table
     TestDataProvider dataProvider = new TestDataProvider();
-    dataProvider.createTable(tableName, true, false);
+    dataProvider.createFullAcidTable(tableName, true, false);
     // Find the location of the table
     IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
     Table table = metaStoreClient.getTable(dbName, tableName);
@@ -440,7 +368,7 @@ public class TestCrudCompactorOnTez {
     String tableName = "testMinorCompaction";
     // Create test table
     TestDataProvider dataProvider = new TestDataProvider();
-    dataProvider.createTable(tableName, true, true);
+    dataProvider.createFullAcidTable(tableName, true, true);
     // Find the location of the table
     IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
     Table table = metaStoreClient.getTable(dbName, tableName);
@@ -522,7 +450,7 @@ public class TestCrudCompactorOnTez {
     String tableName = "testMinorCompaction";
     // Create test table
     TestDataProvider dataProvider = new TestDataProvider();
-    dataProvider.createTable(tableName, false, false);
+    dataProvider.createFullAcidTable(tableName, false, false);
     // Find the location of the table
     IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
     Table table = metaStoreClient.getTable(dbName, tableName);
@@ -577,7 +505,7 @@ public class TestCrudCompactorOnTez {
     String tableName = "testMinorCompaction";
     // Create test table
     TestDataProvider dataProvider = new TestDataProvider();
-    dataProvider.createTable(tableName, false, true);
+    dataProvider.createFullAcidTable(tableName, false, true);
     // Find the location of the table
     IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
     Table table = metaStoreClient.getTable(dbName, tableName);
@@ -757,7 +685,7 @@ public class TestCrudCompactorOnTez {
     String tableName = "testMinorCompaction";
     // Create test table
     TestDataProvider dataProvider = new TestDataProvider();
-    dataProvider.createTable(tableName, false, false);
+    dataProvider.createFullAcidTable(tableName, false, false);
     // Find the location of the table
     IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
     Table table = metaStoreClient.getTable(dbName, tableName);
@@ -786,6 +714,10 @@ public class TestCrudCompactorOnTez {
     // Verify all contents
     List<String> actualData = dataProvider.getAllData(tableName);
     Assert.assertEquals(expectedData, actualData);
+    // Insert another round of test data
+    dataProvider.insertTestData(tableName);
+    expectedData = dataProvider.getAllData(tableName);
+    Collections.sort(expectedData);
     // Run a compaction
     CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
     // Clean up resources
@@ -796,7 +728,7 @@ public class TestCrudCompactorOnTez {
     Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState());
     // Verify base directory after compaction
     Assert.assertEquals("Base directory does not match after major compaction",
-        Collections.singletonList("base_0000005_v0000023"),
+        Collections.singletonList("base_0000010_v0000029"),
         CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
     // Verify all contents
     actualData = dataProvider.getAllData(tableName);
@@ -804,6 +736,62 @@ public class TestCrudCompactorOnTez {
   }
 
   @Test
+  public void testMinorCompactionAfterMajor() throws Exception {
+    String dbName = "default";
+    String tableName = "testMinorCompaction";
+    // Create test table
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createFullAcidTable(tableName, false, false);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    dataProvider.insertTestData(tableName);
+    // Get all data before compaction is run
+    List<String> expectedData = dataProvider.getAllData(tableName);
+    Collections.sort(expectedData);
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
+    // Clean up resources
+    CompactorTestUtil.runCleaner(conf);
+    // Only 1 compaction should be in the response queue with succeeded state
+    List<ShowCompactResponseElement> compacts =
+        TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size());
+    Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState());
+    // Verify base directory after compaction
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList("base_0000005_v0000009"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+    // Verify all contents
+    List<String> actualData = dataProvider.getAllData(tableName);
+    Assert.assertEquals(expectedData, actualData);
+    // Insert another round of test data
+    dataProvider.insertTestData(tableName);
+    expectedData = dataProvider.getAllData(tableName);
+    Collections.sort(expectedData);
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    // Clean up resources
+    CompactorTestUtil.runCleaner(conf);
+    // 2 compaction should be in the response queue with succeeded state
+    compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size());
+    Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState());
+    // Verify base directory after compaction
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList("base_0000005_v0000009"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+    Assert.assertEquals("Delta directories do not match after major compaction",
+        Collections.singletonList("delta_0000001_0000010_v0000020"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+    // Verify all contents
+    actualData = dataProvider.getAllData(tableName);
+    Assert.assertEquals(expectedData, actualData);
+  }
+
+  @Test
   public void testMinorCompactionWhileStreamingWithSplitUpdate() throws Exception {
     String dbName = "default";
     String tableName = "testMinorCompaction";
@@ -967,71 +955,5 @@ public class TestCrudCompactorOnTez {
     hiveConf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "none");
     qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries);
     Assert.assertEquals("none", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));
-
-  }
-
-  private class TestDataProvider {
-
-    private void createTable(String tblName, boolean isPartitioned, boolean isBucketed) throws Exception {
-      executeStatementOnDriver("drop table if exists " + tblName, driver);
-      StringBuilder query = new StringBuilder();
-      query.append("create table ").append(tblName).append(" (a string, b int)");
-      if (isPartitioned) {
-        query.append(" partitioned by (ds string)");
-      }
-      if (isBucketed) {
-        query.append(" clustered by (a) into 2 buckets");
-      }
-      query.append(" stored as ORC TBLPROPERTIES('transactional'='true'," + " 'transactional_properties'='default')");
-      executeStatementOnDriver(query.toString(), driver);
-    }
-
-    private void insertTestDataPartitioned(String tblName) throws Exception {
-      executeStatementOnDriver("insert into " + tblName
-          + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow'),"
-          + "('2',3, 'yesterday'),('2',4, 'today')", driver);
-      executeStatementOnDriver("insert into " + tblName
-          + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today'),"
-          + "('4',3, 'tomorrow'),('4',4, 'today')", driver);
-      executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
-      executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday'),"
-          + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver);
-      executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver);
-    }
-
-    private void insertTestData(String tblName) throws Exception {
-      executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)",
-          driver);
-      executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)",
-          driver);
-      executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
-      executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)",
-          driver);
-      executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver);
-    }
-
-    private void insertTestData(String tblName, int iterations) throws Exception {
-      for (int i = 0; i < iterations; i++) {
-        executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver);
-      }
-      for (int i = 0; i < iterations; i += 2) {
-        executeStatementOnDriver("delete from " + tblName + " where b = " + i, driver);
-      }
-    }
-
-    private List<String> getAllData(String tblName) throws Exception {
-      List<String> result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver);
-      Collections.sort(result);
-      return result;
-    }
-
-    private List<String> getBucketData(String tblName, String bucketId) throws Exception {
-      return executeStatementOnDriverAndReturnResults(
-          "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID", driver);
-    }
-
-    private void dropTable(String tblName) throws Exception {
-      executeStatementOnDriver("drop table " + tblName, driver);
-    }
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
new file mode 100644
index 0000000..074430c
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
+
+/**
+ * Test functionality of MmMinorQueryCompactor,.
+ */
+public class TestMmCompactorOnTez extends CompactorOnTezTest {
+
+  @Test public void testMmMinorCompactionNotPartitionedWithoutBuckets() throws Exception {
+    String dbName = "default";
+    String tableName = "testMmMinorCompaction";
+    // Create test table
+    TestDataProvider testDataProvider = new TestCrudCompactorOnTez.TestDataProvider();
+    testDataProvider.createMmTable(tableName, false, false);
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    testDataProvider.insertMmTestData(tableName);
+    // Get all data before compaction is run
+    List<String> expectedData = testDataProvider.getAllData(tableName);
+    // Verify deltas
+    Assert.assertEquals("Delta directories does not match", Arrays
+            .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+                "delta_0000003_0000003_0000"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(1);
+    // Verify delta directories after compaction
+    List<String> actualDeltasAfterComp =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+    Assert.assertEquals("Delta directories does not match after compaction",
+        Collections.singletonList("delta_0000001_0000003_v0000007"), actualDeltasAfterComp);
+    // Verify bucket files in delta dirs
+    List<String> expectedBucketFiles = Collections.singletonList("000000_0");
+    Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+        CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+    verifyAllContents(tableName, testDataProvider, expectedData);
+    // Clean up
+    testDataProvider.dropTable(tableName);
+  }
+
+  @Test public void testMmMinorCompactionNotPartitionedWithBuckets() throws Exception {
+    String dbName = "default";
+    String tableName = "testMmMinorCompaction";
+    // expected name of the delta dir that will be created with minor compaction
+    String newDeltaName = "delta_0000001_0000003_v0000007";
+    // Create test table
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createMmTable(tableName, false, true);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    testDataProvider.insertMmTestData(tableName);
+    // Get all data before compaction is run
+    List<String> expectedData = testDataProvider.getAllData(tableName);
+    // Verify deltas
+    Assert.assertEquals("Delta directories does not match", Arrays
+            .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+                "delta_0000003_0000003_0000"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(1);
+    // Verify delta directories after compaction
+    List<String> actualDeltasAfterComp =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+    Assert.assertEquals("Delta directories does not match after compaction",
+        Collections.singletonList(newDeltaName), actualDeltasAfterComp);
+    // Verify number of files in directory
+    FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), newDeltaName),
+        AcidUtils.hiddenFileFilter);
+    Assert.assertEquals("Incorrect number of bucket files", 2, files.length);
+    // Verify bucket files in delta dirs
+    List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0");
+    Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+        CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+    verifyAllContents(tableName, testDataProvider, expectedData);
+    // Clean up
+    testDataProvider.dropTable(tableName);
+  }
+
+  @Test public void testMmMinorCompactionPartitionedWithoutBuckets() throws Exception {
+    String dbName = "default";
+    String tableName = "testMmMinorCompaction";
+    // Create test table
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createMmTable(tableName, true, false);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    dataProvider.insertMmTestDataPartitioned(tableName);
+    // Get all data before compaction is run
+    List<String> expectedData = dataProvider.getAllData(tableName);
+    // Verify deltas
+    String partitionToday = "ds=today";
+    String partitionTomorrow = "ds=tomorrow";
+    String partitionYesterday = "ds=yesterday";
+    Assert.assertEquals("Delta directories does not match", Arrays
+        .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+            "delta_0000003_0000003_0000"), CompactorTestUtil
+        .getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday));
+    // Run a compaction
+    CompactorTestUtil
+        .runCompaction(conf, dbName, tableName, CompactionType.MINOR, true, partitionToday,
+            partitionTomorrow, partitionYesterday);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(3);
+    // Verify delta directories after compaction in each partition
+    List<String> actualDeltasAfterCompPartToday =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday);
+    Assert.assertEquals("Delta directories does not match after compaction",
+        Collections.singletonList("delta_0000001_0000003_v0000007"),
+        actualDeltasAfterCompPartToday);
+    // Verify bucket files in delta dirs
+    List<String> expectedBucketFiles = Collections.singletonList("000000_0");
+    Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
+    verifyAllContents(tableName, dataProvider, expectedData);
+    // Clean up
+    dataProvider.dropTable(tableName);
+  }
+
+  @Test public void testMmMinorCompactionPartitionedWithBucketsOrc() throws Exception {
+    testMmMinorCompactionPartitionedWithBuckets("orc");
+  }
+
+  @Test public void testMmMinorCompactionPartitionedWithBucketsParquet() throws Exception {
+    testMmMinorCompactionPartitionedWithBuckets("parquet");
+  }
+
+  @Test public void testMmMinorCompactionPartitionedWithBucketsAvro() throws Exception {
+    testMmMinorCompactionPartitionedWithBuckets("avro");
+  }
+
+  @Test public void testMmMinorCompactionPartitionedWithBucketsTextFile() throws Exception {
+    testMmMinorCompactionPartitionedWithBuckets("textfile");
+  }
+
+  @Test public void testMmMinorCompactionPartitionedWithBucketsSequenceFile() throws Exception {
+    testMmMinorCompactionPartitionedWithBuckets("sequencefile");
+  }
+
+  @Test public void testMmMinorCompactionPartitionedWithBucketsRcFile() throws Exception {
+    testMmMinorCompactionPartitionedWithBuckets("RcFile");
+  }
+
+  @Test public void testMmMinorCompactionPartitionedWithBucketsJsonFile() throws Exception {
+    testMmMinorCompactionPartitionedWithBuckets("JsonFile");
+  }
+
+  private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) throws Exception {
+    String dbName = "default";
+    String tableName = "testMmMinorCompaction";
+    // Create test table
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createMmTable(tableName, true, true, fileFormat);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    dataProvider.insertMmTestDataPartitioned(tableName);
+    // Get all data before compaction is run
+    List<String> expectedData = dataProvider.getAllData(tableName);
+    // Verify deltas
+    String partitionToday = "ds=today";
+    String partitionTomorrow = "ds=tomorrow";
+    String partitionYesterday = "ds=yesterday";
+    Assert.assertEquals("Delta directories does not match", Arrays
+        .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+            "delta_0000003_0000003_0000"), CompactorTestUtil
+        .getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday));
+    // Run a compaction
+    CompactorTestUtil
+        .runCompaction(conf, dbName, tableName, CompactionType.MINOR, true, partitionToday,
+            partitionTomorrow, partitionYesterday);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(3);
+    // Verify delta directories after compaction in each partition
+    List<String> actualDeltasAfterCompPartToday =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday);
+    Assert.assertEquals("Delta directories does not match after compaction",
+        Collections.singletonList("delta_0000001_0000003_v0000007"),
+        actualDeltasAfterCompPartToday);
+    // Verify bucket files in delta dirs
+    List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0");
+    Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
+    verifyAllContents(tableName, dataProvider, expectedData);
+    // Clean up
+    dataProvider.dropTable(tableName);
+  }
+
+  @Test public void testMmMinorCompaction10DeltaDirs() throws Exception {
+    String dbName = "default";
+    String tableName = "testMmMinorCompaction";
+    // Create test table
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createMmTable(tableName, false, false);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    dataProvider.insertMmTestData(tableName, 10);
+    // Get all data before compaction is run
+    List<String> expectedData = dataProvider.getAllData(tableName);
+    Collections.sort(expectedData);
+    // Verify deltas
+    List<String> deltaNames =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+    Assert.assertEquals(10, deltaNames.size());
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    List<ShowCompactResponseElement> compacts =
+        TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals("Completed compaction queue must contain 3 element", 1, compacts.size());
+    compacts.forEach(
+        c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState()));
+    // Verify delta directories after compaction
+    List<String> actualDeltasAfterComp =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+    Assert.assertEquals(Collections.singletonList("delta_0000001_0000010_v0000014"),
+        actualDeltasAfterComp);
+    // Verify bucket file in delta dir
+    List<String> expectedBucketFile = Collections.singletonList("000000_0");
+    Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFile,
+        CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+    verifyAllContents(tableName, dataProvider, expectedData);
+    // Clean up
+    dataProvider.dropTable(tableName);
+  }
+
+  @Test public void testMultipleMmMinorCompactions() throws Exception {
+    String dbName = "default";
+    String tableName = "testMmMinorCompaction";
+    // Create test table
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createMmTable(tableName, false, true);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    dataProvider.insertMmTestData(tableName);
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(1);
+    List<ShowCompactResponseElement> compacts;
+    // Insert test data into test table
+    dataProvider.insertMmTestData(tableName);
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(2);
+    // Insert test data into test table
+    dataProvider.insertMmTestData(tableName);
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(3);
+    // Verify delta directories after compaction
+    List<String> actualDeltasAfterComp =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+    Assert.assertEquals("Delta directories does not match after compaction",
+        Collections.singletonList("delta_0000001_0000009_v0000026"), actualDeltasAfterComp);
+
+  }
+
+  @Test public void testMmMajorCompactionAfterMinor() throws Exception {
+    String dbName = "default";
+    String tableName = "testMmMinorCompaction";
+    // Create test table
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createMmTable(tableName, false, false);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    dataProvider.insertMmTestData(tableName);
+    // Get all data before compaction is run
+    List<String> expectedData = dataProvider.getAllData(tableName);
+    Collections.sort(expectedData);
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(1);
+    List<ShowCompactResponseElement> compacts;
+    // Verify delta directories after compaction
+    Assert.assertEquals("Delta directories does not match after minor compaction",
+        Collections.singletonList("delta_0000001_0000003_v0000007"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+    verifyAllContents(tableName, dataProvider, expectedData);
+    List<String> actualData;
+    // Insert a second round of test data into test table; update expectedData
+    dataProvider.insertMmTestData(tableName);
+    expectedData = dataProvider.getAllData(tableName);
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(2);
+    // Verify base directory after compaction
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList("base_0000006_v0000019"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+    actualData = dataProvider.getAllData(tableName);
+    Assert.assertEquals(expectedData, actualData);
+  }
+
+  @Test public void testMmMinorCompactionAfterMajor() throws Exception {
+    String dbName = "default";
+    String tableName = "testMmMinorCompaction";
+    // Create test table
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createMmTable(tableName, false, false);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    dataProvider.insertMmTestData(tableName);
+    // Get all data before compaction is run
+    List<String> expectedData = dataProvider.getAllData(tableName);
+    Collections.sort(expectedData);
+    // Run a major compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(1);
+    List<ShowCompactResponseElement> compacts;
+    // Verify base directory after compaction
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList("base_0000003_v0000007"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+    verifyAllContents(tableName, dataProvider, expectedData);
+    // Insert test data into test table
+    dataProvider.insertMmTestData(tableName);
+    expectedData = dataProvider.getAllData(tableName);
+    Collections.sort(expectedData);
+    // Run a compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(2);
+    // Verify base/delta directories after compaction
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList("base_0000003_v0000007"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+    Assert.assertEquals("Delta directories does not match after minor compaction",
+        Collections.singletonList("delta_0000001_0000006_v0000016"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+    verifyAllContents(tableName, dataProvider, expectedData);
+  }
+
+  @Test public void testMmMinorCompactionWithSchemaEvolutionAndBuckets() throws Exception {
+    String dbName = "default";
+    String tblName = "testMmMinorCompactionWithSchemaEvolutionAndBuckets";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("create transactional table " + tblName
+        + " (a int, b int) partitioned by(ds string) clustered by (a) into 2 buckets"
+        + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+        + " 'transactional_properties'='insert_only')", driver);
+    // Insert some data
+    executeStatementOnDriver("insert into " + tblName
+            + " partition (ds) values"
+            + "(1,2,'today'),(1,3,'today'),(1,4,'yesterday'),"
+            + "(2,2,'yesterday'),(2,3,'today'),(2,4,'today')",
+        driver);
+    // Add a new column
+    executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver);
+    // TODO uncomment this line after HIVE-22826 fixed:
+    // executeStatementOnDriver("alter table " + tblName + " change column a aa int", driver);
+    // Insert more data
+    executeStatementOnDriver("insert into " + tblName
+        + " partition (ds) values"
+        + "(3,2,1000,'yesterday'),(3,3,1001,'today'),"
+        + "(3,4,1002,'yesterday'),(4,2,1003,'today'),"
+        + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver);
+    // Get all data before compaction is run
+    TestDataProvider dataProvider = new TestDataProvider();
+    List<String> expectedData = dataProvider.getAllData(tblName);
+    Collections.sort(expectedData);
+    //  Run minor compaction and cleaner
+    CompactorTestUtil
+        .runCompaction(conf, dbName, tblName, CompactionType.MINOR, true, "ds=yesterday",
+            "ds=today");
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(2);
+    verifyAllContents(tblName, dataProvider, expectedData);
+    // Clean up
+    executeStatementOnDriver("drop table " + tblName, driver);
+  }
+
+  @Test public void testMmMinorCompactionWithSchemaEvolutionNoBucketsMultipleReducers()
+      throws Exception {
+    HiveConf hiveConf = new HiveConf(conf);
+    hiveConf.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2);
+    hiveConf.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2);
+    driver = DriverFactory.newDriver(hiveConf);
+    String dbName = "default";
+    String tblName = "testMmMinorCompactionWithSchemaEvolutionNoBucketsMultipleReducers";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver(
+        "create transactional table " + tblName + " (a int, b int) partitioned by(ds string)"
+            + " stored as ORC TBLPROPERTIES('transactional'='true',"
+            + " 'transactional_properties'='insert_only')", driver);
+    // Insert some data
+    executeStatementOnDriver("insert into " + tblName + " partition (ds) values"
+            + "(1,2,'today'),(1,3,'today'),"
+            + "(1,4,'yesterday'),(2,2,'yesterday'),"
+            + "(2,3,'today'),(2,4,'today')",
+        driver);
+    // Add a new column
+    executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver);
+    // Insert more data
+    executeStatementOnDriver("insert into " + tblName
+        + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today'),"
+        + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver);
+    // Get all data before compaction is run
+    TestDataProvider dataProvider = new TestDataProvider();
+    List<String> expectedData = dataProvider.getAllData(tblName);
+    Collections.sort(expectedData);
+    //  Run minor compaction and cleaner
+    CompactorTestUtil
+        .runCompaction(conf, dbName, tblName, CompactionType.MINOR, true, "ds=yesterday",
+            "ds=today");
+    CompactorTestUtil.runCleaner(hiveConf);
+    verifySuccessulTxn(2);
+
+    verifyAllContents(tblName, dataProvider, expectedData);
+    // Clean up
+    executeStatementOnDriver("drop table " + tblName, driver);
+  }
+
+  @Test public void testMinorMmCompactionRemovesAbortedDirs()
+      throws Exception { // see mmTableOpenWriteId
+    String dbName = "default";
+    String tableName = "testMmMinorCompaction";
+    // Create test table
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createMmTable(tableName, false, false);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    testDataProvider.insertMmTestData(tableName);
+    // Get all data before compaction is run. Expected data is 2 x MmTestData insertion
+    List<String> expectedData = new ArrayList<>();
+    List<String> oneMmTestDataInsertion = testDataProvider.getAllData(tableName);
+    expectedData.addAll(oneMmTestDataInsertion);
+    expectedData.addAll(oneMmTestDataInsertion);
+    Collections.sort(expectedData);
+    // Insert an aborted directory (txns 4-6)
+    rollbackAllTxns(true, driver);
+    testDataProvider.insertMmTestData(tableName);
+    rollbackAllTxns(false, driver);
+    // Check that delta dirs 4-6 exist
+    List<String> actualDeltasAfterComp =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+    Assert.assertEquals(Lists
+        .newArrayList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+            "delta_0000003_0000003_0000", "delta_0000004_0000004_0000",
+            "delta_0000005_0000005_0000", "delta_0000006_0000006_0000"), actualDeltasAfterComp);
+    // Insert another round of test data (txns 7-9)
+    testDataProvider.insertMmTestData(tableName);
+    verifyAllContents(tableName, testDataProvider, expectedData);
+    // Run a minor compaction
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessulTxn(1);
+    // Verify delta directories after compaction
+    Assert.assertEquals("Delta directories does not match after minor compaction",
+        Collections.singletonList("delta_0000001_0000009_v0000014"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+    verifyAllContents(tableName, testDataProvider, expectedData);
+  }
+
+  /**
+   * Verify that the expected number of transactions have run, and their state is "succeeded".
+   *
+   * @param expectedCompleteCompacts number of compactions already run
+   * @throws MetaException
+   */
+  private void verifySuccessulTxn(int expectedCompleteCompacts) throws MetaException {
+    List<ShowCompactResponseElement> compacts =
+        TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals("Completed compaction queue must contain one element",
+        expectedCompleteCompacts, compacts.size());
+    compacts.forEach(
+        c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState()));
+  }
+
+  /**
+   * Results of a select on the table results in the same data as expectedData.
+   */
+  private void verifyAllContents(String tblName, TestDataProvider dataProvider,
+      List<String> expectedData) throws Exception {
+    List<String> actualData = dataProvider.getAllData(tblName);
+    Collections.sort(actualData);
+    Assert.assertEquals(expectedData, actualData);
+  }
+
+  /**
+   * Set to true to cause all transactions to be rolled back, until set back to false.
+   */
+  private static void rollbackAllTxns(boolean val, IDriver driver) {
+    driver.getConf().setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, val);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index bb70db4..739f2b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -394,21 +394,6 @@ public class CompactorMR {
     HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
   }
 
-  // Remove the directories for aborted transactions only
-  private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException {
-    // For MM table, we only want to delete delta dirs for aborted txns.
-    List<Path> filesToDelete = dir.getAbortedDirectories();
-    if (filesToDelete.size() < 1) {
-      return;
-    }
-    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
-    FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
-    for (Path dead : filesToDelete) {
-      LOG.debug("Going to delete path " + dead.toString());
-      fs.delete(dead, true);
-    }
-  }
-
   public JobConf getMrJob() {
     return mrJob;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index bad5d00..48387c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -20,37 +20,23 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.util.DirectionUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 /**
  * Class responsible to run query based major compaction on insert only tables.
@@ -66,14 +52,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
     AcidUtils.Directory dir = AcidUtils
         .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
             table.getParameters(), false);
-    removeFilesForMmTable(hiveConf, dir);
-
-    // Then, actually do the compaction.
-    if (!compactionInfo.isMajorCompaction()) {
-      // Not supported for MM tables right now.
-      LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction");
-      return;
-    }
+    MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir);
 
     if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) {
       return;
@@ -129,107 +108,10 @@ final class MmMajorQueryCompactor extends QueryCompactor {
     fs.delete(fromPath, true);
   }
 
-  // Remove the directories for aborted transactions only
-  private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
-    // For MM table, we only want to delete delta dirs for aborted txns.
-    List<Path> filesToDelete = dir.getAbortedDirectories();
-    if (filesToDelete.size() < 1) {
-      return;
-    }
-    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
-    FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
-    for (Path dead : filesToDelete) {
-      LOG.debug("Going to delete path " + dead.toString());
-      fs.delete(dead, true);
-    }
-  }
-
-  private List<String> getCreateQueries(String fullName, Table t, StorageDescriptor sd, String location) {
-    StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("(");
-    List<FieldSchema> cols = t.getSd().getCols();
-    boolean isFirst = true;
-    for (FieldSchema col : cols) {
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("`").append(col.getName()).append("` ").append(col.getType());
-    }
-    query.append(") ");
-
-    // Bucketing.
-    List<String> buckCols = t.getSd().getBucketCols();
-    if (buckCols.size() > 0) {
-      query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
-      List<Order> sortCols = t.getSd().getSortCols();
-      if (sortCols.size() > 0) {
-        query.append("SORTED BY (");
-        isFirst = true;
-        for (Order sortCol : sortCols) {
-          if (!isFirst) {
-            query.append(", ");
-          }
-          isFirst = false;
-          query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
-        }
-        query.append(") ");
-      }
-      query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
-    }
-
-    // Stored as directories. We don't care about the skew otherwise.
-    if (t.getSd().isStoredAsSubDirectories()) {
-      SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
-      if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
-        query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
-        isFirst = true;
-        for (List<String> colValues : skewedInfo.getSkewedColValues()) {
-          if (!isFirst) {
-            query.append(", ");
-          }
-          isFirst = false;
-          query.append("('").append(StringUtils.join("','", colValues)).append("')");
-        }
-        query.append(") STORED AS DIRECTORIES");
-      }
-    }
-
-    SerDeInfo serdeInfo = sd.getSerdeInfo();
-    Map<String, String> serdeParams = serdeInfo.getParameters();
-    query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
-        .append("'");
-    String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
-    assert sh == null; // Not supposed to be a compactable table.
-    if (!serdeParams.isEmpty()) {
-      ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
-    }
-    query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
-        .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
-        .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
-    // Exclude all standard table properties.
-    Set<String> excludes = getHiveMetastoreConstants();
-    excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
-    isFirst = true;
-    for (Map.Entry<String, String> e : t.getParameters().entrySet()) {
-      if (e.getValue() == null) {
-        continue;
-      }
-      if (excludes.contains(e.getKey())) {
-        continue;
-      }
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue()))
-          .append("'");
-    }
-    if (!isFirst) {
-      query.append(", ");
-    }
-    query.append("'transactional'='false')");
-    return Lists.newArrayList(query.toString());
-
+  private List<String> getCreateQueries(String tmpTableName, Table table,
+      StorageDescriptor storageDescriptor, String baseLocation) {
+    return Lists.newArrayList(MmQueryCompactorUtils
+        .getCreateQuery(tmpTableName, table, storageDescriptor, baseLocation, false, false));
   }
 
   private List<String> getCompactionQueries(Table t, Partition p, String tmpName) {
@@ -263,28 +145,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
   }
 
   private List<String> getDropQueries(String tmpTableName) {
-    return Lists.newArrayList("drop table if exists " + tmpTableName);
+    return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableName);
   }
 
-  private static Set<String> getHiveMetastoreConstants() {
-    Set<String> result = new HashSet<>();
-    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
-      if (!Modifier.isStatic(f.getModifiers())) {
-        continue;
-      }
-      if (!Modifier.isFinal(f.getModifiers())) {
-        continue;
-      }
-      if (!String.class.equals(f.getType())) {
-        continue;
-      }
-      f.setAccessible(true);
-      try {
-        result.add((String) f.get(null));
-      } catch (IllegalAccessException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return result;
-  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
new file mode 100644
index 0000000..feb667c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Run a minor query compaction on an insert only (MM) table.
+ */
+final class MmMinorQueryCompactor extends QueryCompactor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MmMinorQueryCompactor.class.getName());
+
+  @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition,
+      StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo)
+      throws IOException {
+    LOG.debug(
+        "Going to delete directories for aborted transactions for MM table " + table.getDbName()
+            + "." + table.getTableName());
+
+    AcidUtils.Directory dir = AcidUtils
+        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
+            Ref.from(false), false, table.getParameters(), false);
+    MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir);
+    String tmpLocation = Util.generateTmpPath(storageDescriptor);
+    Path sourceTabLocation = new Path(tmpLocation);
+    Path resultTabLocation = new Path(tmpLocation, "_result");
+
+    HiveConf driverConf = setUpDriverSession(hiveConf);
+
+    String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_";
+    String tmpTableBase = tmpPrefix + System.currentTimeMillis();
+
+    List<String> createTableQueries =
+        getCreateQueries(tmpTableBase, table, partition == null ? table.getSd() : partition.getSd(),
+            sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds);
+    List<String> compactionQueries = getCompactionQueries(tmpTableBase, table.getSd());
+    List<String> dropQueries = getDropQueries(tmpTableBase);
+    runCompactionQueries(driverConf, tmpTableBase, storageDescriptor, writeIds, compactionInfo,
+        createTableQueries, compactionQueries, dropQueries);
+  }
+
+  /**
+   * Move files from "result table" directory to table/partition to compact's directory.
+   */
+  @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
+      ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
+    org.apache.hadoop.hive.ql.metadata.Table resultTable =
+        Hive.get().getTable(tmpTableName + "_result");
+    String from = resultTable.getSd().getLocation();
+    Path fromPath = new Path(from);
+    Path toPath = new Path(dest);
+    FileSystem fs = fromPath.getFileSystem(conf);
+    long maxTxn = actualWriteIds.getHighWatermark();
+    AcidOutputFormat.Options options =
+        new AcidOutputFormat.Options(conf).writingBase(false).isCompressed(false)
+            .minimumWriteId(1).maximumWriteId(maxTxn).bucket(0).statementId(-1)
+            .visibilityTxnId(compactorTxnId);
+    Path newDeltaDir = AcidUtils.createFilename(toPath, options).getParent();
+    if (!fs.exists(fromPath)) {
+      LOG.info(from + " not found.  Assuming 0 splits. Creating " + newDeltaDir);
+      fs.mkdirs(newDeltaDir);
+      return;
+    }
+    LOG.info("Moving contents of " + from + " to " + dest);
+    fs.rename(fromPath, newDeltaDir);
+    fs.delete(fromPath, true);
+  }
+
+  /**
+   * Get a list of create/alter table queries. These tables serves as temporary data source for
+   * query based minor compaction. The following tables are created:
+   * <ol>
+   *   <li>tmpTable - "source table": temporary, external, partitioned table. Each partition
+   *     points to exactly one delta directory in the table/partition to compact</li>
+   *   <li>tmpTable_result - "result table" : temporary table which stores the aggregated
+   *     results of the minor compaction query until the compaction can be committed</li>
+   * </ol>
+   *
+   * @param tmpTableBase name of the first temp table (second will be $tmpTableBase_result)
+   * @param t Table to compact
+   * @param sd storage descriptor of table or partition to compact
+   * @param sourceTabLocation location the "source table" (temp table 1) should go
+   * @param resultTabLocation location the "result table (temp table 2) should go
+   * @param dir the parent directory of delta directories
+   * @param validWriteIdList valid write ids for the table/partition to compact
+   * @return List of 3 query strings: 2 create table, 1 alter table
+   */
+  private List<String> getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd,
+      String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir,
+      ValidWriteIdList validWriteIdList) {
+    List<String> queries = new ArrayList<>();
+    queries.add(
+        MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true, true));
+    buildAlterTableQuery(tmpTableBase, dir, validWriteIdList).ifPresent(queries::add);
+    queries.add(MmQueryCompactorUtils
+        .getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false, false));
+    return queries;
+  }
+
+  /**
+   * Builds an alter table query, which adds partitions pointing to location of delta directories.
+   *
+   * @param tableName name of the temp table to be altered
+   * @param dir the parent directory of delta directories
+   * @param validWriteIdList valid write ids for the table/partition to compact
+   * @return alter table statement wrapped in {@link Optional}.
+   */
+  private Optional<String> buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
+      ValidWriteIdList validWriteIdList) {
+    if (!dir.getCurrentDirectories().isEmpty()) {
+      long minWriteID =
+          validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
+      long highWatermark = validWriteIdList.getHighWatermark();
+      List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
+          delta -> delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID)
+          .collect(Collectors.toList());
+      if (!deltas.isEmpty()) {
+        StringBuilder query = new StringBuilder().append("alter table ").append(tableName);
+        query.append(" add ");
+        deltas.forEach(
+            delta -> query.append("partition (file_name='").append(delta.getPath().getName())
+                .append("') location '").append(delta.getPath()).append("' "));
+        return Optional.of(query.toString());
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Get a list containing just the minor compaction query. The query selects the content of the
+   * source temporary table and inserts it into the resulttable. It will look like:
+   * <ol>
+   *  <li>insert into table $tmpTableBase_result select `col_1`, .. from tmpTableBase</li>
+   * </ol>
+   *
+   * @param tmpTableBase an unique identifier, which helps to find all the temporary tables
+   * @return list of compaction queries, always non-null
+   */
+  private List<String> getCompactionQueries(String tmpTableBase, StorageDescriptor sd) {
+    String resultTableName = tmpTableBase + "_result";
+    StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName)
+        .append(" select ");
+    List<FieldSchema> cols = sd.getCols();
+    boolean isFirst = true;
+    for (FieldSchema col : cols) {
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("`").append(col.getName()).append("`");
+    }
+    query.append(" from ").append(tmpTableBase);
+    return Lists.newArrayList(query.toString());
+  }
+
+  /**
+   * Get list of drop table statements.
+   * @param tmpTableBase an unique identifier, which helps to find all the temp tables
+   * @return list of drop table statements, always non-null
+   */
+  private List<String> getDropQueries(String tmpTableBase) {
+    return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase,
+        MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase + "_result");
+  }
+
+  private HiveConf setUpDriverSession(HiveConf hiveConf) {
+    HiveConf driverConf = new HiveConf(hiveConf);
+    driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+    driverConf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS, false);
+    driverConf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false);
+    return driverConf;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java
new file mode 100644
index 0000000..891696d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+final class MmQueryCompactorUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MmQueryCompactorUtils.class.getName());
+  static final String DROP_IF_EXISTS = "drop table if exists ";
+
+  private MmQueryCompactorUtils() {}
+
+  /**
+   * Creates a command to create a new table based on an example table (sourceTab).
+   *
+   * @param fullName of new table
+   * @param sourceTab the table we are modeling the new table on
+   * @param sd StorageDescriptor of the table or partition we are modeling the new table on
+   * @param location of the new table
+   * @param isPartitioned should the new table be partitioned
+   * @param isExternal should the new table be external
+   * @return query string creating the new table
+   */
+  static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor sd,
+      String location, boolean isPartitioned, boolean isExternal) {
+    StringBuilder query = new StringBuilder("create temporary ");
+    if (isExternal) {
+      query.append("external ");
+    }
+    query.append("table ").append(fullName).append("(");
+    List<FieldSchema> cols = sourceTab.getSd().getCols();
+    boolean isFirst = true;
+    for (FieldSchema col : cols) {
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("`").append(col.getName()).append("` ").append(col.getType());
+    }
+    query.append(") ");
+
+    // Partitioning. Used for minor compaction.
+    if (isPartitioned) {
+      query.append(" PARTITIONED BY (`file_name` STRING) ");
+    }
+
+    // Bucketing.
+    List<String> buckCols = sourceTab.getSd().getBucketCols();
+    if (buckCols.size() > 0) {
+      query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+      List<Order> sortCols = sourceTab.getSd().getSortCols();
+      if (sortCols.size() > 0) {
+        query.append("SORTED BY (");
+        isFirst = true;
+        for (Order sortCol : sortCols) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
+        }
+        query.append(") ");
+      }
+      query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS");
+    }
+
+    // Stored as directories. We don't care about the skew otherwise.
+    if (sourceTab.getSd().isStoredAsSubDirectories()) {
+      SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
+      if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+        query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+        isFirst = true;
+        for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append("('").append(StringUtils.join("','", colValues)).append("')");
+        }
+        query.append(") STORED AS DIRECTORIES");
+      }
+    }
+
+    SerDeInfo serdeInfo = sd.getSerdeInfo();
+    Map<String, String> serdeParams = serdeInfo.getParameters();
+    query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
+        .append("'");
+    String sh = sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+    assert sh == null; // Not supposed to be a compactable table.
+    if (!serdeParams.isEmpty()) {
+      ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
+    }
+    query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
+        .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
+        .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
+    // Exclude all standard table properties.
+    Set<String> excludes = getHiveMetastoreConstants();
+    excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
+    isFirst = true;
+    for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) {
+      if (e.getValue() == null) {
+        continue;
+      }
+      if (excludes.contains(e.getKey())) {
+        continue;
+      }
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue()))
+          .append("'");
+    }
+    if (!isFirst) {
+      query.append(", ");
+    }
+    query.append("'transactional'='false')");
+    return query.toString();
+
+  }
+
+  private static Set<String> getHiveMetastoreConstants() {
+    Set<String> result = new HashSet<>();
+    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+      if (!Modifier.isStatic(f.getModifiers())) {
+        continue;
+      }
+      if (!Modifier.isFinal(f.getModifiers())) {
+        continue;
+      }
+      if (!String.class.equals(f.getType())) {
+        continue;
+      }
+      f.setAccessible(true);
+      try {
+        result.add((String) f.get(null));
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Remove the delta directories of aborted transactions.
+   */
+  static void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
+    List<Path> filesToDelete = dir.getAbortedDirectories();
+    if (filesToDelete.size() < 1) {
+      return;
+    }
+    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
+    FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+    for (Path dead : filesToDelete) {
+      LOG.debug("Going to delete path " + dead.toString());
+      fs.delete(dead, true);
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
index 2f2bb21..6542eef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
@@ -62,7 +62,11 @@ final class QueryCompactorFactory {
 
     if (AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf
         .getBoolVar(configuration, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
-      return new MmMajorQueryCompactor();
+      if (compactionInfo.isMajorCompaction()) {
+        return new MmMajorQueryCompactor();
+      } else {
+        return new MmMinorQueryCompactor();
+      }
     }
 
     return null;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 88ca683..e56d831 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -1978,43 +1978,6 @@ public class TestTxnCommands2 {
   }
 
   /**
-   * Test compaction for Micro-managed table
-   * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables
-   * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any
-   * @throws Exception
-   */
-  @Test
-  public void testMmTableCompaction() throws Exception {
-    // 1. Insert some rows into MM table
-    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)");
-    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)");
-    // There should be 2 delta directories
-    verifyDirAndResult(2);
-
-    // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay.
-    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
-    runWorker(hiveConf);
-    verifyDirAndResult(2);
-
-    // 3. Let a transaction be aborted
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
-    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)");
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
-    // There should be 3 delta directories. The new one is the aborted one.
-    verifyDirAndResult(3);
-
-    // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction.
-    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
-    runWorker(hiveConf);
-    // The worker should remove the subdir for aborted transaction
-    verifyDirAndResult(2);
-
-    // 5. Run Cleaner. Shouldn't impact anything.
-    runCleaner(hiveConf);
-    verifyDirAndResult(2);
-  }
-
-  /**
    * Test cleaner for TXN_TO_WRITE_ID table.
    * @throws Exception
    */
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
index aabf15c..1de25cf 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -105,42 +105,6 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
       runStatementOnDriver("drop table if exists " + t);
     }
   }
-  /**
-   * Test compaction for Micro-managed table
-   * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables
-   * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any
-   * @throws Exception
-   */
-  @Test
-  public void testMmTableCompaction() throws Exception {
-    // 1. Insert some rows into MM table
-    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
-    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
-    // There should be 2 delta directories
-    verifyDirAndResult(2);
-
-    // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay.
-    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
-    runWorker(hiveConf);
-    verifyDirAndResult(2);
-
-    // 3. Let a transaction be aborted
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
-    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(5,6)");
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
-    // There should be 3 delta directories. The new one is the aborted one.
-    verifyDirAndResult(3);
-
-    // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction.
-    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
-    runWorker(hiveConf);
-    // The worker should remove the subdir for aborted transaction
-    verifyDirAndResult(2);
-
-    // 5. Run Cleaner. Shouldn't impact anything.
-    runCleaner(hiveConf);
-    verifyDirAndResult(2);
-  }
 
   /**
    * Test a scenario, on a micro-managed table, where an IOW comes in
@@ -523,96 +487,6 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
     verifyDirAndResult(0, true);
   }
 
-  @Test
-  public void testSnapshotIsolationWithAbortedTxnOnMmTable() throws Exception {
-
-    // Insert two rows into the table.
-    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
-    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
-    // There should be 2 delta directories
-    verifyDirAndResult(2);
-
-    // Initiate a minor compaction request on the table.
-    runStatementOnDriver("alter table " + TableExtended.MMTBL  + " compact 'MINOR'");
-
-    // Run Compaction Worker to do compaction.
-    // But we do not compact a MM table but only transit the compaction request to
-    // "ready for cleaning" state in this case.
-    runWorker(hiveConf);
-    verifyDirAndResult(2);
-
-    // Start an INSERT statement transaction and roll back this transaction.
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
-    runStatementOnDriver("insert into " + TableExtended.MMTBL  + " values (5, 6)");
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
-    /**
-     * There should be 3 delta directories. The new one is the aborted one.
-     *
-     * target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1541637725613/warehouse/mmtbl/
-     ├── delta_0000001_0000001_0000
-     │   └── 000000_0
-     ├── delta_0000002_0000002_0000
-     │   └── 000000_0
-     └── delta_0000003_0000003_0000
-         └── 000000_0
-      */
-    verifyDirAndResult(3);
-
-    // Execute SELECT statement and verify the result set (should be two rows).
-    int[][] expected = new int[][] {{1, 2}, {3, 4}};
-    List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(expected), rs);
-
-    // Run Cleaner.
-    // delta_0000003_0000003_0000 produced by the aborted txn is removed even though it is
-    // above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID since all data in it is aborted
-    // This run does transition the entry "successful".
-    runCleaner(hiveConf);
-    verifyDirAndResult(2);
-
-    // Execute SELECT and verify that aborted operation is not counted for MM table.
-    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(expected), rs);
-
-    // Run initiator to execute CompactionTxnHandler.cleanEmptyAbortedTxns()
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
-            1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
-    TestTxnCommands2.runInitiator(hiveConf);
-    // This run of Initiator doesn't add any compaction_queue entry
-    // since we only have one MM table with data - we don't compact MM tables.
-    verifyDirAndResult(2);
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
-            1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
-
-    // Execute SELECT statement and verify that aborted INSERT statement is not counted.
-    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(expected), rs);
-
-    // Initiate a minor compaction request on the table.
-    runStatementOnDriver("alter table " + TableExtended.MMTBL  + " compact 'MINOR'");
-
-    // Run worker to delete aborted transaction's delta directory.
-    runWorker(hiveConf);
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
-            1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_COMPONENTS"),
-            1,
-            TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_COMPONENTS"));
-    verifyDirAndResult(2);
-
-    // Run Cleaner to delete rows for the aborted transaction
-    // from TXN_COMPONENTS.
-    runCleaner(hiveConf);
-
-    // Run initiator to clean the row fro the aborted transaction from TXNS.
-    TestTxnCommands2.runInitiator(hiveConf);
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
-            0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_COMPONENTS"),
-            0,
-            TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_COMPONENTS"));
-  }
-
   private void verifyDirAndResult(int expectedDeltas) throws Exception {
     verifyDirAndResult(expectedDeltas, false);
   }