You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/11/07 08:56:59 UTC

[hive] branch master updated: HIVE-22401: ACID: Refactor CompactorMR (Laszlo Pinter reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 358e5a9  HIVE-22401: ACID: Refactor CompactorMR (Laszlo Pinter reviewed by Peter Vary)
358e5a9 is described below

commit 358e5a93f3c1e33662bf35d75343b31f4d1246bf
Author: Laszlo Pinter <lp...@cloudera.com>
AuthorDate: Thu Nov 7 09:46:55 2019 +0100

    HIVE-22401: ACID: Refactor CompactorMR (Laszlo Pinter reviewed by Peter Vary)
---
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  | 451 +--------------------
 .../hive/ql/txn/compactor/MajorQueryCompactor.java | 225 ++++++++++
 .../ql/txn/compactor/MmMajorQueryCompactor.java    | 324 +++++++++++++++
 .../hive/ql/txn/compactor/QueryCompactor.java      | 106 +++++
 .../ql/txn/compactor/QueryCompactorFactory.java    |  58 +++
 5 files changed, 720 insertions(+), 444 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 0f1579a..ee2c0f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -17,31 +17,24 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.HashSet;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
@@ -50,19 +43,13 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.DriverUtils;
-import org.apache.hadoop.hive.ql.ddl.table.creation.ShowCreateTableOperation;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -71,10 +58,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.util.DirectionUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -95,9 +78,7 @@ import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.Ref;
 import org.apache.parquet.Strings;
 import org.apache.thrift.TException;
@@ -238,26 +219,16 @@ public class CompactorMR {
     }
     
     /**
-     * Run major compaction in a HiveQL query (compaction for MM tables handled in runMmCompaction method).
-     * TODO: 
+     * Run major compaction in a HiveQL query (compaction for MM tables handled in {@link MmMajorQueryCompactor}
+     * class).
+     * Find a better way:
      * 1. A good way to run minor compaction (currently disabled when this config is enabled)
      * 2. More generic approach to collecting files in the same logical bucket to compact within the same task
      * (currently we're using Tez split grouping).
      */
-    if (!AcidUtils.isInsertOnlyTable(t.getParameters()) && HiveConf.getBoolVar(conf,
-        ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
-      if (ci.isMajorCompaction()) {
-        runCrudCompaction(conf, t, p, sd, writeIds, ci);
-        return;
-      } else {
-        throw new RuntimeException("Query based compaction is not currently supported for minor compactions");
-      }
-    }
-
-    if (AcidUtils.isInsertOnlyTable(t.getParameters())) {
-      if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
-        runMmCompaction(conf, t, p, sd, writeIds, ci);
-      }
+    QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci);
+    if (queryCompactor != null) {
+      queryCompactor.runCompaction(conf, t, p, sd, writeIds, ci);
       return;
     }
 
@@ -334,82 +305,6 @@ public class CompactorMR {
     su.gatherStats();
   }
 
-  /**
-   * @param sd (this is the resolved StorageDescriptor, i.e. resolved to table or partition)
-   * @param writeIds (valid write ids used to filter rows while they're being read for compaction)
-   * @throws IOException
-   */
-  private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
-      CompactionInfo ci) throws IOException {
-    AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters()));
-    AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(
-        false), false, t.getParameters(), false);
-
-    if (!isEnoughToCompact(dir, sd)) {
-      return;
-    }
-
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-    SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true);
-    // Set up the session for driver.
-    HiveConf conf = new HiveConf(hiveConf);
-    conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
-    /**
-     * For now, we will group splits on tez so that we end up with all bucket files, 
-     * with same bucket number in one map task.
-     */
-    conf.set(ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
-    String tmpPrefix = t.getDbName() + "_tmp_compactor_" + t.getTableName() + "_";
-    String tmpTableName = tmpPrefix + System.currentTimeMillis();
-    long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
-    try {
-      // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234
-      String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, t, sd);
-      LOG.info("Running major compaction query into temp table with create definition: {}", query);
-      try {
-        DriverUtils.runOnDriver(conf, user, sessionState, query);
-      } catch (Exception ex) {
-        Throwable cause = ex;
-        while (cause != null && !(cause instanceof AlreadyExistsException)) {
-          cause = cause.getCause();
-        }
-        if (cause == null) {
-          throw new IOException(ex);
-        }
-      }
-      query = buildCrudMajorCompactionQuery(conf, t, p, tmpTableName);
-      LOG.info("Running major compaction via query: {}", query);
-      /**
-       * This will create bucket files like:
-       * db/db_tmp_compactor_tbl_1234/00000_0
-       * db/db_tmp_compactor_tbl_1234/00001_0
-       */
-      DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId);
-      /**
-       * This achieves a final layout like (wid is the highest valid write id for this major compaction):
-       * db/tbl/ptn/base_wid/bucket_00000
-       * db/tbl/ptn/base_wid/bucket_00001
-       */
-      org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
-      String tmpLocation = tempTable.getSd().getLocation();
-      commitCrudMajorCompaction(t, tmpLocation, tmpTableName, sd.getLocation(), conf, writeIds, compactorTxnId);
-    } catch (HiveException e) {
-      LOG.error("Error doing query based major compaction", e);
-      throw new IOException(e);
-    } finally {
-      try {
-        DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName);
-      } catch (HiveException e) {
-        LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName);
-        LOG.error(ExceptionUtils.getStackTrace(e));
-      }
-    }
-  }
-
-  private static boolean isEnoughToCompact(AcidUtils.Directory dir, StorageDescriptor sd) {
-    return isEnoughToCompact(true, dir, sd);
-  }
-
   private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) {
     int deltaCount = dir.getCurrentDirectories().size();
     int origCount = dir.getOriginalFiles().size();
@@ -442,308 +337,9 @@ public class CompactorMR {
     return isEnoughToCompact;
   }
 
-  private void runMmCompaction(HiveConf conf, Table t, Partition p,
-      StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
-    LOG.debug("Going to delete directories for aborted transactions for MM table "
-        + t.getDbName() + "." + t.getTableName());
-    AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false),
-        false, t.getParameters(), false);
-    removeFilesForMmTable(conf, dir);
-
-    // Then, actually do the compaction.
-    if (!ci.isMajorCompaction()) {
-      // Not supported for MM tables right now.
-      LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction");
-      return;
-    }
-
-    if (!isEnoughToCompact(dir, sd)) {
-      return;
-    }
-
-    try {
-      String tmpLocation = generateTmpPath(sd);
-      Path baseLocation = new Path(tmpLocation, "_base");
-
-      // Set up the session for driver.
-      HiveConf driverConf = new HiveConf(conf);
-      driverConf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
-      driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused
-      //thinking it already has a txn opened
-
-      String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, true);
-
-      // Note: we could skip creating the table and just add table type stuff directly to the
-      //       "insert overwrite directory" command if there were no bucketing or list bucketing.
-      String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_";
-      String tmpTableName = null;
-      while (true) {
-        tmpTableName = tmpPrefix + System.currentTimeMillis();
-        String query = buildMmCompactionCtQuery(tmpTableName, t,
-            p == null ? t.getSd() : p.getSd(), baseLocation.toString());
-        LOG.info("Compacting a MM table into " + query);
-        try {
-          DriverUtils.runOnDriver(driverConf, user, sessionState, query);
-          break;
-        } catch (Exception ex) {
-          Throwable cause = ex;
-          while (cause != null && !(cause instanceof AlreadyExistsException)) {
-            cause = cause.getCause();
-          }
-          if (cause == null) {
-            throw new IOException(ex);
-          }
-        }
-      }
-      String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
-      LOG.info("Compacting a MM table via " + query);
-      long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
-      DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId);
-      commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds, compactorTxnId);
-      DriverUtils.runOnDriver(driverConf, user, sessionState,
-          "drop table if exists " + tmpTableName);
-    } catch (HiveException e) {
-      LOG.error("Error compacting a MM table", e);
-      throw new IOException(e);
-    }
-  }
-
   private String generateTmpPath(StorageDescriptor sd) {
     return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
   }
