You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/01/05 08:54:32 UTC

[hive] branch master updated: HIVE-25688: Non blocking DROP PARTITION implementation (Denys Kuzmenko, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 66f7540  HIVE-25688: Non blocking DROP PARTITION implementation (Denys Kuzmenko, reviewed by Peter Vary)
66f7540 is described below

commit 66f75403c27898e296cd085fe14a1d4b75c636bd
Author: Denys Kuzmenko <dk...@cloudera.com>
AuthorDate: Wed Jan 5 10:54:13 2022 +0200

    HIVE-25688: Non blocking DROP PARTITION implementation (Denys Kuzmenko, reviewed by Peter Vary)
    
    Closes #2780
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +
 .../drop/AbstractDropPartitionAnalyzer.java        |  34 +--
 .../drop/AlterTableDropPartitionAnalyzer.java      |   9 -
 .../drop/AlterTableDropPartitionDesc.java          |  30 ++-
 .../drop/AlterTableDropPartitionOperation.java     |  16 +-
 .../drop/AlterViewDropPartitionAnalyzer.java       |   9 -
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |   6 +
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |  12 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      | 236 ++++++++++++---------
 .../hive/ql/txn/compactor/CompactorThread.java     | 105 +++------
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |   2 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java       |  30 +--
 .../org/apache/hadoop/hive/ql/TestTxnCommands.java | 117 ++++++++--
 .../apache/hadoop/hive/ql/TestTxnCommands3.java    |   8 +-
 .../apache/hadoop/hive/ql/TestTxnConcatenate.java  |  20 +-
 .../org/apache/hadoop/hive/ql/TestTxnExIm.java     |   6 +-
 .../org/apache/hadoop/hive/ql/TestTxnLoadData.java |  50 ++---
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java    |  52 ++---
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |   2 +
 .../ql/lockmgr/DbTxnManagerEndToEndTestBase.java   |  38 +++-
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  | 100 +++++++++
 .../hive/ql/txn/compactor/TestInitiator.java       |   7 +-
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp    |   8 +-
 .../src/gen/thrift/gen-cpp/hive_metastore_types.h  |   3 +-
 .../apache/hadoop/hive/metastore/api/TxnType.java  |   5 +-
 .../src/gen/thrift/gen-php/metastore/TxnType.php   |   3 +
 .../src/gen/thrift/gen-py/hive_metastore/ttypes.py |   3 +
 .../src/gen/thrift/gen-rb/hive_metastore_types.rb  |   5 +-
 .../hadoop/hive/metastore/HiveMetaStoreClient.java |  15 +-
 .../hive/metastore/LockComponentBuilder.java       |   4 +-
 .../hive/metastore/PartitionDropOptions.java       |  12 ++
 .../src/main/thrift/hive_metastore.thrift          |   3 +-
 .../hadoop/hive/metastore/AcidEventListener.java   |  53 ++++-
 .../apache/hadoop/hive/metastore/HMSHandler.java   | 152 +++++++------
 .../hive/metastore/txn/CompactionTxnHandler.java   |   3 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java      |  71 +++++++
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |   3 +
 .../apache/hadoop/hive/metastore/txn/TxnUtils.java |  67 ++++++
 .../hadoop/hive/common/AcidMetaDataFile.java       |   4 +-
 39 files changed, 898 insertions(+), 410 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b0b9b4f..fe88e04 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3071,6 +3071,11 @@ public class HiveConf extends Configuration {
         "If enabled, truncate for transactional tables will not delete the data directories,\n" +
         "rather create a new base directory with no datafiles."),
     
+    HIVE_ACID_DROP_PARTITION_USE_BASE("hive.acid.droppartition.usebase", false,
+        "Enables non-blocking DROP PARTITION operation.\n" +
+        "If enabled, drop for transactional tables will not delete the data directories,\n" +
+        "rather create a new base directory with no datafiles.\")"),
+    
     // Configs having to do with DeltaFilesMetricReporter, which collects lists of most recently active tables
     // with the most number of active/obsolete deltas.
     HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE("hive.txn.acid.metrics.max.cache.size", 100,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AbstractDropPartitionAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AbstractDropPartitionAnalyzer.java
index b727492..692732f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AbstractDropPartitionAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AbstractDropPartitionAnalyzer.java
@@ -31,10 +31,11 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.ddl.DDLWork;
 import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableAnalyzer;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
-import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
  * Analyzer for drop partition commands.
  */
 abstract class AbstractDropPartitionAnalyzer extends AbstractAlterTableAnalyzer {
+
   AbstractDropPartitionAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
   }
@@ -103,34 +105,40 @@ abstract class AbstractDropPartitionAnalyzer extends AbstractAlterTableAnalyzer
     re.noLockNeeded();
     inputs.add(re);
 
-    addTableDropPartsOutputs(table, partitionSpecs.values(), !ifExists);
-
+    boolean dropPartUseBase = HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE)
+        || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED)
+      && AcidUtils.isTransactionalTable(table);
+    
+    addTableDropPartsOutputs(table, partitionSpecs.values(), !ifExists, dropPartUseBase);
+    
     AlterTableDropPartitionDesc desc =
-        new AlterTableDropPartitionDesc(tableName, partitionSpecs, mustPurge, replicationSpec);
-    rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
-
-    postProcess(tableName, table, desc);
+        new AlterTableDropPartitionDesc(tableName, partitionSpecs, mustPurge, replicationSpec, !dropPartUseBase, table);
 
+    if (desc.mayNeedWriteId()) {
+      setAcidDdlDesc(desc);
+    }
+    rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
   }
 
   protected abstract boolean expectView();
 
-  protected abstract void postProcess(TableName tableName, Table table, AlterTableDropPartitionDesc desc);
-
   /**
    * Add the table partitions to be modified in the output, so that it is available for the
    * pre-execution hook. If the partition does not exist, throw an error if
    * throwIfNonExistent is true, otherwise ignore it.
    */
