You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by de...@apache.org on 2023/03/23 15:05:11 UTC

[hive] branch master updated: HIVE-25032: Optimise PartitionManagementTask (#4028) (Zhihua Deng, reviewed by Sai Hemanth Gantasala, Henri Biestro)

This is an automated email from the ASF dual-hosted git repository.

dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a1ff44ccd43 HIVE-25032: Optimise PartitionManagementTask (#4028) (Zhihua Deng, reviewed by Sai Hemanth Gantasala, Henri Biestro)
a1ff44ccd43 is described below

commit a1ff44ccd434373c7eef56fc081b40c343a23f33
Author: dengzh <de...@gmail.com>
AuthorDate: Thu Mar 23 23:05:01 2023 +0800

    HIVE-25032: Optimise PartitionManagementTask (#4028) (Zhihua Deng, reviewed by Sai Hemanth Gantasala, Henri Biestro)
---
 .../ql/exec/TestMsckDropPartitionsInBatches.java   |   7 +-
 .../thrift/gen-cpp/hive_metastore_constants.cpp    |   4 +
 .../gen/thrift/gen-cpp/hive_metastore_constants.h  |   2 +
 .../metastore/api/hive_metastoreConstants.java     |   4 +
 .../src/gen/thrift/gen-php/metastore/Constant.php  |  12 +++
 .../gen/thrift/gen-py/hive_metastore/constants.py  |   2 +
 .../gen/thrift/gen-rb/hive_metastore_constants.rb  |   4 +
 .../src/main/thrift/hive_metastore.thrift          |   2 +
 .../apache/hadoop/hive/metastore/HMSHandler.java   |   2 +-
 .../hive/metastore/HiveMetaStoreChecker.java       |  21 +++-
 .../org/apache/hadoop/hive/metastore/Msck.java     |  22 ++--
 .../metastore/MsckPartitionExpressionProxy.java    |  37 ++++---
 .../hadoop/hive/metastore/PartitionIterable.java   |  27 ++++-
 .../hive/metastore/PartitionManagementTask.java    | 117 ++++++++++-----------
 .../GetPartitionProjectionsSpecBuilder.java        |  16 ++-
 .../hive/metastore/parser/ExpressionTree.java      |  12 ++-
 .../hive/metastore/utils/MetaStoreServerUtils.java |  39 +++++++
 .../hive/metastore/TestPartitionManagement.java    |  16 +++
 .../hadoop/hive/metastore/tools/BenchmarkTool.java |  30 +++++-
 .../hadoop/hive/metastore/tools/HMSBenchmarks.java |  92 ++++++++++++++++
 .../hadoop/hive/metastore/tools/HMSClient.java     |   6 +-
 21 files changed, 360 insertions(+), 114 deletions(-)

diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java
index e7318bf6d34..a61e066266a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.CheckResult.PartitionResult;
@@ -274,10 +275,12 @@ public class TestMsckDropPartitionsInBatches {
 
     assertEquals(expectedCallCount, droppedParts.size());
     for (int i = 0; i < expectedCallCount; i++) {
+      List<Pair<Integer, byte[]>> actualArgs = droppedParts.get(i);
+      int actualPartitionSize = actualArgs.get(0).getLeft();
       Assert.assertEquals(
         String.format("Unexpected batch size in attempt %d.  Expected: %d.  Found: %d", i + 1,
-          expectedBatchSizes[i], droppedParts.get(i).size()),
-        expectedBatchSizes[i], droppedParts.get(i).size());
+          expectedBatchSizes[i], actualPartitionSize),
+        expectedBatchSizes[i], actualPartitionSize);
     }
   }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
index a94b58fa2f5..9f0b0c8cf8d 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
@@ -27,6 +27,10 @@ hive_metastoreConstants::hive_metastoreConstants() {
 
   HIVE_FILTER_FIELD_LAST_ACCESS = "hive_filter_field_last_access__";
 
+  HIVE_FILTER_FIELD_TABLE_NAME = "hive_filter_field_tableName__";
+
+  HIVE_FILTER_FIELD_TABLE_TYPE = "hive_filter_field_tableType__";
+
   IS_ARCHIVED = "is_archived";
 
   ORIGINAL_LOCATION = "original_location";
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.h
index ddff39d4735..504b54a01d9 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_constants.h
@@ -23,6 +23,8 @@ class hive_metastoreConstants {
   std::string HIVE_FILTER_FIELD_OWNER;
   std::string HIVE_FILTER_FIELD_PARAMS;
   std::string HIVE_FILTER_FIELD_LAST_ACCESS;
+  std::string HIVE_FILTER_FIELD_TABLE_NAME;
+  std::string HIVE_FILTER_FIELD_TABLE_TYPE;
   std::string IS_ARCHIVED;
   std::string ORIGINAL_LOCATION;
   std::string IS_IMMUTABLE;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
index 02eed33ce8d..f5a102ab964 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
@@ -25,6 +25,10 @@ package org.apache.hadoop.hive.metastore.api;
 
   public static final java.lang.String HIVE_FILTER_FIELD_LAST_ACCESS = "hive_filter_field_last_access__";
 
+  public static final java.lang.String HIVE_FILTER_FIELD_TABLE_NAME = "hive_filter_field_tableName__";
+
+  public static final java.lang.String HIVE_FILTER_FIELD_TABLE_TYPE = "hive_filter_field_tableType__";
+
   public static final java.lang.String IS_ARCHIVED = "is_archived";
 
   public static final java.lang.String ORIGINAL_LOCATION = "original_location";
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Constant.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Constant.php
index 84809315937..84961065fd5 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Constant.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Constant.php
@@ -26,6 +26,8 @@ final class Constant extends \Thrift\Type\TConstant
     static protected $HIVE_FILTER_FIELD_OWNER;
     static protected $HIVE_FILTER_FIELD_PARAMS;
     static protected $HIVE_FILTER_FIELD_LAST_ACCESS;
+    static protected $HIVE_FILTER_FIELD_TABLE_NAME;
+    static protected $HIVE_FILTER_FIELD_TABLE_TYPE;
     static protected $IS_ARCHIVED;
     static protected $ORIGINAL_LOCATION;
     static protected $IS_IMMUTABLE;
@@ -101,6 +103,16 @@ final class Constant extends \Thrift\Type\TConstant
         return "hive_filter_field_last_access__";
     }
 
+    protected static function init_HIVE_FILTER_FIELD_TABLE_NAME()
+    {
+        return "hive_filter_field_tableName__";
+    }
+
+    protected static function init_HIVE_FILTER_FIELD_TABLE_TYPE()
+    {
+        return "hive_filter_field_tableType__";
+    }
+
     protected static function init_IS_ARCHIVED()
     {
         return "is_archived";
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py
index 7c98dd25fbe..b5891397a6e 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/constants.py
@@ -20,6 +20,8 @@ ACCESSTYPE_READWRITE = 8
 HIVE_FILTER_FIELD_OWNER = "hive_filter_field_owner__"
 HIVE_FILTER_FIELD_PARAMS = "hive_filter_field_params__"
 HIVE_FILTER_FIELD_LAST_ACCESS = "hive_filter_field_last_access__"
+HIVE_FILTER_FIELD_TABLE_NAME = "hive_filter_field_tableName__"
+HIVE_FILTER_FIELD_TABLE_TYPE = "hive_filter_field_tableType__"
 IS_ARCHIVED = "is_archived"
 ORIGINAL_LOCATION = "original_location"
 IS_IMMUTABLE = "immutable"
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb
index 05135527cd1..e7c30a2c4dc 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_constants.rb
@@ -23,6 +23,10 @@ HIVE_FILTER_FIELD_PARAMS = %q"hive_filter_field_params__"
 
 HIVE_FILTER_FIELD_LAST_ACCESS = %q"hive_filter_field_last_access__"
 
+HIVE_FILTER_FIELD_TABLE_NAME = %q"hive_filter_field_tableName__"
+
+HIVE_FILTER_FIELD_TABLE_TYPE = %q"hive_filter_field_tableType__"
+
 IS_ARCHIVED = %q"is_archived"
 
 ORIGINAL_LOCATION = %q"original_location"
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index bca23c8111c..20ea52b71e1 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -164,6 +164,8 @@ enum PrincipalType {
 const string HIVE_FILTER_FIELD_OWNER = "hive_filter_field_owner__"
 const string HIVE_FILTER_FIELD_PARAMS = "hive_filter_field_params__"
 const string HIVE_FILTER_FIELD_LAST_ACCESS = "hive_filter_field_last_access__"
+const string HIVE_FILTER_FIELD_TABLE_NAME = "hive_filter_field_tableName__"
+const string HIVE_FILTER_FIELD_TABLE_TYPE = "hive_filter_field_tableType__"
 
 enum PartitionEventType {
   LOAD_DONE = 1,
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index ffc6a162796..5c091836d81 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -5708,7 +5708,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
     GetPartitionsResponse response = null;
     Exception ex = null;
     try {
-      Table table = get_table_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName);
+      Table table = get_table_core(catName, parsedDbName[DB_NAME], tableName);
       List<Partition> partitions = getMS()
           .getPartitionSpecsByFilterAndProjection(table, request.getProjectionSpec(),
               request.getFilterSpec());
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
index 88d761dc800..63497e7036f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
@@ -29,12 +29,14 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPar
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionListByFilterExp;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionName;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionColtoTypeMap;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionsByProjectSpec;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPath;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isPartitioned;
 
 import java.io.IOException;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -58,12 +60,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.MetastoreException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.client.builder.GetPartitionProjectionsSpecBuilder;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
@@ -264,12 +270,17 @@ public class HiveMetaStoreChecker {
             MetastoreConf.getVar(conf, MetastoreConf.ConfVars.DEFAULTPARTITIONNAME), results);
         parts = new PartitionIterable(results);
       } else {
+        GetPartitionsRequest request = new GetPartitionsRequest(table.getDbName(), table.getTableName(),
+            null, null);
+        request.setProjectionSpec(new GetPartitionProjectionsSpecBuilder().addProjectField("sd.location")
+            .addProjectField("createTime").addProjectField("tableName")
+            .addProjectField("values").build());
+        request.setCatName(table.getCatName());
         int batchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
         if (batchSize > 0) {
-          parts = new PartitionIterable(getMsc(), table, batchSize);
+          parts = new PartitionIterable(getMsc(), table, batchSize).withProjectSpec(request);
         } else {
-          List<Partition> loadedPartitions = getAllPartitionsOf(getMsc(), table);
-          parts = new PartitionIterable(loadedPartitions);
+          parts = new PartitionIterable(getPartitionsByProjectSpec(msc, request));
         }
       }
     } else {
@@ -383,8 +394,8 @@ public class HiveMetaStoreChecker {
           pr.setTableName(partition.getTableName());
           result.getExpiredPartitions().add(pr);
           if (LOG.isDebugEnabled()) {
-            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
-                partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
+            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", table.getCatName(),
+                table.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
                 partitionAgeSeconds, partitionExpirySeconds);
           }
         }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
index 926875e514c..79bc4b49d1e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -130,9 +131,10 @@ public class Msck {
     String qualifiedTableName = null;
     boolean success = false;
     long txnId = -1;
-    long partitionExpirySeconds = msckInfo.getPartitionExpirySeconds();
+    long partitionExpirySeconds = -1;
     try {
       Table table = getMsc().getTable(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName());
+      partitionExpirySeconds = PartitionManagementTask.getRetentionPeriodInSeconds(table);
       qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table);
       HiveMetaStoreChecker checker = new HiveMetaStoreChecker(getMsc(), getConf(), partitionExpirySeconds);
       // checkMetastore call will fill in result with partitions that are present in filesystem
@@ -502,9 +504,9 @@ public class Msck {
     }.run();
   }
 
-  public static String makePartExpr(Map<String, String> spec)
+  private static String makePartExpr(Map<String, String> spec)
     throws MetaException {
-    StringBuilder suffixBuf = new StringBuilder();
+    StringBuilder suffixBuf = new StringBuilder("(");
     int i = 0;
     for (Map.Entry<String, String> e : spec.entrySet()) {
       if (e.getValue() == null || e.getValue().length() == 0) {
@@ -518,6 +520,7 @@ public class Msck {
       suffixBuf.append("'").append(Warehouse.escapePathName(e.getValue())).append("'");
       i++;
     }
+    suffixBuf.append(")");
     return suffixBuf.toString();
   }
 
@@ -536,7 +539,8 @@ public class Msck {
     if (expiredPartitions != null && !expiredPartitions.isEmpty()) {
       batchWork.addAll(expiredPartitions);
     }
-    PartitionDropOptions dropOptions = new PartitionDropOptions().deleteData(deleteData).ifExists(true);
+    PartitionDropOptions dropOptions = new PartitionDropOptions().deleteData(deleteData)
+        .ifExists(true).returnResults(false);
     new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) {
       @Override
       public Void execute(int size) throws MetastoreException {
@@ -589,7 +593,7 @@ public class Msck {
       }
 
       private List<Pair<Integer, byte[]>> getPartitionExpr(final List<String> parts) throws MetaException {
-        List<Pair<Integer, byte[]>> expr = new ArrayList<>(parts.size());
+        StringBuilder exprBuilder = new StringBuilder();
         for (int i = 0; i < parts.size(); i++) {
           String partName = parts.get(i);
           Map<String, String> partSpec = Warehouse.makeSpecFromName(partName);
@@ -597,9 +601,13 @@ public class Msck {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Generated partExpr: {} for partName: {}", partExpr, partName);
           }
-          expr.add(Pair.of(i, partExpr.getBytes(StandardCharsets.UTF_8)));
+          if (i > 0) {
+            exprBuilder.append(" OR ");
+          }
+          exprBuilder.append(partExpr);
         }
-        return expr;
+        return Lists.newArrayList(Pair.of(parts.size(),
+            exprBuilder.toString().getBytes(StandardCharsets.UTF_8)));
       }
     }.run();
   }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java
index fcc0d8a2648..094a80cac1c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java
@@ -60,31 +60,30 @@ public class MsckPartitionExpressionProxy implements PartitionExpressionProxy {
     }
     //This is to find in partitionNames all that match expr
     //reverse of the Msck.makePartExpr
-    Set<String> partValueSet = new HashSet<>();
-    String[] parts = partExpr.split(" AND ");
-    for ( String part : parts){
-      String[] colAndValue = part.split("=");
-      String key = FileUtils.unescapePathName(colAndValue[0]);
-      //take the value inside without the single quote marks '2018-10-30' becomes 2018-10-31
-      String value = FileUtils.unescapePathName(colAndValue[1].substring(1, colAndValue[1].length()-1));
-      partValueSet.add(key+"="+value);
+    Set<String> filterParts = new HashSet<>();
+    String[] partitions = partExpr.split(" OR ");
+    for (String part : partitions) {
+      part = part.substring(1, part.length() - 1);
+      String[] pKeyValues = part.split(" AND ");
+      StringBuilder builder = new StringBuilder();
+      for (String pKeyValue : pKeyValues) {
+        String[] colAndValue = pKeyValue.split("=");
+        String key = FileUtils.unescapePathName(colAndValue[0]);
+        //take the value inside without the single quote marks '2018-10-30' becomes 2018-10-31
+        String value = FileUtils.unescapePathName(colAndValue[1].substring(1, colAndValue[1].length() - 1));
+        builder.append(key + "=" + value).append("/");
+      }
+      builder.setLength(builder.length() - 1);
+      filterParts.add(builder.toString());
     }
 
     List<String> partNamesSeq =  new ArrayList<>();
-    for (String partition : partitionNames){
-      boolean isMatch = true;
+    for (String part : partitionNames) {
       //list of partitions [year=2001/month=1, year=2002/month=2, year=2001/month=3]
       //Given expr: e.g. year='2001' AND month='1'. Only when all the expressions in the expr can be found,
       //do we add the partition to the filtered result [year=2001/month=1]
-      String [] partnames = partition.split("/");
-      for (String part: partnames) {
-        if (!partValueSet.contains(FileUtils.unescapePathName(part))){
-          isMatch = false;
-          break;
-        }
-      }
-      if (isMatch){
-        partNamesSeq.add(partition);
+      if (filterParts.contains(FileUtils.unescapePathName(part))) {
+        partNamesSeq.add(part);
       }
     }
     partitionNames.clear();
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java
index 127313eb256..30a7ffcc3f6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionIterable.java
@@ -24,9 +24,13 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsFilterSpec;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
 import org.apache.hadoop.hive.metastore.api.MetastoreException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionFilterMode;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.convertToGetPartitionsByNamesRequest;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependCatalogToDbName;
@@ -103,11 +107,18 @@ public class PartitionIterable implements Iterable<Partition> {
           batch_counter++;
         }
         try {
-          String dbName = prependCatalogToDbName(table.getCatName(), table.getDbName(), null);
-          GetPartitionsByNamesRequest req =
-              convertToGetPartitionsByNamesRequest(dbName, table.getTableName(), nameBatch);
-          batchIter =
-            msc.getPartitionsByNames(req).getPartitionsIterator();
+          if (request != null) {
+            GetPartitionsFilterSpec getPartitionsFilterSpec = new GetPartitionsFilterSpec();
+            getPartitionsFilterSpec.setFilterMode(PartitionFilterMode.BY_NAMES);
+            getPartitionsFilterSpec.setFilters(nameBatch);
+            request.setFilterSpec(getPartitionsFilterSpec);
+            batchIter = MetaStoreServerUtils.getPartitionsByProjectSpec(msc, request).iterator();
+          } else {
+            String dbName = prependCatalogToDbName(table.getCatName(), table.getDbName(), null);
+            GetPartitionsByNamesRequest req = convertToGetPartitionsByNamesRequest(dbName, table.getTableName(),
+                nameBatch);
+            batchIter = msc.getPartitionsByNames(req).getPartitionsIterator();
+          }
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
@@ -135,6 +146,7 @@ public class PartitionIterable implements Iterable<Partition> {
   private IMetaStoreClient msc = null; // Assumes one instance of this + single-threaded compilation for each query.
   private Table table = null;
   private List<String> partitionNames = null;
+  private GetPartitionsRequest request = null;
   private int batch_size;
 
   /**
@@ -167,4 +179,9 @@ public class PartitionIterable implements Iterable<Partition> {
       throw new MetastoreException(e);
     }
   }
+
+  public PartitionIterable withProjectSpec(GetPartitionsRequest request) {
+    this.request = request;
+    return this;
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
index a9ef163a9b0..ef0c4dcac47 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
@@ -23,7 +23,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -32,9 +31,10 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.TimeValidator;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -97,77 +96,84 @@ public class PartitionManagementTask implements MetastoreTaskThread {
       IMetaStoreClient msc = null;
       try {
         msc = new HiveMetaStoreClient(conf);
-        List<Table> candidateTables = new ArrayList<>();
         String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME);
         String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN);
         String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN);
         String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES);
         Set<String> tableTypesSet = new HashSet<>();
-        List<String> tableTypesList;
+        for (String type : tableTypes.split(",")) {
+          try {
+            tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());
+          } catch (IllegalArgumentException e) {
+            // ignore
+            LOG.warn("Unknown table type: {}", type);
+          }
+        }
         // if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
         // specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables.
-        if (tableTypes.isEmpty()) {
-          tableTypesList = Lists.newArrayList("");
-        } else {
-          for (String type : tableTypes.split(",")) {
-            try {
-              tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());
-            } catch (IllegalArgumentException e) {
-              // ignore
-              LOG.warn("Unknown table type: {}", type);
-            }
-          }
-          tableTypesList = Lists.newArrayList(tableTypesSet);
+        if (tableTypesSet.isEmpty()) {
+          LOG.info("Skipping partition management as no table types specified");
+          return;
         }
-        List<TableMeta> foundTableMetas = msc.getTableMeta(catalogName, dbPattern, tablePattern, tableTypesList);
-        LOG.info("Looking for tables using catalog: {} dbPattern: {} tablePattern: {} found: {}", catalogName,
-          dbPattern, tablePattern, foundTableMetas.size());
 
-        Map<String, Boolean> databasesToSkip = new HashMap<>();
+        StringBuilder filterBuilder = new StringBuilder()
+            .append(hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS)
+            .append("discover__partitions").append(" like \"true\" ");
+        boolean external = tableTypesSet.contains(TableType.EXTERNAL_TABLE.name());
+        boolean managed = tableTypesSet.contains(TableType.MANAGED_TABLE.name());
+        if (!managed && external) {
+          // only for external tables
+          filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
+              .append(" = \"").append(TableType.EXTERNAL_TABLE.name()).append("\" ");
+        } else if (managed && !external) {
+          // only for managed tables
+          filterBuilder.append(" and ").append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)
+              .append(" = \"").append(TableType.MANAGED_TABLE.name()).append("\" ");
+        }
+        if (!tablePattern.trim().isEmpty()) {
+          filterBuilder.append(" and ")
+              .append(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME)
+              .append(" like \"").append(tablePattern.replaceAll("\\*", ".*")).append("\"");
+        }
 
-        for (TableMeta tableMeta : foundTableMetas) {
-          try {
-            String dbName = MetaStoreUtils.prependCatalogToDbName(tableMeta.getCatName(), tableMeta.getDbName(), conf);
-            if (!databasesToSkip.containsKey(dbName)) {
-              databasesToSkip.put(dbName, MetaStoreUtils.checkIfDbNeedsToBeSkipped(
-                              msc.getDatabase(tableMeta.getCatName(), tableMeta.getDbName())));
-            }
-            if (databasesToSkip.get(dbName)) {
-              LOG.debug("Skipping table : {}", tableMeta.getTableName());
-              continue;
-            }
-            Table table = msc.getTable(tableMeta.getCatName(), tableMeta.getDbName(), tableMeta.getTableName());
-            if (partitionDiscoveryEnabled(table.getParameters())) {
-              candidateTables.add(table);
-            }
-          } catch (NoSuchObjectException e) {
-            // Ignore dropped tables after fetching TableMeta.
-            LOG.warn(e.getMessage());
+        List<String> databases = msc.getDatabases(catalogName, dbPattern);
+        List<TableName> candidates = new ArrayList<>();
+        for (String db : databases) {
+          Database database = msc.getDatabase(catalogName, db);
+          if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
+            LOG.debug("Skipping table under database: {}", db);
+            continue;
+          }
+          if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
+            LOG.info("Skipping table belongs to database {} being failed over.", db);
+            continue;
           }
+          List<String> tablesNames = msc.listTableNamesByFilter(catalogName, db,
+              filterBuilder.toString(), -1);
+          tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
         }
-        if (candidateTables.isEmpty()) {
+
+        if (candidates.isEmpty()) {
+          LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);
           return;
         }
+
         // TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also
         // will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also
         // defeats the purpose of thread pooled msck repair.
         int threadPoolSize = MetastoreConf.getIntVar(conf,
           MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE);
         final ExecutorService executorService = Executors
-          .newFixedThreadPool(Math.min(candidateTables.size(), threadPoolSize),
+          .newFixedThreadPool(Math.min(candidates.size(), threadPoolSize),
             new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build());
-        CountDownLatch countDownLatch = new CountDownLatch(candidateTables.size());
-        LOG.info("Found {} candidate tables for partition discovery", candidateTables.size());
+        CountDownLatch countDownLatch = new CountDownLatch(candidates.size());
+        LOG.info("Found {} candidate tables for partition discovery", candidates.size());
         setupMsckPathInvalidation();
         Configuration msckConf = Msck.getMsckConf(conf);
-        for (Table table : candidateTables) {
-          qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table);
-          long retentionSeconds = getRetentionPeriodInSeconds(table);
-          LOG.info("Running partition discovery for table {} retentionPeriod: {}s", qualifiedTableName,
-            retentionSeconds);
+        for (TableName table : candidates) {
           // this always runs in 'sync' mode where partitions can be added and dropped
-          MsckInfo msckInfo = new MsckInfo(table.getCatName(), table.getDbName(), table.getTableName(),
-            null, null, true, true, true, retentionSeconds);
+          MsckInfo msckInfo = new MsckInfo(table.getCat(), table.getDb(), table.getTable(),
+            null, null, true, true, true, -1);
           executorService.submit(new MsckThread(msckInfo, msckConf, qualifiedTableName, countDownLatch));
         }
         countDownLatch.await();
@@ -231,13 +237,7 @@ public class PartitionManagementTask implements MetastoreTaskThread {
 
     @Override
     public void run() {
-      IMetaStoreClient msc = null;
       try {
-        msc = new HiveMetaStoreClient(conf);
-        if (MetaStoreUtils.isDbBeingPlannedFailedOver((msc.getDatabase(msckInfo.getCatalogName(), msckInfo.getDbName())))) {
-          LOG.info("Skipping table: {} as it belongs to database being failed over." + msckInfo.getTableName());
-          return;
-        }
         Msck msck = new Msck( true, true);
         msck.init(conf);
         msck.repair(msckInfo);
@@ -246,9 +246,6 @@ public class PartitionManagementTask implements MetastoreTaskThread {
       } finally {
         // there is no recovery from exception, so we always count down and retry in next attempt
         countDownLatch.countDown();
-        if (msc != null) {
-          msc.close();
-        }
       }
     }
   }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java
index 1511f63cf13..6bf898b09af 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/client/builder/GetPartitionProjectionsSpecBuilder.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.client.builder;
 
 import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -27,19 +28,16 @@ import java.util.List;
  */
 public class GetPartitionProjectionsSpecBuilder {
 
-    private List<String> partitionList = null;
+    private List<String> fieldList = new ArrayList<>();
     private String includePartitionPattern = null;
     private String excludePartitionPattern = null;
 
-    public GetPartitionProjectionsSpecBuilder(List<String> partitionList, String includePartitionPattern,
-                                              String excludePartitionPattern) {
-        this.partitionList = partitionList;
-        this.includePartitionPattern = includePartitionPattern;
-        this.excludePartitionPattern = excludePartitionPattern;
+    public GetPartitionProjectionsSpecBuilder() {
+
     }
 
-    public GetPartitionProjectionsSpecBuilder setPartitionList(List<String> partitionList) {
-        this.partitionList = partitionList;
+    public GetPartitionProjectionsSpecBuilder addProjectField(String field) {
+        fieldList.add(field);
         return this;
     }
 
@@ -54,6 +52,6 @@ public class GetPartitionProjectionsSpecBuilder {
     }
 
     public GetProjectionsSpec build() {
-        return new GetProjectionsSpec(partitionList, includePartitionPattern, excludePartitionPattern);
+        return new GetProjectionsSpec(fieldList, includePartitionPattern, excludePartitionPattern);
     }
 }
\ No newline at end of file
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
index b3cd8c33212..46098d39fa6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java
@@ -284,7 +284,11 @@ public class ExpressionTree {
 
     private void generateJDOFilterOverTables(Map<String, Object> params,
         FilterBuilder filterBuilder) throws MetaException {
-      if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_OWNER)) {
+      if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_NAME)) {
+        keyName = "this.tableName";
+      } else if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_TABLE_TYPE)) {
+        keyName = "this.tableType";
+      } else if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_OWNER)) {
         keyName = "this.owner";
       } else if (keyName.equals(hive_metastoreConstants.HIVE_FILTER_FIELD_LAST_ACCESS)) {
         //lastAccessTime expects an integer, so we cannot use the "like operator"
@@ -304,6 +308,12 @@ public class ExpressionTree {
         //value is persisted as a string in the db, so make sure it's a string here
         // in case we get a long.
         value = value.toString();
+        // dot in parameter is not supported when parsing the tree.
+        if ("discover__partitions".equals(paramKeyName)) {
+          paramKeyName = "discover.partitions";
+          keyName = "this.parameters.get(\"" + paramKeyName + "\").toUpperCase()";
+          value = value.toString().toUpperCase();
+        }
       } else {
         filterBuilder.setError("Invalid key name in filter.  " +
           "Use constants from org.apache.hadoop.hive.metastore.api");
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
index 5c417893590..d4bb8d47024 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
@@ -75,6 +75,8 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Decimal;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.MetastoreException;
@@ -1397,6 +1399,43 @@ public class MetaStoreServerUtils {
     }
   }
 
+  public static List<Partition> getPartitionsByProjectSpec(IMetaStoreClient msc, GetPartitionsRequest request)
+      throws MetastoreException {
+    try {
+      GetPartitionsResponse response = msc.getPartitionsWithSpecs(request);
+      List<PartitionSpec> partitionSpecList = response.getPartitionSpec();
+      List<Partition> result = new ArrayList<>();
+      for (PartitionSpec spec : partitionSpecList) {
+        if (spec.getPartitionList() != null && spec.getPartitionList().getPartitions() != null) {
+          spec.getPartitionList().getPartitions().forEach(partition -> {
+            partition.setCatName(spec.getCatName());
+            partition.setDbName(spec.getDbName());
+            partition.setTableName(spec.getTableName());
+            result.add(partition);
+          });
+        }
+        PartitionSpecWithSharedSD pSpecWithSharedSD = spec.getSharedSDPartitionSpec();
+        if (pSpecWithSharedSD == null) {
+          continue;
+        }
+        List<PartitionWithoutSD> withoutSDList = pSpecWithSharedSD.getPartitions();
+        StorageDescriptor descriptor = pSpecWithSharedSD.getSd();
+        if (withoutSDList != null) {
+          for (PartitionWithoutSD psd : withoutSDList) {
+            StorageDescriptor newSD = new StorageDescriptor(descriptor);
+            Partition partition = new Partition(psd.getValues(), spec.getDbName(), spec.getTableName(),
+                psd.getCreateTime(), psd.getLastAccessTime(), newSD, psd.getParameters());
+            partition.getSd().setLocation(newSD.getLocation() + psd.getRelativePath());
+            result.add(partition);
+          }
+        }
+      }
+      return result;
+    } catch (Exception e) {
+      throw new MetastoreException(e);
+    }
+  }
+
   public static void getPartitionListByFilterExp(IMetaStoreClient msc, Table table, byte[] filterExp,
                                                  String defaultPartName, List<Partition> results)
       throws MetastoreException {
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
index f1f063490ce..2b1f65a6724 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.thrift.TException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -302,6 +303,21 @@ public class TestPartitionManagement {
     runPartitionManagementTask(conf);
     partitions = client.listPartitions(dbName, tableName, (short) -1);
     assertEquals(3, partitions.size());
+
+    // only MANAGED table type
+    conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(), TableType.MANAGED_TABLE.name());
+    table.getParameters().remove("EXTERNAL");
+    table.setTableType(TableType.MANAGED_TABLE.name());
+    client.alter_table(dbName, tableName, table);
+    Assert.assertTrue(fs.mkdirs(newPart1));
+    Assert.assertTrue(fs.mkdirs(newPart2));
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+    Assert.assertTrue(fs.delete(newPart1, true));
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
   }
 
   @Test
diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
index e973db7d88e..08661b18950 100644
--- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
+++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
@@ -35,10 +35,12 @@ import java.io.PrintStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Formatter;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
@@ -61,6 +63,7 @@ import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkList
 import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListPartition;
 import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkListTables;
 import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkOpenTxns;
+import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkPartitionManagement;
 import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkRenameTable;
 import static org.apache.hadoop.hive.metastore.tools.HMSBenchmarks.benchmarkTableCreate;
 import static org.apache.hadoop.hive.metastore.tools.Util.getServerUri;
@@ -83,7 +86,8 @@ public class BenchmarkTool implements Runnable {
   private enum RunModes {
     ACID,
     NONACID,
-    ALL
+    ALL,
+    MSCK // test PartitionManagementTask
   }
 
 
@@ -142,7 +146,7 @@ public class BenchmarkTool implements Runnable {
   private Pattern[] exclude;
 
   @Option(names = {"--runMode"},
-      description = "flag for setting the mode for the benchmark, acceptable values are: ACID, NONACID, ALL")
+      description = "flag for setting the mode for the benchmark, acceptable values are: ACID, NONACID, ALL, MSCK")
   private RunModes runMode = RunModes.ALL;
 
   public static void main(String[] args) {
@@ -181,10 +185,12 @@ public class BenchmarkTool implements Runnable {
         + nThreads);
     HMSConfig.getInstance().init(host, port, confDir);
 
+    preRunMsck(runMode == RunModes.MSCK);
     switch (runMode) {
       case ACID:
         runAcidBenchmarks();
         break;
+      case MSCK:
       case NONACID:
         runNonAcidBenchmarks();
         break;
@@ -196,6 +202,18 @@ public class BenchmarkTool implements Runnable {
     }
   }
 
+  private void preRunMsck(boolean isMsck) {
+    if (isMsck) {
+      matches = new Pattern[]{Pattern.compile("PartitionManagementTask.*")};
+    } else {
+      List<Pattern> excludes = new ArrayList<>();
+      Optional.ofNullable(exclude)
+          .ifPresent(patterns -> Arrays.stream(patterns).forEach(p -> excludes.add(p)));
+      excludes.add(Pattern.compile("PartitionManagementTask.*"));
+      exclude = excludes.toArray(new Pattern[0]);
+    }
+  }
+
   private void runAcidBenchmarks() {
     ChainedOptionsBuilder optsBuilder =
         new OptionsBuilder()
@@ -267,7 +285,9 @@ public class BenchmarkTool implements Runnable {
         .add("dropDatabase",
             () -> benchmarkDropDatabase(bench, bData, 1))
         .add("openTxn",
-            () -> benchmarkOpenTxns(bench, bData, 1));
+            () -> benchmarkOpenTxns(bench, bData, 1))
+        .add("PartitionManagementTask",
+            () -> benchmarkPartitionManagement(bench, bData, 1));
 
     for (int howMany: instances) {
       suite.add("listTables" + '.' + howMany,
@@ -291,7 +311,9 @@ public class BenchmarkTool implements Runnable {
           .add("dropDatabase" + '.' + howMany,
               () -> benchmarkDropDatabase(bench, bData, howMany))
           .add("openTxns" + '.' + howMany,
-              () -> benchmarkOpenTxns(bench, bData, howMany));
+              () -> benchmarkOpenTxns(bench, bData, howMany))
+          .add("PartitionManagementTask" + "." + howMany,
+              () -> benchmarkPartitionManagement(bench, bData, howMany));
     }
 
     List<String> toRun = suite.listMatching(matches, exclude);
diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
index 7c6f3431b0a..e91ab78fbb3 100644
--- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
+++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
@@ -19,6 +19,10 @@
 package org.apache.hadoop.hive.metastore.tools;
 
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.PartitionManagementTask;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.thrift.TException;
@@ -27,13 +31,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.hadoop.hive.metastore.tools.Util.addManyPartitions;
 import static org.apache.hadoop.hive.metastore.tools.Util.addManyPartitionsNoException;
+import static org.apache.hadoop.hive.metastore.tools.Util.createSchema;
 import static org.apache.hadoop.hive.metastore.tools.Util.throwingSupplierWrapper;
 
 /**
@@ -447,4 +458,85 @@ final class HMSBenchmarks {
         throwingSupplierWrapper(client::getCurrentNotificationId));
   }
 
+  static DescriptiveStatistics benchmarkPartitionManagement(@NotNull MicroBenchmark bench,
+                                                            @NotNull BenchData data,
+                                                            int tableCount) {
+
+    String dbName = data.dbName + "_" + tableCount, tableNamePrefix = data.tableName;
+    final HMSClient client = data.getClient();
+    final PartitionManagementTask partitionManagementTask = new PartitionManagementTask();
+    final List<Path> paths = new ArrayList<>();
+    final FileSystem fs;
+    try {
+      fs = FileSystem.get(client.getHadoopConf());
+      client.getHadoopConf().set("hive.metastore.uris", client.getServerURI().toString());
+      client.getHadoopConf().set("metastore.partition.management.database.pattern", dbName);
+      partitionManagementTask.setConf(client.getHadoopConf());
+
+      client.createDatabase(dbName);
+      for (int i = 0; i < tableCount; i++) {
+        String tableName = tableNamePrefix + "_" + i;
+        Util.TableBuilder tableBuilder = new Util.TableBuilder(dbName, tableName).withType(TableType.MANAGED_TABLE)
+            .withColumns(createSchema(Arrays.asList(new String[] {"astring:string", "aint:int", "adouble:double", "abigint:bigint"})))
+            .withPartitionKeys(createSchema(Collections.singletonList("d")));
+        boolean enableDynamicPart = i % 5 == 0;
+        if (enableDynamicPart) {
+          tableBuilder.withParameter("discover.partitions", "true");
+        }
+        client.createTable(tableBuilder.build());
+        addManyPartitionsNoException(client, dbName, tableName, null, Collections.singletonList("d"), 500);
+        if (enableDynamicPart) {
+          Table t = client.getTable(dbName, tableName);
+          Path tabLoc = new Path(t.getSd().getLocation());
+          for (int j = 501; j <= 1000; j++) {
+            Path path = new Path(tabLoc, "d=d" + j + "_1");
+            paths.add(path);
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    final AtomicLong id = new AtomicLong(0);
+    ExecutorService service = Executors.newFixedThreadPool(20);
+    Runnable preRun = () -> {
+      int len = paths.size() / 20;
+      id.getAndIncrement();
+      List<Future> futures = new ArrayList<>();
+      for (int i = 0; i <= 20; i++) {
+        int k = i;
+        futures.add(service.submit((Callable<Void>) () -> {
+          for (int j = k * len; j < (k + 1) * len && j < paths.size(); j++) {
+            Path path = paths.get(j);
+            if (id.get() == 1) {
+              fs.mkdirs(path);
+            } else {
+              String fileName = path.getName().split("_")[0];
+              long seq = id.get();
+              Path destPath = new Path(path.getParent(), fileName + "_" + seq);
+              Path sourcePath = new Path(path.getParent(), fileName + "_" + (seq-1));
+              fs.rename(sourcePath, destPath);
+            }
+          }
+          return null;
+        }));
+      }
+      for (Future future : futures) {
+        try {
+          future.get();
+        } catch (Exception e) {
+          service.shutdown();
+          throw new RuntimeException(e);
+        }
+      }
+    };
+
+    try {
+      return bench.measure(preRun, partitionManagementTask, null);
+    } finally {
+      service.shutdown();
+    }
+  }
+
 }
diff --git a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
index cd4032be425..7da4ddb68b6 100644
--- a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
+++ b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
@@ -92,6 +92,7 @@ final class HMSClient implements AutoCloseable {
   private ThriftHiveMetastore.Iface client;
   private TTransport transport;
   private URI serverURI;
+  private Configuration hadoopConf;
 
   public URI getServerURI() {
     return serverURI;
@@ -155,7 +156,7 @@ final class HMSClient implements AutoCloseable {
     LOG.debug("Opening kerberos connection to HMS");
     addResource(conf, CORE_SITE);
 
-    Configuration hadoopConf = new Configuration();
+    this.hadoopConf = new Configuration();
     addResource(hadoopConf, HIVE_SITE);
     addResource(hadoopConf, CORE_SITE);
 
@@ -516,4 +517,7 @@ final class HMSClient implements AutoCloseable {
     }
   }
 
+  public Configuration getHadoopConf() {
+    return hadoopConf;
+  }
 }