You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/03/26 09:35:55 UTC

[hive] branch master updated: HIVE-22875: Refactor query creation in QueryCompactor implementations (Karen Coppage, reviewed by Laszlo Pinter)

This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d71480  HIVE-22875: Refactor query creation in QueryCompactor implementations (Karen Coppage, reviewed by Laszlo Pinter)
8d71480 is described below

commit 8d71480aad8b13a67932ce01701235dc85fcab4e
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Thu Mar 26 10:28:09 2020 +0100

    HIVE-22875: Refactor query creation in QueryCompactor implementations (Karen Coppage, reviewed by Laszlo Pinter)
---
 .../ql/txn/compactor/TestMmCompactorOnTez.java     |   3 -
 .../ql/txn/compactor/CompactionQueryBuilder.java   | 601 +++++++++++++++++++++
 .../hive/ql/txn/compactor/MajorQueryCompactor.java |  71 +--
 .../hive/ql/txn/compactor/MinorQueryCompactor.java | 147 +++--
 .../ql/txn/compactor/MmMajorQueryCompactor.java    |  59 +-
 .../ql/txn/compactor/MmMinorQueryCompactor.java    | 112 ++--
 .../ql/txn/compactor/MmQueryCompactorUtils.java    | 200 -------
 .../hive/ql/txn/compactor/QueryCompactor.java      |  42 +-
 8 files changed, 787 insertions(+), 448 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
index 074430c..43a216b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
@@ -301,7 +301,6 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
     CompactorTestUtil.runCleaner(conf);
     verifySuccessulTxn(1);
-    List<ShowCompactResponseElement> compacts;
     // Insert test data into test table
     dataProvider.insertMmTestData(tableName);
     // Run a compaction
@@ -341,7 +340,6 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
     CompactorTestUtil.runCleaner(conf);
     verifySuccessulTxn(1);