-  private void addTableDropPartsOutputs(Table tab, Collection<List<ExprNodeGenericFuncDesc>> partitionSpecs,
-      boolean throwIfNonExistent) throws SemanticException {
+  private void addTableDropPartsOutputs(Table table, Collection<List<ExprNodeGenericFuncDesc>> partitionSpecs,
+      boolean throwIfNonExistent, boolean  dropPartUseBase) throws SemanticException {
+    WriteType writeType =
+        dropPartUseBase ? WriteType.DDL_EXCL_WRITE : WriteType.DDL_EXCLUSIVE;
+    
     for (List<ExprNodeGenericFuncDesc> specs : partitionSpecs) {
       for (ExprNodeGenericFuncDesc partitionSpec : specs) {
         List<Partition> parts = new ArrayList<>();
 
         boolean hasUnknown = false;
         try {
-          hasUnknown = db.getPartitionsByExpr(tab, partitionSpec, conf, parts);
+          hasUnknown = db.getPartitionsByExpr(table, partitionSpec, conf, parts);
         } catch (Exception e) {
           throw new SemanticException(ErrorMsg.INVALID_PARTITION.getMsg(partitionSpec.getExprString()), e);
         }
@@ -147,7 +155,7 @@ abstract class AbstractDropPartitionAnalyzer extends AbstractAlterTableAnalyzer
           }
         }
         for (Partition p : parts) {
-          outputs.add(new WriteEntity(p, WriteEntity.WriteType.DDL_EXCLUSIVE));
+          outputs.add(new WriteEntity(p, writeType));
         }
       }
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionAnalyzer.java
index f8632c7..a1c038d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionAnalyzer.java
@@ -41,13 +41,4 @@ public class AlterTableDropPartitionAnalyzer extends AbstractDropPartitionAnalyz
   protected boolean expectView() {
     return false;
   }
-
-  @Override
-  protected void postProcess(TableName tableName, Table table, AlterTableDropPartitionDesc desc) {
-    if (!AcidUtils.isTransactionalTable(table)) {
-      return;
-    }
-
-    setAcidDdlDesc(desc);
-  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionDesc.java
index bb64bd6..1b3df43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionDesc.java
@@ -25,6 +25,8 @@ import java.util.Map;
 
 import org.apache.hadoop.hive.ql.ddl.DDLDesc.DDLDescWithWriteId;
 import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -65,10 +67,18 @@ public class AlterTableDropPartitionDesc implements DDLDescWithWriteId, Serializ
   private final ArrayList<PartitionDesc> partSpecs;
   private final boolean ifPurge;
   private final ReplicationSpec replicationSpec;
-  private Long writeId;
+  private final boolean deleteData;
+  private final boolean isTransactional;
+
+  private long writeId = 0;
 
   public AlterTableDropPartitionDesc(TableName tableName, Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs,
       boolean ifPurge, ReplicationSpec replicationSpec) {
+    this(tableName, partSpecs, ifPurge, replicationSpec, true, null);
+  }
+
+  public AlterTableDropPartitionDesc(TableName tableName, Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs,
+      boolean ifPurge, ReplicationSpec replicationSpec, boolean deleteData, Table table) {
     this.tableName = tableName;
     this.partSpecs = new ArrayList<PartitionDesc>(partSpecs.size());
     for (Map.Entry<Integer, List<ExprNodeGenericFuncDesc>> partSpec : partSpecs.entrySet()) {
@@ -79,6 +89,8 @@ public class AlterTableDropPartitionDesc implements DDLDescWithWriteId, Serializ
     }
     this.ifPurge = ifPurge;
     this.replicationSpec = replicationSpec == null ? new ReplicationSpec() : replicationSpec;
+    this.isTransactional = AcidUtils.isTransactionalTable(table);
+    this.deleteData = deleteData;
   }
 
   @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -108,13 +120,21 @@ public class AlterTableDropPartitionDesc implements DDLDescWithWriteId, Serializ
   }
 
   @Override
-  public String getFullTableName() {
-    return getTableName();
+  public boolean mayNeedWriteId() {
+    return isTransactional;
+  }
+
+  public long getWriteId() {
+    return writeId;
+  }
+
+  public boolean getDeleteData() {
+    return deleteData;
   }
 
   @Override
-  public boolean mayNeedWriteId() {
-    return true;
+  public String getFullTableName() {
+    return getTableName();
   }
 
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
index e579b0e..6e702b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
@@ -40,8 +40,6 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.HiveTableName;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 
-import com.google.common.collect.Iterables;
-
 /**
  * Operation process of dropping some partitions of a table.
  */
@@ -103,7 +101,10 @@ public class AlterTableDropPartitionOperation extends DDLOperation<AlterTableDro
         } else {
           for (Partition p : partitions) {
             if (replicationSpec.allowEventReplacementInto(dbParams)) {
-              context.getDb().dropPartition(table.getDbName(), table.getTableName(), p.getValues(), true);
+              PartitionDropOptions options =
+                  PartitionDropOptions.instance().deleteData(desc.getDeleteData())
+                    .setWriteId(desc.getWriteId());
+              context.getDb().dropPartition(table.getDbName(), table.getTableName(), p.getValues(), options);
             }
           }
         }
@@ -117,7 +118,7 @@ public class AlterTableDropPartitionOperation extends DDLOperation<AlterTableDro
 
   private void dropPartitions(boolean isRepl) throws HiveException {
     // ifExists is currently verified in AlterTableDropPartitionAnalyzer
-    TableName tablenName = HiveTableName.of(desc.getTableName());
+    TableName tableName = HiveTableName.of(desc.getTableName());
 
     List<Pair<Integer, byte[]>> partitionExpressions = new ArrayList<>(desc.getPartSpecs().size());
     for (AlterTableDropPartitionDesc.PartitionDesc partSpec : desc.getPartSpecs()) {
@@ -126,8 +127,9 @@ public class AlterTableDropPartitionOperation extends DDLOperation<AlterTableDro
     }
 
     PartitionDropOptions options =
-        PartitionDropOptions.instance().deleteData(true).ifExists(true).purgeData(desc.getIfPurge());
-    List<Partition> droppedPartitions = context.getDb().dropPartitions(tablenName.getDb(), tablenName.getTable(),
+        PartitionDropOptions.instance().deleteData(desc.getDeleteData())
+          .ifExists(true).purgeData(desc.getIfPurge());
+    List<Partition> droppedPartitions = context.getDb().dropPartitions(tableName.getDb(), tableName.getTable(),
         partitionExpressions, options);
 
     if (isRepl) {
@@ -145,7 +147,7 @@ public class AlterTableDropPartitionOperation extends DDLOperation<AlterTableDro
       DDLUtils.addIfAbsentByName(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK), context);
 
       if (llapEvictRequestBuilder != null) {
-        llapEvictRequestBuilder.addPartitionOfATable(tablenName.getDb(), tablenName.getTable(), partition.getSpec());
+        llapEvictRequestBuilder.addPartitionOfATable(tableName.getDb(), tableName.getTable(), partition.getSpec());
       }
     }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterViewDropPartitionAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterViewDropPartitionAnalyzer.java
index 9f8961e..9fdeb56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterViewDropPartitionAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterViewDropPartitionAnalyzer.java
@@ -36,15 +36,6 @@ public class AlterViewDropPartitionAnalyzer extends AbstractDropPartitionAnalyze
   }
 
   @Override
-  protected void postProcess(TableName tableName, Table table, AlterTableDropPartitionDesc desc) {
-    if (!AcidUtils.isTransactionalTable(table)) {
-      return;
-    }
-
-    setAcidDdlDesc(desc);
-  }
-
-  @Override
   protected boolean expectView() {
     return true;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 222e984..519c8f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -3109,6 +3109,12 @@ public class AcidUtils {
     if (tree.getFirstChildWithType(HiveParser.TOK_ALTERTABLE_COMPACT) != null){
       return TxnType.COMPACTION;
     }
+    // check if soft delete
+    if (tree.getToken().getType() == HiveParser.TOK_ALTERTABLE_DROPPARTS
+        && (HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE)
+        || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED))) {
+      return TxnType.SOFT_DELETE;
+    }
     return TxnType.DEFAULT;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 96c60fe..e3b0997 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -58,6 +58,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -1530,7 +1531,7 @@ public class Hive {
                         final String metaTableName, boolean throwException) throws HiveException {
     return this.getTable(dbName, tableName, metaTableName, throwException, false);
   }
-  
+
   /**
    * Returns metadata of the table
    *
@@ -3804,6 +3805,15 @@ private void constructOneLBLocationMap(FileStatus fSta,
       PartitionDropOptions dropOptions) throws HiveException {
     try {
       Table table = getTable(dbName, tableName);
+      if (!dropOptions.deleteData) {
+        AcidUtils.TableSnapshot snapshot = AcidUtils.getTableSnapshot(conf, table, true);
+        if (snapshot != null) {
+          dropOptions.setWriteId(snapshot.getWriteId());
+        }
+        long txnId = Optional.ofNullable(SessionState.get())
+          .map(ss -> ss.getTxnMgr().getCurrentTxnId()).orElse(0L);
+        dropOptions.setTxnId(txnId);
+      }
       List<org.apache.hadoop.hive.metastore.api.Partition> partitions = getMSC().dropPartitions(dbName, tableName,
           partitionExpressions, dropOptions);
       return convertFromMetastore(table, partitions);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 38f4fec..221e99c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -19,13 +19,10 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -35,6 +32,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,11 +40,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -58,10 +51,13 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
@@ -137,8 +133,8 @@ public class Cleaner extends MetaStoreCompactorThread {
             // when min_history_level is finally dropped, than every HMS will commit compaction the new way
             // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
             for (CompactionInfo compactionInfo : readyToClean) {
-              cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
-                      clean(compactionInfo, cleanerWaterMark, metricsEnabled)), cleanerExecutor));
+              cleanerList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(
+                  () -> clean(compactionInfo, cleanerWaterMark, metricsEnabled)), cleanerExecutor));
             }
             CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
           }
@@ -184,91 +180,58 @@ public class Cleaner extends MetaStoreCompactorThread {
       if (metricsEnabled) {
         perfLogger.perfLogBegin(CLASS_NAME, cleanerMetric);
       }
+      String location = Optional.ofNullable(ci.properties).map(StringableMap::new)
+          .map(config -> config.get("location")).orElse(null);
+      
+      Callable<Boolean> cleanUpTask;
       Table t = resolveTable(ci);
-      if (t == null) {
-        // The table was dropped before we got around to cleaning it.
-        LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." +
-            idWatermark(ci));
-        txnHandler.markCleaned(ci);
-        return;
-      }
-      if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
-        // The table was marked no clean up true.
-        LOG.info("Skipping table " + ci.getFullTableName() + " clean up, as NO_CLEANUP set to true");
-        txnHandler.markCleaned(ci);
-        return;
-      }
+      Partition p = resolvePartition(ci);
 
-      Partition p = null;
-      if (ci.partName != null) {
-        p = resolvePartition(ci);
-        if (p == null) {
-          // The partition was dropped before we got around to cleaning it.
-          LOG.info("Unable to find partition " + ci.getFullPartitionName() +
-              ", assuming it was dropped." + idWatermark(ci));
+      if (location == null) {
+        if (t == null) {
+          // The table was dropped before we got around to cleaning it.
+          LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." +
+            idWatermark(ci));
           txnHandler.markCleaned(ci);
           return;
         }
-        if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
-          // The partition was marked no clean up true.
-          LOG.info("Skipping partition " + ci.getFullPartitionName() + " clean up, as NO_CLEANUP set to true");
+        if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+          // The table was marked no clean up true.
+          LOG.info("Skipping table " + ci.getFullTableName() + " clean up, as NO_CLEANUP set to true");
           txnHandler.markCleaned(ci);
           return;
         }
+        if (ci.partName != null) {
+          if (p == null) {
+            // The partition was dropped before we got around to cleaning it.
+            LOG.info("Unable to find partition " + ci.getFullPartitionName() +
+              ", assuming it was dropped." + idWatermark(ci));
+            txnHandler.markCleaned(ci);
+            return;
+          }
+          if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+            // The partition was marked no clean up true.
+            LOG.info("Skipping partition " + ci.getFullPartitionName() + " clean up, as NO_CLEANUP set to true");
+            txnHandler.markCleaned(ci);
+            return;
+          }
+        }
       }
-
       txnHandler.markCleanerStart(ci);
-
+      
       StorageDescriptor sd = resolveStorageDescriptor(t, p);
-      final String location = sd.getLocation();
-      ValidTxnList validTxnList =
-          TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
-      //save it so that getAcidState() sees it
-      conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
-      /**
-       * {@code validTxnList} is capped by minOpenTxnGLB so if
-       * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
-       * produced by a compactor, that means every reader that could be active right now see it
-       * as well.  That means if this base/delta shadows some earlier base/delta, the it will be
-       * used in favor of any files that it shadows.  Thus the shadowed files are safe to delete.
-       *
-       *
-       * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
-       * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
-       * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
-       * For example given partition P1, txnid:150 starts and sees txnid:149 as open.
-       * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved
-       * writeId:17.  Compactor will produce base_17_c160.
-       * Suppose txnid:149 writes delta_18_18
-       * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS entries
-       * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but
-       * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by
-       * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the
-       * metadata that says that 18 is aborted.
-       * In a slightly different case, whatever txn created delta_18 (and all other txn) may have
-       * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove
-       * it (since it has nothing but aborted data).  But we can't tell which actually happened
-       * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID.
-       *
-       * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up
-       * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed
-       * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.  This could be
-       * useful if there is all of a sudden a flood of aborted txns.  (For another day).
-       */
-
-      // Creating 'reader' list since we are interested in the set of 'obsolete' files
-      final ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, t, validTxnList);
-      LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+      cleanUpTask = () -> removeFiles(Optional.ofNullable(location).orElse(sd.getLocation()), 
+          minOpenTxnGLB, ci, ci.partName != null && p == null);
 
       Ref<Boolean> removedFiles = Ref.from(false);
       if (runJobAsSelf(ci.runAs)) {
-        removedFiles.value = removeFiles(location, validWriteIdList, ci);
+        removedFiles.value = cleanUpTask.call();
       } else {
         LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName());
         UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
             UserGroupInformation.getLoginUser());
-        ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
-          removedFiles.value = removeFiles(location, validWriteIdList, ci);
+        ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+          removedFiles.value = cleanUpTask.call();
           return null;
         });
         try {
@@ -296,9 +259,9 @@ public class Cleaner extends MetaStoreCompactorThread {
     }
   }
 