-  
-  /**
-   * Note on ordering of rows in the temp table:
-   * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending). 
-   * (current write id will be the same as original write id). 
-   * We will be achieving the ordering via a custom split grouper for compactor.
-   * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description.
-   * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}
-   *  for details on the mechanism.
-   */
-  private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t, StorageDescriptor sd) {
-    StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" (");
-    // Acid virtual columns
-    query.append(
-        "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<");
-    List<FieldSchema> cols = t.getSd().getCols();
-    boolean isFirst = true;
-    // Actual columns
-    for (FieldSchema col : cols) {
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("`").append(col.getName()).append("` ").append(":").append(col.getType());
-    }
-    query.append(">)");
-    query.append(" stored as orc");
-    query.append(" tblproperties ('transactional'='false')");
-    return query.toString();
-  }
-
-  private String buildCrudMajorCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
-    String fullName = t.getDbName() + "." + t.getTableName();
-    String query = "insert into table " + tmpName + " ";
-    String filter = "";
-    if (p != null) {
-      filter = filter + " where ";
-      List<String> vals = p.getValues();
-      List<FieldSchema> keys = t.getPartitionKeys();
-      assert keys.size() == vals.size();
-      for (int i = 0; i < keys.size(); ++i) {
-        filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'");
-      }
-    }
-    query += " select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, "
-        + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT(";
-    List<FieldSchema> cols = t.getSd().getCols();
-    for (int i = 0; i < cols.size(); ++i) {
-      query += (i == 0 ? "'" : ", '") + cols.get(i).getName() + "', " + cols.get(i).getName();
-    }
-    query += ") from " + fullName + filter;
-    return query;
-  }
-
-  /**
-   * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn.
-   * Since the temp table is a non-transactional table, it has file names in the "original" format.
-   * Also, due to split grouping in
-   * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)},
-   * we will end up with one file per bucket.
-   */
-  private void commitCrudMajorCompaction(Table t, String from, String tmpTableName, String to, HiveConf conf,
-      ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
-    Path fromPath = new Path(from);
-    Path toPath = new Path(to);
-    Path tmpTablePath = new Path(fromPath, tmpTableName);
-    FileSystem fs = fromPath.getFileSystem(conf);
-    // Assume the high watermark can be used as maximum transaction ID.
-    long maxTxn = actualWriteIds.getHighWatermark();
-    // Get a base_wid path which will be the new compacted base
-    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false)
-        .maximumWriteId(maxTxn).bucket(0).statementId(-1);
-    Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
-    if (!fs.exists(fromPath)) {
-      LOG.info("{} not found.  Assuming 0 splits. Creating {}", from, newBaseDir);
-      fs.mkdirs(newBaseDir);
-      return;
-    }
-    LOG.info("Moving contents of {} to {}", tmpTablePath, to);
-    /**
-     * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on
-     * TODO/ToThink:
-     * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination?
-     */
-    //    List<String> buckCols = t.getSd().getBucketCols();
-    FileStatus[] children = fs.listStatus(fromPath);
-    for (FileStatus filestatus : children) {
-      String originalFileName = filestatus.getPath().getName();
-      // This if() may not be required I think...
-      if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) {
-        int bucketId = AcidUtils.parseBucketId(filestatus.getPath());
-        options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn)
-            .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId);
-        Path finalBucketFile = AcidUtils.createFilename(toPath, options);
-        Hive.moveFile(conf, filestatus.getPath(), finalBucketFile, true, false, false);
-      }
-    }
-    fs.delete(fromPath, true);
-  }
-
-  private String buildMmCompactionCtQuery(
-      String fullName, Table t, StorageDescriptor sd, String location) {
-    StringBuilder query = new StringBuilder("create temporary table ")
-      .append(fullName).append("(");
-    List<FieldSchema> cols = t.getSd().getCols();
-    boolean isFirst = true;
-    for (FieldSchema col : cols) {
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("`").append(col.getName()).append("` ").append(col.getType());
-    }
-    query.append(") ");
-
-    // Bucketing.
-    List<String> buckCols = t.getSd().getBucketCols();
-    if (buckCols.size() > 0) {
-      query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
-      List<Order> sortCols = t.getSd().getSortCols();
-      if (sortCols.size() > 0) {
-        query.append("SORTED BY (");
-        isFirst = true;
-        for (Order sortCol : sortCols) {
-          if (!isFirst) {
-            query.append(", ");
-          }
-          isFirst = false;
-          query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
-        }
-        query.append(") ");
-      }
-      query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
-    }
-
-    // Stored as directories. We don't care about the skew otherwise.
-    if (t.getSd().isStoredAsSubDirectories()) {
-      SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
-      if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
-        query.append(" SKEWED BY (").append(
-            StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
-        isFirst = true;
-        for (List<String> colValues : skewedInfo.getSkewedColValues()) {
-          if (!isFirst) {
-            query.append(", ");
-          }
-          isFirst = false;
-          query.append("('").append(StringUtils.join("','", colValues)).append("')");
-        }
-        query.append(") STORED AS DIRECTORIES");
-      }
-    }
-
-    SerDeInfo serdeInfo = sd.getSerdeInfo();
-    Map<String, String> serdeParams = serdeInfo.getParameters();
-    query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(
-        serdeInfo.getSerializationLib())).append("'");
-    String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
-    assert sh == null; // Not supposed to be a compactable table.
-    if (!serdeParams.isEmpty()) {
-      ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
-    }
-    query.append("STORED AS INPUTFORMAT '").append(
-        HiveStringUtils.escapeHiveCommand(sd.getInputFormat())).append("' OUTPUTFORMAT '").append(
-        HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())).append("' LOCATION '").append(
-        HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
-    // Exclude all standard table properties.
-    Set<String> excludes = getHiveMetastoreConstants();
-    excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
-    isFirst = true;
-    for (Map.Entry<String, String> e : t.getParameters().entrySet()) {
-      if (e.getValue() == null) continue;
-      if (excludes.contains(e.getKey())) continue;
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("'").append(e.getKey()).append("'='").append(
-          HiveStringUtils.escapeHiveCommand(e.getValue())).append("'");
-    }
-    if (!isFirst) {
-      query.append(", ");
-    }
-    query.append("'transactional'='false')");
-    return query.toString();
-
-  }
-
-  private static Set<String> getHiveMetastoreConstants() {
-    HashSet<String> result = new HashSet<>();
-    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
-      if (!Modifier.isStatic(f.getModifiers())) continue;
-      if (!Modifier.isFinal(f.getModifiers())) continue;
-      if (!String.class.equals(f.getType())) continue;
-      f.setAccessible(true);
-      try {
-        result.add((String)f.get(null));
-      } catch (IllegalAccessException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return result;
-  }
-
-  private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
-    String fullName = t.getDbName() + "." + t.getTableName();
-    // TODO: ideally we should make a special form of insert overwrite so that we:
-    //       1) Could use fast merge path for ORC and RC.
-    //       2) Didn't have to create a table.
-
-    String query = "insert overwrite table " + tmpName + " ";
-    String filter = "";
-    if (p != null) {
-      filter = " where ";
-      List<String> vals = p.getValues();
-      List<FieldSchema> keys = t.getPartitionKeys();
-      assert keys.size() == vals.size();
-      for (int i = 0; i < keys.size(); ++i) {
-        filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'");
-      }
-      query += " select ";
-      // Use table descriptor for columns.
-      List<FieldSchema> cols = t.getSd().getCols();
-      for (int i = 0; i < cols.size(); ++i) {
-        query += (i == 0 ? "`" : ", `") + (cols.get(i).getName() + "`");
-      }
-    } else {
-      query += "select *";
-    }
-    query += " from "  + fullName + filter;
-    return query;
-  }
 
   /**
    * @param baseDir if not null, it's either table/partition root folder or base_xxxx.
@@ -1221,7 +817,7 @@ public class CompactorMR {
         deleteEventWriter.close(false);
       }
     }
-    private static long getCompactorTxnId(Configuration jobConf) {
+    static long getCompactorTxnId(Configuration jobConf) {
       String snapshot = jobConf.get(ValidTxnList.VALID_TXNS_KEY);
       if(Strings.isNullOrEmpty(snapshot)) {
         throw new IllegalStateException(ValidTxnList.VALID_TXNS_KEY + " not found for writing to "
@@ -1423,37 +1019,4 @@ public class CompactorMR {
       fs.delete(tmpLocation, true);
     }
   }
-
-  /**
-   * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
-   * @param from The temp directory used for compactor output. Not the actual base/delta.
-   * @param to The final directory; basically a SD directory. Not the actual base/delta.
-   * @param compactorTxnId txn that the compactor started
-   */
-  private void commitMmCompaction(String from, String to, Configuration conf,
-      ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException {
-    Path fromPath = new Path(from), toPath = new Path(to);
-    FileSystem fs = fromPath.getFileSystem(conf);
-    // Assume the high watermark can be used as maximum transaction ID.
-    //todo: is that true?  can it be aborted? does it matter for compaction? probably OK since
-    //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
-    long maxTxn = actualWriteIds.getHighWatermark();
-    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
-        .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1)
-        .visibilityTxnId(compactorTxnId);
-    Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
-    if (!fs.exists(fromPath)) {
-      LOG.info(from + " not found.  Assuming 0 splits. Creating " + newBaseDir);
-      fs.mkdirs(newBaseDir);
-      return;
-    }
-    LOG.info("Moving contents of " + from + " to " + to);
-    FileStatus[] children = fs.listStatus(fromPath);
-    if (children.length != 1) {
-      throw new IOException("Unexpected files in the source: " + Arrays.toString(children));
-    }
-    FileStatus dirPath = children[0];
-    fs.rename(dirPath.getPath(), newBaseDir);
-    fs.delete(fromPath, true);
-  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
new file mode 100644
index 0000000..10681c0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class responsible of running query based major compaction.
+ */
+class MajorQueryCompactor extends QueryCompactor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MajorQueryCompactor.class.getName());
+
+  @Override
+  void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
+      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
+    AcidUtils
+        .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
+    AcidUtils.Directory dir = AcidUtils
+        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
+            table.getParameters(), false);
+
+    if (!Util.isEnoughToCompact(true, dir, storageDescriptor)) {
+      return;
+    }
+
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true);
+    // Set up the session for driver.
+    HiveConf conf = new HiveConf(hiveConf);
+    conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+    /*
+     * For now, we will group splits on tez so that we end up with all bucket files,
+     * with same bucket number in one map task.
+     */
+    conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
+    String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_";
+    String tmpTableName = tmpPrefix + System.currentTimeMillis();
+    long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
+    try {
+      // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234
+      String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, table);
+      LOG.info("Running major compaction query into temp table with create definition: {}", query);
+      try {
+        DriverUtils.runOnDriver(conf, user, sessionState, query);
+      } catch (Exception ex) {
+        Throwable cause = ex;
+        while (cause != null && !(cause instanceof AlreadyExistsException)) {
+          cause = cause.getCause();
+        }
+        if (cause == null) {
+          throw new IOException(ex);
+        }
+      }
+      query = buildCrudMajorCompactionQuery(table, partition, tmpTableName);
+      LOG.info("Running major compaction via query: {}", query);
+      /*
+       * This will create bucket files like:
+       * db/db_tmp_compactor_tbl_1234/00000_0
+       * db/db_tmp_compactor_tbl_1234/00001_0
+       */
+      DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId);
+      /*
+       * This achieves a final layout like (wid is the highest valid write id for this major compaction):
+       * db/tbl/ptn/base_wid/bucket_00000
+       * db/tbl/ptn/base_wid/bucket_00001
+       */
+      org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
+      String tmpLocation = tempTable.getSd().getLocation();
+      commitCrudMajorCompaction(tmpLocation, tmpTableName, storageDescriptor.getLocation(), conf, writeIds,
+          compactorTxnId);
+    } catch (HiveException e) {
+      LOG.error("Error doing query based major compaction", e);
+      throw new IOException(e);
+    } finally {
+      try {
+        DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName);
+      } catch (HiveException e) {
+        LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName);
+        LOG.error(ExceptionUtils.getStackTrace(e));
+      }
+    }
+  }
+
+  /**
+   * Note on ordering of rows in the temp table:
+   * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending).
+   * (current write id will be the same as original write id).
+   * We will be achieving the ordering via a custom split grouper for compactor.
+   * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description.
+   * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}
+   *  for details on the mechanism.
+   */
+  private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t) {
+    StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" (");
+    // Acid virtual columns
+    query.append(
+        "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, "
+            + "`row` struct<");
+    List<FieldSchema> cols = t.getSd().getCols();
+    boolean isFirst = true;
+    // Actual columns
+    for (FieldSchema col : cols) {
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("`").append(col.getName()).append("` ").append(":").append(col.getType());
+    }
+    query.append(">)");
+    query.append(" stored as orc");
+    query.append(" tblproperties ('transactional'='false')");
+    return query.toString();
+  }
+
+  private String buildCrudMajorCompactionQuery(Table t, Partition p, String tmpName) {
+    String fullName = t.getDbName() + "." + t.getTableName();
+    StringBuilder query = new StringBuilder("insert into table " + tmpName + " ");
+    StringBuilder filter = new StringBuilder();
+    if (p != null) {
+      filter.append(" where ");
+      List<String> vals = p.getValues();
+      List<FieldSchema> keys = t.getPartitionKeys();
+      assert keys.size() == vals.size();
+      for (int i = 0; i < keys.size(); ++i) {
+        filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i))
+            .append("'");
+      }
+    }
+    query.append(" select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, "
+        + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT(");
+    List<FieldSchema> cols = t.getSd().getCols();
+    for (int i = 0; i < cols.size(); ++i) {
+      query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ").append(cols.get(i).getName());
+    }
+    query.append(") from ").append(fullName).append(filter);
+    return query.toString();
+  }
+
+  /**
+   * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn.
+   * Since the temp table is a non-transactional table, it has file names in the "original" format.
+   * Also, due to split grouping in
+   * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)},
+   * we will end up with one file per bucket.
+   */
+  private void commitCrudMajorCompaction(String from, String tmpTableName, String to, HiveConf conf,
+      ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
+    Path fromPath = new Path(from);
+    Path toPath = new Path(to);
+    Path tmpTablePath = new Path(fromPath, tmpTableName);
+    FileSystem fs = fromPath.getFileSystem(conf);
+    // Assume the high watermark can be used as maximum transaction ID.
+    long maxTxn = actualWriteIds.getHighWatermark();
+    // Get a base_wid path which will be the new compacted base
+    AcidOutputFormat.Options options =
+        new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0)
+            .statementId(-1);
+    Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+    if (!fs.exists(fromPath)) {
+      LOG.info("{} not found.  Assuming 0 splits. Creating {}", from, newBaseDir);
+      fs.mkdirs(newBaseDir);
+      return;
+    }
+    LOG.info("Moving contents of {} to {}", tmpTablePath, to);
+    /*
+     * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on
+     * TODO/ToThink:
+     * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination?
+     */
+    //    List<String> buckCols = t.getSd().getBucketCols();
+    FileStatus[] children = fs.listStatus(fromPath);
+    for (FileStatus filestatus : children) {
+      String originalFileName = filestatus.getPath().getName();
+      // This if() may not be required I think...
+      if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) {
+        int bucketId = AcidUtils.parseBucketId(filestatus.getPath());
+        options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn)
+            .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId);
+        Path finalBucketFile = AcidUtils.createFilename(toPath, options);
+        Hive.moveFile(conf, filestatus.getPath(), finalBucketFile, true, false, false);
+      }
+    }
+    fs.delete(fromPath, true);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
new file mode 100644
index 0000000..f7e0a85
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.ddl.table.creation.ShowCreateTableOperation;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class responsible to run query based major compaction on insert only tables.
+ */
+class MmMajorQueryCompactor extends QueryCompactor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MmMajorQueryCompactor.class.getName());
+
+  @Override
+  void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
+      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
+    LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table
+        .getTableName());
+    AcidUtils.Directory dir = AcidUtils
+        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
+            table.getParameters(), false);
+    removeFilesForMmTable(hiveConf, dir);
+
+    // Then, actually do the compaction.
+    if (!compactionInfo.isMajorCompaction()) {
+      // Not supported for MM tables right now.
+      LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction");
+      return;
+    }
+
+    if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) {
+      return;
+    }
+
+    try {
+      String tmpLocation = Util.generateTmpPath(storageDescriptor);
+      Path baseLocation = new Path(tmpLocation, "_base");
+
+      // Set up the session for driver.
+      HiveConf driverConf = new HiveConf(hiveConf);
+      driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+      driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused
+      //thinking it already has a txn opened
+
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, true);
+
+      // Note: we could skip creating the table and just add table type stuff directly to the
+      //       "insert overwrite directory" command if there were no bucketing or list bucketing.
+      String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_";
+      String tmpTableName;
+      while (true) {
+        tmpTableName = tmpPrefix + System.currentTimeMillis();
+        String query =
+            buildMmCompactionCtQuery(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
+                baseLocation.toString());
+        LOG.info("Compacting a MM table into " + query);
+        try {
+          DriverUtils.runOnDriver(driverConf, user, sessionState, query);
+          break;
+        } catch (Exception ex) {
+          Throwable cause = ex;
+          while (cause != null && !(cause instanceof AlreadyExistsException)) {
+            cause = cause.getCause();
+          }
+          if (cause == null) {
+            throw new IOException(ex);
+          }
+        }
+      }
+      String query = buildMmCompactionQuery(table, partition, tmpTableName);
+      LOG.info("Compacting a MM table via " + query);
+      long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(hiveConf);
+      DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId);
+      commitMmCompaction(tmpLocation, storageDescriptor.getLocation(), hiveConf, writeIds, compactorTxnId);
+      DriverUtils.runOnDriver(driverConf, user, sessionState, "drop table if exists " + tmpTableName);
+    } catch (HiveException e) {
+      LOG.error("Error compacting a MM table", e);
+      throw new IOException(e);
+    }
+  }
+
+  // Remove the directories for aborted transactions only
+  private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
+    // For MM table, we only want to delete delta dirs for aborted txns.
+    List<Path> filesToDelete = dir.getAbortedDirectories();
+    if (filesToDelete.size() < 1) {
+      return;
+    }
+    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
+    FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+    for (Path dead : filesToDelete) {
+      LOG.debug("Going to delete path " + dead.toString());
+      fs.delete(dead, true);
+    }
+  }
+
+  private String buildMmCompactionCtQuery(String fullName, Table t, StorageDescriptor sd, String location) {
+    StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("(");
+    List<FieldSchema> cols = t.getSd().getCols();
+    boolean isFirst = true;
+    for (FieldSchema col : cols) {
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("`").append(col.getName()).append("` ").append(col.getType());
+    }
+    query.append(") ");
+
+    // Bucketing.
+    List<String> buckCols = t.getSd().getBucketCols();
+    if (buckCols.size() > 0) {
+      query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+      List<Order> sortCols = t.getSd().getSortCols();
+      if (sortCols.size() > 0) {
+        query.append("SORTED BY (");
+        isFirst = true;
+        for (Order sortCol : sortCols) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
+        }
+        query.append(") ");
+      }
+      query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
+    }
+
+    // Stored as directories. We don't care about the skew otherwise.
+    if (t.getSd().isStoredAsSubDirectories()) {
+      SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
+      if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+        query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+        isFirst = true;
+        for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append("('").append(StringUtils.join("','", colValues)).append("')");
+        }
+        query.append(") STORED AS DIRECTORIES");
+      }
+    }
+
+    SerDeInfo serdeInfo = sd.getSerdeInfo();
+    Map<String, String> serdeParams = serdeInfo.getParameters();
+    query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
+        .append("'");
+    String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+    assert sh == null; // Not supposed to be a compactable table.
+    if (!serdeParams.isEmpty()) {
+      ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
+    }
+    query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
+        .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
+        .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
+    // Exclude all standard table properties.
+    Set<String> excludes = getHiveMetastoreConstants();
+    excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
+    isFirst = true;
+    for (Map.Entry<String, String> e : t.getParameters().entrySet()) {
+      if (e.getValue() == null) {
+        continue;
+      }
+      if (excludes.contains(e.getKey())) {
+        continue;
+      }
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue()))
+          .append("'");
+    }
+    if (!isFirst) {
+      query.append(", ");
+    }
+    query.append("'transactional'='false')");
+    return query.toString();
+
+  }
+
+  private String buildMmCompactionQuery(Table t, Partition p, String tmpName) {
+    String fullName = t.getDbName() + "." + t.getTableName();
+    // ideally we should make a special form of insert overwrite so that we:
+    // 1) Could use fast merge path for ORC and RC.
+    // 2) Didn't have to create a table.
+
+    StringBuilder query = new StringBuilder("insert overwrite table " + tmpName + " ");
+    StringBuilder filter = new StringBuilder();
+    if (p != null) {
+      filter = new StringBuilder(" where ");
+      List<String> vals = p.getValues();
+      List<FieldSchema> keys = t.getPartitionKeys();
+      assert keys.size() == vals.size();
+      for (int i = 0; i < keys.size(); ++i) {
+        filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i))
+            .append("'");
+      }
+      query.append(" select ");
+      // Use table descriptor for columns.
+      List<FieldSchema> cols = t.getSd().getCols();
+      for (int i = 0; i < cols.size(); ++i) {
+        query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`");
+      }
+    } else {
+      query.append("select *");
+    }
+    query.append(" from ").append(fullName).append(filter);
+    return query.toString();
+  }
+
+  /**
+   * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
+   * @param from The temp directory used for compactor output. Not the actual base/delta.
+   * @param to The final directory; basically a SD directory. Not the actual base/delta.
+   * @param compactorTxnId txn that the compactor started
+   */
+  private void commitMmCompaction(String from, String to, Configuration conf, ValidWriteIdList actualWriteIds,
+      long compactorTxnId) throws IOException {
+    Path fromPath = new Path(from), toPath = new Path(to);
+    FileSystem fs = fromPath.getFileSystem(conf);
+    // Assume the high watermark can be used as maximum transaction ID.
+    //todo: is that true?  can it be aborted? does it matter for compaction? probably OK since
+    //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
+    long maxTxn = actualWriteIds.getHighWatermark();
+    AcidOutputFormat.Options options =
+        new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0)
+            .statementId(-1).visibilityTxnId(compactorTxnId);
+    Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+    if (!fs.exists(fromPath)) {
+      LOG.info(from + " not found.  Assuming 0 splits. Creating " + newBaseDir);
+      fs.mkdirs(newBaseDir);
+      return;
+    }
+    LOG.info("Moving contents of " + from + " to " + to);
+    FileStatus[] children = fs.listStatus(fromPath);
+    if (children.length != 1) {
+      throw new IOException("Unexpected files in the source: " + Arrays.toString(children));
+    }
+    FileStatus dirPath = children[0];
+    fs.rename(dirPath.getPath(), newBaseDir);
+    fs.delete(fromPath, true);
+  }
+
+  private static Set<String> getHiveMetastoreConstants() {
+    Set<String> result = new HashSet<>();
+    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+      if (!Modifier.isStatic(f.getModifiers())) {
+        continue;
+      }
+      if (!Modifier.isFinal(f.getModifiers())) {
+        continue;
+      }
+      if (!String.class.equals(f.getType())) {
+        continue;
+      }
+      f.setAccessible(true);
+      try {
+        result.add((String) f.get(null));
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return result;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
new file mode 100644
index 0000000..80119de
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Common interface for query based compactions.
+ */
+abstract class QueryCompactor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(QueryCompactor.class.getName());
+  private static final String TMPDIR = "_tmp";
+
+  /**
+   * Start a query based compaction.
+   * @param hiveConf hive configuration
+   * @param table the table, where the compaction should run
+   * @param partition the partition, where the compaction should run
+   * @param storageDescriptor this is the resolved storage descriptor
+   * @param writeIds valid write IDs used to filter rows while they're being read for compaction
+   * @param compactionInfo provides info about the type of compaction
+   * @throws IOException compaction cannot be finished.
+   */
+  abstract void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
+      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException;
+
+  /**
+   * Collection of some helper functions.
+   */
+  static class Util {
+    /**
+     * Determine if compaction can run in a specified directory.
+     * @param isMajorCompaction type of compaction.
+     * @param dir the delta directory
+     * @param sd resolved storage descriptor
+     * @return true, if compaction can run.
+     */
+    static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) {
+      int deltaCount = dir.getCurrentDirectories().size();
+      int origCount = dir.getOriginalFiles().size();
+
+      StringBuilder deltaInfo = new StringBuilder().append(deltaCount);
+      boolean isEnoughToCompact;
+
+      if (isMajorCompaction) {
+        isEnoughToCompact = (origCount > 0 || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1);
+
+      } else {
+        isEnoughToCompact = (deltaCount > 1);
+
+        if (deltaCount == 2) {
+          Map<String, Long> deltaByType = dir.getCurrentDirectories().stream().collect(Collectors
+              .groupingBy(delta -> (delta.isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX),
+                  Collectors.counting()));
+
+          isEnoughToCompact = (deltaByType.size() != deltaCount);
+          deltaInfo.append(" ").append(deltaByType);
+        }
+      }
+
+      if (!isEnoughToCompact) {
+        LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", sd.getLocation(),
+            dir.getBaseDirectory(), deltaInfo, origCount);
+      }
+      return isEnoughToCompact;
+    }
+
+    /**
+     * Generate a random tmp path, under the provided storage.
+     * @param sd storage descriptor, must be not null.
+     * @return path, always not null
+     */
+    static String generateTmpPath(StorageDescriptor sd) {
+      return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
new file mode 100644
index 0000000..41cb4b6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+
+/**
+ * Simple factory class, which returns an instance of {@link QueryCompactor}.
+ */
+final class QueryCompactorFactory {
+
+  private QueryCompactorFactory() {
+  }
+
+  /**
+   * Get an instance of {@link QueryCompactor}.
+   * @param table the table, on which the compaction should be running, must be not null.
+   * @param configuration the hive configuration, must be not null.
+   * @param compactionInfo provides insight about the type of compaction, must be not null.
+   * @return {@link QueryCompactor} or null.
+   */
+  static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, CompactionInfo compactionInfo) {
+    if (!AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf
+        .getBoolVar(configuration, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
+      if (compactionInfo.isMajorCompaction()) {
+        return new MajorQueryCompactor();
+      } else {
+        throw new RuntimeException("Query based compaction is not currently supported for minor compactions");
+      }
+    }
+
+    if (AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf
+        .getBoolVar(configuration, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
+      return new MmMajorQueryCompactor();
+    }
+
+    return null;
+  }
+
+}