-    List<ShowCompactResponseElement> compacts;
     // Verify delta directories after compaction
     Assert.assertEquals("Delta directories does not match after minor compaction",
         Collections.singletonList("delta_0000001_0000003_v0000007"),
@@ -382,7 +380,6 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
     CompactorTestUtil.runCleaner(conf);
     verifySuccessulTxn(1);
-    List<ShowCompactResponseElement> compacts;
     // Verify base directory after compaction
     Assert.assertEquals("Base directory does not match after major compaction",
         Collections.singletonList("base_0000003_v0000007"),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
new file mode 100644
index 0000000..6b3a4db
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Builds query strings that help with query-based compaction of CRUD and insert-only tables.
+ */
+class CompactionQueryBuilder {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CompactionQueryBuilder.class.getName());
+
+  // required fields, set in constructor
+  private final Operation operation;
+  private String resultTableName;
+
+  // required for some types of compaction. Required...
+  private Table sourceTab; // for Create and for Insert in CRUD and insert-only major
+  private StorageDescriptor storageDescriptor; // for Create in insert-only
+  private String location; // for Create
+  private ValidWriteIdList validWriteIdList; // for Alter/Insert in minor and CRUD
+  private AcidUtils.Directory dir; // for Alter in minor
+  private Partition sourcePartition; // for Insert in major and insert-only minor
+  private String fromTableName; // for Insert
+
+  // settable booleans
+  private boolean isPartitioned; // for Create
+  private boolean isBucketed; // for Create in CRUD
+  private boolean isDeleteDelta; // for Alter in CRUD minor
+
+  // internal use only, for legibility
+  private final boolean major;
+  private final boolean minor;
+  private final boolean crud;
+  private final boolean insertOnly;
+
+  enum CompactionType {
+    MAJOR_CRUD, MINOR_CRUD, MAJOR_INSERT_ONLY, MINOR_INSERT_ONLY
+  }
+
+  enum Operation {
+    CREATE, ALTER, INSERT, DROP
+  }
+
+  /**
+   * Set source table – the table to compact.
+   * Required for Create operations and for Insert operations in crud and insert-only major
+   * compaction.
+   *
+   * @param sourceTab table to compact, not null
+   */
+  CompactionQueryBuilder setSourceTab(Table sourceTab) {
+    this.sourceTab = sourceTab;
+    return this;
+  }
+
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  /**
+   * Set the location of the temp tables.
+   * Used for Create operations.
+   *
+   * @param location location of the temp tables
+   */
+  CompactionQueryBuilder setLocation(String location) {
+    this.location = location;
+    return this;
+  }
+
+  /**
+   * Set list of valid write ids.
+   * Required for Alter and Insert operations in crud minor compaction.
+   *
+   * @param validWriteIdList list of valid write ids, not null
+   */
+  CompactionQueryBuilder setValidWriteIdList(ValidWriteIdList validWriteIdList) {
+    this.validWriteIdList = validWriteIdList;
+    return this;
+  }
+
+  /**
+   * Set Acid Directory.
+   * Required for Alter operations in minor compaction.
+   *
+   * @param dir Acid Directory, not null
+   */
+  CompactionQueryBuilder setDir(AcidUtils.Directory dir) {
+    this.dir = dir;
+    return this;
+  }
+
+  /**
+   * Set partition to compact, if we are compacting a partition.
+   * Required for Insert operations in major and insert-only minor compaction.
+   */
+  CompactionQueryBuilder setSourcePartition(Partition sourcePartition) {
+    this.sourcePartition = sourcePartition;
+    return this;
+  }
+
+  /**
+   * Set table to select from.
+   * Required for Insert operations.
+   *
+   * @param fromTableName name of table to select from, not null
+   */
+  CompactionQueryBuilder setFromTableName(String fromTableName) {
+    this.fromTableName = fromTableName;
+    return this;
+  }
+
+  /**
+   * If true, Create operations will result in a table with partition column `file_name`.
+   */
+  CompactionQueryBuilder setPartitioned(boolean partitioned) {
+    isPartitioned = partitioned;
+    return this;
+  }
+
+  /**
+   * If true, Create operations for CRUD minor compaction will result in a bucketed table.
+   */
+  CompactionQueryBuilder setBucketed(boolean bucketed) {
+    isBucketed = bucketed;
+    return this;
+  }
+
+  /**
+   * If true, during CRUD minor compaction, Alter operations will result in the temp table's
+   * partitions pointing to delete delta directories as opposed to insert deltas' directories (see
+   * MinorQueryCompactor for details).
+   */
+  CompactionQueryBuilder setIsDeleteDelta(boolean deleteDelta) {
+    isDeleteDelta = deleteDelta;
+    return this;
+  }
+
+  /**
+   * Construct a CompactionQueryBuilder with required params.
+   *
+   * @param compactionType major or minor; crud or insert-only, e.g. CompactionType.MAJOR_CRUD.
+   *                       Cannot be null.
+   * @param operation query's Operation e.g. Operation.CREATE.
+   * @param resultTableName the name of the table we are running the operation on
+   * @throws IllegalArgumentException if compactionType is null
+   */
+  CompactionQueryBuilder(CompactionType compactionType, Operation operation,
+      String resultTableName) {
+    if (compactionType == null) {
+      throw new IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be null");
+    }
+    this.operation = operation;
+    this.resultTableName = resultTableName;
+    major = compactionType == CompactionType.MAJOR_CRUD
+        || compactionType == CompactionType.MAJOR_INSERT_ONLY;
+    crud =
+        compactionType == CompactionType.MAJOR_CRUD || compactionType == CompactionType.MINOR_CRUD;
+    minor = !major;
+    insertOnly = !crud;
+  }
+
+  /**
+   * Build the query string based on parameters.
+   *
+   * @return query String
+   */
+  String build() {
+    StringBuilder query = new StringBuilder(operation.toString());
+
+    if (operation == Operation.CREATE) {
+      query.append(" temporary external");
+    }
+    if (operation == Operation.INSERT) {
+      query.append(" into");
+    }
+    query.append(" table ");
+
+    if (operation == Operation.DROP) {
+      query.append("if exists ");
+    }
+
+    query.append(resultTableName);
+
+    switch (operation) {
+    case CREATE:
+      getDdlForCreate(query);
+      break;
+    case ALTER:
+      buildAddClauseForAlter(query);
+      break;
+    case INSERT:
+      query.append(" select ");
+      buildSelectClauseForInsert(query);
+      query.append(" from ")
+          .append(fromTableName);
+      buildWhereClauseForInsert(query);
+      break;
+    case DROP:
+    default:
+    }
+
+    return query.toString();
+  }
+
+  private void buildAddClauseForAlter(StringBuilder query) {
+    if (validWriteIdList == null || dir == null) {
+      query.setLength(0);
+      return;  // avoid NPEs, don't throw an exception but return an empty query
+    }
+    long minWriteID =
+        validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
+    long highWatermark = validWriteIdList.getHighWatermark();
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
+        delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark
+            && delta.getMinWriteId() >= minWriteID)
+        .collect(Collectors.toList());
+    if (deltas.isEmpty()) {
+      query.setLength(0); // no alter query needed; clear StringBuilder
+      return;
+    }
+    query.append(" add ");
+    deltas.forEach(delta -> query.append("partition (file_name='")
+        .append(delta.getPath().getName()).append("')"
+            + " location '").append(delta.getPath()).append("' "));
+  }
+
+
+  private void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns for major crud, mmmajor partitioned, mmminor
+    List<FieldSchema> cols;
+    if (major && crud || major && insertOnly && sourcePartition != null || minor && insertOnly) {
+      if (sourceTab == null) {
+        return; // avoid NPEs, don't throw an exception but skip this part of the query
+      }
+      cols = sourceTab.getSd().getCols();
+    } else {
+      cols = null;
+    }
+
+    if (crud) {
+      if (major) {
+        query.append(
+            "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), "
+                + "ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, "
+                + "NAMED_STRUCT(");
+        for (int i = 0; i < cols.size(); ++i) {
+          query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ")
+              .append(cols.get(i).getName());
+        }
+        query.append(") ");
+      } else { //minor
+        query.append(
+            "`operation`, `originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row`");
+      }
+
+    } else { // mm
+      if (major) {
+        if (sourcePartition != null) { //mmmajor and partitioned
+          for (int i = 0; i < cols.size(); ++i) {
+            query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`");
+          }
+        } else { // mmmajor and unpartitioned
+          query.append("*");
+        }
+      } else { // mmminor
+        for (int i = 0; i < cols.size(); ++i) {
+          query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`");
+        }
+      }
+    }
+  }
+
+  private void buildWhereClauseForInsert(StringBuilder query) {
+    if (major && sourcePartition != null && sourceTab != null) {
+      List<String> vals = sourcePartition.getValues();
+      List<FieldSchema> keys = sourceTab.getPartitionKeys();
+      if (keys.size() != vals.size()) {
+        throw new IllegalStateException("source partition values ("
+            + Arrays.toString(vals.toArray()) + ") do not match source table values ("
+            + Arrays.toString(keys.toArray()) + "). Failing compaction.");
+      }
+
+      query.append(" where ");
+      for (int i = 0; i < keys.size(); ++i) {
+        query.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='")
+            .append(vals.get(i)).append("'");
+      }
+    }
+
+    if (minor && crud && validWriteIdList != null) {
+      long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds();
+      if (invalidWriteIds.length > 0) {
+        query.append(" where `originalTransaction` not in (").append(
+            org.apache.commons.lang3.StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ","))
+            .append(")");
+      }
+    }
+  }
+
+  private void getDdlForCreate(StringBuilder query) {
+    defineColumns(query);
+
+    // PARTITIONED BY. Used for parts of minor compaction.
+    if (isPartitioned) {
+      query.append(" PARTITIONED BY (`file_name` STRING) ");
+    }
+
+    // CLUSTERED BY. (bucketing)
+    int bucketingVersion = 0;
+    if (crud && minor) {
+      bucketingVersion = getMinorCrudBucketing(query, bucketingVersion);
+    } else if (insertOnly) {
+      getMmBucketing(query);
+    }
+
+    // SKEWED BY
+    if (insertOnly) {
+      getSkewedByClause(query);
+    }
+
+    // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+    if (crud) {
+      query.append(" stored as orc");
+    } else {
+      copySerdeFromSourceTable(query);
+    }
+
+    // LOCATION
+    if (location != null) {
+      query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+    }
+
+    // TBLPROPERTIES
+    addTblProperties(query, bucketingVersion);
+  }
+
+  /**
+   * Define columns of the create query.
+   */
+  private void defineColumns(StringBuilder query) {
+    if (sourceTab == null) {
+      return; // avoid NPEs, don't throw an exception but skip this part of the query
+    }
+    query.append("(");
+    if (crud) {
+      query.append(
+          "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, "
+              + "`currentTransaction` bigint, `row` struct<");
+    }
+    List<FieldSchema> cols = sourceTab.getSd().getCols();
+    boolean isFirst = true;
+    for (FieldSchema col : cols) {
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("`").append(col.getName()).append("` ");
+      query.append(crud ? ":" : "");
+      query.append(col.getType());
+    }
+    query.append(crud ? ">" : "");
+    query.append(") ");
+  }
+
+  /**
+   * Part of Create operation. Copy source table bucketing for insert-only compaction.
+   */
+  private void getMmBucketing(StringBuilder query) {
+    if (sourceTab == null) {
+      return; // avoid NPEs, don't throw an exception but skip this part of the query
+    }
+    boolean isFirst;
+    List<String> buckCols = sourceTab.getSd().getBucketCols();
+    if (buckCols.size() > 0) {
+      query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+      List<Order> sortCols = sourceTab.getSd().getSortCols();
+      if (sortCols.size() > 0) {
+        query.append("SORTED BY (");
+        isFirst = true;
+        for (Order sortCol : sortCols) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append(sortCol.getCol()).append(" ")
+              .append(DirectionUtils.codeToText(sortCol.getOrder()));
+        }
+        query.append(") ");
+      }
+      query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS");
+    }
+  }
+
+  /**
+   * Part of Create operation. Minor crud compaction uses its own bucketing system.
+   */
+  private int getMinorCrudBucketing(StringBuilder query, int bucketingVersion) {
+    if (isBucketed && sourceTab != null) { // skip if sourceTab is null to avoid NPEs
+      int numBuckets = 1;
+      try {
+        org.apache.hadoop.hive.ql.metadata.Table t =
+            Hive.get().getTable(sourceTab.getDbName(), sourceTab.getTableName());
+        numBuckets = Math.max(t.getNumBuckets(), numBuckets);
+        bucketingVersion = t.getBucketingVersion();
+      } catch (HiveException e) {
+        LOG.info("Error finding table {}. Minor compaction result will use 0 buckets.",
+            sourceTab.getTableName());
+      } finally {
+        query.append(" clustered by (`bucket`)")
+            .append(" sorted by (`bucket`, `originalTransaction`, `rowId`)")
+            .append(" into ").append(numBuckets).append(" buckets");
+      }
+    }
+    return bucketingVersion;
+  }
+
+  /**
+   * Part of Create operation. Insert-only compaction tables copy source tables.
+   */
+  private void getSkewedByClause(StringBuilder query) {
+    if (sourceTab == null) {
+      return; // avoid NPEs, don't throw an exception but skip this part of the query
+    }
+    boolean isFirst; // Stored as directories. We don't care about the skew otherwise.
+    if (sourceTab.getSd().isStoredAsSubDirectories()) {
+      SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
+      if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+        query.append(" SKEWED BY (")
+            .append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+        isFirst = true;
+        for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append("('").append(StringUtils.join("','", colValues)).append("')");
+        }
+        query.append(") STORED AS DIRECTORIES");
+      }
+    }
+  }
+
+  /**
+   * Part of Create operation. Insert-only compaction tables copy source tables' serde.
+   */
+  private void copySerdeFromSourceTable(StringBuilder query) {
+    if (storageDescriptor == null) {
+      return; // avoid NPEs, don't throw an exception but skip this part of the query
+    }
+    ensureTableToCompactIsNative();
+    SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
+    Map<String, String> serdeParams = serdeInfo.getParameters();
+    query.append(" ROW FORMAT SERDE '")
+        .append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())).append("'");
+    // WITH SERDEPROPERTIES
+    if (!serdeParams.isEmpty()) {
+      ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
+    }
+    query.append("STORED AS INPUTFORMAT '")
+        .append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getInputFormat())).append("'")
+        .append(" OUTPUTFORMAT '")
+        .append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getOutputFormat()))
+        .append("'");
+  }
+
+  /**
+   * Part of Create operation. All tmp tables are not transactional and are marked as
+   * compaction tables. Additionally...
+   * - Crud compaction temp tables need tblproperty, "compactiontable."
+   * - Minor crud compaction temp tables need bucketing version tblproperty, if table is bucketed.
+   * - Insert-only compaction tables copy source tables' tblproperties, except metastore/statistics
+   *   properties.
+   */
+  private void addTblProperties(StringBuilder query, int bucketingVersion) {
+    Map<String, String> tblProperties = new HashMap<>();
+    tblProperties.put("transactional", "false");
+    if (crud) {
+      tblProperties.put(AcidUtils.COMPACTOR_TABLE_PROPERTY, "true");
+    }
+    if (crud && minor && isBucketed) {
+      tblProperties.put("bucketing_version", String.valueOf(bucketingVersion));
+    }
+    if (insertOnly && sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is null
+      // Exclude all standard table properties.
+      Set<String> excludes = getHiveMetastoreConstants();
+      excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
+      for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) {
+        if (e.getValue() == null) {
+          continue;
+        }
+        if (excludes.contains(e.getKey())) {
+          continue;
+        }
+        tblProperties.put(e.getKey(), HiveStringUtils.escapeHiveCommand(e.getValue()));
+      }
+    }
+
+    // add TBLPROPERTIES clause to query
+    boolean isFirst;
+    query.append(" TBLPROPERTIES (");
+    isFirst = true;
+    for (Map.Entry<String, String> property : tblProperties.entrySet()) {
+      if (!isFirst) {
+        query.append(", ");
+      }
+      query.append("'").append(property.getKey()).append("'='").append(property.getValue())
+          .append("'");
+      isFirst = false;
+    }
+    query.append(")");
+  }
+
+  private static Set<String> getHiveMetastoreConstants() {
+    Set<String> result = new HashSet<>();
+    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+      if (!Modifier.isFinal(f.getModifiers())) {
+        continue;
+      }
+      if (!String.class.equals(f.getType())) {
+        continue;
+      }
+      f.setAccessible(true);
+      try {
+        result.add((String) f.get(null));
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return result;
+  }
+
+  private void ensureTableToCompactIsNative() {
+    if (sourceTab == null) {
+      return;
+    }
+    String storageHandler =
+        sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+    if (storageHandler != null) {
+      String message = "Table " + sourceTab.getTableName() + "has a storage handler ("
+          + storageHandler + "). Failing compaction for this non-native table.";
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 9385080..f47c23a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -21,14 +21,12 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import java.io.IOException;
 import java.util.List;
@@ -40,7 +38,7 @@ final class MajorQueryCompactor extends QueryCompactor {
 
   @Override
   void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
-      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException {
+      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
     AcidUtils
         .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
 
@@ -83,57 +81,34 @@ final class MajorQueryCompactor extends QueryCompactor {
    * (current write id will be the same as original write id).
    * We will be achieving the ordering via a custom split grouper for compactor.
    * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description.
-   * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}
-   *  for details on the mechanism.
    */
-  private List<String> getCreateQueries(String fullName, Table t, String tmpTableLocation) throws HiveException {
-    StringBuilder query = new StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(fullName, t));
-    org.apache.hadoop.hive.ql.metadata.Table table = Hive.get().getTable(t.getDbName(), t.getTableName(), false);
-    int numBuckets = 1;
-    int bucketingVersion = 0;
-    if (table != null) {
-      numBuckets = Math.max(table.getNumBuckets(), numBuckets);
-      bucketingVersion = table.getBucketingVersion();
-    }
-    query.append(" clustered by (`bucket`) into ").append(numBuckets).append(" buckets");
-    query.append(" stored as orc");
-    query.append(" location '");
-    query.append(tmpTableLocation);
-    query.append("' tblproperties ('transactional'='false',");
-    query.append(" 'bucketing_version'='");
-    query.append(bucketingVersion);
-    query.append("','");
-    query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY);
-    query.append("'='true'");
-    query.append(")");
-    return Lists.newArrayList(query.toString());
+  private List<String> getCreateQueries(String fullName, Table t, String tmpTableLocation) {
+    return Lists.newArrayList(new CompactionQueryBuilder(
+        CompactionQueryBuilder.CompactionType.MAJOR_CRUD,
+        CompactionQueryBuilder.Operation.CREATE,
+        fullName)
+        .setSourceTab(t)
+        .setLocation(tmpTableLocation)
+        .build());
   }
 
   private List<String> getCompactionQueries(Table t, Partition p, String tmpName) {
-    String fullName = t.getDbName() + "." + t.getTableName();
-    StringBuilder query = new StringBuilder("insert into table " + tmpName + " ");
-    StringBuilder filter = new StringBuilder();
-    if (p != null) {
-      filter.append(" where ");
-      List<String> vals = p.getValues();
-      List<FieldSchema> keys = t.getPartitionKeys();
-      assert keys.size() == vals.size();
-      for (int i = 0; i < keys.size(); ++i) {
-        filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i))
-            .append("'");
-      }
-    }
-    query.append(" select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, "
-        + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT(");
-    List<FieldSchema> cols = t.getSd().getCols();
-    for (int i = 0; i < cols.size(); ++i) {
-      query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ").append(cols.get(i).getName());
-    }
-    query.append(") from ").append(fullName).append(filter);
-    return Lists.newArrayList(query.toString());
+    return Lists.newArrayList(
+        new CompactionQueryBuilder(
+            CompactionQueryBuilder.CompactionType.MAJOR_CRUD,
+            CompactionQueryBuilder.Operation.INSERT,
+            tmpName)
+            .setFromTableName(t.getTableName())
+            .setSourceTab(t)
+            .setSourcePartition(p)
+        .build());
   }
 
   private List<String> getDropQueries(String tmpTableName) {
-    return Lists.newArrayList("drop table if exists " + tmpTableName);
+    return Lists.newArrayList(
+        new CompactionQueryBuilder(
+            CompactionQueryBuilder.CompactionType.MAJOR_CRUD,
+            CompactionQueryBuilder.Operation.DROP,
+            tmpTableName).build());
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index 01cd2fc..1bf0bee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -17,8 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -28,7 +27,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
@@ -37,8 +35,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * Class responsible for handling query based minor compaction.
@@ -66,7 +62,7 @@ final class MinorQueryCompactor extends QueryCompactor {
         table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis();
 
     List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, conf, storageDescriptor);
-    List<String> compactionQueries = getCompactionQueries(tmpTableName, writeIds.getInvalidWriteIds());
+    List<String> compactionQueries = getCompactionQueries(tmpTableName, table, writeIds);
     List<String> dropQueries = getDropQueries(tmpTableName);
 
     runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries,
@@ -98,7 +94,7 @@ final class MinorQueryCompactor extends QueryCompactor {
    * @return list of create/alter queries, always non-null
    */
   private List<String> getCreateQueries(Table table, String tempTableBase, AcidUtils.Directory dir,
-      ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor storageDescriptor) throws HiveException {
+      ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor storageDescriptor) {
     List<String> queries = new ArrayList<>();
     long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
     long highWatermark = writeIds.getHighWatermark();
@@ -106,7 +102,10 @@ final class MinorQueryCompactor extends QueryCompactor {
     // create delta temp table
     String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase;
     queries.add(buildCreateTableQuery(table, tmpTableName, true, false, null));
-    buildAlterTableQuery(tmpTableName, dir, writeIds, false).ifPresent(queries::add);
+    String alterQuery = buildAlterTableQuery(tmpTableName, dir, writeIds, false);
+    if (!alterQuery.isEmpty()) {
+      queries.add(alterQuery);
+    }
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(false)
         .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
         .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
@@ -120,7 +119,11 @@ final class MinorQueryCompactor extends QueryCompactor {
     // create delete delta temp tables
     String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase;
     queries.add(buildCreateTableQuery(table, tmpDeleteTableName,  true, false, null));
-    buildAlterTableQuery(tmpDeleteTableName, dir, writeIds, true).ifPresent(queries::add);
+
+    alterQuery = buildAlterTableQuery(tmpDeleteTableName, dir, writeIds, true);
+    if (!alterQuery.isEmpty()) {
+      queries.add(alterQuery);
+    }
     options = new AcidOutputFormat.Options(conf).writingBase(false).writingDeleteDelta(true).isCompressed(false)
         .minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
     String tmpTableDeleteResultLocation = AcidUtils.baseOrDeltaSubdirPath(location,
@@ -151,40 +154,16 @@ final class MinorQueryCompactor extends QueryCompactor {
    * </p>
    */
   private String buildCreateTableQuery(Table table, String newTableName, boolean isPartitioned,
-      boolean isBucketed, String location) throws HiveException {
-    StringBuilder query = new StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(newTableName, table));
-    if (isPartitioned) {
-      query.append(" partitioned by (`file_name` string)");
-    }
-    int bucketingVersion = 0;
-    if (isBucketed) {
-      int numBuckets = 1;
-      org.apache.hadoop.hive.ql.metadata.Table t = Hive.get().getTable(table.getDbName(), table.getTableName(), false);
-      if (t != null) {
-        numBuckets = Math.max(t.getNumBuckets(), numBuckets);
-        bucketingVersion = t.getBucketingVersion();
-      }
-      query.append(" clustered by (`bucket`)").append(" sorted by (`bucket`, `originalTransaction`, `rowId`)")
-              .append(" into ").append(numBuckets).append(" buckets");
-    }
-    query.append(" stored as orc");
-    if (location != null && !location.isEmpty()) {
-      query.append(" location '");
-      query.append(location);
-      query.append("'");
-    }
-    query.append(" tblproperties ('transactional'='false'");
-    query.append(", '");
-    query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY);
-    query.append("'='true'");
-    if (isBucketed) {
-      query.append(", 'bucketing_version'='")
-          .append(bucketingVersion)
-          .append("')");
-    } else {
-      query.append(")");
-    }
-    return query.toString();
+      boolean isBucketed, String location) {
+    return new CompactionQueryBuilder(
+        CompactionQueryBuilder.CompactionType.MINOR_CRUD,
+        CompactionQueryBuilder.Operation.CREATE,
+        newTableName)
+        .setSourceTab(table)
+        .setBucketed(isBucketed)
+        .setPartitioned(isPartitioned)
+        .setLocation(location)
+        .build();
   }
 
   /**
@@ -194,44 +173,36 @@ final class MinorQueryCompactor extends QueryCompactor {
    * @param validWriteIdList list of valid write IDs
    * @param isDeleteDelta if true, only the delete delta directories will be mapped as new partitions, otherwise only
    *                      the delta directories
-   * @return alter table statement wrapped in {@link Optional}.
+   * @return alter table statement
    */
-  private Optional<String> buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
+  private String buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
       ValidWriteIdList validWriteIdList, boolean isDeleteDelta) {
-    // add partitions
-    if (!dir.getCurrentDirectories().isEmpty()) {
-      long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
-      long highWatermark = validWriteIdList.getHighWatermark();
-      List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
-          delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark
-              && delta.getMinWriteId() >= minWriteID)
-          .collect(Collectors.toList());
-      if (!deltas.isEmpty()) {
-        StringBuilder query = new StringBuilder().append("alter table ").append(tableName);
-        query.append(" add ");
-        deltas.forEach(
-            delta -> query.append("partition (file_name='").append(delta.getPath().getName()).append("') location '")
-                .append(delta.getPath()).append("' "));
-        return Optional.of(query.toString());
-      }
-    }
-    return Optional.empty();
+    return new CompactionQueryBuilder(
+        CompactionQueryBuilder.CompactionType.MINOR_CRUD,
+        CompactionQueryBuilder.Operation.ALTER,
+        tableName)
+        .setDir(dir)
+        .setValidWriteIdList(validWriteIdList)
+        .setIsDeleteDelta(isDeleteDelta)
+        .build();
   }
 
   /**
    * Get a list of compaction queries which fills up the delta/delete-delta temporary result tables.
    * @param tmpTableBase an unique identifier, which helps to find all the temporary tables
-   * @param invalidWriteIds list of invalid write IDs. This list is used to filter out aborted/open transactions
+   * @param table
+   * @param validWriteIdList list of valid write IDs. This list is used to filter out aborted/open
+   *                         transactions
    * @return list of compaction queries, always non-null
    */
-  private List<String> getCompactionQueries(String tmpTableBase, long[] invalidWriteIds) {
+  private List<String> getCompactionQueries(String tmpTableBase, Table table, ValidWriteIdList validWriteIdList) {
     List<String> queries = new ArrayList<>();
     String sourceTableName = AcidUtils.DELTA_PREFIX + tmpTableBase;
     String resultTableName = sourceTableName + "_result";
-    queries.add(buildCompactionQuery(sourceTableName, resultTableName, invalidWriteIds));
+    queries.add(buildCompactionQuery(sourceTableName, resultTableName, table, validWriteIdList));
     String sourceDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase;
     String resultDeleteTableName = sourceDeleteTableName + "_result";
-    queries.add(buildCompactionQuery(sourceDeleteTableName, resultDeleteTableName, invalidWriteIds));
+    queries.add(buildCompactionQuery(sourceDeleteTableName, resultDeleteTableName, table, validWriteIdList));
     return queries;
   }
 
@@ -240,19 +211,20 @@ final class MinorQueryCompactor extends QueryCompactor {
    * it into the result table, filtering out all rows which belong to open/aborted transactions.
    * @param sourceTableName the name of the source table
    * @param resultTableName the name of the result table
-   * @param invalidWriteIds list of invalid write IDs
+   * @param table the table to compact
+   * @param validWriteIdList list of valid write IDs
    * @return compaction query, always non-null
    */
-  private String buildCompactionQuery(String sourceTableName, String resultTableName, long[] invalidWriteIds) {
-    StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName)
-        .append(" select `operation`, `originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row` from ")
-        .append(sourceTableName);
-    if (invalidWriteIds.length > 0) {
-      query.append(" where `originalTransaction` not in (")
-          .append(StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")).append(")");
-    }
-
-    return query.toString();
+  private String buildCompactionQuery(String sourceTableName, String resultTableName, Table table,
+      ValidWriteIdList validWriteIdList) {
+    return new CompactionQueryBuilder(
+        CompactionQueryBuilder.CompactionType.MINOR_CRUD,
+        CompactionQueryBuilder.Operation.INSERT,
+        resultTableName)
+        .setFromTableName(sourceTableName)
+        .setSourceTab(table)
+        .setValidWriteIdList(validWriteIdList)
+        .build();
   }
 
   /**
@@ -261,12 +233,17 @@ final class MinorQueryCompactor extends QueryCompactor {
    * @return list of drop table statements, always non-null
    */
   private List<String> getDropQueries(String tmpTableBase) {
-    List<String> queries = new ArrayList<>();
-    String dropStm = "drop table if exists ";
-    queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase);
-    queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase);
-    queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase + "_result");
-    queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + "_result");
-    return queries;
+    return Lists.newArrayList(
+        getDropQuery(AcidUtils.DELTA_PREFIX + tmpTableBase),
+        getDropQuery(AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase),
+        getDropQuery(AcidUtils.DELTA_PREFIX + tmpTableBase + "_result"),
+        getDropQuery(AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + "_result"));
+  }
+
+  private String getDropQuery(String tableToDrop) {
+    return new CompactionQueryBuilder(
+        CompactionQueryBuilder.CompactionType.MINOR_CRUD,
+        CompactionQueryBuilder.Operation.DROP,
+        tableToDrop).build();
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 41fdd7e..114b6f7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -52,7 +51,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
     AcidUtils.Directory dir = AcidUtils
         .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
             table.getParameters(), false);
-    MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir);
+    QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
 
     String tmpLocation = Util.generateTmpPath(storageDescriptor);
     Path baseLocation = new Path(tmpLocation, "_base");
@@ -106,42 +105,36 @@ final class MmMajorQueryCompactor extends QueryCompactor {
 
   private List<String> getCreateQueries(String tmpTableName, Table table,
       StorageDescriptor storageDescriptor, String baseLocation) {
-    return Lists.newArrayList(MmQueryCompactorUtils
-        .getCreateQuery(tmpTableName, table, storageDescriptor, baseLocation, false, false));
+    return Lists.newArrayList(
+        new CompactionQueryBuilder(
+            CompactionQueryBuilder.CompactionType.MAJOR_INSERT_ONLY,
+            CompactionQueryBuilder.Operation.CREATE,
+            tmpTableName)
+            .setSourceTab(table)
+            .setStorageDescriptor(storageDescriptor)
+            .setLocation(baseLocation)
+            .build()
+    );
   }
 
   private List<String> getCompactionQueries(Table t, Partition p, String tmpName) {
-    String fullName = t.getDbName() + "." + t.getTableName();
-    // ideally we should make a special form of insert overwrite so that we:
-    // 1) Could use fast merge path for ORC and RC.
-    // 2) Didn't have to create a table.
-
-    StringBuilder query = new StringBuilder("insert overwrite table " + tmpName + " ");
-    StringBuilder filter = new StringBuilder();
-    if (p != null) {
-      filter = new StringBuilder(" where ");
-      List<String> vals = p.getValues();
-      List<FieldSchema> keys = t.getPartitionKeys();
-      assert keys.size() == vals.size();
-      for (int i = 0; i < keys.size(); ++i) {
-        filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i))
-            .append("'");
-      }
-      query.append(" select ");
-      // Use table descriptor for columns.
-      List<FieldSchema> cols = t.getSd().getCols();
-      for (int i = 0; i < cols.size(); ++i) {
-        query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`");
-      }
-    } else {
-      query.append("select *");
-    }
-    query.append(" from ").append(fullName).append(filter);
-    return Lists.newArrayList(query.toString());
+    return Lists.newArrayList(
+        new CompactionQueryBuilder(
+            CompactionQueryBuilder.CompactionType.MAJOR_INSERT_ONLY,
+            CompactionQueryBuilder.Operation.INSERT,
+            tmpName)
+            .setSourceTab(t)
+            .setFromTableName(t.getTableName())
+            .setSourcePartition(p)
+            .build()
+    );
   }
 
   private List<String> getDropQueries(String tmpTableName) {
-    return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableName);
+    return Lists.newArrayList(
+        new CompactionQueryBuilder(
+            CompactionQueryBuilder.CompactionType.MAJOR_INSERT_ONLY,
+            CompactionQueryBuilder.Operation.DROP,
+            tmpTableName).build());
   }
-
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
index feb667c..383891b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -36,10 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * Run a minor query compaction on an insert only (MM) table.
@@ -58,7 +54,7 @@ final class MmMinorQueryCompactor extends QueryCompactor {
     AcidUtils.Directory dir = AcidUtils
         .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
             Ref.from(false), false, table.getParameters(), false);
-    MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir);
+    QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
     String tmpLocation = Util.generateTmpPath(storageDescriptor);
     Path sourceTabLocation = new Path(tmpLocation);
     Path resultTabLocation = new Path(tmpLocation, "_result");
@@ -66,14 +62,15 @@ final class MmMinorQueryCompactor extends QueryCompactor {
     HiveConf driverConf = setUpDriverSession(hiveConf);
 
     String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_";
-    String tmpTableBase = tmpPrefix + System.currentTimeMillis();
+    String tmpTableName = tmpPrefix + System.currentTimeMillis();
+    String resultTmpTableName = tmpTableName + "_result";
 
     List<String> createTableQueries =
-        getCreateQueries(tmpTableBase, table, partition == null ? table.getSd() : partition.getSd(),
+        getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
             sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds);
-    List<String> compactionQueries = getCompactionQueries(tmpTableBase, table.getSd());
-    List<String> dropQueries = getDropQueries(tmpTableBase);
-    runCompactionQueries(driverConf, tmpTableBase, storageDescriptor, writeIds, compactionInfo,
+    List<String> compactionQueries = getCompactionQueries(tmpTableName, resultTmpTableName, table);
+    List<String> dropQueries = getDropQueries(tmpTableName);
+    runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
         createTableQueries, compactionQueries, dropQueries);
   }
 
@@ -126,42 +123,45 @@ final class MmMinorQueryCompactor extends QueryCompactor {
   private List<String> getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd,
       String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir,
       ValidWriteIdList validWriteIdList) {
-    List<String> queries = new ArrayList<>();
-    queries.add(
-        MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true, true));
-    buildAlterTableQuery(tmpTableBase, dir, validWriteIdList).ifPresent(queries::add);
-    queries.add(MmQueryCompactorUtils
-        .getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false, false));
+    List<String> queries = Lists.newArrayList(
+        getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true),
+        getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false)
+    );
+    String alterQuery = buildAlterTableQuery(tmpTableBase, dir, validWriteIdList);
+    if (!alterQuery.isEmpty()) {
+      queries.add(alterQuery);
+    }
     return queries;
   }
 
+  private String getCreateQuery(String resultTableName, Table t, StorageDescriptor sd,
+      String location, boolean isPartitioned) {
+    return new CompactionQueryBuilder(
+        CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
+        CompactionQueryBuilder.Operation.CREATE,
+        resultTableName)
+        .setSourceTab(t)
+        .setStorageDescriptor(sd)
+        .setLocation(location)
+        .setPartitioned(isPartitioned)
+        .build();
+  }
+
   /**
    * Builds an alter table query, which adds partitions pointing to location of delta directories.
    *
    * @param tableName name of the temp table to be altered
    * @param dir the parent directory of delta directories
    * @param validWriteIdList valid write ids for the table/partition to compact
-   * @return alter table statement wrapped in {@link Optional}.
+   * @return alter table statement.
    */
-  private Optional<String> buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
+  private String buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
       ValidWriteIdList validWriteIdList) {
-    if (!dir.getCurrentDirectories().isEmpty()) {
-      long minWriteID =
-          validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
-      long highWatermark = validWriteIdList.getHighWatermark();
-      List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
-          delta -> delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID)
-          .collect(Collectors.toList());
-      if (!deltas.isEmpty()) {
-        StringBuilder query = new StringBuilder().append("alter table ").append(tableName);
-        query.append(" add ");
-        deltas.forEach(
-            delta -> query.append("partition (file_name='").append(delta.getPath().getName())
-                .append("') location '").append(delta.getPath()).append("' "));
-        return Optional.of(query.toString());
-      }
-    }
-    return Optional.empty();
+    return new CompactionQueryBuilder(CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
+        CompactionQueryBuilder.Operation.ALTER, tableName)
+        .setDir(dir)
+        .setValidWriteIdList(validWriteIdList)
+        .build();
   }
 
   /**
@@ -171,24 +171,21 @@ final class MmMinorQueryCompactor extends QueryCompactor {
    *  <li>insert into table $tmpTableBase_result select `col_1`, .. from tmpTableBase</li>
    * </ol>
    *
-   * @param tmpTableBase an unique identifier, which helps to find all the temporary tables
+   * @param sourceTmpTableName an unique identifier, which helps to find all the temporary tables
+   * @param resultTmpTableName
    * @return list of compaction queries, always non-null
    */
-  private List<String> getCompactionQueries(String tmpTableBase, StorageDescriptor sd) {
-    String resultTableName = tmpTableBase + "_result";
-    StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName)
-        .append(" select ");
-    List<FieldSchema> cols = sd.getCols();
-    boolean isFirst = true;
-    for (FieldSchema col : cols) {
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("`").append(col.getName()).append("`");
-    }
-    query.append(" from ").append(tmpTableBase);
-    return Lists.newArrayList(query.toString());
+  private List<String> getCompactionQueries(String sourceTmpTableName, String resultTmpTableName,
+      Table sourceTable) {
+    return Lists.newArrayList(
+        new CompactionQueryBuilder(
+            CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
+            CompactionQueryBuilder.Operation.INSERT,
+            resultTmpTableName)
+        .setFromTableName(sourceTmpTableName)
+        .setSourceTab(sourceTable)
+        .build()
+    );
   }
 
   /**
@@ -197,8 +194,17 @@ final class MmMinorQueryCompactor extends QueryCompactor {
    * @return list of drop table statements, always non-null
    */
   private List<String> getDropQueries(String tmpTableBase) {
-    return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase,
-        MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase + "_result");
+    return Lists.newArrayList(
+        getDropQuery(tmpTableBase),
+        getDropQuery(tmpTableBase + "_result")
+        );
+  }
+
+  private String getDropQuery(String tableToDrop) {
+    return new CompactionQueryBuilder(
+        CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
+        CompactionQueryBuilder.Operation.DROP,
+        tableToDrop).build();
   }
 
   private HiveConf setUpDriverSession(HiveConf hiveConf) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java
deleted file mode 100644
index 891696d..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.txn.compactor;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SkewedInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.util.DirectionUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.HiveStringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-final class MmQueryCompactorUtils {
-
-  private static final Logger LOG = LoggerFactory.getLogger(MmQueryCompactorUtils.class.getName());
-  static final String DROP_IF_EXISTS = "drop table if exists ";
-
-  private MmQueryCompactorUtils() {}
-
-  /**
-   * Creates a command to create a new table based on an example table (sourceTab).
-   *
-   * @param fullName of new table
-   * @param sourceTab the table we are modeling the new table on
-   * @param sd StorageDescriptor of the table or partition we are modeling the new table on
-   * @param location of the new table
-   * @param isPartitioned should the new table be partitioned
-   * @param isExternal should the new table be external
-   * @return query string creating the new table
-   */
-  static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor sd,
-      String location, boolean isPartitioned, boolean isExternal) {
-    StringBuilder query = new StringBuilder("create temporary ");
-    if (isExternal) {
-      query.append("external ");
-    }
-    query.append("table ").append(fullName).append("(");
-    List<FieldSchema> cols = sourceTab.getSd().getCols();
-    boolean isFirst = true;
-    for (FieldSchema col : cols) {
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("`").append(col.getName()).append("` ").append(col.getType());
-    }
-    query.append(") ");
-
-    // Partitioning. Used for minor compaction.
-    if (isPartitioned) {
-      query.append(" PARTITIONED BY (`file_name` STRING) ");
-    }
-
-    // Bucketing.
-    List<String> buckCols = sourceTab.getSd().getBucketCols();
-    if (buckCols.size() > 0) {
-      query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
-      List<Order> sortCols = sourceTab.getSd().getSortCols();
-      if (sortCols.size() > 0) {
-        query.append("SORTED BY (");
-        isFirst = true;
-        for (Order sortCol : sortCols) {
-          if (!isFirst) {
-            query.append(", ");
-          }
-          isFirst = false;
-          query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
-        }
-        query.append(") ");
-      }
-      query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS");
-    }
-
-    // Stored as directories. We don't care about the skew otherwise.
-    if (sourceTab.getSd().isStoredAsSubDirectories()) {
-      SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
-      if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
-        query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
-        isFirst = true;
-        for (List<String> colValues : skewedInfo.getSkewedColValues()) {
-          if (!isFirst) {
-            query.append(", ");
-          }
-          isFirst = false;
-          query.append("('").append(StringUtils.join("','", colValues)).append("')");
-        }
-        query.append(") STORED AS DIRECTORIES");
-      }
-    }
-
-    SerDeInfo serdeInfo = sd.getSerdeInfo();
-    Map<String, String> serdeParams = serdeInfo.getParameters();
-    query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
-        .append("'");
-    String sh = sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
-    assert sh == null; // Not supposed to be a compactable table.
-    if (!serdeParams.isEmpty()) {
-      ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
-    }
-    query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
-        .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
-        .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
-    // Exclude all standard table properties.
-    Set<String> excludes = getHiveMetastoreConstants();
-    excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
-    isFirst = true;
-    for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) {
-      if (e.getValue() == null) {
-        continue;
-      }
-      if (excludes.contains(e.getKey())) {
-        continue;
-      }
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue()))
-          .append("'");
-    }
-    if (!isFirst) {
-      query.append(", ");
-    }
-    query.append("'transactional'='false')");
-    return query.toString();
-
-  }
-
-  private static Set<String> getHiveMetastoreConstants() {
-    Set<String> result = new HashSet<>();
-    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
-      if (!Modifier.isStatic(f.getModifiers())) {
-        continue;
-      }
-      if (!Modifier.isFinal(f.getModifiers())) {
-        continue;
-      }
-      if (!String.class.equals(f.getType())) {
-        continue;
-      }
-      f.setAccessible(true);
-      try {
-        result.add((String) f.get(null));
-      } catch (IllegalAccessException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Remove the delta directories of aborted transactions.
-   */
-  static void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
-    List<Path> filesToDelete = dir.getAbortedDirectories();
-    if (filesToDelete.size() < 1) {
-      return;
-    }
-    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
-    FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
-    for (Path dead : filesToDelete) {
-      LOG.debug("Going to delete path " + dead.toString());
-      fs.delete(dead, true);
-    }
-  }
-}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 7a9e48f..7f3ccfa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -23,12 +23,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -161,31 +161,6 @@ abstract class QueryCompactor {
       }
     }
 
-      /**
-       * Get a create temporary table query string with Orc ACID columns.
-       * @param tableName name of the new temporary table
-       * @param table the table where the compaction is running
-       * @return create query
-       */
-    static String getCreateTempTableQueryWithAcidColumns(String tableName, Table table) {
-      StringBuilder query = new StringBuilder("create temporary external table ").append(tableName).append(" (");
-      // Acid virtual columns
-      query.append("`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` "
-              + "bigint, `row` struct<");
-      List<FieldSchema> cols = table.getSd().getCols();
-      boolean isFirst = true;
-      // Actual columns
-      for (FieldSchema col : cols) {
-        if (!isFirst) {
-          query.append(", ");
-        }
-        isFirst = false;
-        query.append("`").append(col.getName()).append("` ").append(":").append(col.getType());
-      }
-      query.append(">)");
-      return query.toString();
-    }
-
     /**
      * Remove the root directory of a table if it's empty.
      * @param conf the Hive configuration
@@ -203,5 +178,20 @@ abstract class QueryCompactor {
         }
       }
     }
+    /**
+     * Remove the delta directories of aborted transactions.
+     */
+    static void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
+      List<Path> filesToDelete = dir.getAbortedDirectories();
+      if (filesToDelete.size() < 1) {
+        return;
+      }
+      LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
+      FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+      for (Path dead : filesToDelete) {
+        LOG.debug("Going to delete path " + dead.toString());
+        fs.delete(dead, true);
+      }
+    }
   }
 }