-  private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, Table t, ValidTxnList validTxnList)
+  private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
       throws NoSuchTxnException, MetaException {
-    List<String> tblNames = Collections.singletonList(TableName.getDbTable(t.getDbName(), t.getTableName()));
+    List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
     GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
     request.setValidTxnList(validTxnList.writeToString());
     GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
@@ -320,7 +283,7 @@ public class Cleaner extends MetaStoreCompactorThread {
   }
 
   private static boolean isDynPartAbort(Table t, CompactionInfo ci) {
-    return t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0
+    return Optional.ofNullable(t).map(Table::getPartitionKeys).filter(pk -> pk.size() > 0).isPresent()
         && ci.partName == null;
   }
 
@@ -328,14 +291,89 @@ public class Cleaner extends MetaStoreCompactorThread {
     return " id=" + ci.id;
   }
 
+  private boolean removeFiles(String location, long minOpenTxnGLB, CompactionInfo ci, boolean dropPartition)
+      throws MetaException, IOException, NoSuchObjectException, NoSuchTxnException {
+
+    if (dropPartition) {
+      LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
+      LockResponse res = null;
+      
+      try {
+        res = txnHandler.lock(lockRequest);
+        if (res.getState() == LockState.ACQUIRED) {
+          if (resolvePartition(ci) == null) {
+            Path path = new Path(location);
+            StringBuilder extraDebugInfo = new StringBuilder("[").append(path.getName()).append(",");
+
+            boolean ifPurge = Optional.ofNullable(ci.properties).map(StringableMap::new)
+              .map(config -> config.get("ifPurge")).map(Boolean::valueOf).orElse(true);
+
+            return remove(location, ci, Collections.singletonList(path), ifPurge,
+              path.getFileSystem(conf), extraDebugInfo);
+          }
+        }
+      } catch (NoSuchTxnException | TxnAbortedException e) {
+        LOG.error(e.getMessage());
+      } finally {
+        if (res != null && res.getState() != LockState.NOT_ACQUIRED) {
+          try {
+            txnHandler.unlock(new UnlockRequest(res.getLockid()));
+          } catch (NoSuchLockException | TxnOpenException e) {
+            LOG.error(e.getMessage());
+          }
+        }
+      }
+    }
+      
+    ValidTxnList validTxnList =
+      TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
+    //save it so that getAcidState() sees it
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    /**
+     * {@code validTxnList} is capped by minOpenTxnGLB so if
+     * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
+     * produced by a compactor, that means every reader that could be active right now see it
+     * as well.  That means if this base/delta shadows some earlier base/delta, the it will be
+     * used in favor of any files that it shadows.  Thus the shadowed files are safe to delete.
+     *
+     *
+     * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
+     * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+     * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+     * For example given partition P1, txnid:150 starts and sees txnid:149 as open.
+     * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved
+     * writeId:17.  Compactor will produce base_17_c160.
+     * Suppose txnid:149 writes delta_18_18
+     * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS entries
+     * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but
+     * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by
+     * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the
+     * metadata that says that 18 is aborted.
+     * In a slightly different case, whatever txn created delta_18 (and all other txn) may have
+     * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove
+     * it (since it has nothing but aborted data).  But we can't tell which actually happened
+     * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID.
+     *
+     * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up
+     * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed
+     * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.  This could be
+     * useful if there is all of a sudden a flood of aborted txns.  (For another day).
+     */
+
+    // Creating 'reader' list since we are interested in the set of 'obsolete' files
+    ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
+    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+    
+    return removeFiles(location, validWriteIdList, ci);
+  }
   /**
    * @return true if any files were removed
    */
   private boolean removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci)
       throws IOException, NoSuchObjectException, MetaException {
-    Path locPath = new Path(location);
-    AcidDirectory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from(
-        false), false);
+    Path path = new Path(location);
+    FileSystem fs = path.getFileSystem(conf);
+    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, Ref.from(false), false);
     List<Path> obsoleteDirs = dir.getObsolete();
     /**
      * add anything in 'dir'  that only has data from aborted transactions - no one should be
@@ -356,16 +394,15 @@ public class Cleaner extends MetaStoreCompactorThread {
       // Including obsolete directories for partitioned tables can result in data loss.
       obsoleteDirs = dir.getAbortedDirectories();
     }
-    List<Path> filesToDelete = new ArrayList<>(obsoleteDirs.size());
+    StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
+        .map(Path::getName).collect(Collectors.joining(",")));
+    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+  }
 
-    StringBuilder extraDebugInfo = new StringBuilder("[");
-    for (Path stat : obsoleteDirs) {
-      filesToDelete.add(stat);
-      extraDebugInfo.append(stat.getName()).append(",");
-      if (!FileUtils.isPathWithinSubtree(stat, locPath)) {
-        LOG.info(idWatermark(ci) + " found unexpected file: " + stat);
-      }
-    }
+  private boolean remove(String location, CompactionInfo ci, List<Path> filesToDelete, boolean ifPurge, 
+      FileSystem fs, StringBuilder extraDebugInfo) 
+      throws NoSuchObjectException, MetaException, IOException {
+    
     extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
     LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
          " obsolete directories from " + location + ". " + extraDebugInfo.toString());
@@ -374,16 +411,15 @@ public class Cleaner extends MetaStoreCompactorThread {
           ", that hardly seems right.");
       return false;
     }
-
-    FileSystem fs = dir.getFs();
     Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
-
+    boolean needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
+    
     for (Path dead : filesToDelete) {
       LOG.debug("Going to delete path " + dead.toString());
-      if (ReplChangeManager.shouldEnableCm(db, table)) {
-        replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true);
+      if (needCmRecycle) {
+        replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, ifPurge);
       }
-      fs.delete(dead, true);
+      FileUtils.moveToTrash(fs, dead, conf, ifPurge);
     }
     return true;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 7ee8f8b..9b5affa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -20,28 +20,26 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
+
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,74 +146,12 @@ public abstract class CompactorThread extends Thread implements Configurable {
   protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) {
     return (p == null) ? t.getSd() : p.getSd();
   }
-
-  /**
-   * Determine which user to run an operation as. If metastore.compactor.run.as.user is set, that user will be
-   * returned; if not: the the owner of the directory to be compacted.
-   * It is asserted that either the user running the hive metastore or the table
-   * owner must be able to stat the directory and determine the owner.
-   * @param location directory that will be read or written to.
-   * @param t metastore table object
-   * @return metastore.compactor.run.as.user value; or if that is not set: username of the owner of the location.
-   * @throws java.io.IOException if neither the hive metastore user nor the table owner can stat
-   * the location.
-   */
-  protected String findUserToRunAs(String location, Table t) throws IOException,
-      InterruptedException {
-    LOG.debug("Determining who to run the job as.");
-
-    // check if a specific user is set in config
-    String runUserAs = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.COMPACTOR_RUN_AS_USER);
-    if (runUserAs != null && !"".equals(runUserAs)) {
-      return runUserAs;
-    }
-
-    // get table directory owner
-    final Path p = new Path(location);
-    final FileSystem fs = p.getFileSystem(conf);
-    try {
-      FileStatus stat = fs.getFileStatus(p);
-      LOG.debug("Running job as " + stat.getOwner());
-      return stat.getOwner();
-    } catch (AccessControlException e) {
-      // TODO not sure this is the right exception
-      LOG.debug("Unable to stat file as current user, trying as table owner");
-
-      // Now, try it as the table owner and see if we get better luck.
-      final List<String> wrapper = new ArrayList<>(1);
-      UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
-          UserGroupInformation.getLoginUser());
-      ugi.doAs(new PrivilegedExceptionAction<Object>() {
-        @Override
-        public Object run() throws Exception {
-          // need to use a new filesystem object here to have the correct ugi
-          FileSystem proxyFs = p.getFileSystem(conf);
-          FileStatus stat = proxyFs.getFileStatus(p);
-          wrapper.add(stat.getOwner());
-          return null;
-        }
-      });
-      try {
-        FileSystem.closeAllForUGI(ugi);
-      } catch (IOException exception) {
-        LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
-      }
-
-      if (wrapper.size() == 1) {
-        LOG.debug("Running job as " + wrapper.get(0));
-        return wrapper.get(0);
-      }
-    }
-    LOG.error("Unable to stat file " + p + " as either current user(" + UserGroupInformation.getLoginUser() +
-      ") or table owner(" + t.getOwner() + "), giving up");
-    throw new IOException("Unable to stat file: " + p);
-  }
-
+  
   /**
    * Determine whether to run this job as the current user or whether we need a doAs to switch
    * users.
    * @param owner of the directory we will be working in, as determined by
-   * {@link #findUserToRunAs(String, org.apache.hadoop.hive.metastore.api.Table)}
+   * {@link TxnUtils#findUserToRunAs(String, org.apache.hadoop.hive.metastore.api.Table)}
    * @return true if the job should run as the current user, false if a doAs is needed.
    */
   protected boolean runJobAsSelf(String owner) {
@@ -250,4 +186,27 @@ public abstract class CompactorThread extends Thread implements Configurable {
   protected String getRuntimeVersion() {
     return this.getClass().getPackage().getImplementationVersion();
   }
+  
+  protected LockRequest createLockRequest(CompactionInfo ci, long txnId, LockType lockType, DataOperationType opType) {
+    String agentInfo = Thread.currentThread().getName();
+    LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+    requestBuilder.setUser(ci.runAs);
+    requestBuilder.setTransactionId(txnId);
+
+    LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+      .setLock(lockType)
+      .setOperationType(opType)
+      .setDbName(ci.dbname)
+      .setTableName(ci.tableName)
+      .setIsTransactional(true);
+
+    if (ci.partName != null) {
+      lockCompBuilder.setPartitionName(ci.partName);
+    }
+    requestBuilder.addLockComponent(lockCompBuilder.build());
+
+    requestBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) ||
+      !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
+    return requestBuilder.build();
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index bffa773..071cbbe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -262,7 +262,7 @@ public class Initiator extends MetaStoreCompactorThread {
 
     String user = cache.get(fullTableName);
     if (user == null) {
-      user = findUserToRunAs(sd.getLocation(), t);
+      user = TxnUtils.findUserToRunAs(sd.getLocation(), t, conf);
       cache.put(fullTableName, user);
     }
     return user;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index a90e307..08910ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -27,12 +27,11 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreThread;
-import org.apache.hadoop.hive.metastore.LockComponentBuilder;
-import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -438,7 +437,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
       }
 
       if (ci.runAs == null) {
-        ci.runAs = findUserToRunAs(sd.getLocation(), t);
+        ci.runAs = TxnUtils.findUserToRunAs(sd.getLocation(), t, conf);
       }
 
       checkInterrupt();
@@ -560,29 +559,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     return true;
   }
 
-  private LockRequest createLockRequest(CompactionInfo ci, long txnId) {
-    String agentInfo = Thread.currentThread().getName();
-    LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
-    requestBuilder.setUser(ci.runAs);
-    requestBuilder.setTransactionId(txnId);
-
-    LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
-      .setSharedRead()
-      .setOperationType(DataOperationType.SELECT)
-      .setDbName(ci.dbname)
-      .setTableName(ci.tableName)
-      .setIsTransactional(true);
-
-    if (ci.partName != null) {
-      lockCompBuilder.setPartitionName(ci.partName);
-    }
-    requestBuilder.addLockComponent(lockCompBuilder.build());
-
-    requestBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) ||
-      !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
-    return requestBuilder.build();
-  }
-
   /**
    * Just AcidUtils.getAcidState, but with impersonation if needed.
    */
@@ -719,7 +695,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
       this.txnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
       status = TxnStatus.OPEN;
 
-      LockRequest lockRequest = createLockRequest(ci, txnId);
+      LockRequest lockRequest = createLockRequest(ci, txnId, LockType.SHARED_READ, DataOperationType.SELECT);
       LockResponse res = msc.lock(lockRequest);
       if (res.getState() != LockState.ACQUIRED) {
         throw new TException("Unable to acquire lock(s) on {" + ci.getFullPartitionName()
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 5b18b8d..ddc7530 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -107,6 +107,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     //TestTxnCommandsWithSplitUpdateAndVectorization has the vectorized version
     //of these tests.
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, false);
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, false);
   }
 
@@ -975,7 +976,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
       }
     }
     Assert.assertNotNull(txnInfo);
-    Assert.assertEquals(14, txnInfo.getId());
+    Assert.assertEquals(16, txnInfo.getId());
     Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
     String s = TestTxnDbUtil
         .queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
@@ -1402,10 +1403,10 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     query = "select ROW__ID, a, b" + (isVectorized ? "" : ", INPUT__FILE__NAME") + " from "
         + Table.NONACIDORCTBL + " order by ROW__ID";
     String[][] expected2 = new String[][] {
-        {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/base_10000001_v0000019/bucket_00001"},
-        {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/base_10000001_v0000019/bucket_00001"},
-        {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/base_10000001_v0000019/bucket_00001"},
-        {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/base_10000001_v0000019/bucket_00001"}
+        {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/base_10000001_v0000021/bucket_00001"},
+        {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/base_10000001_v0000021/bucket_00001"},
+        {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/base_10000001_v0000021/bucket_00001"},
+        {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/base_10000001_v0000021/bucket_00001"}
     };
     checkResult(expected2, query, isVectorized, "after major compact", LOG);
     //make sure they are the same before and after compaction
@@ -1531,8 +1532,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     runStatementOnDriver("truncate table " + Table.ACIDTBL);
 
     FileSystem fs = FileSystem.get(hiveConf);
-    FileStatus[] stat =
-        fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBL.toString().toLowerCase()), AcidUtils.baseFileFilter);
+    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBL.toString().toLowerCase()),
+        AcidUtils.baseFileFilter);
     if (1 != stat.length) {
       Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
     }
@@ -1552,8 +1553,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     runStatementOnDriver("truncate table " + Table.ACIDTBLPART);
 
     FileSystem fs = FileSystem.get(hiveConf);
-    FileStatus[] stat =
-        fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"), AcidUtils.baseFileFilter);
+    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
+        AcidUtils.baseFileFilter);
     if (1 != stat.length) {
       Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
     }
@@ -1573,15 +1574,15 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     runStatementOnDriver("truncate table " + Table.ACIDTBLPART + " partition(p='b')");
 
     FileSystem fs = FileSystem.get(hiveConf);
-    FileStatus[] stat =
-        fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"), AcidUtils.baseFileFilter);
+    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"),
+        AcidUtils.baseFileFilter);
     if (1 != stat.length) {
       Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
     }
     String name = stat[0].getPath().getName();
     Assert.assertEquals("base_0000003", name);
-    stat =
-        fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"), AcidUtils.deltaFileFilter);
+    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
+        AcidUtils.deltaFileFilter);
     if (1 != stat.length) {
       Assert.fail("Expecting 1 delta and found " + stat.length + " files " + Arrays.toString(stat));
     }
@@ -1589,4 +1590,94 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
     Assert.assertEquals(2, r.size());
   }
+
+  @Test
+  public void testDropWithBaseOnePartition() throws Exception {
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='a') values (1,2),(3,4)");
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (5,5),(4,4)");
+
+    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, true);
+    runStatementOnDriver("alter table " + Table.ACIDTBLPART + " drop partition (p='b')");
+
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"),
+        AcidUtils.baseFileFilter);
+    if (1 != stat.length) {
+      Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
+    }
+    String name = stat[0].getPath().getName();
+    Assert.assertEquals("base_0000003", name);
+    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
+        AcidUtils.baseFileFilter);
+    if (0 != stat.length) {
+      Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat));
+    }
+    
+    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
+    Assert.assertEquals(2, r.size());
+    
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+    
+    Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
+    Assert.assertTrue(resp.getCompacts().stream().anyMatch(
+        ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && "p=b".equals(ci.getPartitionname())));
+    
+    runCleaner(hiveConf);
+
+    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase()),
+        path -> path.getName().equals("p=b"));
+    if (0 != stat.length) {
+      Assert.fail("Expecting partition data to be removed from FS");
+    }
+  }
+
+  @Test
+  public void testDropWithBaseMultiplePartitions() throws Exception {
+    runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='a', p3='a') values (1,1),(2,2)");
+    runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='a', p3='b') values (3,3),(4,4)");
+    runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='c') values (7,7),(8,8)");
+    
+    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, true);
+    runStatementOnDriver("alter table " + Table.ACIDTBLNESTEDPART + " drop partition (p2='a')");
+    
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
+    
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] stat;
+
+    for (char p : Arrays.asList('a', 'b')) {
+      String partName = "p1=a/p2=a/p3=" + p;
+      Assert.assertTrue(resp.getCompacts().stream().anyMatch(
+          ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && partName.equals(ci.getPartitionname())));
+      
+      stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/" + partName),
+          AcidUtils.baseFileFilter);
+      if (1 != stat.length) {
+        Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
+      }
+      String name = stat[0].getPath().getName();
+      Assert.assertEquals("base_0000004", name);
+    }
+    stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=b/p3=c"),
+        AcidUtils.baseFileFilter);
+    if (0 != stat.length) {
+      Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat));
+    }
+    
+    List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLNESTEDPART);
+    Assert.assertEquals(2, r.size());
+    
+    runCleaner(hiveConf);
+
+    for (char p : Arrays.asList('a', 'b')) {
+      stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=a"),
+          path -> path.getName().equals("p3=" + p));
+      if (0 != stat.length) {
+        Assert.fail("Expecting partition data to be removed from FS");
+      }
+    }
+  }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 8345832..64ac224 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -346,7 +346,7 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
      ├── delta_0000001_0000001_0000
      │   ├── _orc_acid_version
      │   └── bucket_00000
-     ├── delta_0000001_0000002_v0000018
+     ├── delta_0000001_0000002_v0000020
      │   ├── _orc_acid_version
      │   └── bucket_00000
      └── delta_0000002_0000002_0000
@@ -358,7 +358,7 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
         FileUtils.HIDDEN_FILES_PATH_FILTER);
 
     String[] expectedList = new String[] {
-        "/t/delta_0000001_0000002_v0000018",
+        "/t/delta_0000001_0000002_v0000020",
         "/t/delta_0000001_0000001_0000",
         "/t/delta_0000002_0000002_0000",
     };
@@ -385,14 +385,14 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'minor'");
     runWorker(hiveConf);
     /*
-    at this point delta_0000001_0000003_v0000020 is visible to everyone
+    at this point delta_0000001_0000003_v0000022 is visible to everyone
     so cleaner removes all files shadowed by it (which is everything in this case)
     */
     runCleaner(hiveConf);
     runCleaner(hiveConf);
 
     expectedList = new String[] {
-        "/t/delta_0000001_0000003_v0000020"
+        "/t/delta_0000001_0000003_v0000022"
     };
     actualList = fs.listStatus(new Path(warehousePath + "/t"),
         FileUtils.HIDDEN_FILES_PATH_FILTER);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
index 5daf5ac..1c16bb9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
@@ -79,13 +79,13 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
     Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
     String[][] expected2 = new String[][] {
         {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
-            "acidtbl/base_0000003_v0000019/bucket_00001"},
+            "acidtbl/base_0000003_v0000021/bucket_00001"},
         {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4",
-            "acidtbl/base_0000003_v0000019/bucket_00001"},
+            "acidtbl/base_0000003_v0000021/bucket_00001"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
-            "acidtbl/base_0000003_v0000019/bucket_00001"},
+            "acidtbl/base_0000003_v0000021/bucket_00001"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t8\t8",
-            "acidtbl/base_0000003_v0000019/bucket_00001"}};
+            "acidtbl/base_0000003_v0000021/bucket_00001"}};
     checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
   }
   @Test
@@ -120,11 +120,11 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
     Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
     String[][] expected2 = new String[][] {
         {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
-            "acidtblpart/p=p1/base_0000003_v0000019/bucket_00001"},
+            "acidtblpart/p=p1/base_0000003_v0000021/bucket_00001"},
         {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
             "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001_0"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
-            "acidtblpart/p=p1/base_0000003_v0000019/bucket_00001"},
+            "acidtblpart/p=p1/base_0000003_v0000021/bucket_00001"},
         {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
             "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001_0"}};
 
@@ -160,10 +160,10 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
     Assert.assertEquals(1, rsp.getCompactsSize());
     Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
     String[][] expected2 = new String[][] {
-        {"1\t2", "t/base_0000003_v0000020/000000_0"},
-        {"4\t5", "t/base_0000003_v0000020/000000_0"},
-        {"5\t6", "t/base_0000003_v0000020/000000_0"},
-        {"8\t8", "t/base_0000003_v0000020/000000_0"}};
+        {"1\t2", "t/base_0000003_v0000022/000000_0"},
+        {"4\t5", "t/base_0000003_v0000022/000000_0"},
+        {"5\t6", "t/base_0000003_v0000022/000000_0"},
+        {"8\t8", "t/base_0000003_v0000022/000000_0"}};
     checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
   }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 80948dc..34affbf 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -346,11 +346,11 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected3 = new String[][] {
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
-            ".*t/delta_0000001_0000002_v000002[4-5]/bucket_00000"},
+            ".*t/delta_0000001_0000002_v000002[6-7]/bucket_00000"},
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4",
-            ".*t/delta_0000001_0000002_v000002[4-5]/bucket_00000"},
+            ".*t/delta_0000001_0000002_v000002[6-7]/bucket_00000"},
         {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t6",
-            ".*t/delta_0000001_0000002_v000002[4-5]/bucket_00000"}};
+            ".*t/delta_0000001_0000002_v000002[6-7]/bucket_00000"}};
     checkResult(expected3, testQuery, isVectorized, "minor compact imported table");
 
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
index 67787a6..9d3714e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -139,8 +139,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'minor'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected3 = new String[][] {
-        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000032/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000032/bucket_00000"}
+        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000034/bucket_00000"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000034/bucket_00000"}
     };
     checkResult(expected3, testQuery, isVectorized, "delete compact minor");
 
@@ -173,9 +173,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected6 = new String[][]{
-        {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009_v0000046/bucket_00000"},
-        {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009_v0000046/bucket_00000"},
-        {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000046/bucket_00000"}
+        {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009_v0000048/bucket_00000"},
+        {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009_v0000048/bucket_00000"},
+        {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000048/bucket_00000"}
     };
     checkResult(expected6, testQuery, isVectorized, "load data inpath compact major");
   }
@@ -210,10 +210,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'minor'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected1 = new String[][] {
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002_v0000024/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002_v0000024/bucket_00000"},
-        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002_v0000024/bucket_00000"},
-        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002_v0000024/bucket_00000"}
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002_v0000026/bucket_00000"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002_v0000026/bucket_00000"},
+        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002_v0000026/bucket_00000"},
+        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002_v0000026/bucket_00000"}
     };
     checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)");
 
@@ -222,11 +222,11 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected2 = new String[][] {
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003_v0000028/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003_v0000028/bucket_00000"},
-        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003_v0000028/bucket_00000"},
-        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003_v0000028/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003_v0000028/bucket_00000"}
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003_v0000030/bucket_00000"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003_v0000030/bucket_00000"},
+        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003_v0000030/bucket_00000"},
+        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003_v0000030/bucket_00000"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003_v0000030/bucket_00000"}
     };
     checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
 
@@ -244,9 +244,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected4 = new String[][] {
-        {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005_v0000037/bucket_00000"},
-        {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005_v0000037/bucket_00000"},
-        {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005_v0000037/bucket_00000"}};
+        {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005_v0000039/bucket_00000"},
+        {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005_v0000039/bucket_00000"},
+        {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005_v0000039/bucket_00000"}};
     checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)");
   }
   /**
@@ -325,13 +325,13 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
 
     String[][] expected3 = new String[][] {
         {"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
-            "t/base_10000003_v0000034/bucket_00000"},
+            "t/base_10000003_v0000036/bucket_00000"},
         {"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":1}\t7\t8",
-            "t/base_10000003_v0000034/bucket_00000"},
+            "t/base_10000003_v0000036/bucket_00000"},
         {"{\"writeid\":10000002,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
-            "t/base_10000003_v0000034/bucket_00001"},
+            "t/base_10000003_v0000036/bucket_00001"},
         {"{\"writeid\":10000003,\"bucketid\":536870912,\"rowid\":0}\t9\t9",
-            "t/base_10000003_v0000034/bucket_00000"}
+            "t/base_10000003_v0000036/bucket_00000"}
     };
     checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)");
   }
@@ -454,10 +454,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
     String[][] expected2 = new String[][] {
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001_v0000022/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001_v0000022/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001_v0000022/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001_v0000022/bucket_00000"}
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001_v0000024/bucket_00000"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001_v0000024/bucket_00000"},
+        {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001_v0000024/bucket_00000"},
+        {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001_v0000024/bucket_00000"}
     };
     checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
     //at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index da670c8..2aa8840 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -168,10 +168,10 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     */
 
     String expected[][] = {
-        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000"},
-        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001"},
-        {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000"}
+        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00000"},
+        {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00001"},
+        {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00001"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00000"}
     };
     checkResult(expected,
         "select ROW__ID, c1, c2, c3" + (shouldVectorize() ? "" : ", INPUT__FILE__NAME")
@@ -186,8 +186,8 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0");
     expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000_0");
     expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001_0");
-    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000");
-    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00000");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00001");
     assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
 
     TestTxnCommands2.runCleaner(hiveConf);
@@ -196,8 +196,8 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
     Assert.assertEquals("Unexpected result after clean", stringifyValues(result), rs);
 
     expectedFiles.clear();
-    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000");
-    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00000");
+    expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00001");
     assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
   }
 
@@ -478,23 +478,23 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
     /*Compaction preserves location of rows wrt buckets/tranches (for now)*/
     String expected4[][] = {
         {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2",
-            "warehouse/t/base_10000002_v0000028/bucket_00002"},
+            "warehouse/t/base_10000002_v0000030/bucket_00002"},
         {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4",
-            "warehouse/t/base_10000002_v0000028/bucket_00002"},
+            "warehouse/t/base_10000002_v0000030/bucket_00002"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
-            "warehouse/t/base_10000002_v0000028/bucket_00000"},
+            "warehouse/t/base_10000002_v0000030/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t9\t10",
-            "warehouse/t/base_10000002_v0000028/bucket_00000"},
+            "warehouse/t/base_10000002_v0000030/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20",
-            "warehouse/t/base_10000002_v0000028/bucket_00000"},
+            "warehouse/t/base_10000002_v0000030/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12",
-            "warehouse/t/base_10000002_v0000028/bucket_00000"},
+            "warehouse/t/base_10000002_v0000030/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40",
-            "warehouse/t/base_10000002_v0000028/bucket_00000"},
+            "warehouse/t/base_10000002_v0000030/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60",
-            "warehouse/t/base_10000002_v0000028/bucket_00000"},
+            "warehouse/t/base_10000002_v0000030/bucket_00000"},
         {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88",
-            "warehouse/t/base_10000002_v0000028/bucket_00001"},
+            "warehouse/t/base_10000002_v0000030/bucket_00001"},
     };
     checkExpected(rs, expected4,"after major compact");
   }
@@ -750,15 +750,15 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
     rs = runStatementOnDriver(query);
     String[][] expected5 = {//the row__ids are the same after compaction
         {"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t1\t17",
-            "warehouse/t/base_10000001_v0000028/bucket_00000"},
+            "warehouse/t/base_10000001_v0000030/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t2\t4",
-            "warehouse/t/base_10000001_v0000028/bucket_00000"},
+            "warehouse/t/base_10000001_v0000030/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6",
-            "warehouse/t/base_10000001_v0000028/bucket_00000"},
+            "warehouse/t/base_10000001_v0000030/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t6\t8",
-            "warehouse/t/base_10000001_v0000028/bucket_00000"},
+            "warehouse/t/base_10000001_v0000030/bucket_00000"},
         {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10",
-            "warehouse/t/base_10000001_v0000028/bucket_00000"}
+            "warehouse/t/base_10000001_v0000030/bucket_00000"}
     };
     checkExpected(rs, expected5, "After major compaction");
     //vectorized because there is INPUT__FILE__NAME
@@ -820,10 +820,10 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
         {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
         {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
         {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"},
-        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"},
-        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"}
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/base_0000003_v0000021/bucket_00000"},
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/base_0000003_v0000021/bucket_00000"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/base_0000003_v0000021/bucket_00000"},
+        {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/base_0000003_v0000021/bucket_00000"}
     };
     checkExpected(rs, expected2, "after major compaction");
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index e86e2d5..05e45f5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -72,6 +72,7 @@ public abstract class TxnCommandsBaseForTests {
   public enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart"),
+    ACIDTBLNESTEDPART("acidTblNestedPart"),
     ACIDTBL2("acidTbl2"),
     NONACIDORCTBL("nonAcidOrcTbl"),
     NONACIDORCTBL2("nonAcidOrcTbl2"),
@@ -147,6 +148,7 @@ public abstract class TxnCommandsBaseForTests {
   protected void setUpSchema() throws Exception {
     runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("create table " + Table.ACIDTBLNESTEDPART + "(a int, b int) partitioned by (p1 string, p2 string, p3 string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
     runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
     runStatementOnDriver("create temporary  table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index 12e2348..9eb9e1b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.lockmgr;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClientWithLocalCache;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -32,11 +34,18 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
+import java.io.File;
+
 /**
  * Base class for "end-to-end" tests for DbTxnManager and simulate concurrent queries.
  */
 public abstract class DbTxnManagerEndToEndTestBase {
 
+  private static final String TEST_DATA_DIR = new File(
+    System.getProperty("java.io.tmpdir") + File.separator +
+      DbTxnManagerEndToEndTestBase.class.getCanonicalName() + "-" + System.currentTimeMillis())
+    .getPath().replaceAll("\\\\", "/");
+
   protected static HiveConf conf = new HiveConf(Driver.class);
   protected HiveTxnManager txnMgr;
   protected Context ctx;
@@ -44,10 +53,12 @@ public abstract class DbTxnManagerEndToEndTestBase {
   protected TxnStore txnHandler;
 
   public DbTxnManagerEndToEndTestBase() {
-    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-            "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
-    conf.setBoolVar(HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, true);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+      "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, true);
+
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE, getWarehouseDir());
     TestTxnDbUtil.setConfValues(conf);
   }
   @BeforeClass
@@ -58,15 +69,19 @@ public abstract class DbTxnManagerEndToEndTestBase {
   @Before
   public void setUp() throws Exception {
     // set up metastore client cache
-    if (conf.getBoolVar(HiveConf.ConfVars.MSC_CACHE_ENABLED)) {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.MSC_CACHE_ENABLED)) {
       HiveMetaStoreClientWithLocalCache.init(conf);
     }
     SessionState.start(conf);
     ctx = new Context(conf);
     driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
     driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
+
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, -1);
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, false);
     HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+    
     TestTxnDbUtil.cleanDb(conf);
     SessionState ss = SessionState.get();
     ss.initTxnMgr(conf);
@@ -74,7 +89,15 @@ public abstract class DbTxnManagerEndToEndTestBase {
     Assert.assertTrue(txnMgr instanceof DbTxnManager);
     txnHandler = TxnUtils.getTxnStore(conf);
 
+    File f = new File(getWarehouseDir());
+    if (f.exists()) {
+      FileUtil.fullyDelete(f);
+    }
+    if (!(new File(getWarehouseDir()).mkdirs())) {
+      throw new RuntimeException("Could not create " + getWarehouseDir());
+    }
   }
+  
   @After
   public void tearDown() throws Exception {
     driver.close();
@@ -82,5 +105,10 @@ public abstract class DbTxnManagerEndToEndTestBase {
     if (txnMgr != null) {
       txnMgr.closeTxnManager();
     }
+    FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
+  }
+
+  protected String getWarehouseDir() {
+    return TEST_DATA_DIR + "/warehouse";
   }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index a8cb6aa..c3db57d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -37,6 +40,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.junit.Assert;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -48,6 +52,8 @@ import org.junit.ComparisonFailure;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.FieldSetter;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -3361,4 +3367,98 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
     Assert.assertEquals("1\t2", res.get(0));
   }
 
+  @Test
+  public void testDropPartitionNonBlocking() throws Exception {
+    testDropPartition(false);
+  }
+  @Test
+  public void testDropPartitionBlocking() throws Exception {
+    testDropPartition(true);
+  }
+
+  private void testDropPartition(boolean blocking) throws Exception {
+    dropTable(new String[] {"tab_acid"});
+    FileSystem fs = FileSystem.get(conf);
+    
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, !blocking);
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, 1);
+    driver = Mockito.spy(driver);
+
+    HiveConf.setBoolVar(driver2.getConf(), HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, !blocking);
+    driver2 = Mockito.spy(driver2);
+
+    driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " +
+      "stored as orc TBLPROPERTIES ('transactional'='true')");
+    driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')");
+
+    driver.compileAndRespond("select * from tab_acid");
+    List<String> res = new ArrayList<>();
+
+    driver.lockAndRespond();
+    List<ShowLocksResponseElement> locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+
+    checkLock(LockType.SHARED_READ,
+      LockState.ACQUIRED, "default", "tab_acid", null, locks);
+
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    driver2.compileAndRespond("alter table tab_acid drop partition (p='foo')");
+
+    if (blocking) {
+      txnMgr2.acquireLocks(driver2.getPlan(), ctx, null, false);
+      locks = getLocks();
+
+      ShowLocksResponseElement checkLock = checkLock(LockType.EXCLUSIVE,
+        LockState.WAITING, "default", "tab_acid", "p=foo", locks);
+
+      swapTxnManager(txnMgr);
+      Mockito.doNothing().when(driver).lockAndRespond();
+      driver.run();
+
+      driver.getFetchTask().fetch(res);
+      swapTxnManager(txnMgr2);
+
+      FieldSetter.setField(txnMgr2, txnMgr2.getClass().getDeclaredField("numStatements"), 0);
+      txnMgr2.getMS().unlock(checkLock.getLockid());
+    }
+    driver2.lockAndRespond();
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", blocking ? 1 : 2, locks.size());
+
+    checkLock(blocking ? LockType.EXCLUSIVE : LockType.EXCL_WRITE,
+      LockState.ACQUIRED, "default", "tab_acid", "p=foo", locks);
+
+    Mockito.doNothing().when(driver2).lockAndRespond();
+    driver2.run();
+
+    if (!blocking) {
+      swapTxnManager(txnMgr);
+      Mockito.doNothing().when(driver).lockAndRespond();
+      driver.run();
+    }
+    Mockito.reset(driver, driver2);
+    
+    FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), "tab_acid" + (blocking ? "" : "/p=foo")),
+      (blocking ? path -> path.getName().equals("p=foo") : AcidUtils.baseFileFilter));
+    if ((blocking ? 0 : 1) != stat.length) {
+      Assert.fail("Partition data was " + (blocking ? "not " : "") + "removed from FS");
+    }
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals("Expecting 2 rows and found " + res.size(), 2, res.size());
+    
+    driver.run("select * from tab_acid where p='foo'");
+    res = new ArrayList<>();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals("Expecting 0 rows and found " + res.size(), 0, res.size());
+
+    //re-create partition with the same name
+    driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo')");
+    
+    driver.run("select * from tab_acid where p='foo'");
+    res = new ArrayList<>();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals("Expecting 1 rows and found " + res.size(), 1, res.size());
+  }
+  
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index ca8e1e0..33871fa 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -899,7 +900,7 @@ public class TestInitiator extends CompactorTest {
   }
 
   /**
-   * Tests org.apache.hadoop.hive.ql.txn.compactor.CompactorThread#findUserToRunAs(java.lang.String, org.apache.hadoop
+   * Tests org.apache.hadoop.hive.metastore.txn.#findUserToRunAs(java.lang.String, org.apache.hadoop
    * .hive.metastore.api.Table).
    * Used by Worker and Initiator.
    * Initiator caches this via Initiator#resolveUserToRunAs.
@@ -917,13 +918,13 @@ public class TestInitiator extends CompactorTest {
     // user set in config
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.COMPACTOR_RUN_AS_USER, userFromConf);
     initiator.setConf(conf);
-    Assert.assertEquals(userFromConf, initiator.findUserToRunAs(t.getSd().getLocation(), t));
+    Assert.assertEquals(userFromConf, TxnUtils.findUserToRunAs(t.getSd().getLocation(), t, conf));
 
     // table dir owner (is probably not "randomUser123")
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.COMPACTOR_RUN_AS_USER, "");
     // simulate restarting Initiator
     initiator.setConf(conf);
-    Assert.assertNotEquals(userFromConf, initiator.findUserToRunAs(t.getSd().getLocation(), t));
+    Assert.assertNotEquals(userFromConf, TxnUtils.findUserToRunAs(t.getSd().getLocation(), t, conf));
   }
 
   /**
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 8ae55a3..3d13a9e 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -615,16 +615,18 @@ int _kTxnTypeValues[] = {
   TxnType::REPL_CREATED,
   TxnType::READ_ONLY,
   TxnType::COMPACTION,
-  TxnType::MATER_VIEW_REBUILD
+  TxnType::MATER_VIEW_REBUILD,
+  TxnType::SOFT_DELETE
 };
 const char* _kTxnTypeNames[] = {
   "DEFAULT",
   "REPL_CREATED",
   "READ_ONLY",
   "COMPACTION",
-  "MATER_VIEW_REBUILD"
+  "MATER_VIEW_REBUILD",
+  "SOFT_DELETE"
 };
-const std::map<int, const char*> _TxnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(5, _kTxnTypeValues, _kTxnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr));
+const std::map<int, const char*> _TxnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kTxnTypeValues, _kTxnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr));
 
 std::ostream& operator<<(std::ostream& out, const TxnType::type& val) {
   std::map<int, const char*>::const_iterator it = _TxnType_VALUES_TO_NAMES.find(val);
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index a0a105c..6f3d42d 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -298,7 +298,8 @@ struct TxnType {
     REPL_CREATED = 1,
     READ_ONLY = 2,
     COMPACTION = 3,
-    MATER_VIEW_REBUILD = 4
+    MATER_VIEW_REBUILD = 4,
+    SOFT_DELETE = 5
   };
 };
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java
index ea2b2fc..39b4248 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java
@@ -13,7 +13,8 @@ public enum TxnType implements org.apache.thrift.TEnum {
   REPL_CREATED(1),
   READ_ONLY(2),
   COMPACTION(3),
-  MATER_VIEW_REBUILD(4);
+  MATER_VIEW_REBUILD(4),
+  SOFT_DELETE(5);
 
   private final int value;
 
@@ -45,6 +46,8 @@ public enum TxnType implements org.apache.thrift.TEnum {
         return COMPACTION;
       case 4:
         return MATER_VIEW_REBUILD;
+      case 5:
+        return SOFT_DELETE;
       default:
         return null;
     }
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php
index 6c2e294..5179566 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php
@@ -28,12 +28,15 @@ final class TxnType
 
     const MATER_VIEW_REBUILD = 4;
 
+    const SOFT_DELETE = 5;
+
     static public $__names = array(
         0 => 'DEFAULT',
         1 => 'REPL_CREATED',
         2 => 'READ_ONLY',
         3 => 'COMPACTION',
         4 => 'MATER_VIEW_REBUILD',
+        5 => 'SOFT_DELETE',
     );
 }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 3e34455..5c15eb1 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -377,6 +377,7 @@ class TxnType(object):
     READ_ONLY = 2
     COMPACTION = 3
     MATER_VIEW_REBUILD = 4
+    SOFT_DELETE = 5
 
     _VALUES_TO_NAMES = {
         0: "DEFAULT",
@@ -384,6 +385,7 @@ class TxnType(object):
         2: "READ_ONLY",
         3: "COMPACTION",
         4: "MATER_VIEW_REBUILD",
+        5: "SOFT_DELETE",
     }
 
     _NAMES_TO_VALUES = {
@@ -392,6 +394,7 @@ class TxnType(object):
         "READ_ONLY": 2,
         "COMPACTION": 3,
         "MATER_VIEW_REBUILD": 4,
+        "SOFT_DELETE": 5,
     }
 
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 8cecd8f..195de51 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -170,8 +170,9 @@ module TxnType
   READ_ONLY = 2
   COMPACTION = 3
   MATER_VIEW_REBUILD = 4
-  VALUE_MAP = {0 => "DEFAULT", 1 => "REPL_CREATED", 2 => "READ_ONLY", 3 => "COMPACTION", 4 => "MATER_VIEW_REBUILD"}
-  VALID_VALUES = Set.new([DEFAULT, REPL_CREATED, READ_ONLY, COMPACTION, MATER_VIEW_REBUILD]).freeze
+  SOFT_DELETE = 5
+  VALUE_MAP = {0 => "DEFAULT", 1 => "REPL_CREATED", 2 => "READ_ONLY", 3 => "COMPACTION", 4 => "MATER_VIEW_REBUILD", 5 => "SOFT_DELETE"}
+  VALID_VALUES = Set.new([DEFAULT, REPL_CREATED, READ_ONLY, COMPACTION, MATER_VIEW_REBUILD, SOFT_DELETE]).freeze
 end
 
 module GetTablesExtRequestFields
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index a077ab1..8e373e4 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1759,10 +1760,22 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     req.setDeleteData(options.deleteData);
     req.setNeedResult(options.returnResults);
     req.setIfExists(options.ifExists);
+    
+    EnvironmentContext context = null;
     if (options.purgeData) {
       LOG.info("Dropped partitions will be purged!");
-      req.setEnvironmentContext(getEnvironmentContextWithIfPurgeSet());
+      context = getEnvironmentContextWithIfPurgeSet();
     }
+    if (options.writeId != null) {
+      context = Optional.ofNullable(context).orElse(new EnvironmentContext());
+      context.putToProperties("writeId", options.writeId.toString());
+    }
+    if (options.txnId != null) {
+      context = Optional.ofNullable(context).orElse(new EnvironmentContext());
+      context.putToProperties("txnId", options.txnId.toString());
+    }
+    req.setEnvironmentContext(context);
+    
     return client.drop_partitions_req(req).getPartitions();
   }
 
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
index 3488062..da6d634 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
@@ -128,8 +128,8 @@ public class LockComponentBuilder {
     return component;
   }
 
-  public LockComponent setLock(LockType type) {
+  public LockComponentBuilder setLock(LockType type) {
     component.setType(type);
-    return component;
+    return this;
   }
 }
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionDropOptions.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionDropOptions.java
index 40018c9..db61aeb 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionDropOptions.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionDropOptions.java
@@ -28,6 +28,9 @@ public class PartitionDropOptions {
   public boolean returnResults = true;
   public boolean purgeData = false;
 
+  public Long writeId;
+  public Long txnId;
+
   public static PartitionDropOptions instance() { return new PartitionDropOptions(); }
 
   public PartitionDropOptions deleteData(boolean deleteData) {
@@ -50,5 +53,14 @@ public class PartitionDropOptions {
     return this;
   }
 
+  public PartitionDropOptions setWriteId(Long writeId) {
+    this.writeId = writeId;
+    return this;
+  }
+
+  public void setTxnId(Long txnId) {
+    this.txnId = txnId;
+  }
+
 } // class PartitionDropSwitches;
 
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index fa5cd19..737b7b5 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -997,7 +997,8 @@ enum TxnType {
     REPL_CREATED = 1,
     READ_ONLY    = 2,
     COMPACTION   = 3,
-    MATER_VIEW_REBUILD = 4
+    MATER_VIEW_REBUILD = 4,
+    SOFT_DELETE = 5
 }
 
 // specifies which info to return with GetTablesExtRequest
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
index 2ecdb96..2034d85 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
@@ -19,7 +19,11 @@
 package org.apache.hadoop.hive.metastore;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.HiveObjectType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -35,6 +39,14 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.isMustPurge;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.throwMetaException;
+
 
 /**
  * It handles cleanup of dropped partition/table/database in ACID related metastore tables
@@ -68,10 +80,45 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
 
   @Override
   public void onDropPartition(DropPartitionEvent partitionEvent)  throws MetaException {
-    if (TxnUtils.isTransactionalTable(partitionEvent.getTable())) {
+    Table table = partitionEvent.getTable();
+    EnvironmentContext context = partitionEvent.getEnvironmentContext();
+
+    if (TxnUtils.isTransactionalTable(table)) {
       txnHandler = getTxnHandler();
-      txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(),
-          partitionEvent.getPartitionIterator());
+      txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, table, partitionEvent.getPartitionIterator());
+
+      if (!partitionEvent.getDeleteData()) {
+        long currentTxn = Optional.ofNullable(context).map(EnvironmentContext::getProperties)
+          .map(prop -> prop.get("txnId")).map(Long::parseLong)
+          .orElse(0L);
+
+        long writeId = Optional.ofNullable(context).map(EnvironmentContext::getProperties)
+          .map(prop -> prop.get("writeId")).map(Long::parseLong)
+          .orElse(0L);
+
+        try {
+          if (currentTxn > 0) {
+            CompactionRequest rqst = new CompactionRequest(
+                table.getDbName(), table.getTableName(), CompactionType.MAJOR);
+            rqst.setRunas(TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf));
+            rqst.putToProperties("ifPurge", Boolean.toString(isMustPurge(context, table)));
+
+            Iterator<Partition> partitionIterator = partitionEvent.getPartitionIterator();
+            while (partitionIterator.hasNext()) {
+              Partition p = partitionIterator.next();
+              
+              List<FieldSchema> partCols = partitionEvent.getTable().getPartitionKeys();  // partition columns
+              List<String> partVals = p.getValues();
+              rqst.setPartitionname(Warehouse.makePartName(partCols, partVals));
+              rqst.putToProperties("location", p.getSd().getLocation());
+              
+              txnHandler.submitForCleanup(rqst, writeId, currentTxn);
+            }
+          }
+        } catch ( InterruptedException | IOException e) {
+          throwMetaException(e);
+        }
+      }
     }
   }
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index fab149c..ff79412 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.AcidConstants;
 import org.apache.hadoop.hive.common.AcidMetaDataFile;
+import org.apache.hadoop.hive.common.AcidMetaDataFile.DataFormat;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
@@ -104,6 +105,7 @@ import java.util.regex.Pattern;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.join;
+
 import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION;
 import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS;
 import static org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
@@ -3401,7 +3403,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
   }
 
   private void truncateTableInternal(String dbName, String tableName, List<String> partNames,
-                                     String validWriteIds, long writeId, EnvironmentContext context) throws MetaException, NoSuchObjectException {
+      String validWriteIds, long writeId, EnvironmentContext context) throws MetaException, NoSuchObjectException {
     boolean isSkipTrash = false, needCmRecycle = false;
     try {
       String[] parsedDbName = parseDbName(dbName, conf);
@@ -3423,13 +3425,12 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
         // This is not transactional
         for (Path location : getLocationsForTruncate(getMS(), parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
             tbl, partNames)) {
-          FileSystem fs = location.getFileSystem(getConf());
           if (!skipDataDeletion) {
-            truncateDataFiles(location, fs, isSkipTrash, needCmRecycle);
+            truncateDataFiles(location, isSkipTrash, needCmRecycle);
           } else {
             // For Acid tables we don't need to delete the old files, only write an empty baseDir.
             // Compaction and cleaner will take care of the rest
-            addTruncateBaseFile(location, writeId, fs);
+            addTruncateBaseFile(location, writeId, DataFormat.TRUNCATED);
           }
         }
       }
@@ -3448,24 +3449,35 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
    * Add an empty baseDir with a truncate metadatafile
    * @param location partition or table directory
    * @param writeId allocated writeId
-   * @param fs FileSystem
    * @throws Exception
    */
-  private void addTruncateBaseFile(Path location, long writeId, FileSystem fs) throws Exception {
+  private void addTruncateBaseFile(Path location, long writeId, DataFormat dataFormat) 
+      throws MetaException {
+    if (location == null) 
+      return;
+    
     Path basePath = new Path(location, AcidConstants.baseDir(writeId));
-    fs.mkdirs(basePath);
-    // We can not leave the folder empty, otherwise it will be skipped at some file listing in AcidUtils
-    // No need for a data file, a simple metadata is enough
-    AcidMetaDataFile.writeToFile(fs, basePath, AcidMetaDataFile.DataFormat.TRUNCATED);
+    try {
+      FileSystem fs = location.getFileSystem(getConf());
+      fs.mkdirs(basePath);
+      // We can not leave the folder empty, otherwise it will be skipped at some file listing in AcidUtils
+      // No need for a data file, a simple metadata is enough
+      AcidMetaDataFile.writeToFile(fs, basePath, dataFormat);
+    } catch (Exception e) {
+      throw newMetaException(e);
+    }
   }
 
-  private void truncateDataFiles(Path location, FileSystem fs, boolean isSkipTrash, boolean needCmRecycle)
-      throws IOException, MetaException, NoSuchObjectException {
+  private void truncateDataFiles(Path location, boolean isSkipTrash, boolean needCmRecycle)
+      throws IOException, MetaException {
+    FileSystem fs = location.getFileSystem(getConf());
+    
     if (!HdfsUtils.isPathEncrypted(getConf(), fs.getUri(), location) &&
         !FileUtils.pathHasSnapshotSubDir(location, fs)) {
       HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getConf(), fs, location);
       FileStatus targetStatus = fs.getFileStatus(location);
       String targetGroup = targetStatus == null ? null : targetStatus.getGroup();
+      
       wh.deleteDir(location, true, isSkipTrash, needCmRecycle);
       fs.mkdirs(location);
       HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, location, false);
@@ -5029,21 +5041,21 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
     }
   }
 
-  private boolean drop_partition_common(RawStore ms, String catName, String db_name,
-                                        String tbl_name, List<String> part_vals,
-                                        final boolean deleteData, final EnvironmentContext envContext)
-      throws MetaException, NoSuchObjectException, IOException, InvalidObjectException,
-      InvalidInputException {
-    boolean success = false;
+  private boolean drop_partition_common(RawStore ms, String catName, String db_name, String tbl_name, 
+      List<String> part_vals, boolean deleteData, final EnvironmentContext envContext)
+      throws MetaException, NoSuchObjectException, IOException, InvalidObjectException, InvalidInputException {
     Path partPath = null;
-    Table tbl = null;
-    Partition part = null;
     boolean isArchived = false;
     Path archiveParentDir = null;
+    boolean success = false;
+
+    Table tbl = null;
+    Partition part = null;
     boolean mustPurge = false;
-    boolean tableDataShouldBeDeleted = false;
-    boolean needsCm = false;
+    long writeId = 0;
+    
     Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+    boolean needsCm = false;
 
     if (db_name == null) {
       throw new MetaException("The DB name cannot be null.");
@@ -5057,19 +5069,19 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
 
     try {
       ms.openTransaction();
+      
       part = ms.getPartition(catName, db_name, tbl_name, part_vals);
       GetTableRequest request = new GetTableRequest(db_name,tbl_name);
       request.setCatName(catName);
       tbl = get_table_core(request);
-      tableDataShouldBeDeleted = checkTableDataShouldBeDeleted(tbl, deleteData);
       firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this));
+      
       mustPurge = isMustPurge(envContext, tbl);
-
+      writeId = getWriteId(envContext);
+            
       if (part == null) {
-        throw new NoSuchObjectException("Partition doesn't exist. "
-            + part_vals);
+        throw new NoSuchObjectException("Partition doesn't exist. " + part_vals);
       }
-
       isArchived = MetaStoreUtils.isArchived(part);
       if (isArchived) {
         archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
@@ -5098,28 +5110,26 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
     } finally {
       if (!success) {
         ms.rollbackTransaction();
-      } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) {
-        if (tableDataShouldBeDeleted) {
-          if (mustPurge) {
-            LOG.info("dropPartition() will purge " + partPath + " directly, skipping trash.");
-          }
-          else {
-            LOG.info("dropPartition() will move " + partPath + " to trash-directory.");
-          }
-          // Archived partitions have har:/to_har_file as their location.
-          // The original directory was saved in params
+      } else if (checkTableDataShouldBeDeleted(tbl, deleteData) && 
+          (partPath != null || archiveParentDir != null)) {
 
-          if (isArchived) {
-            assert (archiveParentDir != null);
-            wh.deleteDir(archiveParentDir, true, mustPurge, needsCm);
-          } else {
-            assert (partPath != null);
-            wh.deleteDir(partPath, true, mustPurge, needsCm);
-            deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, needsCm);
-          }
-          // ok even if the data is not deleted
+        LOG.info(mustPurge ?
+          "dropPartition() will purge " + partPath + " directly, skipping trash." :
+          "dropPartition() will move " + partPath + " to trash-directory.");
+          
+        // Archived partitions have har:/to_har_file as their location.
+        // The original directory was saved in params
+        if (isArchived) {
+          wh.deleteDir(archiveParentDir, true, mustPurge, needsCm);
+        } else {
+          wh.deleteDir(partPath, true, mustPurge, needsCm);
+          deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, needsCm);
         }
+        // ok even if the data is not deleted
+      } else if (TxnUtils.isTransactionalTable(tbl) && writeId > 0) {
+        addTruncateBaseFile(partPath, writeId, DataFormat.DROPPED);
       }
+    
       if (!listeners.isEmpty()) {
         MetaStoreListenerNotifier.notifyEvent(listeners,
             EventType.DROP_PARTITION,
@@ -5131,7 +5141,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
     return true;
   }
 
-  private static boolean isMustPurge(EnvironmentContext envContext, Table tbl) {
+  static boolean isMustPurge(EnvironmentContext envContext, Table tbl) {
     // Data needs deletion. Check if trash may be skipped.
     // Trash may be skipped iff:
     //  1. deleteData == true, obviously.
@@ -5143,6 +5153,14 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
         || MetaStoreUtils.isSkipTrash(tbl.getParameters());
   }
 
+  private long getWriteId(EnvironmentContext context){
+    return Optional.ofNullable(context)
+      .map(EnvironmentContext::getProperties)
+      .map(prop -> prop.get("writeId"))
+      .map(Long::parseLong)
+      .orElse(0L);
+  }
+
   private void throwUnsupportedExceptionIfRemoteDB(String dbName, String operationName) throws MetaException {
     if (isDatabaseRemote(dbName)) {
       throw new MetaException("Operation " + operationName + " not supported for REMOTE database " + dbName);
@@ -5218,32 +5236,38 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
     RawStore ms = getMS();
     String dbName = request.getDbName(), tblName = request.getTblName();
     String catName = request.isSetCatName() ? request.getCatName() : getDefaultCatalog(conf);
+    
     boolean ifExists = request.isSetIfExists() && request.isIfExists();
     boolean deleteData = request.isSetDeleteData() && request.isDeleteData();
     boolean ignoreProtection = request.isSetIgnoreProtection() && request.isIgnoreProtection();
     boolean needResult = !request.isSetNeedResult() || request.isNeedResult();
+    
     List<PathAndDepth> dirsToDelete = new ArrayList<>();
     List<Path> archToDelete = new ArrayList<>();
-    EnvironmentContext envContext = request.isSetEnvironmentContext()
-        ? request.getEnvironmentContext() : null;
-
+    EnvironmentContext envContext = 
+        request.isSetEnvironmentContext() ? request.getEnvironmentContext() : null;
     boolean success = false;
-    ms.openTransaction();
+    
     Table tbl = null;
     List<Partition> parts = null;
     boolean mustPurge = false;
+    long writeId = 0;
+    
     Map<String, String> transactionalListenerResponses = null;
-    boolean needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, dbName),
-        ms.getTable(catName, dbName, tblName));
-
+    boolean needsCm = false;
+    
     try {
+      ms.openTransaction();
       // We need Partition-s for firing events and for result; DN needs MPartition-s to drop.
       // Great... Maybe we could bypass fetching MPartitions by issuing direct SQL deletes.
       tbl = get_table_core(catName, dbName, tblName);
       mustPurge = isMustPurge(envContext, tbl);
+      writeId = getWriteId(envContext);
+      
       int minCount = 0;
       RequestPartsSpec spec = request.getParts();
       List<String> partNames = null;
+      
       if (spec.isSetExprs()) {
         // Dropping by expressions.
         parts = new ArrayList<>(spec.getExprs().size());
@@ -5291,7 +5315,6 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
       }
 
       for (Partition part : parts) {
-
         // TODO - we need to speed this up for the normal path where all partitions are under
         // the table and we don't have to stat every partition
 
@@ -5314,24 +5337,24 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
       }
 
       ms.dropPartitions(catName, dbName, tblName, partNames);
-      if (parts != null && !parts.isEmpty() && !transactionalListeners.isEmpty()) {
+      if (!parts.isEmpty() && !transactionalListeners.isEmpty()) {
         transactionalListenerResponses = MetaStoreListenerNotifier
             .notifyEvent(transactionalListeners, EventType.DROP_PARTITION,
                 new DropPartitionEvent(tbl, parts, true, deleteData, this), envContext);
       }
-
       success = ms.commitTransaction();
+      needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, dbName), tbl);
+      
       DropPartitionsResult result = new DropPartitionsResult();
       if (needResult) {
         result.setPartitions(parts);
       }
-
       return result;
     } finally {
       if (!success) {
         ms.rollbackTransaction();
       } else if (checkTableDataShouldBeDeleted(tbl, deleteData)) {
-        LOG.info( mustPurge?
+        LOG.info(mustPurge ?
             "dropPartition() will purge partition-directories directly, skipping trash."
             :  "dropPartition() will move partition-directories to trash-directory.");
         // Archived partitions have har:/to_har_file as their location.
@@ -5369,10 +5392,19 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
             throw new MetaException("Failed to delete parent: " + ex.getMessage());
           }
         }
+      } else if (TxnUtils.isTransactionalTable(tbl) && writeId > 0) {
+        for (Partition part : parts) {
+          if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
+            Path partPath = new Path(part.getSd().getLocation());
+            verifyIsWritablePath(partPath);
+            
+            addTruncateBaseFile(partPath, writeId, DataFormat.DROPPED);
+          }
+        }
       }
 
       if (parts != null) {
-        if (parts != null && !parts.isEmpty() && !listeners.isEmpty()) {
+        if (!parts.isEmpty() && !listeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(listeners,
                 EventType.DROP_PARTITION,
                 new DropPartitionEvent(tbl, parts, success, deleteData, this),
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 9cece8a..3a159f7 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -366,6 +366,7 @@ class CompactionTxnHandler extends TxnHandler {
             info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
             info.runAs = rs.getString(6);
             info.highestWriteId = rs.getLong(7);
+            info.properties = rs.getString(8);
             if (LOG.isDebugEnabled()) {
               LOG.debug("Found ready to clean: " + info.toString());
             }
@@ -1503,7 +1504,7 @@ class CompactionTxnHandler extends TxnHandler {
   protected void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId,
       long tempId) throws SQLException, MetaException {
     super.updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnType, commitId, tempId);
-    if (txnType == TxnType.COMPACTION) {
+    if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
       stmt.executeUpdate(
           "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + commitId + ", \"CQ_COMMIT_TIME\" = " +
               getEpochFn(dbProduct) + " WHERE \"CQ_TXN_ID\" = " + txnid);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 887da84..2a9d6bb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -55,6 +55,7 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import javax.sql.DataSource;
 
@@ -167,6 +168,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Splitter;
 
+import static org.apache.commons.lang3.StringUtils.repeat;
 import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
 import static org.apache.hadoop.hive.metastore.txn.TxnUtils.executeQueriesInBatchNoCount;
 import static org.apache.hadoop.hive.metastore.txn.TxnUtils.executeQueriesInBatch;
@@ -3814,6 +3816,74 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  @Override
+  @RetrySemantics.Idempotent
+  public boolean submitForCleanup(CompactionRequest rqst, long highestWriteId, long txnId) throws MetaException {
+    // Put a compaction request in the queue.
+    try {
+      Connection dbConn = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        lockInternal();
+
+        List<String> params = new ArrayList<String>() {{
+          add(rqst.getDbname());
+          add(rqst.getTablename());
+        }};
+        long cqId;
+        try (Statement stmt = dbConn.createStatement()) {
+          cqId = generateCompactionQueueId(stmt);
+        }
+        StringBuilder buf = new StringBuilder(
+          "INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TXN_ID\", \"CQ_ENQUEUE_TIME\", \"CQ_DATABASE\", \"CQ_TABLE\", ");
+        String partName = rqst.getPartitionname();
+        if (partName != null) {
+          buf.append("\"CQ_PARTITION\", ");
+          params.add(partName);
+        }
+        buf.append("\"CQ_STATE\", \"CQ_TYPE\"");
+        params.add(String.valueOf(READY_FOR_CLEANING));
+        params.add(thriftCompactionType2DbType(rqst.getType()).toString());
+
+        if (rqst.getProperties() != null) {
+          buf.append(", \"CQ_TBLPROPERTIES\"");
+          params.add(new StringableMap(rqst.getProperties()).toString());
+        }
+        if (rqst.getRunas() != null) {
+          buf.append(", \"CQ_RUN_AS\"");
+          params.add(rqst.getRunas());
+        }
+        buf.append(") values (")
+          .append(
+            Stream.of(cqId, highestWriteId, txnId, getEpochFn(dbProduct))
+              .map(Object::toString)
+              .collect(Collectors.joining(", ")))
+          .append(repeat(", ?", params.size()))
+          .append(")");
+
+        String s = buf.toString();
+        try (PreparedStatement pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params)) {
+          LOG.debug("Going to execute update <" + s + ">");
+          pst.executeUpdate();
+        }
+        LOG.debug("Going to commit");
+        dbConn.commit();
+        return true;
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback: ", e);
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "submitForCleanup(" + rqst + ")");
+        throw new MetaException("Failed to submit cleanup request: " +
+          StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      return submitForCleanup(rqst, highestWriteId, txnId);
+    }
+  }
+  
   private static String compactorStateToResponse(char s) {
     switch (s) {
       case INITIATED_STATE: return INITIATED_RESPONSE;
@@ -4274,6 +4344,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               partVals = p.getValues();
               partName = Warehouse.makePartName(partCols, partVals);
 
+              buff.setLength(0);
               buff.append("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_DATABASE\"='");
               buff.append(dbName);
               buff.append("' AND \"TC_TABLE\"='");
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 58e0999..5837727 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -346,6 +346,9 @@ public interface TxnStore extends Configurable {
   @RetrySemantics.Idempotent
   CompactionResponse compact(CompactionRequest rqst) throws MetaException;
 
+  @RetrySemantics.Idempotent
+  boolean submitForCleanup(CompactionRequest rqst, long highestWriteId, long txnId) throws MetaException;
+
   /**
    * Show list of current compactions.
    * @param rqst info on which compactions to show
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 13d45d1..eca6caa 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hive.metastore.txn;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -31,9 +34,13 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -495,4 +502,64 @@ public class TxnUtils {
     DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct, conf);
     stmt.execute(databaseProduct.getTxnSeedFn(seedTxnId));
   }
+
+  /**
+   * Determine which user to run an operation as. If metastore.compactor.run.as.user is set, that user will be
+   * returned; if not: the the owner of the directory to be compacted.
+   * It is asserted that either the user running the hive metastore or the table
+   * owner must be able to stat the directory and determine the owner.
+   * @param location directory that will be read or written to.
+   * @param t metastore table object
+   * @return metastore.compactor.run.as.user value; or if that is not set: username of the owner of the location.
+   * @throws java.io.IOException if neither the hive metastore user nor the table owner can stat
+   * the location.
+   */
+  public static String findUserToRunAs(String location, Table t, Configuration conf)
+    throws IOException, InterruptedException {
+    LOG.debug("Determining who to run the job as.");
+
+    // check if a specific user is set in config
+    String runUserAs = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.COMPACTOR_RUN_AS_USER);
+    if (runUserAs != null && !"".equals(runUserAs)) {
+      return runUserAs;
+    }
+    // get table directory owner
+    Path p = new Path(location);
+    FileSystem fs = p.getFileSystem(conf);
+
+    try {
+      FileStatus stat = fs.getFileStatus(p);
+      LOG.debug("Running job as " + stat.getOwner());
+      return stat.getOwner();
+    } catch (AccessControlException e) {
+      // TODO not sure this is the right exception
+      LOG.debug("Unable to stat file as current user, trying as table owner");
+
+      // Now, try it as the table owner and see if we get better luck.
+      List<String> wrapper = new ArrayList<>(1);
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
+        UserGroupInformation.getLoginUser());
+
+      ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        // need to use a new filesystem object here to have the correct ugi
+        FileSystem proxyFs = p.getFileSystem(conf);
+        FileStatus stat = proxyFs.getFileStatus(p);
+        wrapper.add(stat.getOwner());
+        return null;
+      });
+
+      try {
+        FileSystem.closeAllForUGI(ugi);
+      } catch (IOException exception) {
+        LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
+      }
+      if (wrapper.size() == 1) {
+        LOG.debug("Running job as " + wrapper.get(0));
+        return wrapper.get(0);
+      }
+    }
+    LOG.error("Unable to stat file " + p + " as either current user(" +
+      UserGroupInformation.getLoginUser() + ") or table owner(" + t.getOwner() + "), giving up");
+    throw new IOException("Unable to stat file: " + p);
+  }
 }
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/AcidMetaDataFile.java b/storage-api/src/java/org/apache/hadoop/hive/common/AcidMetaDataFile.java
index e4a9abd..e833060 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/AcidMetaDataFile.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/AcidMetaDataFile.java
@@ -55,7 +55,9 @@ public class AcidMetaDataFile {
     // written by Major compaction
     COMPACTED,
     // written by truncate
-    TRUNCATED;
+    TRUNCATED,
+    // written by drop partition
+    DROPPED;
 
     @Override
     public String toString() {