You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2018/10/29 22:08:01 UTC

[1/4] hive git commit: HIVE-20707: Automatic partition management (Prasanth Jayachandran reviewed by Jason Dere)

Repository: hive
Updated Branches:
  refs/heads/master 54bba9cbf -> 64bea0354
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
new file mode 100644
index 0000000..2837ff4
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
@@ -0,0 +1,163 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+ * PartitionIterable - effectively a lazy Iterable<Partition>
+ * Sometimes, we have a need for iterating through a list of partitions,
+ * but the list of partitions can be too big to fetch as a single object.
+ * Thus, the goal of PartitionIterable is to act as an Iterable<Partition>
+ * while lazily fetching each relevant partition, one after the other as
+ * independent metadata calls.
+ * It is very likely that any calls to PartitionIterable are going to result
+ * in a large number of calls, so use sparingly only when the memory cost
+ * of fetching all the partitions in one shot is too prohibitive.
+ * This is still pretty costly in that it would retain a list of partition
+ * names, but that should be far less expensive than the entire partition
+ * objects.
+ * Note that remove() is an illegal call on this, and will result in an
+ * IllegalStateException.
+ */
+public class PartitionIterable implements Iterable<Partition> {
+  @Override
+  public Iterator<Partition> iterator() {
+    return new Iterator<Partition>() {
+      private boolean initialized = false;
+      private Iterator<Partition> ptnsIterator = null;
+      private Iterator<String> partitionNamesIter = null;
+      private Iterator<Partition> batchIter = null;
+      private void initialize() {
+        if (!initialized) {
+          if (currType == Type.LIST_PROVIDED) {
+            ptnsIterator = ptnsProvided.iterator();
+          } else {
+            partitionNamesIter = partitionNames.iterator();
+          }
+          initialized = true;
+        }
+      }
+      @Override
+      public boolean hasNext() {
+        initialize();
+        if (currType == Type.LIST_PROVIDED) {
+          return ptnsIterator.hasNext();
+        } else {
+          return ((batchIter != null) && batchIter.hasNext()) || partitionNamesIter.hasNext();
+        }
+      }
+      @Override
+      public Partition next() {
+        initialize();
+        if (currType == Type.LIST_PROVIDED) {
+          return;
+        }
+        if ((batchIter == null) || !batchIter.hasNext()) {
+          getNextBatch();
+        }
+        return;
+      }
+      private void getNextBatch() {
+        int batch_counter = 0;
+        List<String> nameBatch = new ArrayList<String>();
+        while (batch_counter < batch_size && partitionNamesIter.hasNext()) {
+          nameBatch.add(;
+          batch_counter++;
+        }
+        try {
+          batchIter =
+            msc.getPartitionsByNames(table.getCatName(), table.getDbName(), table.getTableName(), nameBatch).iterator();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+      @Override
+      public void remove() {
+        throw new IllegalStateException(
+          "PartitionIterable is a read-only iterable and remove() is unsupported");
+      }
+    };
+  }
+  enum Type {
+    LIST_PROVIDED,  // Where a List<Partitions is already provided
+    LAZY_FETCH_PARTITIONS // Where we want to fetch Partitions lazily when they're needed.
+  }
+  private final Type currType;
+  // used for LIST_PROVIDED cases
+  private Collection<Partition> ptnsProvided = null;
+  // used for LAZY_FETCH_PARTITIONS cases
+  private IMetaStoreClient msc = null; // Assumes one instance of this + single-threaded compilation for each query.
+  private Table table = null;
+  private List<String> partitionNames = null;
+  private int batch_size;
+  /**
+   * Dummy constructor, which simply acts as an iterator on an already-present
+   * list of partitions, allows for easy drop-in replacement for other methods
+   * that already have a List<Partition>
+   */
+  public PartitionIterable(Collection<Partition> ptnsProvided) {
+    this.currType = Type.LIST_PROVIDED;
+    this.ptnsProvided = ptnsProvided;
+  }
+  /**
+   * Primary constructor that fetches all partitions in a given table, given
+   * a Hive object and a table object, and a partial partition spec.
+   */
+  public PartitionIterable(IMetaStoreClient msc, Table table, int batch_size) throws MetastoreException {
+    this.currType = Type.LAZY_FETCH_PARTITIONS;
+    this.msc = msc;
+    this.table = table;
+    this.batch_size = batch_size;
+    partitionNames = getPartitionNames(msc, table.getCatName(), table.getDbName(), table.getTableName(), (short) -1);
+  }
+  public List<String> getPartitionNames(IMetaStoreClient msc, String catName, String dbName, String tblName, short max)
+    throws MetastoreException {
+    try {
+      return msc.listPartitionNames(catName, dbName, tblName, max);
+    } catch (Exception e) {
+      throw new MetastoreException(e);
+    }
+  }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
new file mode 100644
index 0000000..901bf80
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
@@ -0,0 +1,235 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.TimeValidator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ * Partition management task is primarily responsible for partition retention and discovery based on table properties.
+ *
+ * Partition Retention - If "partition.retention.period" table property is set with retention interval, when this
+ * metastore task runs periodically, it will drop partitions with age (creation time) greater than retention period.
+ * Dropping partitions after retention period will also delete the data in that partition.
+ *
+ * Partition Discovery - If "discover.partitions" table property is set, this metastore task monitors table location
+ * for newly added partition directories and create partition objects if it does not exist. Also, if partition object
+ * exist and if corresponding directory does not exists under table location then the partition object will be dropped.
+ *
+ */
+public class PartitionManagementTask implements MetastoreTaskThread {
+  private static final Logger LOG = LoggerFactory.getLogger(PartitionManagementTask.class);
+  public static final String DISCOVER_PARTITIONS_TBLPROPERTY = "discover.partitions";
+  public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = "partition.retention.period";
+  private static final Lock lock = new ReentrantLock();
+  // these are just for testing
+  private static int completedAttempts;
+  private static int skippedAttempts;
+  private Configuration conf;
+  @Override
+  public long runFrequency(TimeUnit unit) {
+    return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TASK_FREQUENCY, unit);
+  }
+  @Override
+  public void setConf(Configuration configuration) {
+    // we modify conf in setupConf(), so we make a copy
+    conf = new Configuration(configuration);
+  }
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  @Override
+  public void run() {
+    if (lock.tryLock()) {
+      skippedAttempts = 0;
+      String qualifiedTableName = null;
+      IMetaStoreClient msc = null;
+      try {
+        msc = new HiveMetaStoreClient(conf);
+        List<Table> candidateTables = new ArrayList<>();
+        String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME);
+        String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN);
+        String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN);
+        String tableTypes = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES);
+        Set<String> tableTypesSet = new HashSet<>();
+        List<String> tableTypesList;
+        // if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
+        // specifying empty here is equivalent to disabling the partition discovery altogether as it scans no tables.
+        if (tableTypes.isEmpty()) {
+          tableTypesList = Lists.newArrayList("");
+        } else {
+          for (String type : tableTypes.split(",")) {
+            try {
+              tableTypesSet.add(TableType.valueOf(type.trim().toUpperCase()).name());
+            } catch (IllegalArgumentException e) {
+              // ignore
+              LOG.warn("Unknown table type: {}", type);
+            }
+          }
+          tableTypesList = Lists.newArrayList(tableTypesSet);
+        }
+        List<TableMeta> foundTableMetas = msc.getTableMeta(catalogName, dbPattern, tablePattern, tableTypesList);
+"Looking for tables using catalog: {} dbPattern: {} tablePattern: {} found: {}", catalogName,
+          dbPattern, tablePattern, foundTableMetas.size());
+        for (TableMeta tableMeta : foundTableMetas) {
+          Table table = msc.getTable(tableMeta.getCatName(), tableMeta.getDbName(), tableMeta.getTableName());
+          if (table.getParameters() != null && table.getParameters().containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
+            table.getParameters().get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true")) {
+            candidateTables.add(table);
+          }
+        }
+        if (candidateTables.isEmpty()) {
+          return;
+        }
+        // TODO: Msck creates MetastoreClient (MSC) on its own. MSC creation is expensive. Sharing MSC also
+        // will not be safe unless synchronized MSC is used. Using synchronized MSC in multi-threaded context also
+        // defeats the purpose of thread pooled msck repair.
+        int threadPoolSize = MetastoreConf.getIntVar(conf,
+        final ExecutorService executorService = Executors
+          .newFixedThreadPool(Math.min(candidateTables.size(), threadPoolSize),
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PartitionDiscoveryTask-%d").build());
+        CountDownLatch countDownLatch = new CountDownLatch(candidateTables.size());
+"Found {} candidate tables for partition discovery", candidateTables.size());
+        setupMsckConf();
+        for (Table table : candidateTables) {
+          qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table);
+          long retentionSeconds = getRetentionPeriodInSeconds(table);
+"Running partition discovery for table {} retentionPeriod: {}s", qualifiedTableName,
+            retentionSeconds);
+          // this always runs in 'sync' mode where partitions can be added and dropped
+          MsckInfo msckInfo = new MsckInfo(table.getCatName(), table.getDbName(), table.getTableName(),
+            null, null, true, true, true, retentionSeconds);
+          executorService.submit(new MsckThread(msckInfo, conf, qualifiedTableName, countDownLatch));
+        }
+        countDownLatch.await();
+        executorService.shutdownNow();
+      } catch (Exception e) {
+        LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
+      } finally {
+        if (msc != null) {
+          msc.close();
+        }
+        lock.unlock();
+      }
+      completedAttempts++;
+    } else {
+      skippedAttempts++;
+"Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts);
+    }
+  }
+  static long getRetentionPeriodInSeconds(final Table table) {
+    String retentionPeriod;
+    long retentionSeconds = -1;
+    if (table.getParameters() != null && table.getParameters().containsKey(PARTITION_RETENTION_PERIOD_TBLPROPERTY)) {
+      retentionPeriod = table.getParameters().get(PARTITION_RETENTION_PERIOD_TBLPROPERTY);
+      if (retentionPeriod.isEmpty()) {
+        LOG.warn("'{}' table property is defined but empty. Skipping retention period..",
+      } else {
+        try {
+          TimeValidator timeValidator = new TimeValidator(TimeUnit.SECONDS);
+          timeValidator.validate(retentionPeriod);
+          retentionSeconds = MetastoreConf.convertTimeStr(retentionPeriod, TimeUnit.SECONDS, TimeUnit.SECONDS);
+        } catch (IllegalArgumentException e) {
+          LOG.warn("'{}' retentionPeriod value is invalid. Skipping retention period..", retentionPeriod);
+          // will return -1
+        }
+      }
+    }
+    return retentionSeconds;
+  }
+  private void setupMsckConf() {
+    // if invalid partition directory appears, we just skip and move on. We don't want partition management to throw
+    // when invalid path is encountered as these are background threads. We just want to skip and move on. Users will
+    // have to fix the invalid paths via external means.
+    conf.set(MetastoreConf.ConfVars.MSCK_PATH_VALIDATION.getVarname(), "skip");
+    // since msck runs in thread pool and each of them create their own metastore client, we don't want explosion of
+    // connections to metastore for embedded mode. Also we don't need too many db connections anyway.
+    conf.setInt(MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(), 2);
+  }
+  private static class MsckThread implements Runnable {
+    private MsckInfo msckInfo;
+    private Configuration conf;
+    private String qualifiedTableName;
+    private CountDownLatch countDownLatch;
+    MsckThread(MsckInfo msckInfo, Configuration conf, String qualifiedTableName, CountDownLatch countDownLatch) {
+      this.msckInfo = msckInfo;
+      this.conf = conf;
+      this.qualifiedTableName = qualifiedTableName;
+      this.countDownLatch = countDownLatch;
+    }
+    @Override
+    public void run() {
+      try {
+        Msck msck = new Msck( true, true);
+        msck.init(conf);
+      } catch (Exception e) {
+        LOG.error("Exception while running partition discovery task for table: " + qualifiedTableName, e);
+      } finally {
+        // there is no recovery from exception, so we always count down and retry in next attempt
+        countDownLatch.countDown();
+      }
+    }
+  }
+  @VisibleForTesting
+  public static int getSkippedAttempts() {
+    return skippedAttempts;
+  }
+  @VisibleForTesting
+  public static int getCompletedAttempts() {
+    return completedAttempts;
+  }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/
index f3b3866..363db35 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -58,10 +59,14 @@ import org.apache.commons.collections.ListUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.ColumnType;
 import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -71,6 +76,8 @@ import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionListComposingSpec;
@@ -590,7 +597,7 @@ public class MetaStoreServerUtils {
   /** Duplicates AcidUtils; used in a couple places in metastore. */
   public static boolean isTransactionalTable(Map<String, String> params) {
     String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
-    return (transactionalProp != null && "true".equalsIgnoreCase(transactionalProp));
+    return "true".equalsIgnoreCase(transactionalProp);
@@ -1294,4 +1301,162 @@ public class MetaStoreServerUtils {
       return hashCode;
+  // Some util methods from, this is copied so as to avoid circular dependency with hive ql
+  public static Path getPath(Table table) {
+    String location = table.getSd().getLocation();
+    if (location == null) {
+      return null;
+    }
+    return new Path(location);
+  }
+  public static List<Partition> getAllPartitionsOf(IMetaStoreClient msc, Table table) throws MetastoreException {
+    try {
+      return msc.listPartitions(table.getCatName(), table.getDbName(), table.getTableName(), (short)-1);
+    } catch (Exception e) {
+      throw new MetastoreException(e);
+    }
+  }
+  public static boolean isPartitioned(Table table) {
+    if (getPartCols(table) == null) {
+      return false;
+    }
+    return (getPartCols(table).size() != 0);
+  }
+  public static List<FieldSchema> getPartCols(Table table) {
+    List<FieldSchema> partKeys = table.getPartitionKeys();
+    if (partKeys == null) {
+      partKeys = new ArrayList<>();
+      table.setPartitionKeys(partKeys);
+    }
+    return partKeys;
+  }
+  public static List<String> getPartColNames(Table table) {
+    List<String> partColNames = new ArrayList<>();
+    for (FieldSchema key : getPartCols(table)) {
+      partColNames.add(key.getName());
+    }
+    return partColNames;
+  }
+  public static Path getDataLocation(Table table, Partition partition) {
+    if (isPartitioned(table)) {
+      if (partition.getSd() == null) {
+        return null;
+      } else {
+        return new Path(partition.getSd().getLocation());
+      }
+    } else {
+      if (table.getSd() == null) {
+        return null;
+      }
+      else {
+        return getPath(table);
+      }
+    }
+  }
+  public static String getPartitionName(Table table, Partition partition) {
+    try {
+      return Warehouse.makePartName(getPartCols(table), partition.getValues());
+    } catch (MetaException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  public static Map<String, String> getPartitionSpec(Table table, Partition partition) {
+    return Warehouse.makeSpecFromValues(getPartCols(table), partition.getValues());
+  }
+  public static Partition getPartition(IMetaStoreClient msc, Table tbl, Map<String, String> partSpec) throws MetastoreException {
+    List<String> pvals = new ArrayList<String>();
+    for (FieldSchema field : getPartCols(tbl)) {
+      String val = partSpec.get(field.getName());
+      pvals.add(val);
+    }
+    Partition tpart = null;
+    try {
+      tpart = msc.getPartition(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), pvals);
+    } catch (NoSuchObjectException nsoe) {
+      // this means no partition exists for the given partition
+      // key value pairs - thrift cannot handle null return values, hence
+      // getPartition() throws NoSuchObjectException to indicate null partition
+    } catch (Exception e) {
+      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      throw new MetastoreException(e);
+    }
+    return tpart;
+  }
+  /**
+   * Get the partition name from the path.
+   *
+   * @param tablePath
+   *          Path of the table.
+   * @param partitionPath
+   *          Path of the partition.
+   * @param partCols
+   *          Set of partition columns from table definition
+   * @return Partition name, for example partitiondate=2008-01-01
+   */
+  public static String getPartitionName(Path tablePath, Path partitionPath, Set<String> partCols) {
+    String result = null;
+    Path currPath = partitionPath;
+    LOG.debug("tablePath:" + tablePath + ", partCols: " + partCols);
+    while (currPath != null && !tablePath.equals(currPath)) {
+      // format: partition=p_val
+      // Add only when table partition colName matches
+      String[] parts = currPath.getName().split("=");
+      if (parts.length > 0) {
+        if (parts.length != 2) {
+          LOG.warn(currPath.getName() + " is not a valid partition name");
+          return result;
+        }
+        String partitionName = parts[0];
+        if (partCols.contains(partitionName)) {
+          if (result == null) {
+            result = currPath.getName();
+          } else {
+            result = currPath.getName() + Path.SEPARATOR + result;
+          }
+        }
+      }
+      currPath = currPath.getParent();
+      LOG.debug("currPath=" + currPath);
+    }
+    return result;
+  }
+  public static Partition createMetaPartitionObject(Table tbl, Map<String, String> partSpec, Path location)
+    throws MetastoreException {
+    List<String> pvals = new ArrayList<String>();
+    for (FieldSchema field : getPartCols(tbl)) {
+      String val = partSpec.get(field.getName());
+      if (val == null || val.isEmpty()) {
+        throw new MetastoreException("partition spec is invalid; field "
+          + field.getName() + " does not exist or is empty");
+      }
+      pvals.add(val);
+    }
+    Partition tpart = new Partition();
+    tpart.setCatName(tbl.getCatName());
+    tpart.setDbName(tbl.getDbName());
+    tpart.setTableName(tbl.getTableName());
+    tpart.setValues(pvals);
+    if (!MetaStoreUtils.isView(tbl)) {
+      tpart.setSd(tbl.getSd().deepCopy());
+      tpart.getSd().setLocation((location != null) ? location.toString() : null);
+    }
+    return tpart;
+  }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/
new file mode 100644
index 0000000..22513b9
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/
@@ -0,0 +1,110 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class RetryUtilities {
+  public static class RetryException extends Exception {
+    private static final long serialVersionUID = 1L;
+    public RetryException(Exception ex) {
+      super(ex);
+    }
+    public RetryException(String msg) {
+      super(msg);
+    }
+  }
+  /**
+   * Interface used to create a ExponentialBackOffRetry policy
+   */
+  public static interface ExponentialBackOffRetry<T> {
+    /**
+     * This method should be called by implementations of this ExponentialBackOffRetry policy
+     * It represents the actual work which needs to be done based on a given batch size
+     * @param batchSize The batch size for the work which needs to be executed
+     * @return
+     * @throws Exception
+     */
+    public T execute(int batchSize) throws Exception;
+  }
+  /**
+   * This class is a base implementation of a simple exponential back retry policy. The batch size
+   * and decaying factor are provided with the constructor. It reduces the batch size by dividing
+   * it by the decaying factor every time there is an exception in the execute method.
+   */
+  public static abstract class ExponentiallyDecayingBatchWork<T>
+    implements ExponentialBackOffRetry<T> {
+    private int batchSize;
+    private final int decayingFactor;
+    private int maxRetries;
+    private static final Logger LOG = LoggerFactory.getLogger(ExponentiallyDecayingBatchWork.class);
+    public ExponentiallyDecayingBatchWork(int batchSize, int reducingFactor, int maxRetries) {
+      if (batchSize <= 0) {
+        throw new IllegalArgumentException(String.format(
+          "Invalid batch size %d provided. Batch size must be greater than 0", batchSize));
+      }
+      this.batchSize = batchSize;
+      if (reducingFactor <= 1) {
+        throw new IllegalArgumentException(String.format(
+          "Invalid decaying factor %d provided. Decaying factor must be greater than 1",
+          batchSize));
+      }
+      if (maxRetries < 0) {
+        throw new IllegalArgumentException(String.format(
+          "Invalid number of maximum retries %d provided. It must be a non-negative integer value",
+          maxRetries));
+      }
+      //if maxRetries is 0 code retries until batch decays to zero
+      this.maxRetries = maxRetries;
+      this.decayingFactor = reducingFactor;
+    }
+    public T run() throws Exception {
+      int attempt = 0;
+      while (true) {
+        int size = getNextBatchSize();
+        if (size == 0) {
+          throw new RetryException("Batch size reduced to zero");
+        }
+        try {
+          return execute(size);
+        } catch (Exception ex) {
+          LOG.warn(String.format("Exception thrown while processing using a batch size %d", size),
+            ex);
+        } finally {
+          attempt++;
+          if (attempt == maxRetries) {
+            throw new RetryException(String.format("Maximum number of retry attempts %d exhausted", maxRetries));
+          }
+        }
+      }
+    }
+    private int getNextBatchSize() {
+      int ret = batchSize;
+      batchSize /= decayingFactor;
+      return ret;
+    }
+  }
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/
index f750ca2..377a550 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/
@@ -482,7 +482,9 @@ public abstract class NonCatCallsWithCatalog {
-      expected.add(new TableMeta(dbName, tableNames[i],;
+      TableMeta tableMeta = new TableMeta(dbName, tableNames[i],;
+      tableMeta.setCatName(expectedCatalog());
+      expected.add(tableMeta);
     List<String> types = Collections.singletonList(;
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/
index fc996c8..b3690ec 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/
@@ -17,10 +17,10 @@
 package org.apache.hadoop.hive.metastore;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import org.apache.hadoop.hive.metastore.api.MetaException;
  * This tests calls with an older client, to make sure that if the client supplies no catalog
  * information the server still does the right thing.  I assumes the default catalog
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/
new file mode 100644
index 0000000..059c166
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/
@@ -0,0 +1,581 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ *
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.client.builder.CatalogBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+public class TestPartitionManagement {
+  private IMetaStoreClient client;
+  private Configuration conf;
+  @Before
+  public void setUp() throws Exception {
+    conf = MetastoreConf.newMetastoreConf();
+    conf.setClass(MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS.getVarname(),
+      MsckPartitionExpressionProxy.class, PartitionExpressionProxy.class);
+    MetaStoreTestUtils.setConfForStandloneMode(conf);
+    conf.setBoolean(MetastoreConf.ConfVars.MULTITHREADED.getVarname(), false);
+    MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf);
+    TxnDbUtil.setConfValues(conf);
+    TxnDbUtil.prepDb(conf);
+    client = new HiveMetaStoreClient(conf);
+  }
+  @After
+  public void tearDown() throws Exception {
+    if (client != null) {
+      // Drop any left over catalogs
+      List<String> catalogs = client.getCatalogs();
+      for (String catName : catalogs) {
+        if (!catName.equalsIgnoreCase(DEFAULT_CATALOG_NAME)) {
+          // First drop any databases in catalog
+          List<String> databases = client.getAllDatabases(catName);
+          for (String db : databases) {
+            client.dropDatabase(catName, db, true, false, true);
+          }
+          client.dropCatalog(catName);
+        } else {
+          List<String> databases = client.getAllDatabases(catName);
+          for (String db : databases) {
+            if (!db.equalsIgnoreCase(Warehouse.DEFAULT_DATABASE_NAME)) {
+              client.dropDatabase(catName, db, true, false, true);
+            }
+          }
+        }
+      }
+    }
+    try {
+      if (client != null) {
+        client.close();
+      }
+    } finally {
+      client = null;
+    }
+  }
+  private Map<String, Column> buildAllColumns() {
+    Map<String, Column> colMap = new HashMap<>(6);
+    Column[] cols = {new Column("b", "binary"), new Column("bo", "boolean"),
+      new Column("d", "date"), new Column("do", "double"), new Column("l", "bigint"),
+      new Column("s", "string")};
+    for (Column c : cols) {
+      colMap.put(c.colName, c);
+    }
+    return colMap;
+  }
+  private List<String> createMetadata(String catName, String dbName, String tableName,
+    List<String> partKeys, List<String> partKeyTypes, List<List<String>> partVals,
+    Map<String, Column> colMap, boolean isOrc)
+    throws TException {
+    if (!DEFAULT_CATALOG_NAME.equals(catName)) {
+      Catalog cat = new CatalogBuilder()
+        .setName(catName)
+        .setLocation(MetaStoreTestUtils.getTestWarehouseDir(catName))
+        .build();
+      client.createCatalog(cat);
+    }
+    Database db;
+    if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
+      DatabaseBuilder dbBuilder = new DatabaseBuilder()
+        .setName(dbName);
+      dbBuilder.setCatalogName(catName);
+      db = dbBuilder.create(client, conf);
+    } else {
+      db = client.getDatabase(DEFAULT_CATALOG_NAME, DEFAULT_DATABASE_NAME);
+    }
+    TableBuilder tb = new TableBuilder()
+      .inDb(db)
+      .setTableName(tableName);
+    if (isOrc) {
+      tb.setInputFormat("")
+        .setOutputFormat("");
+    }
+    for (Column col : colMap.values()) {
+      tb.addCol(col.colName, col.colType);
+    }
+    if (partKeys != null) {
+      if (partKeyTypes == null) {
+        throw new IllegalArgumentException("partKeyTypes cannot be null when partKeys is non-null");
+      }
+      if (partKeys.size() != partKeyTypes.size()) {
+        throw new IllegalArgumentException("partKeys and partKeyTypes size should be same");
+      }
+      if (partVals.isEmpty()) {
+        throw new IllegalArgumentException("partVals cannot be empty for patitioned table");
+      }
+      for (int i = 0; i < partKeys.size(); i++) {
+        tb.addPartCol(partKeys.get(i), partKeyTypes.get(i));
+      }
+    }
+    Table table = tb.create(client, conf);
+    if (partKeys != null) {
+      for (List<String> partVal : partVals) {
+        new PartitionBuilder()
+          .inTable(table)
+          .setValues(partVal)
+          .addToTable(client, conf);
+      }
+    }
+    List<String> partNames = new ArrayList<>();
+    if (partKeys != null) {
+      for (int i = 0; i < partKeys.size(); i++) {
+        String partKey = partKeys.get(i);
+        for (String partVal : partVals.get(i)) {
+          String partName = partKey + "=" + partVal;
+          partNames.add(partName);
+        }
+      }
+    }
+    client.flushCache();
+    return partNames;
+  }
+  @Test
+  public void testPartitionDiscoveryDisabledByDefault() throws TException, IOException {
+    String dbName = "db1";
+    String tableName = "tbl1";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    fs.mkdirs(new Path(tablePath, "state=WA/dt=2018-12-01"));
+    fs.mkdirs(new Path(tablePath, "state=UT/dt=2018-12-02"));
+    assertEquals(5, fs.listStatus(tablePath).length);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    // partition discovery is not enabled via table property, so nothing should change on this table
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    // table property is set to false, so no change expected
+    table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "false");
+    client.alter_table(dbName, tableName, table);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+  }
+  @Test
+  public void testPartitionDiscoveryEnabledBothTableTypes() throws TException, IOException {
+    String dbName = "db2";
+    String tableName = "tbl2";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    // table property is set to true, we expect 5 partitions
+    table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
+    client.alter_table(dbName, tableName, table);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+    // change table type to external, delete a partition directory and make sure partition discovery works
+    table.getParameters().put("EXTERNAL", "true");
+    table.setTableType(;
+    client.alter_table(dbName, tableName, table);
+    boolean deleted = fs.delete(newPart1.getParent(), true);
+    assertTrue(deleted);
+    assertEquals(4, fs.listStatus(tablePath).length);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+    // remove external tables from partition discovery and expect no changes even after partition is deleted
+    conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(),;
+    deleted = fs.delete(newPart2.getParent(), true);
+    assertTrue(deleted);
+    assertEquals(3, fs.listStatus(tablePath).length);
+    // this doesn't remove partition because table is still external and we have remove external table type from
+    // partition discovery
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+    // no table types specified, msck will not select any tables
+    conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(), "");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+    // only EXTERNAL table type, msck should drop a partition now
+    conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_TYPES.getVarname(),;
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+  }
+  @Test
+  public void testPartitionDiscoveryNonDefaultCatalog() throws TException, IOException {
+    String catName = "cat3";
+    String dbName = "db3";
+    String tableName = "tbl3";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(catName, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(catName, dbName, tableName);
+    List<Partition> partitions = client.listPartitions(catName, dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
+    client.alter_table(catName, dbName, tableName, table);
+    // default catalog in conf is 'hive' but we are using 'cat3' as catName for this test, so msck should not fix
+    // anything for this one
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(catName, dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    // using the correct catalog name, we expect msck to fix partitions
+    conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_CATALOG_NAME.getVarname(), catName);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(catName, dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+  }
+  @Test
+  public void testPartitionDiscoveryDBPattern() throws TException, IOException {
+    String dbName = "db4";
+    String tableName = "tbl4";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
+    client.alter_table(dbName, tableName, table);
+    // no match for this db pattern, so we will see only 3 partitions
+    conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN.getVarname(), "*dbfoo*");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    // matching db pattern, we will see all 5 partitions now
+    conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_DATABASE_PATTERN.getVarname(), "*db4*");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+  }
+  @Test
+  public void testPartitionDiscoveryTablePattern() throws TException, IOException {
+    String dbName = "db5";
+    String tableName = "tbl5";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
+    client.alter_table(dbName, tableName, table);
+    // no match for this table pattern, so we will see only 3 partitions
+    conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN.getVarname(), "*tblfoo*");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    // matching table pattern, we will see all 5 partitions now
+    conf.set(MetastoreConf.ConfVars.PARTITION_MANAGEMENT_TABLE_PATTERN.getVarname(), "tbl5*");
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+  }
+  @Test
+  public void testPartitionDiscoveryTransactionalTable()
+    throws TException, IOException, InterruptedException, ExecutionException {
+    String dbName = "db6";
+    String tableName = "tbl6";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, true);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
+    table.getParameters().put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+    table.getParameters().put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
+      TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
+    client.alter_table(dbName, tableName, table);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+    // only one partition discovery task is running, there will be no skipped attempts
+    assertEquals(0, PartitionManagementTask.getSkippedAttempts());
+    // delete a partition from fs, and submit 3 tasks at the same time each of them trying to acquire X lock on the
+    // same table, only one of them will run other attempts will be skipped
+    boolean deleted = fs.delete(newPart1.getParent(), true);
+    assertTrue(deleted);
+    assertEquals(4, fs.listStatus(tablePath).length);
+    // 3 tasks are submitted at the same time, only one will eventually lock the table and only one get to run at a time
+    // This is to simulate, skipping partition discovery task attempt when previous attempt is still incomplete
+    PartitionManagementTask partitionDiscoveryTask1 = new PartitionManagementTask();
+    partitionDiscoveryTask1.setConf(conf);
+    PartitionManagementTask partitionDiscoveryTask2 = new PartitionManagementTask();
+    partitionDiscoveryTask2.setConf(conf);
+    PartitionManagementTask partitionDiscoveryTask3 = new PartitionManagementTask();
+    partitionDiscoveryTask3.setConf(conf);
+    List<PartitionManagementTask> tasks = Lists
+      .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, partitionDiscoveryTask3);
+    ExecutorService executorService = Executors.newFixedThreadPool(3);
+    int successBefore = PartitionManagementTask.getCompletedAttempts();
+    int skippedBefore = PartitionManagementTask.getSkippedAttempts();
+    List<Future<?>> futures = new ArrayList<>();
+    for (PartitionManagementTask task : tasks) {
+      futures.add(executorService.submit(task));
+    }
+    for (Future<?> future : futures) {
+      future.get();
+    }
+    int successAfter = PartitionManagementTask.getCompletedAttempts();
+    int skippedAfter = PartitionManagementTask.getSkippedAttempts();
+    assertEquals(1, successAfter - successBefore);
+    assertEquals(2, skippedAfter - skippedBefore);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+  }
+  @Test
+  public void testPartitionRetention() throws TException, IOException, InterruptedException {
+    String dbName = "db7";
+    String tableName = "tbl7";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
+    table.getParameters().put(PartitionManagementTask.PARTITION_RETENTION_PERIOD_TBLPROPERTY, "20000ms");
+    client.alter_table(dbName, tableName, table);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(5, partitions.size());
+    // after 30s all partitions should have been gone
+    Thread.sleep(30 * 1000);
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(0, partitions.size());
+  }
+  @Test
+  public void testPartitionDiscoverySkipInvalidPath() throws TException, IOException, InterruptedException {
+    String dbName = "db8";
+    String tableName = "tbl8";
+    Map<String, Column> colMap = buildAllColumns();
+    List<String> partKeys = Lists.newArrayList("state", "dt");
+    List<String> partKeyTypes = Lists.newArrayList("string", "date");
+    List<List<String>> partVals = Lists.newArrayList(
+      Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+      Lists.newArrayList("CA", "1986-04-28"),
+      Lists.newArrayList("MN", "2018-11-31"));
+    createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys, partKeyTypes, partVals, colMap, false);
+    Table table = client.getTable(dbName, tableName);
+    List<Partition> partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(3, partitions.size());
+    String tableLocation = table.getSd().getLocation();
+    URI location = URI.create(tableLocation);
+    Path tablePath = new Path(location);
+    FileSystem fs = FileSystem.get(location, conf);
+    Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+    Path newPart2 = new Path(tablePath, "state=UT/dt=");
+    fs.mkdirs(newPart1);
+    fs.mkdirs(newPart2);
+    assertEquals(5, fs.listStatus(tablePath).length);
+    table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
+    // empty retention period basically means disabled
+    table.getParameters().put(PartitionManagementTask.PARTITION_RETENTION_PERIOD_TBLPROPERTY, "");
+    client.alter_table(dbName, tableName, table);
+    // there is one partition with invalid path which will get skipped
+    runPartitionManagementTask(conf);
+    partitions = client.listPartitions(dbName, tableName, (short) -1);
+    assertEquals(4, partitions.size());
+  }
+  private void runPartitionManagementTask(Configuration conf) {
+    PartitionManagementTask task = new PartitionManagementTask();
+    task.setConf(conf);
+  }
+  private static class Column {
+    private String colName;
+    private String colType;
+    public Column(final String colName, final String colType) {
+      this.colName = colName;
+      this.colType = colType;
+    }
+  }
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/
index 59daa52..7720aa2 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
 import org.apache.hadoop.hive.metastore.api.CreationMetadata;
 import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -123,6 +124,7 @@ public class TestGetTableMeta extends MetaStoreClientTest {
   private Table createTable(String dbName, String tableName, TableType type)
           throws Exception {
     TableBuilder builder = new TableBuilder()
+            .setCatName("hive")
             .addCol("id", "int")
@@ -153,6 +155,7 @@ public class TestGetTableMeta extends MetaStoreClientTest {
     TableMeta tableMeta = new TableMeta(dbName, tableName,;
+    tableMeta.setCatName("hive");
     return tableMeta;
@@ -160,7 +163,9 @@ public class TestGetTableMeta extends MetaStoreClientTest {
           throws Exception {
     Table table  = createTable(dbName, tableName, type);
-    return new TableMeta(dbName, tableName,;
+    TableMeta tableMeta = new TableMeta(dbName, tableName,;
+    tableMeta.setCatName("hive");
+    return tableMeta;
   private void assertTableMetas(int[] expected, List<TableMeta> actualTableMetas) {
@@ -301,7 +306,9 @@ public class TestGetTableMeta extends MetaStoreClientTest {
           .addCol("id", "int")
           .addCol("name", "string")
-      expected.add(new TableMeta(dbName, tableNames[i],;
+      TableMeta tableMeta = new TableMeta(dbName, tableNames[i],;
+      tableMeta.setCatName(catName);
+      expected.add(tableMeta);
     List<String> types = Collections.singletonList(;

[3/4] hive git commit: HIVE-20707: Automatic partition management (Prasanth Jayachandran reviewed by Jason Dere)

Posted by
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/ b/ql/src/test/org/apache/hadoop/hive/ql/exec/
index 9480d38..1ec4636 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hive.ql.exec;
+import static org.junit.Assert.assertEquals;
 import static;
 import java.util.ArrayList;
@@ -27,16 +28,22 @@ import java.util.Set;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.CheckResult.PartitionResult;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Msck;
+import org.apache.hadoop.hive.metastore.PartitionDropOptions;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.RetryUtilities;
-import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.RetryUtilities.RetryException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -47,57 +54,71 @@ import org.mockito.Mockito;
  * Unit test for function dropPartitionsInBatches in DDLTask.
- *
 public class TestMsckDropPartitionsInBatches {
   private static HiveConf hiveConf;
-  private static DDLTask ddlTask;
+  private static Msck msck;
+  private final String catName = "hive";
+  private final String dbName = "default";
   private final String tableName = "test_msck_batch";
-  private static Hive db;
+  private static IMetaStoreClient db;
   private List<String> repairOutput;
   private Table table;
-  public static void setupClass() throws HiveException {
+  public static void setupClass() throws Exception {
     hiveConf = new HiveConf(TestMsckCreatePartitionsInBatches.class);
     hiveConf.setIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE, 5);
-        "");
+      "");
-    db = Hive.get(hiveConf);
-    ddlTask = new DDLTask();
+    db = new HiveMetaStoreClient(hiveConf);
+    msck = new Msck( false, false);
+    msck.init(hiveConf);
   public void before() throws Exception {
-    createPartitionedTable("default", tableName);
-    table = db.getTable(tableName);
+    createPartitionedTable(catName, dbName, tableName);
+    table = db.getTable(catName, dbName, tableName);
     repairOutput = new ArrayList<String>();
   public void after() throws Exception {
-    cleanUpTableQuietly("default", tableName);
+    cleanUpTableQuietly(catName, dbName, tableName);
-  private Table createPartitionedTable(String dbName, String tableName) throws Exception {
+  private Table createPartitionedTable(String catName, String dbName, String tableName) throws Exception {
     try {
-      db.dropTable(dbName, tableName);
-      db.createTable(tableName, Arrays.asList("key", "value"), // Data columns.
-          Arrays.asList("city"), // Partition columns.
-          TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class);
-      return db.getTable(dbName, tableName);
+      db.dropTable(catName, dbName, tableName);
+      Table table = new Table();
+      table.setCatName(catName);
+      table.setDbName(dbName);
+      table.setTableName(tableName);
+      FieldSchema col1 = new FieldSchema("key", "string", "");
+      FieldSchema col2 = new FieldSchema("value", "int", "");
+      FieldSchema col3 = new FieldSchema("city", "string", "");
+      StorageDescriptor sd = new StorageDescriptor();
+      sd.setSerdeInfo(new SerDeInfo());
+      sd.setInputFormat(TextInputFormat.class.getCanonicalName());
+      sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getCanonicalName());
+      sd.setCols(Arrays.asList(col1, col2));
+      table.setPartitionKeys(Arrays.asList(col3));
+      table.setSd(sd);
+      db.createTable(table);
+      return db.getTable(catName, dbName, tableName);
     } catch (Exception exception) {
       fail("Unable to drop and create table " + StatsUtils
-          .getFullyQualifiedTableName(dbName, tableName) + " because " + StringUtils
-          .stringifyException(exception));
+        .getFullyQualifiedTableName(dbName, tableName) + " because " + StringUtils
+        .stringifyException(exception));
       throw exception;
-  private void cleanUpTableQuietly(String dbName, String tableName) {
+  private void cleanUpTableQuietly(String catName, String dbName, String tableName) {
     try {
-      db.dropTable(dbName, tableName, true, true, true);
+      db.dropTable(catName, dbName, tableName, true, true, true);
     } catch (Exception exception) {
       fail("Unexpected exception: " + StringUtils.stringifyException(exception));
@@ -142,9 +163,10 @@ public class TestMsckDropPartitionsInBatches {
   private final int noException = 1;
   private final int oneException = 2;
   private final int allException = 3;
   private void runDropPartitions(int partCount, int batchSize, int maxRetries, int exceptionStatus)
-      throws Exception {
-    Hive spyDb = Mockito.spy(db);
+    throws Exception {
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // create partCount dummy partitions
     Set<PartitionResult> partsNotInFs = dropPartsNotInFs(partCount);
@@ -163,13 +185,13 @@ public class TestMsckDropPartitionsInBatches {
     if (exceptionStatus == oneException) {
       // After one exception everything is expected to run
-      actualBatchSize = batchSize/2;
+      actualBatchSize = batchSize / 2;
     if (exceptionStatus != allException) {
-      expectedCallCount = partCount/actualBatchSize;
+      expectedCallCount = partCount / actualBatchSize;
-      if (expectedCallCount*actualBatchSize < partCount) {
+      if (expectedCallCount * actualBatchSize < partCount) {
         // partCount not equally divided into batches.  last batch size will be less than batch size
         lastBatchSize = partCount - (expectedCallCount * actualBatchSize);
@@ -182,9 +204,10 @@ public class TestMsckDropPartitionsInBatches {
         // only first call throws exception
-        Mockito.doThrow(HiveException.class).doCallRealMethod().doCallRealMethod().when(spyDb)
-            .dropPartitions(Mockito.eq(table), Mockito.any(List.class), Mockito.eq(false),
-                Mockito.eq(true));
+        Mockito.doThrow(MetastoreException.class).doCallRealMethod().doCallRealMethod().when(spyDb)
+          .dropPartitions(Mockito.eq(table.getCatName()), Mockito.eq(table.getDbName()),
+            Mockito.eq(table.getTableName()),
+            Mockito.any(List.class), Mockito.any(PartitionDropOptions.class));
       expectedBatchSizes = new int[expectedCallCount];
@@ -195,15 +218,15 @@ public class TestMsckDropPartitionsInBatches {
       // second batch to last but one batch will be actualBatchSize
       // actualBatchSize is same as batchSize when no exceptions are expected
       // actualBatchSize is half of batchSize when 1 exception is expected
-      for (int i = 1; i < expectedCallCount-1; i++) {
+      for (int i = 1; i < expectedCallCount - 1; i++) {
         expectedBatchSizes[i] = Integer.min(partCount, actualBatchSize);
-      expectedBatchSizes[expectedCallCount-1] = lastBatchSize;
+      expectedBatchSizes[expectedCallCount - 1] = lastBatchSize;
       // batch size from input and decaying factor of 2
-      ddlTask.dropPartitionsInBatches(spyDb, repairOutput, partsNotInFs, table, batchSize, 2,
-          maxRetries);
+      msck.dropPartitionsInBatches(spyDb, repairOutput, partsNotInFs, null, table, batchSize, 2,
+        maxRetries);
     } else {
       if (maxRetries == 0) {
         // Retries will be done till decaying factor reduces to 0.  Decaying Factor is 2.
@@ -219,35 +242,37 @@ public class TestMsckDropPartitionsInBatches {
         expectedBatchSizes[i] = Integer.min(partCount, actualBatchSize);
       // all calls fail
-      Mockito.doThrow(HiveException.class).when(spyDb)
-          .dropPartitions(Mockito.eq(table), Mockito.any(List.class), Mockito.eq(false),
-              Mockito.eq(true));
+      Mockito.doThrow(MetastoreException.class).when(spyDb)
+        .dropPartitions(Mockito.eq(table.getCatName()), Mockito.eq(table.getDbName()), Mockito.eq(table.getTableName()),
+          Mockito.any(List.class), Mockito.any(PartitionDropOptions.class));
       Exception ex = null;
       try {
-        ddlTask.dropPartitionsInBatches(spyDb, repairOutput, partsNotInFs, table, batchSize, 2,
-            maxRetries);
+        msck.dropPartitionsInBatches(spyDb, repairOutput, partsNotInFs, null, table, batchSize, 2,
+          maxRetries);
       } catch (Exception retryEx) {
         ex = retryEx;
       Assert.assertFalse("Exception was expected but was not thrown", ex == null);
-      Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryException);
+      Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryUtilities.RetryException);
     // there should be expectedCallCount calls to drop partitions with each batch size of
     // actualBatchSize
     ArgumentCaptor<List> argument = ArgumentCaptor.forClass(List.class);
     Mockito.verify(spyDb, Mockito.times(expectedCallCount))
-        .dropPartitions(Mockito.eq(table), argument.capture(), Mockito.eq(false), Mockito.eq(true));
+      .dropPartitions(Mockito.eq(table.getCatName()), Mockito.eq(table.getDbName()), Mockito.eq(table.getTableName()),
+        argument.capture(), Mockito.any(PartitionDropOptions.class));
     // confirm the batch sizes were as expected
     List<List> droppedParts = argument.getAllValues();
+    assertEquals(expectedCallCount, droppedParts.size());
     for (int i = 0; i < expectedCallCount; i++) {
-          String.format("Unexpected batch size in attempt %d.  Expected: %d.  Found: %d", i + 1,
-              expectedBatchSizes[i], droppedParts.get(i).size()),
-          expectedBatchSizes[i], droppedParts.get(i).size());
+        String.format("Unexpected batch size in attempt %d.  Expected: %d.  Found: %d", i + 1,
+          expectedBatchSizes[i], droppedParts.get(i).size()),
+        expectedBatchSizes[i], droppedParts.get(i).size());
@@ -301,7 +326,7 @@ public class TestMsckDropPartitionsInBatches {
    * Tests the number of calls to dropPartitions and the respective batch sizes when first call to
-   * dropPartitions throws HiveException. The batch size should be reduced once by the
+   * dropPartitions throws MetastoreException. The batch size should be reduced once by the
    * decayingFactor 2, iow after batch size is halved.
    * @throws Exception
@@ -313,7 +338,7 @@ public class TestMsckDropPartitionsInBatches {
    * Tests the retries exhausted case when Hive.DropPartitions method call always keep throwing
-   * HiveException. The batch sizes should exponentially decreased based on the decaying factor and
+   * MetastoreException. The batch sizes should exponentially decreased based on the decaying factor and
    * ultimately give up when it reaches 0.
    * @throws Exception
@@ -325,6 +350,7 @@ public class TestMsckDropPartitionsInBatches {
    * Tests the maximum retry attempt is set to 2.
+   *
    * @throws Exception
@@ -334,6 +360,7 @@ public class TestMsckDropPartitionsInBatches {
    * Tests when max number of retries is set to 1.
+   *
    * @throws Exception
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/
index a2a0583..434d82a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/
@@ -17,7 +17,9 @@
 package org.apache.hadoop.hive.ql.metadata;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
@@ -29,11 +31,14 @@ import java.util.Map;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.CheckResult;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreChecker;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.mapred.TextInputFormat;
@@ -52,9 +57,11 @@ import;
 public class TestHiveMetaStoreChecker {
   private Hive hive;
+  private IMetaStoreClient msc;
   private FileSystem fs;
   private HiveMetaStoreChecker checker = null;
+  private final String catName = "hive";
   private final String dbName = "testhivemetastorechecker_db";
   private final String tableName = "testhivemetastorechecker_table";
@@ -69,7 +76,8 @@ public class TestHiveMetaStoreChecker {
     hive = Hive.get();
     hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 15);
     hive.getConf().set(HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION.varname, "throw");
-    checker = new HiveMetaStoreChecker(hive);
+    msc = new HiveMetaStoreClient(hive.getConf());
+    checker = new HiveMetaStoreChecker(msc, hive.getConf());
     partCols = new ArrayList<FieldSchema>();
     partCols.add(new FieldSchema(partDateName, serdeConstants.STRING_TYPE_NAME, ""));
@@ -92,11 +100,9 @@ public class TestHiveMetaStoreChecker {
   private void dropDbTable()  {
     // cleanup
     try {
-      hive.dropTable(dbName, tableName, true, true);
-      hive.dropDatabase(dbName, true, true, true);
-    } catch (NoSuchObjectException e) {
-      // ignore
-    } catch (HiveException e) {
+      msc.dropTable(catName, dbName, tableName, true, true);
+      msc.dropDatabase(catName, dbName, true, true, true);
+    } catch (TException e) {
       // ignore
@@ -108,28 +114,28 @@ public class TestHiveMetaStoreChecker {
-  public void testTableCheck() throws HiveException, MetaException,
-      IOException, TException, AlreadyExistsException {
+  public void testTableCheck() throws HiveException, IOException, TException, MetastoreException {
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, null, null, result);
+    checker.checkMetastore(catName, dbName, null, null, result);
     // we haven't added anything so should return an all ok
     assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
     // check table only, should not exist in ms
     result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     assertEquals(1, result.getTablesNotInMs().size());
     assertEquals(tableName, result.getTablesNotInMs().iterator().next());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
     Database db = new Database();
+    db.setCatalogName(catName);
-    hive.createDatabase(db);
+    msc.createDatabase(db);
     Table table = new Table(dbName, tableName);
@@ -142,19 +148,19 @@ public class TestHiveMetaStoreChecker {
     // now we've got a table, check that it works
     // first check all (1) tables
     result = new CheckResult();
-    checker.checkMetastore(dbName, null, null, result);
+    checker.checkMetastore(catName, dbName, null, null, result);
     assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
     // then let's check the one we know about
     result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
     // remove the table folder
     fs = table.getPath().getFileSystem(hive.getConf());
@@ -162,12 +168,12 @@ public class TestHiveMetaStoreChecker {
     // now this shouldn't find the path on the fs
     result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
-    assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());;
+    checker.checkMetastore(catName, dbName, tableName, null, result);
+    assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(1, result.getTablesNotOnFs().size());
     assertEquals(tableName, result.getTablesNotOnFs().iterator().next());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
     // put it back and one additional table
@@ -178,12 +184,12 @@ public class TestHiveMetaStoreChecker {
     // find the extra table
     result = new CheckResult();
-    checker.checkMetastore(dbName, null, null, result);
+    checker.checkMetastore(catName, dbName, null, null, result);
     assertEquals(1, result.getTablesNotInMs().size());
     assertEquals(fakeTable.getName(), Lists.newArrayList(result.getTablesNotInMs()).get(0));
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
     // create a new external table
     hive.dropTable(dbName, tableName);
@@ -192,11 +198,11 @@ public class TestHiveMetaStoreChecker {
     // should return all ok
     result = new CheckResult();
-    checker.checkMetastore(dbName, null, null, result);
+    checker.checkMetastore(catName, dbName, null, null, result);
     assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
@@ -205,7 +211,7 @@ public class TestHiveMetaStoreChecker {
   public void testAdditionalPartitionDirs()
-      throws HiveException, AlreadyExistsException, IOException {
+    throws HiveException, AlreadyExistsException, IOException, MetastoreException {
     Table table = createTestTable();
     List<Partition> partitions = hive.getPartitions(table);
     assertEquals(2, partitions.size());
@@ -216,16 +222,17 @@ public class TestHiveMetaStoreChecker {
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult> emptySet(), result.getPartitionsNotOnFs());
     //fakePart path partition is added since the defined partition keys are valid
     assertEquals(1, result.getPartitionsNotInMs().size());
-  @Test(expected = HiveException.class)
-  public void testInvalidPartitionKeyName() throws HiveException, AlreadyExistsException, IOException {
+  @Test(expected = MetastoreException.class)
+  public void testInvalidPartitionKeyName()
+    throws HiveException, AlreadyExistsException, IOException, MetastoreException {
     Table table = createTestTable();
     List<Partition> partitions = hive.getPartitions(table);
     assertEquals(2, partitions.size());
@@ -235,7 +242,7 @@ public class TestHiveMetaStoreChecker {
-    checker.checkMetastore(dbName, tableName, null, new CheckResult());
+    checker.checkMetastore(catName, dbName, tableName, null, new CheckResult());
@@ -244,9 +251,9 @@ public class TestHiveMetaStoreChecker {
   public void testSkipInvalidPartitionKeyName()
-      throws HiveException, AlreadyExistsException, IOException {
+    throws HiveException, AlreadyExistsException, IOException, MetastoreException {
     hive.getConf().set(HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION.varname, "skip");
-    checker = new HiveMetaStoreChecker(hive);
+    checker = new HiveMetaStoreChecker(msc, hive.getConf());
     Table table = createTestTable();
     List<Partition> partitions = hive.getPartitions(table);
     assertEquals(2, partitions.size());
@@ -258,18 +265,18 @@ public class TestHiveMetaStoreChecker {
     createPartitionsDirectoriesOnFS(table, 2);
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult> emptySet(), result.getPartitionsNotOnFs());
     // only 2 valid partitions should be added
     assertEquals(2, result.getPartitionsNotInMs().size());
-  private Table createTestTable() throws AlreadyExistsException, HiveException {
+  private Table createTestTable() throws HiveException, AlreadyExistsException {
     Database db = new Database();
-    hive.createDatabase(db);
+    hive.createDatabase(db, true);
     Table table = new Table(dbName, tableName);
@@ -289,17 +296,17 @@ public class TestHiveMetaStoreChecker {
-  public void testPartitionsCheck() throws HiveException, MetaException,
-      IOException, TException, AlreadyExistsException {
+  public void testPartitionsCheck() throws HiveException,
+    IOException, TException, MetastoreException {
     Table table = createTestTable();
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     // all is well
     assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
     List<Partition> partitions = hive.getPartitions(table);
     assertEquals(2, partitions.size());
@@ -313,7 +320,7 @@ public class TestHiveMetaStoreChecker {
     fs.delete(partToRemovePath, true);
     result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     // missing one partition on fs
     assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
@@ -322,17 +329,17 @@ public class TestHiveMetaStoreChecker {
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
     List<Map<String, String>> partsCopy = new ArrayList<Map<String, String>>();
     // check only the partition that exists, all should be well
     result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, partsCopy, result);
+    checker.checkMetastore(catName, dbName, tableName, partsCopy, result);
     assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs());
     // old test is moved to msck_repair_2.q
@@ -340,17 +347,17 @@ public class TestHiveMetaStoreChecker {
     hive.dropTable(dbName, tableName, true, true);
     result = new CheckResult();
-    checker.checkMetastore(dbName, null, null, result);
+    checker.checkMetastore(catName, dbName, null, null, result);
     assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotInMs()); //--0e
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotInMs()); //--0e
     System.err.println("Test completed - partition check");
-  public void testDataDeletion() throws HiveException, MetaException,
-      IOException, TException, AlreadyExistsException, NoSuchObjectException {
+  public void testDataDeletion() throws HiveException,
+    IOException, TException {
     Database db = new Database();
@@ -386,15 +393,15 @@ public class TestHiveMetaStoreChecker {
    * Test multi-threaded implementation of checker to find out missing partitions
-  public void testPartitionsNotInMs() throws HiveException, AlreadyExistsException, IOException {
+  public void testPartitionsNotInMs() throws HiveException, AlreadyExistsException, IOException, MetastoreException {
     Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
     // add 10 partitions on the filesystem
     createPartitionsDirectoriesOnFS(testTable, 10);
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     assertEquals(Collections.<String>emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String>emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String>emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult>emptySet(), result.getPartitionsNotOnFs());
     assertEquals(10, result.getPartitionsNotInMs().size());
@@ -403,17 +410,17 @@ public class TestHiveMetaStoreChecker {
   public void testSingleThreadedCheckMetastore()
-      throws HiveException, AlreadyExistsException, IOException {
+    throws HiveException, AlreadyExistsException, IOException, MetastoreException {
     // set num of threads to 0 so that single-threaded checkMetastore is called
     hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 0);
     Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
     // add 10 partitions on the filesystem
     createPartitionsDirectoriesOnFS(testTable, 10);
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult> emptySet(), result.getPartitionsNotOnFs());
     assertEquals(10, result.getPartitionsNotInMs().size());
@@ -426,7 +433,7 @@ public class TestHiveMetaStoreChecker {
   public void testSingleThreadedDeeplyNestedTables()
-      throws HiveException, AlreadyExistsException, IOException {
+    throws HiveException, AlreadyExistsException, IOException, MetastoreException {
     // set num of threads to 0 so that single-threaded checkMetastore is called
     hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 0);
     int poolSize = 2;
@@ -435,10 +442,10 @@ public class TestHiveMetaStoreChecker {
     // add 10 partitions on the filesystem
     createPartitionsDirectoriesOnFS(testTable, 10);
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult> emptySet(), result.getPartitionsNotOnFs());
     assertEquals(10, result.getPartitionsNotInMs().size());
@@ -451,7 +458,7 @@ public class TestHiveMetaStoreChecker {
   public void testDeeplyNestedPartitionedTables()
-      throws HiveException, AlreadyExistsException, IOException {
+    throws HiveException, AlreadyExistsException, IOException, MetastoreException {
     hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 2);
     int poolSize = 2;
     // create a deeply nested table which has more partition keys than the pool size
@@ -459,10 +466,10 @@ public class TestHiveMetaStoreChecker {
     // add 10 partitions on the filesystem
     createPartitionsDirectoriesOnFS(testTable, 10);
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult> emptySet(), result.getPartitionsNotOnFs());
     assertEquals(10, result.getPartitionsNotInMs().size());
@@ -487,20 +494,20 @@ public class TestHiveMetaStoreChecker {
     CheckResult result = new CheckResult();
     Exception exception = null;
     try {
-      checker.checkMetastore(dbName, tableName, null, result);
+      checker.checkMetastore(catName, dbName, tableName, null, result);
     } catch (Exception e) {
       exception = e;
-    assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException);
+    assertTrue("Expected MetastoreException", exception!=null && exception instanceof MetastoreException);
     createFile(sb.toString(), "dummyFile");
     result = new CheckResult();
     exception = null;
     try {
-      checker.checkMetastore(dbName, tableName, null, result);
+      checker.checkMetastore(catName, dbName, tableName, null, result);
     } catch (Exception e) {
       exception = e;
-    assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException);
+    assertTrue("Expected MetastoreException", exception!=null && exception instanceof MetastoreException);
@@ -511,14 +518,14 @@ public class TestHiveMetaStoreChecker {
    * @throws HiveException
    * @throws IOException
-  @Test(expected = HiveException.class)
+  @Test(expected = MetastoreException.class)
   public void testInvalidOrderForPartitionKeysOnFS()
-      throws AlreadyExistsException, HiveException, IOException {
+    throws AlreadyExistsException, HiveException, IOException, MetastoreException {
     Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
     // add 10 partitions on the filesystem
     createInvalidPartitionDirsOnFS(testTable, 10);
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
@@ -527,19 +534,19 @@ public class TestHiveMetaStoreChecker {
   public void testSkipInvalidOrderForPartitionKeysOnFS()
-      throws AlreadyExistsException, HiveException, IOException {
+    throws AlreadyExistsException, HiveException, IOException, MetastoreException {
     hive.getConf().set(HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION.varname, "skip");
-    checker = new HiveMetaStoreChecker(hive);
+    checker = new HiveMetaStoreChecker(msc, hive.getConf());
     Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0);
     // add 10 partitions on the filesystem
     createInvalidPartitionDirsOnFS(testTable, 2);
     // add 10 partitions on the filesystem
     createPartitionsDirectoriesOnFS(testTable, 2);
     CheckResult result = new CheckResult();
-    checker.checkMetastore(dbName, tableName, null, result);
+    checker.checkMetastore(catName, dbName, tableName, null, result);
     assertEquals(Collections.<String> emptySet(), result.getTablesNotInMs());
     assertEquals(Collections.<String> emptySet(), result.getTablesNotOnFs());
-    assertEquals(Collections.<String> emptySet(), result.getPartitionsNotOnFs());
+    assertEquals(Collections.<CheckResult.PartitionResult> emptySet(), result.getPartitionsNotOnFs());
     // only 2 valid partitions should be added
     assertEquals(2, result.getPartitionsNotInMs().size());
@@ -565,20 +572,20 @@ public class TestHiveMetaStoreChecker {
     CheckResult result = new CheckResult();
     Exception exception = null;
     try {
-      checker.checkMetastore(dbName, tableName, null, result);
+      checker.checkMetastore(catName, dbName, tableName, null, result);
     } catch (Exception e) {
       exception = e;
-    assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException);
+    assertTrue("Expected MetastoreException", exception!=null && exception instanceof MetastoreException);
     createFile(sb.toString(), "dummyFile");
     result = new CheckResult();
     exception = null;
     try {
-      checker.checkMetastore(dbName, tableName, null, result);
+      checker.checkMetastore(catName, dbName, tableName, null, result);
     } catch (Exception e) {
       exception = e;
-    assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException);
+    assertTrue("Expected MetastoreException", exception!=null && exception instanceof MetastoreException);
    * Creates a test partitioned table with the required level of nested partitions and number of
@@ -597,7 +604,7 @@ public class TestHiveMetaStoreChecker {
       int valuesPerPartition) throws AlreadyExistsException, HiveException {
     Database db = new Database();
-    hive.createDatabase(db);
+    hive.createDatabase(db, true);
     Table table = new Table(dbName, tableName);
@@ -611,7 +618,7 @@ public class TestHiveMetaStoreChecker {
     // create table
-    hive.createTable(table);
+    hive.createTable(table, true);
     table = hive.getTable(dbName, tableName);
     if (valuesPerPartition == 0) {
       return table;
diff --git a/ql/src/test/queries/clientpositive/msck_repair_acid.q b/ql/src/test/queries/clientpositive/msck_repair_acid.q
new file mode 100644
index 0000000..369095d
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/msck_repair_acid.q
@@ -0,0 +1,34 @@
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+DROP TABLE IF EXISTS repairtable_n6;
+CREATE TABLE repairtable_n6(col STRING) PARTITIONED BY (p1 STRING, p2 STRING) STORED AS ORC tblproperties ("transactional"="true", "transactional_properties"="insert_only");
+MSCK TABLE repairtable_n6;
+show partitions repairtable_n6;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n6/p1=a/p2=b/;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n6/p1=c/p2=d/;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n6/p1=a/p2=b/datafile;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n6/p1=c/p2=d/datafile;
+EXPLAIN LOCKS MSCK REPAIR TABLE default.repairtable_n6;
+MSCK REPAIR TABLE default.repairtable_n6;
+show partitions default.repairtable_n6;
+set hive.mapred.mode=strict;
+dfs -rmr ${system:test.warehouse.dir}/repairtable_n6/p1=c;
+MSCK REPAIR TABLE default.repairtable_n6 DROP PARTITIONS;
+show partitions default.repairtable_n6;
+DROP TABLE default.repairtable_n6;
diff --git a/ql/src/test/queries/clientpositive/partition_discovery.q b/ql/src/test/queries/clientpositive/partition_discovery.q
new file mode 100644
index 0000000..2f0ff87
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/partition_discovery.q
@@ -0,0 +1,77 @@
+DROP TABLE IF EXISTS repairtable_n7;
+DROP TABLE IF EXISTS repairtable_n8;
+DROP TABLE IF EXISTS repairtable_n9;
+DROP TABLE IF EXISTS repairtable_n10;
+LOCATION '${system:test.warehouse.dir}/repairtable_n7';
+describe formatted repairtable_n7;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n7/p1=a/p2=b/;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n7/p1=c/p2=d/;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n7/p1=a/p2=b/datafile;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n7/p1=c/p2=d/datafile;
+MSCK REPAIR TABLE default.repairtable_n7;
+show partitions default.repairtable_n7;
+CREATE EXTERNAL TABLE repairtable_n8 LIKE repairtable_n7
+LOCATION '${system:test.warehouse.dir}/repairtable_n8';
+describe formatted repairtable_n8;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n8/p1=a/p2=b/;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n8/p1=c/p2=d/;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n8/p1=a/p2=b/datafile;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n8/p1=c/p2=d/datafile;
+MSCK REPAIR TABLE default.repairtable_n8;
+show partitions default.repairtable_n8;
+LOCATION '${system:test.warehouse.dir}/repairtable_n9' tblproperties ("partition.retention.period"="10s");
+describe formatted repairtable_n9;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n9/p1=a/p2=b/;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n9/p1=c/p2=d/;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n9/p1=a/p2=b/datafile;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n9/p1=c/p2=d/datafile;
+MSCK REPAIR TABLE default.repairtable_n9;
+show partitions default.repairtable_n9;
+!sleep 12;
+-- msck does not drop partitions, so this still should be no-op
+MSCK REPAIR TABLE default.repairtable_n9;
+show partitions default.repairtable_n9;
+-- this will drop old partitions
+MSCK REPAIR TABLE default.repairtable_n9 SYNC PARTITIONS;
+show partitions default.repairtable_n9;
+describe formatted repairtable_n10;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n10/p1=a/p2=b/;
+dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable_n10/p1=c/p2=d/;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n10/p1=a/p2=b/datafile;
+dfs -touchz ${system:test.warehouse.dir}/repairtable_n10/p1=c/p2=d/datafile;
+!sleep 12;
+MSCK REPAIR TABLE default.repairtable_n10;
+show partitions default.repairtable_n10;
+DROP TABLE default.repairtable_n7;
+DROP TABLE default.repairtable_n8;
+DROP TABLE default.repairtable_n9;
+DROP TABLE default.repairtable_n10;
diff --git a/ql/src/test/results/clientpositive/create_like.q.out b/ql/src/test/results/clientpositive/create_like.q.out
index f4a5ed5..6d4e14a 100644
--- a/ql/src/test/results/clientpositive/create_like.q.out
+++ b/ql/src/test/results/clientpositive/create_like.q.out
@@ -118,6 +118,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\"}}
 	EXTERNAL            	TRUE                
+	discover.partitions 	true                
 	numFiles            	0                   
 	numRows             	0                   
 	rawDataSize         	0           
diff --git a/ql/src/test/results/clientpositive/create_like_view.q.out b/ql/src/test/results/clientpositive/create_like_view.q.out
index 870f280..7e33e50 100644
--- a/ql/src/test/results/clientpositive/create_like_view.q.out
+++ b/ql/src/test/results/clientpositive/create_like_view.q.out
@@ -172,6 +172,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"a\":\"true\",\"b\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	0                   
 	numRows             	0                   
 	rawDataSize         	0           
diff --git a/ql/src/test/results/clientpositive/default_file_format.q.out b/ql/src/test/results/clientpositive/default_file_format.q.out
index 0adf5ae..beef419 100644
--- a/ql/src/test/results/clientpositive/default_file_format.q.out
+++ b/ql/src/test/results/clientpositive/default_file_format.q.out
@@ -172,6 +172,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 #### A masked pattern was here ####
 # Storage Information	 	 
@@ -236,6 +237,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	0                   
 	totalSize           	0                   
 #### A masked pattern was here ####
@@ -472,6 +474,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	0                   
 	totalSize           	0                   
 #### A masked pattern was here ####
@@ -538,6 +541,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	0                   
 	totalSize           	0                   
 #### A masked pattern was here ####
diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
index 883994c..14522fb 100644
--- a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
@@ -355,6 +355,7 @@ STAGE PLANS:
                     columns __time,page,user,language,added,deleted
                     columns.types timestamp:string:string:string:int:int
+                    discover.partitions true
                     druid.datasource default.druid_kafka_test
                     druid.fieldNames language,user
                     druid.fieldTypes string,string
@@ -396,6 +397,7 @@ STAGE PLANS:
                       columns __time,page,user,language,added,deleted
                       columns.types timestamp:string:string:string:int:int
+                      discover.partitions true
                       druid.datasource default.druid_kafka_test
                       druid.fieldNames language,user
                       druid.fieldTypes string,string
diff --git a/ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out b/ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out
index 9c9af44..b07ed52 100644
--- a/ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out
@@ -245,6 +245,7 @@ STAGE PLANS:
                     columns __time,cstring1,cstring2,cdouble,cfloat,ctinyint,csmallint,cint,cbigint,cboolean1,cboolean2
                     columns.types timestamp with local time zone:string:string:double:float:tinyint:smallint:int:bigint:boolean:boolean
+                    discover.partitions true
                     druid.datasource default.druid_table_alltypesorc
                     druid.fieldNames vc
                     druid.fieldTypes int
@@ -277,6 +278,7 @@ STAGE PLANS:
                       columns __time,cstring1,cstring2,cdouble,cfloat,ctinyint,csmallint,cint,cbigint,cboolean1,cboolean2
                       columns.types timestamp with local time zone:string:string:double:float:tinyint:smallint:int:bigint:boolean:boolean
+                      discover.partitions true
                       druid.datasource default.druid_table_alltypesorc
                       druid.fieldNames vc
                       druid.fieldTypes int
diff --git a/ql/src/test/results/clientpositive/druid_topn.q.out b/ql/src/test/results/clientpositive/druid_topn.q.out
index 179902a..755e977 100644
--- a/ql/src/test/results/clientpositive/druid_topn.q.out
+++ b/ql/src/test/results/clientpositive/druid_topn.q.out
@@ -42,6 +42,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"__time\":\"true\",\"added\":\"true\",\"anonymous\":\"true\",\"count\":\"true\",\"deleted\":\"true\",\"delta\":\"true\",\"language\":\"true\",\"namespace\":\"true\",\"newpage\":\"true\",\"page\":\"true\",\"robot\":\"true\",\"unpatrolled\":\"true\",\"user\":\"true\",\"variation\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	druid.datasource    	wikipedia           
 	numFiles            	0                   
 	numRows             	0           
diff --git a/ql/src/test/results/clientpositive/explain_locks.q.out b/ql/src/test/results/clientpositive/explain_locks.q.out
index ed7f1e8..3183533 100644
--- a/ql/src/test/results/clientpositive/explain_locks.q.out
+++ b/ql/src/test/results/clientpositive/explain_locks.q.out
@@ -2,6 +2,7 @@ PREHOOK: query: explain locks drop table test_explain_locks
 POSTHOOK: query: explain locks drop table test_explain_locks
 PREHOOK: query: explain locks create table test_explain_locks (a int, b int)
 PREHOOK: Output: database:default
diff --git a/ql/src/test/results/clientpositive/llap/external_table_purge.q.out b/ql/src/test/results/clientpositive/llap/external_table_purge.q.out
index 24c778e..4e2f6a3 100644
--- a/ql/src/test/results/clientpositive/llap/external_table_purge.q.out
+++ b/ql/src/test/results/clientpositive/llap/external_table_purge.q.out
@@ -119,6 +119,7 @@ LOCATION
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 test.comment=Table should have data
@@ -168,6 +169,7 @@ LOCATION
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 test.comment=Table should have data
@@ -451,6 +453,7 @@ LOCATION
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: alter table etp_2 add partition (p1='part1')
@@ -520,6 +523,7 @@ LOCATION
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: alter table etp_2 add partition (p1='part1')
diff --git a/ql/src/test/results/clientpositive/llap/mm_exim.q.out b/ql/src/test/results/clientpositive/llap/mm_exim.q.out
index ee6cf06..868d107 100644
--- a/ql/src/test/results/clientpositive/llap/mm_exim.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_exim.q.out
@@ -643,6 +643,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	3                   
 	numRows             	6                   
 	rawDataSize         	37          
diff --git a/ql/src/test/results/clientpositive/llap/strict_managed_tables2.q.out b/ql/src/test/results/clientpositive/llap/strict_managed_tables2.q.out
index f3b6152..348266c 100644
--- a/ql/src/test/results/clientpositive/llap/strict_managed_tables2.q.out
+++ b/ql/src/test/results/clientpositive/llap/strict_managed_tables2.q.out
@@ -49,6 +49,7 @@ LOCATION
 #### A masked pattern was here ####
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: create table smt2_tab2 (c1 string, c2 string)
@@ -137,6 +138,7 @@ LOCATION
 #### A masked pattern was here ####
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: create table smt2_tab5 (c1 string, c2 string)
diff --git a/ql/src/test/results/clientpositive/llap/table_nonprintable.q.out b/ql/src/test/results/clientpositive/llap/table_nonprintable.q.out
index 8221b8c..9dc8710 100644
--- a/ql/src/test/results/clientpositive/llap/table_nonprintable.q.out
+++ b/ql/src/test/results/clientpositive/llap/table_nonprintable.q.out
@@ -26,8 +26,8 @@ POSTHOOK: query: msck repair table table_external
 POSTHOOK: Output: default@table_external
 Partitions not in metastore:	table_external:day=¢Bar
-Repair: Cannot add partition table_external:day=Foo due to invalid characters in the name
 #### A masked pattern was here ####
+Repair: Cannot add partition table_external:day=Foo due to invalid characters in the name
 Found 2 items
 drwxr-xr-x   - ### USER ### ### GROUP ###          0 ### HDFS DATE ### hdfs://### HDFS PATH ###Foo
 drwxr-xr-x   - ### USER ### ### GROUP ###          0 ### HDFS DATE ### hdfs://### HDFS PATH ###¢Bar
diff --git a/ql/src/test/results/clientpositive/llap/whroot_external1.q.out b/ql/src/test/results/clientpositive/llap/whroot_external1.q.out
index cac158c..4333bf4 100644
--- a/ql/src/test/results/clientpositive/llap/whroot_external1.q.out
+++ b/ql/src/test/results/clientpositive/llap/whroot_external1.q.out
@@ -72,6 +72,7 @@ LOCATION
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: insert into table wre1_ext1 select * from src where key < 5
@@ -157,6 +158,7 @@ LOCATION
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: insert into table wre1_ext2 select * from src where key < 5
@@ -246,6 +248,7 @@ LOCATION
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: insert into table wre1_db.wre1_ext3 select * from src where key < 5
@@ -331,6 +334,7 @@ LOCATION
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: insert into table wre1_db.wre1_ext4 select * from src where key < 5
@@ -413,6 +417,7 @@ OUTPUTFORMAT
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: insert into table wre1_ext5 select * from src where key < 5
@@ -495,6 +500,7 @@ OUTPUTFORMAT
   'hdfs://### HDFS PATH ###'
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: insert into table wre1_db.wre1_ext6 select * from src where key < 5
diff --git a/ql/src/test/results/clientpositive/msck_repair_acid.q.out b/ql/src/test/results/clientpositive/msck_repair_acid.q.out
new file mode 100644
index 0000000..902a4b7
--- /dev/null
+++ b/ql/src/test/results/clientpositive/msck_repair_acid.q.out
@@ -0,0 +1,88 @@
+PREHOOK: query: DROP TABLE IF EXISTS repairtable_n6
+POSTHOOK: query: DROP TABLE IF EXISTS repairtable_n6
+PREHOOK: query: CREATE TABLE repairtable_n6(col STRING) PARTITIONED BY (p1 STRING, p2 STRING) STORED AS ORC tblproperties ("transactional"="true", "transactional_properties"="insert_only")
+PREHOOK: Output: database:default
+PREHOOK: Output: default@repairtable_n6
+POSTHOOK: query: CREATE TABLE repairtable_n6(col STRING) PARTITIONED BY (p1 STRING, p2 STRING) STORED AS ORC tblproperties ("transactional"="true", "transactional_properties"="insert_only")
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@repairtable_n6
+PREHOOK: query: EXPLAIN LOCKS MSCK TABLE repairtable_n6
+PREHOOK: Output: default@repairtable_n6
+POSTHOOK: Output: default@repairtable_n6
+default.repairtable_n6 -> SHARED_READ
+PREHOOK: query: MSCK TABLE repairtable_n6
+PREHOOK: Output: default@repairtable_n6
+POSTHOOK: query: MSCK TABLE repairtable_n6
+POSTHOOK: Output: default@repairtable_n6
+PREHOOK: query: show partitions repairtable_n6
+PREHOOK: Input: default@repairtable_n6
+POSTHOOK: query: show partitions repairtable_n6
+POSTHOOK: Input: default@repairtable_n6
+PREHOOK: query: EXPLAIN LOCKS MSCK REPAIR TABLE default.repairtable_n6
+PREHOOK: Output: default@repairtable_n6
+POSTHOOK: query: EXPLAIN LOCKS MSCK REPAIR TABLE default.repairtable_n6
+POSTHOOK: Output: default@repairtable_n6
+default.repairtable_n6 -> EXCLUSIVE
+PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n6
+PREHOOK: Output: default@repairtable_n6
+POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n6
+POSTHOOK: Output: default@repairtable_n6
+Partitions not in metastore:	repairtable_n6:p1=a/p2=b	repairtable_n6:p1=c/p2=d
+#### A masked pattern was here ####
+PREHOOK: query: show partitions default.repairtable_n6
+PREHOOK: Input: default@repairtable_n6
+POSTHOOK: query: show partitions default.repairtable_n6
+POSTHOOK: Input: default@repairtable_n6
+#### A masked pattern was here ####
+PREHOOK: Output: default@repairtable_n6
+POSTHOOK: Output: default@repairtable_n6
+default.repairtable_n6 -> EXCLUSIVE
+PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n6 DROP PARTITIONS
+PREHOOK: Output: default@repairtable_n6
+POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n6 DROP PARTITIONS
+POSTHOOK: Output: default@repairtable_n6
+Partitions missing from filesystem:	repairtable_n6:p1=c/p2=d
+Repair: Dropped partition from metastore hive.default.repairtable_n6:p1=c/p2=d
+PREHOOK: query: show partitions default.repairtable_n6
+PREHOOK: Input: default@repairtable_n6
+POSTHOOK: query: show partitions default.repairtable_n6
+POSTHOOK: Input: default@repairtable_n6
+PREHOOK: query: DROP TABLE default.repairtable_n6
+PREHOOK: Input: default@repairtable_n6
+PREHOOK: Output: default@repairtable_n6
+POSTHOOK: query: DROP TABLE default.repairtable_n6
+POSTHOOK: Input: default@repairtable_n6
+POSTHOOK: Output: default@repairtable_n6
diff --git a/ql/src/test/results/clientpositive/msck_repair_drop.q.out b/ql/src/test/results/clientpositive/msck_repair_drop.q.out
index 2456734..27b718c 100644
--- a/ql/src/test/results/clientpositive/msck_repair_drop.q.out
+++ b/ql/src/test/results/clientpositive/msck_repair_drop.q.out
@@ -58,16 +58,16 @@ POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n1 DROP PARTITIONS
 POSTHOOK: Output: default@repairtable_n1
 Partitions missing from filesystem:	repairtable_n1:p1=2/p2=21	repairtable_n1:p1=2/p2=210	repairtable_n1:p1=2/p2=22	repairtable_n1:p1=2/p2=23	repairtable_n1:p1=2/p2=24	repairtable_n1:p1=2/p2=25	repairtable_n1:p1=2/p2=26	repairtable_n1:p1=2/p2=27	repairtable_n1:p1=2/p2=28	repairtable_n1:p1=2/p2=29
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=21
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=210
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=22
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=23
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=24
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=25
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=26
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=27
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=28
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=29
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=21
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=210
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=22
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=23
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=24
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=25
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=26
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=27
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=28
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=29
 PREHOOK: query: show partitions default.repairtable_n1
 PREHOOK: Input: default@repairtable_n1
@@ -124,16 +124,16 @@ POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n1 DROP PARTITIONS
 POSTHOOK: Output: default@repairtable_n1
 Partitions missing from filesystem:	repairtable_n1:p1=2/p2=21	repairtable_n1:p1=2/p2=210	repairtable_n1:p1=2/p2=22	repairtable_n1:p1=2/p2=23	repairtable_n1:p1=2/p2=24	repairtable_n1:p1=2/p2=25	repairtable_n1:p1=2/p2=26	repairtable_n1:p1=2/p2=27	repairtable_n1:p1=2/p2=28	repairtable_n1:p1=2/p2=29
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=21
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=210
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=22
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=23
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=24
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=25
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=26
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=27
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=28
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=29
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=21
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=210
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=22
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=23
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=24
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=25
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=26
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=27
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=28
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=29
 PREHOOK: query: show partitions default.repairtable_n1
 PREHOOK: Input: default@repairtable_n1
@@ -190,16 +190,16 @@ POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n1 DROP PARTITIONS
 POSTHOOK: Output: default@repairtable_n1
 Partitions missing from filesystem:	repairtable_n1:p1=2/p2=21	repairtable_n1:p1=2/p2=210	repairtable_n1:p1=2/p2=22	repairtable_n1:p1=2/p2=23	repairtable_n1:p1=2/p2=24	repairtable_n1:p1=2/p2=25	repairtable_n1:p1=2/p2=26	repairtable_n1:p1=2/p2=27	repairtable_n1:p1=2/p2=28	repairtable_n1:p1=2/p2=29
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=21
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=210
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=22
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=23
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=24
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=25
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=26
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=27
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=28
-Repair: Dropped partition from metastore default.repairtable_n1:p1=2/p2=29
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=21
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=210
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=22
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=23
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=24
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=25
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=26
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=27
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=28
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=2/p2=29
 PREHOOK: query: show partitions default.repairtable_n1
 PREHOOK: Input: default@repairtable_n1
@@ -279,8 +279,8 @@ POSTHOOK: type: MSCK
 POSTHOOK: Output: default@repairtable_n1
 Partitions not in metastore:	repairtable_n1:p1=5/p2=51	repairtable_n1:p1=5/p2=52
 Partitions missing from filesystem:	repairtable_n1:p1=3/p2=31	repairtable_n1:p1=3/p2=32
-Repair: Dropped partition from metastore default.repairtable_n1:p1=3/p2=31
-Repair: Dropped partition from metastore default.repairtable_n1:p1=3/p2=32
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=3/p2=31
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=3/p2=32
 PREHOOK: query: show partitions default.repairtable_n1
 PREHOOK: Input: default@repairtable_n1
@@ -309,8 +309,8 @@ POSTHOOK: Output: default@repairtable_n1
 Partitions not in metastore:	repairtable_n1:p1=5/p2=51	repairtable_n1:p1=5/p2=52
 Partitions missing from filesystem:	repairtable_n1:p1=4/p2=41	repairtable_n1:p1=4/p2=42
 #### A masked pattern was here ####
-Repair: Dropped partition from metastore default.repairtable_n1:p1=4/p2=41
-Repair: Dropped partition from metastore default.repairtable_n1:p1=4/p2=42
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=4/p2=41
+Repair: Dropped partition from metastore hive.default.repairtable_n1:p1=4/p2=42
 PREHOOK: query: show partitions default.repairtable_n1
 PREHOOK: Input: default@repairtable_n1
diff --git a/ql/src/test/results/clientpositive/partition_discovery.q.out b/ql/src/test/results/clientpositive/partition_discovery.q.out
new file mode 100644
index 0000000..9075136
--- /dev/null
+++ b/ql/src/test/results/clientpositive/partition_discovery.q.out
@@ -0,0 +1,357 @@
+PREHOOK: query: DROP TABLE IF EXISTS repairtable_n7
+POSTHOOK: query: DROP TABLE IF EXISTS repairtable_n7
+PREHOOK: query: DROP TABLE IF EXISTS repairtable_n8
+POSTHOOK: query: DROP TABLE IF EXISTS repairtable_n8
+PREHOOK: query: DROP TABLE IF EXISTS repairtable_n9
+POSTHOOK: query: DROP TABLE IF EXISTS repairtable_n9
+PREHOOK: query: DROP TABLE IF EXISTS repairtable_n10
+POSTHOOK: query: DROP TABLE IF EXISTS repairtable_n10
+#### A masked pattern was here ####
+#### A masked pattern was here ####
+PREHOOK: Output: database:default
+PREHOOK: Output: default@repairtable_n7
+#### A masked pattern was here ####
+#### A masked pattern was here ####
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@repairtable_n7
+PREHOOK: query: describe formatted repairtable_n7
+PREHOOK: Input: default@repairtable_n7
+POSTHOOK: query: describe formatted repairtable_n7
+POSTHOOK: Input: default@repairtable_n7
+# col_name            	data_type           	comment             
+col                 	string              	                    
+# Partition Information	 	 
+# col_name            	data_type           	comment             
+p1                  	string              	                    
+p2                  	string              	                    
+# Detailed Table Information	 	 
+Database:           	default             	 
+#### A masked pattern was here ####
+Retention:          	0                   	 
+#### A masked pattern was here ####
+Table Type:         	EXTERNAL_TABLE      	 
+Table Parameters:	 	 
+	EXTERNAL            	TRUE                
+	bucketing_version   	2                   
+	discover.partitions 	true                
+	numFiles            	0                   
+	numPartitions       	0                   
+	numRows             	0                   
+	rawDataSize         	0                   
+	totalSize           	0                   
+#### A masked pattern was here ####
+# Storage Information	 	 
+SerDe Library:      	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe	 
+InputFormat:        	org.apache.hadoop.mapred.TextInputFormat	 
+Compressed:         	No                  	 
+Num Buckets:        	-1                  	 
+Bucket Columns:     	[]                  	 
+Sort Columns:       	[]                  	 
+Storage Desc Params:	 	 
+	serialization.format	1                   
+PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n7
+PREHOOK: Output: default@repairtable_n7
+POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n7
+POSTHOOK: Output: default@repairtable_n7
+Partitions not in metastore:	repairtable_n7:p1=a/p2=b	repairtable_n7:p1=c/p2=d
+#### A masked pattern was here ####
+PREHOOK: query: show partitions default.repairtable_n7
+PREHOOK: Input: default@repairtable_n7
+POSTHOOK: query: show partitions default.repairtable_n7
+POSTHOOK: Input: default@repairtable_n7
+PREHOOK: query: CREATE EXTERNAL TABLE repairtable_n8 LIKE repairtable_n7
+#### A masked pattern was here ####
+#### A masked pattern was here ####
+PREHOOK: Output: database:default
+PREHOOK: Output: default@repairtable_n8
+POSTHOOK: query: CREATE EXTERNAL TABLE repairtable_n8 LIKE repairtable_n7
+#### A masked pattern was here ####
+#### A masked pattern was here ####
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@repairtable_n8
+PREHOOK: query: describe formatted repairtable_n8
+PREHOOK: Input: default@repairtable_n8
+POSTHOOK: query: describe formatted repairtable_n8
+POSTHOOK: Input: default@repairtable_n8
+# col_name            	data_type           	comment             
+col                 	string              	                    
+# Partition Information	 	 
+# col_name            	data_type           	comment             
+p1                  	string              	                    
+p2                  	string              	                    
+# Detailed Table Information	 	 
+Database:           	default             	 
+#### A masked pattern was here ####
+Retention:          	0                   	 
+#### A masked pattern was here ####
+Table Type:         	EXTERNAL_TABLE      	 
+Table Parameters:	 	 
+	EXTERNAL            	TRUE                
+	discover.partitions 	true                
+	numFiles            	0                   
+	numPartitions       	0                   
+	numRows             	0                   
+	rawDataSize         	0                   
+	totalSize           	0                   
+#### A masked pattern was here ####
+# Storage Information	 	 
+SerDe Library:      	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe	 
+InputFormat:        	org.apache.hadoop.mapred.TextInputFormat	 
+Compressed:         	No                  	 
+Num Buckets:        	-1                  	 
+Bucket Columns:     	[]                  	 
+Sort Columns:       	[]                  	 
+Storage Desc Params:	 	 
+	serialization.format	1                   
+PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n8
+PREHOOK: Output: default@repairtable_n8
+POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n8
+POSTHOOK: Output: default@repairtable_n8
+Partitions not in metastore:	repairtable_n8:p1=a/p2=b	repairtable_n8:p1=c/p2=d
+#### A masked pattern was here ####
+PREHOOK: query: show partitions default.repairtable_n8
+PREHOOK: Input: default@repairtable_n8
+POSTHOOK: query: show partitions default.repairtable_n8
+POSTHOOK: Input: default@repairtable_n8
+#### A masked pattern was here ####
+#### A masked pattern was here ####
+PREHOOK: Output: database:default
+PREHOOK: Output: default@repairtable_n9
+#### A masked pattern was here ####
+#### A masked pattern was here ####
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@repairtable_n9
+PREHOOK: query: describe formatted repairtable_n9
+PREHOOK: Input: default@repairtable_n9
+POSTHOOK: query: describe formatted repairtable_n9
+POSTHOOK: Input: default@repairtable_n9
+# col_name            	data_type           	comment             
+col                 	string              	                    
+# Partition Information	 	 
+# col_name            	data_type           	comment             
+p1                  	string              	                    
+p2                  	string              	                    
+# Detailed Table Information	 	 
+Database:           	default             	 
+#### A masked pattern was here ####
+Retention:          	0                   	 
+#### A masked pattern was here ####
+Table Type:         	EXTERNAL_TABLE      	 
+Table Parameters:	 	 
+	EXTERNAL            	TRUE                
+	bucketing_version   	2                   
+	discover.partitions 	true                
+	numFiles            	0                   
+	numPartitions       	0                   
+	numRows             	0                   
+	partition.retention.period	10s                 
+	rawDataSize         	0                   
+	totalSize           	0                   
+#### A masked pattern was here ####
+# Storage Information	 	 
+SerDe Library:      	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe	 
+InputFormat:        	org.apache.hadoop.mapred.TextInputFormat	 
+Compressed:         	No                  	 
+Num Buckets:        	-1                  	 
+Bucket Columns:     	[]                  	 
+Sort Columns:       	[]                  	 
+Storage Desc Params:	 	 
+	serialization.format	1                   
+PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n9
+PREHOOK: Output: default@repairtable_n9
+POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n9
+POSTHOOK: Output: default@repairtable_n9
+Partitions not in metastore:	repairtable_n9:p1=a/p2=b	repairtable_n9:p1=c/p2=d
+#### A masked pattern was here ####
+PREHOOK: query: show partitions default.repairtable_n9
+PREHOOK: Input: default@repairtable_n9
+POSTHOOK: query: show partitions default.repairtable_n9
+POSTHOOK: Input: default@repairtable_n9
+PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n9
+PREHOOK: Output: default@repairtable_n9
+POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n9
+POSTHOOK: Output: default@repairtable_n9
+Expired partitions (retention period: 10s) :	repairtable_n9:p1=a/p2=b	repairtable_n9:p1=c/p2=d
+PREHOOK: query: show partitions default.repairtable_n9
+PREHOOK: Input: default@repairtable_n9
+POSTHOOK: query: show partitions default.repairtable_n9
+POSTHOOK: Input: default@repairtable_n9
+PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n9 SYNC PARTITIONS
+PREHOOK: Output: default@repairtable_n9
+POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n9 SYNC PARTITIONS
+POSTHOOK: Output: default@repairtable_n9
+Expired partitions (retention period: 10s) :	repairtable_n9:p1=a/p2=b	repairtable_n9:p1=c/p2=d
+Repair: Dropped partition from metastore hive.default.repairtable_n9:p1=a/p2=b
+Repair: Dropped partition from metastore hive.default.repairtable_n9:p1=c/p2=d
+PREHOOK: query: show partitions default.repairtable_n9
+PREHOOK: Input: default@repairtable_n9
+POSTHOOK: query: show partitions default.repairtable_n9
+POSTHOOK: Input: default@repairtable_n9
+PREHOOK: Input: default@repairtable_n9
+PREHOOK: Output: database:default
+PREHOOK: Output: default@repairtable_n10
+PREHOOK: Output: default@repairtable_n10
+POSTHOOK: Input: default@repairtable_n9
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@repairtable_n10
+PREHOOK: query: describe formatted repairtable_n10
+PREHOOK: Input: default@repairtable_n10
+POSTHOOK: query: describe formatted repairtable_n10
+POSTHOOK: Input: default@repairtable_n10
+# col_name            	data_type           	comment             
+col                 	string              	                    
+# Partition Information	 	 
+# col_name            	data_type           	comment             
+p1                  	string              	                    
+p2                  	string              	                    
+# Detailed Table Information	 	 
+Database:           	default             	 
+#### A masked pattern was here ####
+Retention:          	0                   	 
+#### A masked pattern was here ####
+Table Type:         	EXTERNAL_TABLE      	 
+Table Parameters:	 	 
+	EXTERNAL            	TRUE                
+	bucketing_version   	2                   
+	discover.partitions 	true                
+	numFiles            	0                   
+	numPartitions       	0                   
+	numRows             	0                   
+	rawDataSize         	0                   
+	totalSize           	0                   
+#### A masked pattern was here ####
+# Storage Information	 	 
+SerDe Library:	 
+Compressed:         	No                  	 
+Num Buckets:        	-1                  	 
+Bucket Columns:     	[]                  	 
+Sort Columns:       	[]                  	 
+Storage Desc Params:	 	 
+	serialization.format	1                   
+PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n10
+PREHOOK: Output: default@repairtable_n10
+POSTHOOK: query: MSCK REPAIR TABLE default.repairtable_n10
+POSTHOOK: Output: default@repairtable_n10
+Partitions not in metastore:	repairtable_n10:p1=a/p2=b	repairtable_n10:p1=c/p2=d
+#### A masked pattern was here ####
+PREHOOK: query: show partitions default.repairtable_n10
+PREHOOK: Input: default@repairtable_n10
+POSTHOOK: query: show partitions default.repairtable_n10
+POSTHOOK: Input: default@repairtable_n10
+PREHOOK: query: DROP TABLE default.repairtable_n7
+PREHOOK: Input: default@repairtable_n7
+PREHOOK: Output: default@repairtable_n7
+POSTHOOK: query: DROP TABLE default.repairtable_n7
+POSTHOOK: Input: default@repairtable_n7
+POSTHOOK: Output: default@repairtable_n7
+PREHOOK: query: DROP TABLE default.repairtable_n8
+PREHOOK: Input: default@repairtable_n8
+PREHOOK: Output: default@repairtable_n8
+POSTHOOK: query: DROP TABLE default.repairtable_n8
+POSTHOOK: Input: default@repairtable_n8
+POSTHOOK: Output: default@repairtable_n8
+PREHOOK: query: DROP TABLE default.repairtable_n9
+PREHOOK: Input: default@repairtable_n9
+PREHOOK: Output: default@repairtable_n9
+POSTHOOK: query: DROP TABLE default.repairtable_n9
+POSTHOOK: Input: default@repairtable_n9
+POSTHOOK: Output: default@repairtable_n9
+PREHOOK: query: DROP TABLE default.repairtable_n10
+PREHOOK: Input: default@repairtable_n10
+PREHOOK: Output: default@repairtable_n10
+POSTHOOK: query: DROP TABLE default.repairtable_n10
+POSTHOOK: Input: default@repairtable_n10
+POSTHOOK: Output: default@repairtable_n10

[4/4] hive git commit: HIVE-20707: Automatic partition management (Prasanth Jayachandran reviewed by Jason Dere)

Posted by
HIVE-20707: Automatic partition management (Prasanth Jayachandran reviewed by Jason Dere)


Branch: refs/heads/master
Commit: 64bea0354fba2947e4bc0318728f5419e5d763b9
Parents: 54bba9c
Author: Prasanth Jayachandran <>
Authored: Mon Oct 29 15:07:49 2018 -0700
Committer: Prasanth Jayachandran <>
Committed: Mon Oct 29 15:07:49 2018 -0700

 .../org/apache/hadoop/hive/conf/   |  12 +
 .../results/positive/external_table_ppd.q.out   |   1 +
 .../positive/hbase_binary_storage_queries.q.out |   2 +
 .../src/test/results/positive/hbase_ddl.q.out   |   2 +
 .../test/results/positive/hbase_queries.q.out   |   1 +
 .../src/test/results/positive/hbasestats.q.out  |   5 +
 .../hive/ql/txn/compactor/    |   1 -
 .../org/apache/hadoop/hive/ql/exec/ | 290 +--------
 .../apache/hadoop/hive/ql/exec/ |  40 +-
 .../hadoop/hive/ql/metadata/    | 142 -----
 .../hive/ql/metadata/  | 567 ------------------
 .../hive/ql/optimizer/       |  21 +-
 .../hive/ql/parse/      |   6 +-
 .../hadoop/hive/ql/plan/    |   6 +
 .../exec/ | 244 +++++---
 .../exec/   | 125 ++--
 .../ql/metadata/   | 187 +++---
 .../queries/clientpositive/msck_repair_acid.q   |  34 ++
 .../clientpositive/partition_discovery.q        |  77 +++
 .../results/clientpositive/create_like.q.out    |   1 +
 .../clientpositive/create_like_view.q.out       |   1 +
 .../clientpositive/default_file_format.q.out    |   4 +
 .../druid/druidkafkamini_basic.q.out            |   2 +
 .../druid/druidmini_expressions.q.out           |   2 +
 .../results/clientpositive/druid_topn.q.out     |   1 +
 .../results/clientpositive/explain_locks.q.out  |   1 +
 .../llap/external_table_purge.q.out             |   4 +
 .../results/clientpositive/llap/mm_exim.q.out   |   1 +
 .../llap/strict_managed_tables2.q.out           |   2 +
 .../llap/table_nonprintable.q.out               |   2 +-
 .../clientpositive/llap/whroot_external1.q.out  |   6 +
 .../clientpositive/msck_repair_acid.q.out       |  88 +++
 .../clientpositive/msck_repair_drop.q.out       |  68 +--
 .../clientpositive/partition_discovery.q.out    | 357 ++++++++++++
 .../rename_external_partition_location.q.out    |   2 +
 .../clientpositive/repl_2_exim_basic.q.out      |   2 +
 .../show_create_table_alter.q.out               |   5 +
 .../show_create_table_partitioned.q.out         |   1 +
 .../show_create_table_serde.q.out               |   1 +
 .../clientpositive/spark/stats_noscan_2.q.out   |   2 +
 .../results/clientpositive/stats_noscan_2.q.out |   2 +
 .../temp_table_display_colstats_tbllvl.q.out    |   5 +
 .../hadoop/hive/metastore/      | 153 +++++
 .../apache/hadoop/hive/metastore/ |   2 +-
 .../hive/metastore/api/  |  36 ++
 .../hive/metastore/conf/      |  59 +-
 .../hive/metastore/utils/    |  55 +-
 .../hive/metastore/    | 571 ++++++++++++++++++
 .../org/apache/hadoop/hive/metastore/  | 530 +++++++++++++++++
 .../apache/hadoop/hive/metastore/  | 125 ++++
 .../metastore/ |  64 ++
 .../hadoop/hive/metastore/      |   6 +-
 .../hive/metastore/       | 163 ++++++
 .../hive/metastore/ | 235 ++++++++
 .../metastore/utils/   | 167 +++++-
 .../hive/metastore/utils/    | 110 ++++
 .../hive/metastore/  |   4 +-
 .../hive/metastore/    |   4 +-
 .../hive/metastore/ | 581 +++++++++++++++++++
 .../hive/metastore/client/ |  11 +-
 60 files changed, 3891 insertions(+), 1308 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/ b/common/src/java/org/apache/hadoop/hive/conf/
index e226a1f..917aaeb 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/
+++ b/common/src/java/org/apache/hadoop/hive/conf/
@@ -4415,17 +4415,29 @@ public class HiveConf extends Configuration {
       "Merge adjacent joins into a single n-way join"),
     HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null),
       "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."),
+    /**
+     * @deprecated Use MetastoreConf.MSCK_PATH_VALIDATION
+     */
+    @Deprecated
     HIVE_MSCK_PATH_VALIDATION("hive.msck.path.validation", "throw",
         new StringSet("throw", "skip", "ignore"), "The approach msck should take with HDFS " +
        "directories that are partition-like but contain unsupported characters. 'throw' (an " +
        "exception) is the default; 'skip' will skip the invalid directories and still repair the" +
        " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"),
+    /**
+     * @deprecated Use MetastoreConf.MSCK_REPAIR_BATCH_SIZE
+     */
+    @Deprecated
         "", 3000,
         "Batch size for the msck repair command. If the value is greater than zero,\n "
             + "it will execute batch wise with the configured batch size. In case of errors while\n"
             + "adding unknown partitions the batch size is automatically reduced by half in the subsequent\n"
             + "retry attempt. The default value is 3000 which means it will execute in the batches of 3000."),
+    /**
+     * @deprecated Use MetastoreConf.MSCK_REPAIR_BATCH_MAX_RETRIES
+     */
+    @Deprecated
         "Maximum number of retries for the msck repair command when adding unknown partitions.\n "
         + "If the value is greater than zero it will retry adding unknown partitions until the maximum\n"
diff --git a/hbase-handler/src/test/results/positive/external_table_ppd.q.out b/hbase-handler/src/test/results/positive/external_table_ppd.q.out
index edcbe7e..22c8b70 100644
--- a/hbase-handler/src/test/results/positive/external_table_ppd.q.out
+++ b/hbase-handler/src/test/results/positive/external_table_ppd.q.out
@@ -60,6 +60,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"bigint_col\":\"true\",\"boolean_col\":\"true\",\"double_col\":\"true\",\"float_col\":\"true\",\"int_col\":\"true\",\"key\":\"true\",\"smallint_col\":\"true\",\"tinyint_col\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	external.table.purge	true         	binary           	t_hive      
diff --git a/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out b/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
index 1209c88..bf1a89d 100644
--- a/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
@@ -60,6 +60,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"bigint_col\":\"true\",\"boolean_col\":\"true\",\"double_col\":\"true\",\"float_col\":\"true\",\"int_col\":\"true\",\"key\":\"true\",\"smallint_col\":\"true\",\"tinyint_col\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	external.table.purge	true         	binary           	t_hive              
@@ -242,6 +243,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"bigint_col\":\"true\",\"boolean_col\":\"true\",\"double_col\":\"true\",\"float_col\":\"true\",\"int_col\":\"true\",\"key\":\"true\",\"smallint_col\":\"true\",\"tinyint_col\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true             	t_hive              
 	numFiles            	0                   
 	numRows             	0           
diff --git a/hbase-handler/src/test/results/positive/hbase_ddl.q.out b/hbase-handler/src/test/results/positive/hbase_ddl.q.out
index ccd4148..fc40026 100644
--- a/hbase-handler/src/test/results/positive/hbase_ddl.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_ddl.q.out
@@ -119,6 +119,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	external.table.purge	true                
 	hbase.mapred.output.outputtable	kkk              	hbase_table_0       
@@ -168,6 +169,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	external.table.purge	true             	hbase_table_0       
 #### A masked pattern was here ####
diff --git a/hbase-handler/src/test/results/positive/hbase_queries.q.out b/hbase-handler/src/test/results/positive/hbase_queries.q.out
index eeb97f0..aea7e7e 100644
--- a/hbase-handler/src/test/results/positive/hbase_queries.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_queries.q.out
@@ -986,6 +986,7 @@ WITH SERDEPROPERTIES (
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: DROP TABLE IF EXISTS hbase_table_9
diff --git a/hbase-handler/src/test/results/positive/hbasestats.q.out b/hbase-handler/src/test/results/positive/hbasestats.q.out
index 5a4aea9..5143522 100644
--- a/hbase-handler/src/test/results/positive/hbasestats.q.out
+++ b/hbase-handler/src/test/results/positive/hbasestats.q.out
@@ -42,6 +42,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"country\":\"true\",\"country_id\":\"true\",\"key\":\"true\",\"state\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	external.table.purge	true                
 	numFiles            	0                   
 	numRows             	0                   
@@ -136,6 +137,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	external.table.purge	true                
 #### A masked pattern was here ####
 	numFiles            	0                   
@@ -203,6 +205,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	external.table.purge	true                
 #### A masked pattern was here ####
 	numFiles            	0                   
@@ -262,6 +265,7 @@ Table Parameters:
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	external.table.purge	true                
 #### A masked pattern was here ####
 	numFiles            	0                   
@@ -371,6 +375,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	external.table.purge	true                
 #### A masked pattern was here ####
 	numFiles            	0           
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/
index a9d7468..9648645 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/
@@ -1679,7 +1679,6 @@ public class TestCompactor {
     Assert.assertNotEquals("Unexpected default compression size", 2700,
     // Insert one more row - this should trigger to be reached for ttp2
     executeStatementOnDriver("insert into " + tblName1 + " values (6, 'f')", driver);
     executeStatementOnDriver("insert into " + tblName2 + " values (6, 'f')", driver);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ b/ql/src/java/org/apache/hadoop/hive/ql/exec/
index 807f159..6790a06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/
@@ -74,7 +74,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.Msck;
+import org.apache.hadoop.hive.metastore.MsckInfo;
 import org.apache.hadoop.hive.metastore.PartitionDropOptions;
+import org.apache.hadoop.hive.metastore.PartitionManagementTask;
 import org.apache.hadoop.hive.metastore.StatObjectConverter;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -149,13 +152,13 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.metadata.CheckConstraint;
-import org.apache.hadoop.hive.ql.metadata.CheckResult;
+import org.apache.hadoop.hive.metastore.CheckResult;
 import org.apache.hadoop.hive.ql.metadata.DefaultConstraint;
 import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
-import org.apache.hadoop.hive.ql.metadata.HiveMetaStoreChecker;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreChecker;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.NotNullConstraint;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -2125,279 +2128,22 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
    * @return Returns 0 when execution succeeds and above 0 if it fails.
   private int msck(Hive db, MsckDesc msckDesc) {
-    CheckResult result = new CheckResult();
-    List<String> repairOutput = new ArrayList<String>();
+    Msck msck;
     try {
-      HiveMetaStoreChecker checker = new HiveMetaStoreChecker(db);
+      msck = new Msck( false, false);
+      msck.init(db.getConf());
       String[] names = Utilities.getDbTableName(msckDesc.getTableName());
-      // checkMetastore call will fill in result with partitions that are present in filesystem
-      // and missing in metastore - accessed through getPartitionsNotInMs
-      // And partitions that are not present in filesystem and metadata exists in metastore -
-      // accessed through getPartitionNotOnFS
-      checker.checkMetastore(names[0], names[1], msckDesc.getPartSpecs(), result);
-      Set<CheckResult.PartitionResult> partsNotInMs = result.getPartitionsNotInMs();
-      Set<CheckResult.PartitionResult> partsNotInFs = result.getPartitionsNotOnFs();
-      if (msckDesc.isRepairPartitions()) {
-        // Repair metadata in HMS
-        Table table = db.getTable(msckDesc.getTableName());
-        int maxRetries = conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_MAX_RETRIES);
-        int decayingFactor = 2;
-        if (msckDesc.isAddPartitions() && !partsNotInMs.isEmpty()) {
-          // MSCK called to add missing paritions into metastore and there are
-          // missing partitions.
-          int batchSize = conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE);
-          if (batchSize == 0) {
-            //batching is not enabled. Try to add all the partitions in one call
-            batchSize = partsNotInMs.size();
-          }
-          AbstractList<String> vals = null;
-          String settingStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION);
-          boolean doValidate = !("ignore".equals(settingStr));
-          boolean doSkip = doValidate && "skip".equals(settingStr);
-          // The default setting is "throw"; assume doValidate && !doSkip means throw.
-          if (doValidate) {
-            // Validate that we can add partition without escaping. Escaping was originally intended
-            // to avoid creating invalid HDFS paths; however, if we escape the HDFS path (that we
-            // deem invalid but HDFS actually supports - it is possible to create HDFS paths with
-            // unprintable characters like ASCII 7), metastore will create another directory instead
-            // of the one we are trying to "repair" here.
-            Iterator<CheckResult.PartitionResult> iter = partsNotInMs.iterator();
-            while (iter.hasNext()) {
-              CheckResult.PartitionResult part =;
-              try {
-                vals = Warehouse.makeValsFromName(part.getPartitionName(), vals);
-              } catch (MetaException ex) {
-                throw new HiveException(ex);
-              }
-              for (String val : vals) {
-                String escapedPath = FileUtils.escapePathName(val);
-                assert escapedPath != null;
-                if (escapedPath.equals(val)) {
-                  continue;
-                }
-                String errorMsg = "Repair: Cannot add partition " + msckDesc.getTableName() + ':' +
-                    part.getPartitionName() + " due to invalid characters in the name";
-                if (doSkip) {
-                  repairOutput.add(errorMsg);
-                  iter.remove();
-                } else {
-                  throw new HiveException(errorMsg);
-                }
-              }
-            }
-          }
-          try {
-            createPartitionsInBatches(db, repairOutput, partsNotInMs, table, batchSize,
-                decayingFactor, maxRetries);
-          } catch (Exception e) {
-            throw new HiveException(e);
-          }
-        }
-        if (msckDesc.isDropPartitions() && !partsNotInFs.isEmpty()) {
-          // MSCK called to drop stale paritions from metastore and there are
-          // stale partitions.
-          int batchSize = conf.getIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE);
-          if (batchSize == 0) {
-            //batching is not enabled. Try to drop all the partitions in one call
-            batchSize = partsNotInFs.size();
-          }
-          try {
-            dropPartitionsInBatches(db, repairOutput, partsNotInFs, table, batchSize,
-                decayingFactor, maxRetries);
-          } catch (Exception e) {
-            throw new HiveException(e);
-          }
-        }
-      }
-    } catch (HiveException e) {
-      LOG.warn("Failed to run metacheck: ", e);
+      MsckInfo msckInfo = new MsckInfo(SessionState.get().getCurrentCatalog(), names[0],
+        names[1], msckDesc.getPartSpecs(), msckDesc.getResFile(),
+        msckDesc.isRepairPartitions(), msckDesc.isAddPartitions(), msckDesc.isDropPartitions(), -1);
+      return;
+    } catch (MetaException e) {
+      LOG.error("Unable to create msck instance.", e);
       return 1;
-    } catch (IOException e) {
-      LOG.warn("Failed to run metacheck: ", e);
+    } catch (SemanticException e) {
+      LOG.error("Msck failed.", e);
       return 1;
-    } finally {
-      BufferedWriter resultOut = null;
-      try {
-        Path resFile = new Path(msckDesc.getResFile());
-        FileSystem fs = resFile.getFileSystem(conf);
-        resultOut = new BufferedWriter(new OutputStreamWriter(fs
-            .create(resFile)));
-        boolean firstWritten = false;
-        firstWritten |= writeMsckResult(result.getTablesNotInMs(),
-            "Tables not in metastore:", resultOut, firstWritten);
-        firstWritten |= writeMsckResult(result.getTablesNotOnFs(),
-            "Tables missing on filesystem:", resultOut, firstWritten);
-        firstWritten |= writeMsckResult(result.getPartitionsNotInMs(),
-            "Partitions not in metastore:", resultOut, firstWritten);
-        firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(),
-            "Partitions missing from filesystem:", resultOut, firstWritten);
-        for (String rout : repairOutput) {
-          if (firstWritten) {
-            resultOut.write(terminator);
-          } else {
-            firstWritten = true;
-          }
-          resultOut.write(rout);
-        }
-      } catch (IOException e) {
-        LOG.warn("Failed to save metacheck output: ", e);
-        return 1;
-      } finally {
-        if (resultOut != null) {
-          try {
-            resultOut.close();
-          } catch (IOException e) {
-            LOG.warn("Failed to close output file: ", e);
-            return 1;
-          }
-        }
-      }
-    }
-    return 0;
-  }
-  @VisibleForTesting
-  void createPartitionsInBatches(Hive db, List<String> repairOutput,
-      Set<CheckResult.PartitionResult> partsNotInMs, Table table, int batchSize, int decayingFactor, int maxRetries)
-      throws Exception {
-    String addMsgFormat = "Repair: Added partition to metastore "
-        + table.getTableName() + ":%s";
-    Set<CheckResult.PartitionResult> batchWork = new HashSet<>(partsNotInMs);
-    new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) {
-      @Override
-      public Void execute(int size) throws Exception {
-        while (!batchWork.isEmpty()) {
-          //get the current batch size
-          int currentBatchSize = size;
-          AddPartitionDesc apd =
-              new AddPartitionDesc(table.getDbName(), table.getTableName(), true);
-          //store the partitions temporarily until processed
-          List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize);
-          List<String> addMsgs = new ArrayList<>(currentBatchSize);
-          //add the number of partitions given by the current batchsize
-          for (CheckResult.PartitionResult part : batchWork) {
-            if (currentBatchSize == 0) {
-              break;
-            }
-            apd.addPartition(Warehouse.makeSpecFromName(part.getPartitionName()), null);
-            lastBatch.add(part);
-            addMsgs.add(String.format(addMsgFormat, part.getPartitionName()));
-            currentBatchSize--;
-          }
-          db.createPartitions(apd);
-          // if last batch is successful remove it from partsNotInMs
-          batchWork.removeAll(lastBatch);
-          repairOutput.addAll(addMsgs);
-        }
-        return null;
-      }
-    }.run();
-  }
-  // Drops partitions in batches.  partNotInFs is split into batches based on batchSize
-  // and dropped.  The dropping will be through RetryUtilities which will retry when there is a
-  // failure after reducing the batchSize by decayingFactor.  Retrying will cease when maxRetries
-  // limit is reached or batchSize reduces to 0, whichever comes earlier.
-  @VisibleForTesting
-  void dropPartitionsInBatches(Hive db, List<String> repairOutput,
-      Set<CheckResult.PartitionResult> partsNotInFs, Table table, int batchSize, int decayingFactor,
-      int maxRetries) throws Exception {
-    String dropMsgFormat =
-        "Repair: Dropped partition from metastore " + table.getFullyQualifiedName() + ":%s";
-    // Copy of partitions that will be split into batches
-    Set<CheckResult.PartitionResult> batchWork = new TreeSet<>(partsNotInFs);
-    new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) {
-      @Override
-      public Void execute(int size) throws Exception {
-        while (!batchWork.isEmpty()) {
-          int currentBatchSize = size;
-          // to store the partitions that are currently being processed
-          List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize);
-          // drop messages for the dropped partitions
-          List<String> dropMsgs = new ArrayList<>(currentBatchSize);
-          // Partitions to be dropped
-          List<String> dropParts = new ArrayList<>(currentBatchSize);
-          for (CheckResult.PartitionResult part : batchWork) {
-            // This batch is full: break out of for loop to execute
-            if (currentBatchSize == 0) {
-              break;
-            }
-            dropParts.add(part.getPartitionName());
-            // Add the part to lastBatch to track the parition being dropped
-            lastBatch.add(part);
-            // Update messages
-            dropMsgs.add(String.format(dropMsgFormat, part.getPartitionName()));
-            // Decrement batch size.  When this gets to 0, the batch will be executed
-            currentBatchSize--;
-          }
-          // this call is deleting partitions that are already missing from filesystem
-          // so 3rd parameter (deleteData) is set to false
-          // msck is doing a clean up of hms.  if for some reason the partition is already
-          // deleted, then it is good.  So, the last parameter ifexists is set to true
-          db.dropPartitions(table, dropParts, false, true);
-          // if last batch is successful remove it from partsNotInFs
-          batchWork.removeAll(lastBatch);
-          repairOutput.addAll(dropMsgs);
-        }
-        return null;
-      }
-    }.run();
-  }
-  /**
-   * Write the result of msck to a writer.
-   *
-   * @param result
-   *          The result we're going to write
-   * @param msg
-   *          Message to write.
-   * @param out
-   *          Writer to write to
-   * @param wrote
-   *          if any previous call wrote data
-   * @return true if something was written
-   * @throws IOException
-   *           In case the writing fails
-   */
-  private boolean writeMsckResult(Set<? extends Object> result, String msg,
-      Writer out, boolean wrote) throws IOException {
-    if (!result.isEmpty()) {
-      if (wrote) {
-        out.write(terminator);
-      }
-      out.write(msg);
-      for (Object entry : result) {
-        out.write(separator);
-        out.write(entry.toString());
-      }
-      return true;
-    return false;
@@ -5011,6 +4757,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       if (crtTbl.isExternal()) {
         tbl.setProperty("EXTERNAL", "TRUE");
+        // partition discovery is on by default
+        tbl.setProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
@@ -5109,6 +4857,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       if (crtTbl.isExternal()) {
         tbl.setProperty("EXTERNAL", "TRUE");
+        // partition discovery is on by default
+        tbl.setProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
       } else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ b/ql/src/java/org/apache/hadoop/hive/ql/exec/
index 4cc5fa8..7c4efab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/
@@ -356,32 +356,28 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
     if (jsonOutput) {
       out = null;
-    if (work.getParseContext() != null) {
-      List<LockComponent> lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf);
-      if (null != out) {
-        out.print("LOCK INFORMATION:\n");
-      }
-      List<ExplainLockDesc> locks = new ArrayList<>(lockComponents.size());
-      for (LockComponent component : lockComponents) {
-        ExplainLockDesc lockDesc = new ExplainLockDesc(component);
+    List<LockComponent> lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf);
+    if (null != out) {
+      out.print("LOCK INFORMATION:\n");
+    }
+    List<ExplainLockDesc> locks = new ArrayList<>(lockComponents.size());
-        if (null != out) {
-          out.print(lockDesc.getFullName());
-          out.print(" -> ");
-          out.print(lockDesc.getLockType());
-          out.print('\n');
-        } else {
-          locks.add(lockDesc);
-        }
+    for (LockComponent component : lockComponents) {
+      ExplainLockDesc lockDesc = new ExplainLockDesc(component);
+      if (null != out) {
+        out.print(lockDesc.getFullName());
+        out.print(" -> ");
+        out.print(lockDesc.getLockType());
+        out.print('\n');
+      } else {
+        locks.add(lockDesc);
-      if (jsonOutput) {
-        jsonObject.put("LOCK INFORMATION:", locks);
-      }
-    } else {
-      System.err.println("No parse context!");
+    }
+    if (jsonOutput) {
+      jsonObject.put("LOCK INFORMATION:", locks);
     return jsonObject;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/
deleted file mode 100644
index 0b4240f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/
+++ /dev/null
@@ -1,142 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.metadata;
-import java.util.Set;
-import java.util.TreeSet;
- * Result class used by the HiveMetaStoreChecker.
- */
-public class CheckResult {
-  private Set<String> tablesNotOnFs = new TreeSet<String>();
-  private Set<String> tablesNotInMs = new TreeSet<String>();
-  private Set<PartitionResult> partitionsNotOnFs = new TreeSet<PartitionResult>();
-  private Set<PartitionResult> partitionsNotInMs = new TreeSet<PartitionResult>();
-  /**
-   * @return a list of tables not found on the filesystem.
-   */
-  public Set<String> getTablesNotOnFs() {
-    return tablesNotOnFs;
-  }
-  /**
-   * @param tablesNotOnFs
-   *          a list of tables not found on the filesystem.
-   */
-  public void setTablesNotOnFs(Set<String> tablesNotOnFs) {
-    this.tablesNotOnFs = tablesNotOnFs;
-  }
-  /**
-   * @return a list of tables not found in the metastore.
-   */
-  public Set<String> getTablesNotInMs() {
-    return tablesNotInMs;
-  }
-  /**
-   * @param tablesNotInMs
-   *          a list of tables not found in the metastore.
-   */
-  public void setTablesNotInMs(Set<String> tablesNotInMs) {
-    this.tablesNotInMs = tablesNotInMs;
-  }
-  /**
-   * @return a list of partitions not found on the fs
-   */
-  public Set<PartitionResult> getPartitionsNotOnFs() {
-    return partitionsNotOnFs;
-  }
-  /**
-   * @param partitionsNotOnFs
-   *          a list of partitions not found on the fs
-   */
-  public void setPartitionsNotOnFs(Set<PartitionResult> partitionsNotOnFs) {
-    this.partitionsNotOnFs = partitionsNotOnFs;
-  }
-  /**
-   * @return a list of partitions not found in the metastore
-   */
-  public Set<PartitionResult> getPartitionsNotInMs() {
-    return partitionsNotInMs;
-  }
-  /**
-   * @param partitionsNotInMs
-   *          a list of partitions not found in the metastore
-   */
-  public void setPartitionsNotInMs(Set<PartitionResult> partitionsNotInMs) {
-    this.partitionsNotInMs = partitionsNotInMs;
-  }
-  /**
-   * A basic description of a partition that is missing from either the fs or
-   * the ms.
-   */
-  public static class PartitionResult implements Comparable<PartitionResult> {
-    private String partitionName;
-    private String tableName;
-    /**
-     * @return name of partition
-     */
-    public String getPartitionName() {
-      return partitionName;
-    }
-    /**
-     * @param partitionName
-     *          name of partition
-     */
-    public void setPartitionName(String partitionName) {
-      this.partitionName = partitionName;
-    }
-    /**
-     * @return table name
-     */
-    public String getTableName() {
-      return tableName;
-    }
-    /**
-     * @param tableName
-     *          table name
-     */
-    public void setTableName(String tableName) {
-      this.tableName = tableName;
-    }
-    @Override
-    public String toString() {
-      return tableName + ":" + partitionName;
-    }
-    public int compareTo(PartitionResult o) {
-      int ret = tableName.compareTo(o.tableName);
-      return ret != 0 ? ret : partitionName.compareTo(o.partitionName);
-    }
-  }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/
deleted file mode 100644
index 598bb2e..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/
+++ /dev/null
@@ -1,567 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.metadata;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult;
-import org.apache.thrift.TException;
- * Verify that the information in the metastore matches what is on the
- * filesystem. Return a CheckResult object containing lists of missing and any
- * unexpected tables and partitions.
- */
-public class HiveMetaStoreChecker {
-  public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreChecker.class);
-  public static final String CLASS_NAME = HiveMetaStoreChecker.class.getName();
-  private final Hive hive;
-  private final HiveConf conf;
-  public HiveMetaStoreChecker(Hive hive) {
-    super();
-    this.hive = hive;
-    conf = hive.getConf();
-  }
-  /**
-   * Check the metastore for inconsistencies, data missing in either the
-   * metastore or on the dfs.
-   *
-   * @param dbName
-   *          name of the database, if not specified the default will be used.
-   * @param tableName
-   *          Table we want to run the check for. If null we'll check all the
-   *          tables in the database.
-   * @param partitions
-   *          List of partition name value pairs, if null or empty check all
-   *          partitions
-   * @param result
-   *          Fill this with the results of the check
-   * @throws HiveException
-   *           Failed to get required information from the metastore.
-   * @throws IOException
-   *           Most likely filesystem related
-   */
-  public void checkMetastore(String dbName, String tableName,
-      List<? extends Map<String, String>> partitions, CheckResult result)
-      throws HiveException, IOException {
-    if (dbName == null || "".equalsIgnoreCase(dbName)) {
-      dbName = Warehouse.DEFAULT_DATABASE_NAME;
-    }
-    try {
-      if (tableName == null || "".equals(tableName)) {
-        // no table specified, check all tables and all partitions.
-        List<String> tables = hive.getTablesForDb(dbName, ".*");
-        for (String currentTableName : tables) {
-          checkTable(dbName, currentTableName, null, result);
-        }
-        findUnknownTables(dbName, tables, result);
-      } else if (partitions == null || partitions.isEmpty()) {
-        // only one table, let's check all partitions
-        checkTable(dbName, tableName, null, result);
-      } else {
-        // check the specified partitions
-        checkTable(dbName, tableName, partitions, result);
-      }
-"Number of partitionsNotInMs=" + result.getPartitionsNotInMs()
-              + ", partitionsNotOnFs=" + result.getPartitionsNotOnFs()
-              + ", tablesNotInMs=" + result.getTablesNotInMs()
-              + ", tablesNotOnFs=" + result.getTablesNotOnFs());
-    } catch (MetaException e) {
-      throw new HiveException(e);
-    } catch (TException e) {
-      throw new HiveException(e);
-    }
-  }
-  /**
-   * Check for table directories that aren't in the metastore.
-   *
-   * @param dbName
-   *          Name of the database
-   * @param tables
-   *          List of table names
-   * @param result
-   *          Add any found tables to this
-   * @throws HiveException
-   *           Failed to get required information from the metastore.
-   * @throws IOException
-   *           Most likely filesystem related
-   * @throws MetaException
-   *           Failed to get required information from the metastore.
-   * @throws NoSuchObjectException
-   *           Failed to get required information from the metastore.
-   * @throws TException
-   *           Thrift communication error.
-   */
-  void findUnknownTables(String dbName, List<String> tables, CheckResult result)
-      throws IOException, MetaException, TException, HiveException {
-    Set<Path> dbPaths = new HashSet<Path>();
-    Set<String> tableNames = new HashSet<String>(tables);
-    for (String tableName : tables) {
-      Table table = hive.getTable(dbName, tableName);
-      // hack, instead figure out a way to get the db paths
-      String isExternal = table.getParameters().get("EXTERNAL");
-      if (isExternal == null || !"TRUE".equalsIgnoreCase(isExternal)) {
-        dbPaths.add(table.getPath().getParent());
-      }
-    }
-    for (Path dbPath : dbPaths) {
-      FileSystem fs = dbPath.getFileSystem(conf);
-      FileStatus[] statuses = fs.listStatus(dbPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
-      for (FileStatus status : statuses) {
-        if (status.isDir() && !tableNames.contains(status.getPath().getName())) {
-          result.getTablesNotInMs().add(status.getPath().getName());
-        }
-      }
-    }
-  }
-  /**
-   * Check the metastore for inconsistencies, data missing in either the
-   * metastore or on the dfs.
-   *
-   * @param dbName
-   *          Name of the database
-   * @param tableName
-   *          Name of the table
-   * @param partitions
-   *          Partitions to check, if null or empty get all the partitions.
-   * @param result
-   *          Result object
-   * @throws HiveException
-   *           Failed to get required information from the metastore.
-   * @throws IOException
-   *           Most likely filesystem related
-   * @throws MetaException
-   *           Failed to get required information from the metastore.
-   */
-  void checkTable(String dbName, String tableName,
-      List<? extends Map<String, String>> partitions, CheckResult result)
-      throws MetaException, IOException, HiveException {
-    Table table = null;
-    try {
-      table = hive.getTable(dbName, tableName);
-    } catch (HiveException e) {
-      result.getTablesNotInMs().add(tableName);
-      return;
-    }
-    PartitionIterable parts;
-    boolean findUnknownPartitions = true;
-    if (table.isPartitioned()) {
-      if (partitions == null || partitions.isEmpty()) {
-        String mode = HiveConf.getVar(conf, ConfVars.HIVEMAPREDMODE, (String) null);
-        if ("strict".equalsIgnoreCase(mode)) {
-          parts = new PartitionIterable(hive, table, null, conf.getIntVar(
-              HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX));
-        } else {
-          List<Partition> loadedPartitions = new ArrayList<>();
-          PerfLogger perfLogger = SessionState.getPerfLogger();
-          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
-          loadedPartitions.addAll(hive.getAllPartitionsOf(table));
-          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARTITION_RETRIEVING);
-          parts = new PartitionIterable(loadedPartitions);
-        }
-      } else {
-        // we're interested in specific partitions,
-        // don't check for any others
-        findUnknownPartitions = false;
-        List<Partition> loadedPartitions = new ArrayList<>();
-        for (Map<String, String> map : partitions) {
-          Partition part = hive.getPartition(table, map, false);
-          if (part == null) {
-            PartitionResult pr = new PartitionResult();
-            pr.setTableName(tableName);
-            pr.setPartitionName(Warehouse.makePartPath(map));
-            result.getPartitionsNotInMs().add(pr);
-          } else {
-            loadedPartitions.add(part);
-          }
-        }
-        parts = new PartitionIterable(loadedPartitions);
-      }
-    } else {
-      parts = new PartitionIterable(Collections.<Partition>emptyList());
-    }
-    checkTable(table, parts, findUnknownPartitions, result);
-  }
-  /**
-   * Check the metastore for inconsistencies, data missing in either the
-   * metastore or on the dfs.
-   *
-   * @param table
-   *          Table to check
-   * @param parts
-   *          Partitions to check
-   * @param result
-   *          Result object
-   * @param findUnknownPartitions
-   *          Should we try to find unknown partitions?
-   * @throws IOException
-   *           Could not get information from filesystem
-   * @throws HiveException
-   *           Could not create Partition object
-   */
-  void checkTable(Table table, PartitionIterable parts,
-      boolean findUnknownPartitions, CheckResult result) throws IOException,
-      HiveException {
-    Path tablePath = table.getPath();
-    FileSystem fs = tablePath.getFileSystem(conf);
-    if (!fs.exists(tablePath)) {
-      result.getTablesNotOnFs().add(table.getTableName());
-      return;
-    }
-    Set<Path> partPaths = new HashSet<Path>();
-    // check that the partition folders exist on disk
-    for (Partition partition : parts) {
-      if (partition == null) {
-        // most likely the user specified an invalid partition
-        continue;
-      }
-      Path partPath = partition.getDataLocation();
-      fs = partPath.getFileSystem(conf);
-      if (!fs.exists(partPath)) {
-        PartitionResult pr = new PartitionResult();
-        pr.setPartitionName(partition.getName());
-        pr.setTableName(partition.getTable().getTableName());
-        result.getPartitionsNotOnFs().add(pr);
-      }
-      for (int i = 0; i < partition.getSpec().size(); i++) {
-        Path qualifiedPath = partPath.makeQualified(fs);
-        StringInternUtils.internUriStringsInPath(qualifiedPath);
-        partPaths.add(qualifiedPath);
-        partPath = partPath.getParent();
-      }
-    }
-    if (findUnknownPartitions) {
-      findUnknownPartitions(table, partPaths, result);
-    }
-  }
-  /**
-   * Find partitions on the fs that are unknown to the metastore.
-   *
-   * @param table
-   *          Table where the partitions would be located
-   * @param partPaths
-   *          Paths of the partitions the ms knows about
-   * @param result
-   *          Result object
-   * @throws IOException
-   *           Thrown if we fail at fetching listings from the fs.
-   * @throws HiveException 
-   */
-  void findUnknownPartitions(Table table, Set<Path> partPaths,
-      CheckResult result) throws IOException, HiveException {
-    Path tablePath = table.getPath();
-    // now check the table folder and see if we find anything
-    // that isn't in the metastore
-    Set<Path> allPartDirs = new HashSet<Path>();
-    checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(table.getPartColNames()));
-    // don't want the table dir
-    allPartDirs.remove(tablePath);
-    // remove the partition paths we know about
-    allPartDirs.removeAll(partPaths);
-    Set<String> partColNames = Sets.newHashSet();
-    for(FieldSchema fSchema : table.getPartCols()) {
-      partColNames.add(fSchema.getName());
-    }
-    // we should now only have the unexpected folders left
-    for (Path partPath : allPartDirs) {
-      FileSystem fs = partPath.getFileSystem(conf);
-      String partitionName = getPartitionName(fs.makeQualified(tablePath),
-          partPath, partColNames);
-      LOG.debug("PartitionName: " + partitionName);
-      if (partitionName != null) {
-        PartitionResult pr = new PartitionResult();
-        pr.setPartitionName(partitionName);
-        pr.setTableName(table.getTableName());
-        result.getPartitionsNotInMs().add(pr);
-      }
-    }
-    LOG.debug("Number of partitions not in metastore : " + result.getPartitionsNotInMs().size());
-  }
-  /**
-   * Get the partition name from the path.
-   *
-   * @param tablePath
-   *          Path of the table.
-   * @param partitionPath
-   *          Path of the partition.
-   * @param partCols
-   *          Set of partition columns from table definition
-   * @return Partition name, for example partitiondate=2008-01-01
-   */
-  static String getPartitionName(Path tablePath, Path partitionPath,
-      Set<String> partCols) {
-    String result = null;
-    Path currPath = partitionPath;
-    LOG.debug("tablePath:" + tablePath + ", partCols: " + partCols);
-    while (currPath != null && !tablePath.equals(currPath)) {
-      // format: partition=p_val
-      // Add only when table partition colName matches
-      String[] parts = currPath.getName().split("=");
-      if (parts != null && parts.length > 0) {
-        if (parts.length != 2) {
-          LOG.warn(currPath.getName() + " is not a valid partition name");
-          return result;
-        }
-        String partitionName = parts[0];
-        if (partCols.contains(partitionName)) {
-          if (result == null) {
-            result = currPath.getName();
-          } else {
-            result = currPath.getName() + Path.SEPARATOR + result;
-          }
-        }
-      }
-      currPath = currPath.getParent();
-      LOG.debug("currPath=" + currPath);
-    }
-    return result;
-  }
-  /**
-   * Assume that depth is 2, i.e., partition columns are a and b
-   * tblPath/a=1  => throw exception
-   * tblPath/a=1/file => throw exception
-   * tblPath/a=1/b=2/file => return a=1/b=2
-   * tblPath/a=1/b=2/c=3 => return a=1/b=2
-   * tblPath/a=1/b=2/c=3/file => return a=1/b=2
-   *
-   * @param basePath
-   *          Start directory
-   * @param allDirs
-   *          This set will contain the leaf paths at the end.
-   * @param list
-   *          Specify how deep the search goes.
-   * @throws IOException
-   *           Thrown if we can't get lists from the fs.
-   * @throws HiveException
-   */
-  private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final List<String> partColNames) throws IOException, HiveException {
-    // Here we just reuse the THREAD_COUNT configuration for
-    // METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance
-    // The number of missing partitions discovered are later added by metastore using a
-    // threadpool of size METASTORE_FS_HANDLER_THREADS_COUNT. If we have different sized
-    // pool here the smaller sized pool of the two becomes a bottleneck
-    int poolSize = conf.getInt(ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT.varname, 15);
-    ExecutorService executor;
-    if (poolSize <= 1) {
-      LOG.debug("Using single-threaded version of MSCK-GetPaths");
-      executor = MoreExecutors.newDirectExecutorService();
-    } else {
-      LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " + poolSize);
-      ThreadFactory threadFactory =
-          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build();
-      executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory);
-    }
-    checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames);
-    executor.shutdown();
-  }
-  private final class PathDepthInfoCallable implements Callable<Path> {
-    private final List<String> partColNames;
-    private final FileSystem fs;
-    private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
-    private final boolean throwException;
-    private final PathDepthInfo pd;
-    private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, FileSystem fs,
-        ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
-      this.partColNames = partColNames;
-      this.pd = pd;
-      this.fs = fs;
-      this.pendingPaths = basePaths;
-      this.throwException = "throw"
-      .equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION));
-    }
-    @Override
-    public Path call() throws Exception {
-      return processPathDepthInfo(pd);
-    }
-    private Path processPathDepthInfo(final PathDepthInfo pd)
-        throws IOException, HiveException, InterruptedException {
-      final Path currentPath = pd.p;
-      final int currentDepth = pd.depth;
-      FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
-      // found no files under a sub-directory under table base path; it is possible that the table
-      // is empty and hence there are no partition sub-directories created under base path
-      if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < partColNames.size()) {
-        // since maxDepth is not yet reached, we are missing partition
-        // columns in currentPath
-        logOrThrowExceptionWithMsg(
-            "MSCK is missing partition columns under " + currentPath.toString());
-      } else {
-        // found files under currentPath add them to the queue if it is a directory
-        for (FileStatus fileStatus : fileStatuses) {
-          if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) {
-            // found a file at depth which is less than number of partition keys
-            logOrThrowExceptionWithMsg(
-                "MSCK finds a file rather than a directory when it searches for "
-                    + fileStatus.getPath().toString());
-          } else if (fileStatus.isDirectory() && currentDepth < partColNames.size()) {
-            // found a sub-directory at a depth less than number of partition keys
-            // validate if the partition directory name matches with the corresponding
-            // partition colName at currentDepth
-            Path nextPath = fileStatus.getPath();
-            String[] parts = nextPath.getName().split("=");
-            if (parts.length != 2) {
-              logOrThrowExceptionWithMsg("Invalid partition name " + nextPath);
-            } else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) {
-              logOrThrowExceptionWithMsg(
-                  "Unexpected partition key " + parts[0] + " found at " + nextPath);
-            } else {
-              // add sub-directory to the work queue if maxDepth is not yet reached
-              pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1));
-            }
-          }
-        }
-        if (currentDepth == partColNames.size()) {
-          return currentPath;
-        }
-      }
-      return null;
-    }
-    private void logOrThrowExceptionWithMsg(String msg) throws HiveException {
-      if(throwException) {
-        throw new HiveException(msg);
-      } else {
-        LOG.warn(msg);
-      }
-    }
-  }
-  private static class PathDepthInfo {
-    private final Path p;
-    private final int depth;
-    PathDepthInfo(Path p, int depth) {
-      this.p = p;
-      this.depth = depth;
-    }
-  }
-  private void checkPartitionDirs(final ExecutorService executor,
-      final Path basePath, final Set<Path> result,
-      final FileSystem fs, final List<String> partColNames) throws HiveException {
-    try {
-      Queue<Future<Path>> futures = new LinkedList<Future<Path>>();
-      ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>();
-      nextLevel.add(new PathDepthInfo(basePath, 0));
-      //Uses level parallel implementation of a bfs. Recursive DFS implementations
-      //have a issue where the number of threads can run out if the number of
-      //nested sub-directories is more than the pool size.
-      //Using a two queue implementation is simpler than one queue since then we will
-      //have to add the complex mechanisms to let the free worker threads know when new levels are
-      //discovered using notify()/wait() mechanisms which can potentially lead to bugs if
-      //not done right
-      while(!nextLevel.isEmpty()) {
-        ConcurrentLinkedQueue<PathDepthInfo> tempQueue = new ConcurrentLinkedQueue<>();
-        //process each level in parallel
-        while(!nextLevel.isEmpty()) {
-          futures.add(
-              executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue)));
-        }
-        while(!futures.isEmpty()) {
-          Path p = futures.poll().get();
-          if (p != null) {
-            result.add(p);
-          }
-        }
-        //update the nextlevel with newly discovered sub-directories from the above
-        nextLevel = tempQueue;
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      LOG.error(e.getMessage());
-      executor.shutdownNow();
-      throw new HiveException(e.getCause());
-    }
-  }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/
index cff32d3..2131bf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -1324,7 +1326,7 @@ public final class GenMapRedUtils {
       // update the FileSinkOperator to include partition columns
-      usePartitionColumns(fsInputDesc.getTableInfo().getProperties(), dpCtx.getDPColNames());
+      usePartitionColumns(fsInputDesc.getTableInfo().getProperties(), fsInputDesc.getTable(), dpCtx.getDPColNames());
     } else {
       // non-partitioned table
@@ -2090,6 +2092,23 @@ public final class GenMapRedUtils {
     return null;
+  static void usePartitionColumns(Properties properties, Table table, List<String> partColNames) {
+    if (properties.containsKey(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS)) {
+      usePartitionColumns(properties, partColNames);
+    } else {
+      List<FieldSchema> partCols = table.getPartCols();
+      String partNames ="/"));
+      String partTypes =":"));
+      properties.setProperty(
+        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
+        partNames);
+      properties.setProperty(
+        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES,
+        partTypes);
+    }
+  }
    * Uses only specified partition columns.
    * Provided properties should be pre-populated with partition column names and types.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ b/ql/src/java/org/apache/hadoop/hive/ql/parse/
index bba7d6c..6e7c78b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/
@@ -3841,7 +3841,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
     Table tab = getTable(tableName);
     List<Map<String, String>> specs = getPartitionSpecs(tab, ast);
-    outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED));
+    if (repair && AcidUtils.isTransactionalTable(tab)) {
+      outputs.add(new WriteEntity(tab, WriteType.DDL_EXCLUSIVE));
+    } else {
+      outputs.add(new WriteEntity(tab, WriteEntity.WriteType.DDL_SHARED));
+    }
     MsckDesc checkDesc = new MsckDesc(tableName, specs, ctx.getResFile(),
         repair, addPartitions, dropPartitions);
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ b/ql/src/java/org/apache/hadoop/hive/ql/plan/
index 27f677e..f00148b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.PartitionManagementTask;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -837,6 +838,11 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
     if (isExternal()) {
       tbl.setProperty("EXTERNAL", "TRUE");
+      // only add if user have not explicit set it (user explicitly disabled for example in which case don't flip it)
+      if (tbl.getProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY) == null) {
+        // partition discovery is on by default if undefined
+        tbl.setProperty(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY, "true");
+      }
     // If the sorted columns is a superset of bucketed columns, store this fact.
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/ b/ql/src/test/org/apache/hadoop/hive/ql/exec/
index ce2b186..3e45016 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/
@@ -17,6 +17,8 @@
 package org.apache.hadoop.hive.ql.exec;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static;
 import java.util.ArrayList;
@@ -27,17 +29,23 @@ import java.util.Set;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.CheckResult.PartitionResult;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Msck;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.RetryUtilities;
-import org.apache.hadoop.hive.ql.metadata.CheckResult.PartitionResult;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.RetryUtilities.RetryException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -48,42 +56,61 @@ import org.mockito.Mockito;
 public class TestMsckCreatePartitionsInBatches {
   private static HiveConf hiveConf;
-  private static DDLTask ddlTask;
+  private static Msck msck;
+  private final String catName = "hive";
+  private final String dbName = "default";
   private final String tableName = "test_msck_batch";
-  private static Hive db;
+  private static IMetaStoreClient db;
   private List<String> repairOutput;
   private Table table;
-  public static void setupClass() throws HiveException {
+  public static void setupClass() throws HiveException, MetaException {
     hiveConf = new HiveConf(TestMsckCreatePartitionsInBatches.class);
     hiveConf.setIntVar(ConfVars.HIVE_MSCK_REPAIR_BATCH_SIZE, 5);
-    db = Hive.get(hiveConf);
-    ddlTask = new DDLTask();
+    try {
+      db = new HiveMetaStoreClient(hiveConf);
+    } catch (MetaException e) {
+      throw new HiveException(e);
+    }
+    msck = new Msck( false, false);
+    msck.init(hiveConf);
   public void before() throws Exception {
-    createPartitionedTable("default", tableName);
-    table = db.getTable(tableName);
+    createPartitionedTable(catName, dbName, tableName);
+    table = db.getTable(catName, dbName, tableName);
     repairOutput = new ArrayList<String>();
   public void after() throws Exception {
-    cleanUpTableQuietly("default", tableName);
+    cleanUpTableQuietly(catName, dbName, tableName);
-  private Table createPartitionedTable(String dbName, String tableName) throws Exception {
+  private Table createPartitionedTable(String catName, String dbName, String tableName) throws Exception {
     try {
-      db.dropTable(dbName, tableName);
-      db.createTable(tableName, Arrays.asList("key", "value"), // Data columns.
-          Arrays.asList("city"), // Partition columns.
-          TextInputFormat.class, HiveIgnoreKeyTextOutputFormat.class);
-      return db.getTable(dbName, tableName);
+      db.dropTable(catName, dbName, tableName);
+      Table table = new Table();
+      table.setCatName(catName);
+      table.setDbName(dbName);
+      table.setTableName(tableName);
+      FieldSchema col1 = new FieldSchema("key", "string", "");
+      FieldSchema col2 = new FieldSchema("value", "int", "");
+      FieldSchema col3 = new FieldSchema("city", "string", "");
+      StorageDescriptor sd = new StorageDescriptor();
+      sd.setSerdeInfo(new SerDeInfo());
+      sd.setInputFormat(TextInputFormat.class.getCanonicalName());
+      sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getCanonicalName());
+      sd.setCols(Arrays.asList(col1, col2));
+      table.setPartitionKeys(Arrays.asList(col3));
+      table.setSd(sd);
+      db.createTable(table);
+      return db.getTable(catName, dbName, tableName);
     } catch (Exception exception) {
       fail("Unable to drop and create table " + StatsUtils.getFullyQualifiedTableName(dbName, tableName) + " because "
           + StringUtils.stringifyException(exception));
@@ -91,9 +118,9 @@ public class TestMsckCreatePartitionsInBatches {
-  private void cleanUpTableQuietly(String dbName, String tableName) {
+  private void cleanUpTableQuietly(String catName, String dbName, String tableName) {
     try {
-      db.dropTable(dbName, tableName, true, true, true);
+      db.dropTable(catName, dbName, tableName);
     } catch (Exception exception) {
       fail("Unexpected exception: " + StringUtils.stringifyException(exception));
@@ -119,19 +146,23 @@ public class TestMsckCreatePartitionsInBatches {
   public void testNumberOfCreatePartitionCalls() throws Exception {
     // create 10 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(10);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // batch size of 5 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 5, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 5, 2, 0);
     // there should be 2 calls to create partitions with each batch size of 5
-    ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, Mockito.times(2)).createPartitions(argument.capture());
+    ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class);
+    Mockito.verify(spyDb, Mockito.times(2)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), needResultsArg.capture());
     // confirm the batch sizes were 5, 5 in the two calls to create partitions
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
     Assert.assertEquals(String.format("Unexpected batch size in retry attempt %d ", retryAttempt++),
-        5, apds.get(0).getPartitionCount());
+        5, apds.get(0).size());
     Assert.assertEquals(String.format("Unexpected batch size in retry attempt %d ", retryAttempt++),
-        5, apds.get(1).getPartitionCount());
+        5, apds.get(1).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
@@ -144,19 +175,23 @@ public class TestMsckCreatePartitionsInBatches {
   public void testUnevenNumberOfCreatePartitionCalls() throws Exception {
     // create 9 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(9);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // batch size of 5 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 5, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 5, 2, 0);
     // there should be 2 calls to create partitions with batch sizes of 5, 4
-    ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, Mockito.times(2)).createPartitions(argument.capture());
+    ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class);
+    Mockito.verify(spyDb, Mockito.times(2)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), needResultsArg.capture());
     // confirm the batch sizes were 5, 4 in the two calls to create partitions
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
     Assert.assertEquals(String.format("Unexpected batch size in retry attempt %d ", retryAttempt++),
-        5, apds.get(0).getPartitionCount());
+        5, apds.get(0).size());
     Assert.assertEquals(String.format("Unexpected batch size in retry attempt %d ", retryAttempt++),
-        4, apds.get(1).getPartitionCount());
+        4, apds.get(1).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
@@ -169,14 +204,20 @@ public class TestMsckCreatePartitionsInBatches {
   public void testEqualNumberOfPartitions() throws Exception {
     // create 13 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(13);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // batch size of 13 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 13, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 13, 2, 0);
+    // there should be 1 call to create partitions with batch sizes of 13
+    ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class);
     // there should be 1 call to create partitions with batch sizes of 13
-    ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, Mockito.times(1)).createPartitions(argument.capture());
+    Mockito.verify(spyDb, Mockito.times(1)).add_partitions(argParts.capture(), ifNotExistsArg.capture(),
+      needResultsArg.capture());
     Assert.assertEquals("Unexpected number of batch size", 13,
-        argument.getValue().getPartitionCount());
+        argParts.getValue().size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
@@ -189,15 +230,22 @@ public class TestMsckCreatePartitionsInBatches {
   public void testSmallNumberOfPartitions() throws Exception {
     // create 10 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(10);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // batch size of 20 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 20, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 20, 2, 0);
     // there should be 1 call to create partitions with batch sizes of 10
-    Mockito.verify(spyDb, Mockito.times(1)).createPartitions(Mockito.anyObject());
-    ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb).createPartitions(argument.capture());
+    Mockito.verify(spyDb, Mockito.times(1)).add_partitions(Mockito.anyObject(), Mockito.anyBoolean(),
+      Mockito.anyBoolean());
+    ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class);
+    // there should be 1 call to create partitions with batch sizes of 10
+    Mockito.verify(spyDb, Mockito.times(1)).add_partitions(argParts.capture(), ifNotExistsArg.capture(),
+      needResultsArg.capture());
     Assert.assertEquals("Unexpected number of batch size", 10,
-        argument.getValue().getPartitionCount());
+        argParts.getValue().size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
@@ -210,28 +258,34 @@ public class TestMsckCreatePartitionsInBatches {
   public void testBatchingWhenException() throws Exception {
     // create 13 dummy partitions
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(23);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
     // first call to createPartitions should throw exception
-        .createPartitions(Mockito.any(AddPartitionDesc.class));
+      .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(),
+        Mockito.anyBoolean());
     // test with a batch size of 30 and decaying factor of 2
-    ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 0);
+    msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 0);
     // confirm the batch sizes were 23, 15, 8 in the three calls to create partitions
-    ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class);
+    ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class);
     // there should be 3 calls to create partitions with batch sizes of 23, 15, 8
-    Mockito.verify(spyDb, Mockito.times(3)).createPartitions(argument.capture());
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    Mockito.verify(spyDb, Mockito.times(3)).add_partitions(argParts.capture(), ifNotExistsArg.capture(),
+      needResultsArg.capture());
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 23,
-        apds.get(0).getPartitionCount());
+        apds.get(0).size());
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 15,
-        apds.get(1).getPartitionCount());
+        apds.get(1).size());
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 8,
-        apds.get(2).getPartitionCount());
+        apds.get(2).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
@@ -244,38 +298,44 @@ public class TestMsckCreatePartitionsInBatches {
   public void testRetriesExhaustedBatchSize() throws Exception {
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(17);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
-        .createPartitions(Mockito.any(AddPartitionDesc.class));
+      .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), Mockito.anyBoolean());
     // batch size of 5 and decaying factor of 2
     Exception ex = null;
     try {
-      ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 0);
+      msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 0);
     } catch (Exception retryEx) {
       ex = retryEx;
-    Assert.assertFalse("Exception was expected but was not thrown", ex == null);
-    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryException);
+    assertFalse("Exception was expected but was not thrown", ex == null);
+    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryUtilities.RetryException);
+    // there should be 5 calls to create partitions with batch sizes of 17, 15, 7, 3, 1
+    ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class);
     // there should be 5 calls to create partitions with batch sizes of 17, 15, 7, 3, 1
-    ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, Mockito.times(5)).createPartitions(argument.capture());
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    Mockito.verify(spyDb, Mockito.times(5)).add_partitions(argParts.capture(), ifNotExistsArg.capture(),
+      needResultsArg.capture());
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 17,
-        apds.get(0).getPartitionCount());
+        apds.get(0).size());
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 15,
-        apds.get(1).getPartitionCount());
+        apds.get(1).size());
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 7,
-        apds.get(2).getPartitionCount());
+        apds.get(2).size());
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 3,
-        apds.get(3).getPartitionCount());
+        apds.get(3).size());
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 1,
-        apds.get(4).getPartitionCount());
+        apds.get(4).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
@@ -285,28 +345,32 @@ public class TestMsckCreatePartitionsInBatches {
   public void testMaxRetriesReached() throws Exception {
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(17);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
-        .createPartitions(Mockito.any(AddPartitionDesc.class));
+      .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), Mockito.anyBoolean());
     // batch size of 5 and decaying factor of 2
     Exception ex = null;
     try {
-      ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 2);
+      msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 2);
     } catch (Exception retryEx) {
       ex = retryEx;
-    Assert.assertFalse("Exception was expected but was not thrown", ex == null);
-    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryException);
-    ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, Mockito.times(2)).createPartitions(argument.capture());
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    assertFalse("Exception was expected but was not thrown", ex == null);
+    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryUtilities.RetryException);
+    ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class);
+    Mockito.verify(spyDb, Mockito.times(2)).add_partitions(argParts.capture(), ifNotExistsArg.capture(), needResultsArg.capture());
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 17,
-        apds.get(0).getPartitionCount());
+        apds.get(0).size());
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 15,
-        apds.get(1).getPartitionCount());
+        apds.get(1).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());
@@ -317,25 +381,31 @@ public class TestMsckCreatePartitionsInBatches {
   public void testOneMaxRetries() throws Exception {
     Set<PartitionResult> partsNotInMs = createPartsNotInMs(17);
-    Hive spyDb = Mockito.spy(db);
+    IMetaStoreClient spyDb = Mockito.spy(db);
-        .createPartitions(Mockito.any(AddPartitionDesc.class));
+      .add_partitions(Mockito.anyObject(), Mockito.anyBoolean(), Mockito.anyBoolean());
     // batch size of 5 and decaying factor of 2
     Exception ex = null;
     try {
-      ddlTask.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 1);
+      msck.createPartitionsInBatches(spyDb, repairOutput, partsNotInMs, table, 30, 2, 1);
     } catch (Exception retryEx) {
       ex = retryEx;
-    Assert.assertFalse("Exception was expected but was not thrown", ex == null);
-    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryException);
+    assertFalse("Exception was expected but was not thrown", ex == null);
+    Assert.assertTrue("Unexpected class of exception thrown", ex instanceof RetryUtilities.RetryException);
+    // there should be 5 calls to create partitions with batch sizes of 17, 15, 7, 3, 1
+    ArgumentCaptor<Boolean> ifNotExistsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<Boolean> needResultsArg = ArgumentCaptor.forClass(Boolean.class);
+    ArgumentCaptor<List<Partition>> argParts = ArgumentCaptor.forClass((Class) List.class);
     // there should be 5 calls to create partitions with batch sizes of 17, 15, 7, 3, 1
-    ArgumentCaptor<AddPartitionDesc> argument = ArgumentCaptor.forClass(AddPartitionDesc.class);
-    Mockito.verify(spyDb, Mockito.times(1)).createPartitions(argument.capture());
-    List<AddPartitionDesc> apds = argument.getAllValues();
+    Mockito.verify(spyDb, Mockito.times(1)).add_partitions(argParts.capture(), ifNotExistsArg.capture(),
+      needResultsArg.capture());
+    List<List<Partition>> apds = argParts.getAllValues();
     int retryAttempt = 1;
         String.format("Unexpected batch size in retry attempt %d ", retryAttempt++), 17,
-        apds.get(0).getPartitionCount());
+        apds.get(0).size());
+    assertTrue(ifNotExistsArg.getValue());
+    assertFalse(needResultsArg.getValue());

[2/4] hive git commit: HIVE-20707: Automatic partition management (Prasanth Jayachandran reviewed by Jason Dere)

Posted by
diff --git a/ql/src/test/results/clientpositive/rename_external_partition_location.q.out b/ql/src/test/results/clientpositive/rename_external_partition_location.q.out
index 02cd814..d854887 100644
--- a/ql/src/test/results/clientpositive/rename_external_partition_location.q.out
+++ b/ql/src/test/results/clientpositive/rename_external_partition_location.q.out
@@ -103,6 +103,7 @@ Table Parameters:
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numPartitions       	1                   
 	numRows             	10                  
@@ -266,6 +267,7 @@ Table Parameters:
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numPartitions       	1                   
 	numRows             	10          
diff --git a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
index b2bcd51..40b6ad7 100644
--- a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
+++ b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
@@ -345,6 +345,7 @@ LOCATION
 #### A masked pattern was here ####
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: select * from ext_t_imported
@@ -426,6 +427,7 @@ LOCATION
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: select * from ext_t_r_imported
diff --git a/ql/src/test/results/clientpositive/show_create_table_alter.q.out b/ql/src/test/results/clientpositive/show_create_table_alter.q.out
index 2c75c36..9d93ee9 100644
--- a/ql/src/test/results/clientpositive/show_create_table_alter.q.out
+++ b/ql/src/test/results/clientpositive/show_create_table_alter.q.out
@@ -32,6 +32,7 @@ LOCATION
 #### A masked pattern was here ####
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('comment'='temporary table', 'EXTERNAL'='FALSE')
@@ -67,6 +68,7 @@ LOCATION
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('comment'='changed comment', 'EXTERNAL'='TRUE')
@@ -101,6 +103,7 @@ LOCATION
 #### A masked pattern was here ####
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
@@ -135,6 +138,7 @@ LOCATION
 #### A masked pattern was here ####
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('storage_handler'='org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler')
@@ -169,6 +173,7 @@ LOCATION
 #### A masked pattern was here ####
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: DROP TABLE tmp_showcrt1_n1
diff --git a/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out b/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out
index e554a18..8a56bfc 100644
--- a/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out
+++ b/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out
@@ -32,6 +32,7 @@ LOCATION
 #### A masked pattern was here ####
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: DROP TABLE tmp_showcrt1_n2
diff --git a/ql/src/test/results/clientpositive/show_create_table_serde.q.out b/ql/src/test/results/clientpositive/show_create_table_serde.q.out
index 8b95c9b..a66c09a 100644
--- a/ql/src/test/results/clientpositive/show_create_table_serde.q.out
+++ b/ql/src/test/results/clientpositive/show_create_table_serde.q.out
@@ -174,6 +174,7 @@ LOCATION
 #### A masked pattern was here ####
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: DROP TABLE tmp_showcrt1_n0
diff --git a/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out b/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out
index 2d713a8..74f8b5a 100644
--- a/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out
+++ b/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out
@@ -49,6 +49,7 @@ Table Parameters:
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	totalSize           	11                  
 #### A masked pattern was here ####
@@ -90,6 +91,7 @@ Table Parameters:
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numRows             	6                   
 	rawDataSize         	6           
diff --git a/ql/src/test/results/clientpositive/stats_noscan_2.q.out b/ql/src/test/results/clientpositive/stats_noscan_2.q.out
index 182820f..6625219 100644
--- a/ql/src/test/results/clientpositive/stats_noscan_2.q.out
+++ b/ql/src/test/results/clientpositive/stats_noscan_2.q.out
@@ -49,6 +49,7 @@ Table Parameters:
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	totalSize           	11                  
 #### A masked pattern was here ####
@@ -90,6 +91,7 @@ Table Parameters:
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numRows             	6                   
 	rawDataSize         	6           
diff --git a/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out b/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out
index 2a442b4..065cd98 100644
--- a/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out
+++ b/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out
@@ -61,6 +61,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"adrevenue\":\"true\",\"avgtimeonsite\":\"true\",\"ccode\":\"true\",\"desturl\":\"true\",\"lcode\":\"true\",\"skeyword\":\"true\",\"sourceip\":\"true\",\"useragent\":\"true\",\"visitdate\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	0                   
 	numRows             	0                   
 	rawDataSize         	0                   
@@ -111,6 +112,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numRows             	0                   
 	rawDataSize         	0                   
@@ -267,6 +269,7 @@ STAGE PLANS:
               columns sourceip,desturl,visitdate,adrevenue,useragent,ccode,lcode,skeyword,avgtimeonsite
               columns.types string:string:string:float:string:string:string:string:int
+              discover.partitions true
               field.delim |
 #### A masked pattern was here ####
               name default.uservisits_web_text_none
@@ -289,6 +292,7 @@ STAGE PLANS:
                 columns sourceip,desturl,visitdate,adrevenue,useragent,ccode,lcode,skeyword,avgtimeonsite
                 columns.types string:string:string:float:string:string:string:string:int
+                discover.partitions true
                 field.delim |
 #### A masked pattern was here ####
                 name default.uservisits_web_text_none
@@ -381,6 +385,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"adRevenue\":\"true\",\"avgTimeOnSite\":\"true\",\"sourceIP\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numRows             	55                  
 	rawDataSize         	7005        
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/
new file mode 100644
index 0000000..5287f47
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/
@@ -0,0 +1,153 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+import java.util.Set;
+import java.util.TreeSet;
+ * Result class used by the HiveMetaStoreChecker.
+ */
+public class CheckResult {
+  // tree sets to preserve ordering in qfile tests
+  private Set<String> tablesNotOnFs = new TreeSet<String>();
+  private Set<String> tablesNotInMs = new TreeSet<String>();
+  private Set<PartitionResult> partitionsNotOnFs = new TreeSet<PartitionResult>();
+  private Set<PartitionResult> partitionsNotInMs = new TreeSet<PartitionResult>();
+  private Set<PartitionResult> expiredPartitions = new TreeSet<>();
+  /**
+   * @return a list of tables not found on the filesystem.
+   */
+  public Set<String> getTablesNotOnFs() {
+    return tablesNotOnFs;
+  }
+  /**
+   * @param tablesNotOnFs
+   *          a list of tables not found on the filesystem.
+   */
+  public void setTablesNotOnFs(Set<String> tablesNotOnFs) {
+    this.tablesNotOnFs = tablesNotOnFs;
+  }
+  /**
+   * @return a list of tables not found in the metastore.
+   */
+  public Set<String> getTablesNotInMs() {
+    return tablesNotInMs;
+  }
+  /**
+   * @param tablesNotInMs
+   *          a list of tables not found in the metastore.
+   */
+  public void setTablesNotInMs(Set<String> tablesNotInMs) {
+    this.tablesNotInMs = tablesNotInMs;
+  }
+  /**
+   * @return a list of partitions not found on the fs
+   */
+  public Set<PartitionResult> getPartitionsNotOnFs() {
+    return partitionsNotOnFs;
+  }
+  /**
+   * @param partitionsNotOnFs
+   *          a list of partitions not found on the fs
+   */
+  public void setPartitionsNotOnFs(Set<PartitionResult> partitionsNotOnFs) {
+    this.partitionsNotOnFs = partitionsNotOnFs;
+  }
+  /**
+   * @return a list of partitions not found in the metastore
+   */
+  public Set<PartitionResult> getPartitionsNotInMs() {
+    return partitionsNotInMs;
+  }
+  /**
+   * @param partitionsNotInMs
+   *          a list of partitions not found in the metastore
+   */
+  public void setPartitionsNotInMs(Set<PartitionResult> partitionsNotInMs) {
+    this.partitionsNotInMs = partitionsNotInMs;
+  }
+  public Set<PartitionResult> getExpiredPartitions() {
+    return expiredPartitions;
+  }
+  public void setExpiredPartitions(
+    final Set<PartitionResult> expiredPartitions) {
+    this.expiredPartitions = expiredPartitions;
+  }
+  /**
+   * A basic description of a partition that is missing from either the fs or
+   * the ms.
+   */
+  public static class PartitionResult implements Comparable<PartitionResult> {
+    private String partitionName;
+    private String tableName;
+    /**
+     * @return name of partition
+     */
+    public String getPartitionName() {
+      return partitionName;
+    }
+    /**
+     * @param partitionName
+     *          name of partition
+     */
+    public void setPartitionName(String partitionName) {
+      this.partitionName = partitionName;
+    }
+    /**
+     * @return table name
+     */
+    public String getTableName() {
+      return tableName;
+    }
+    /**
+     * @param tableName
+     *          table name
+     */
+    public void setTableName(String tableName) {
+      this.tableName = tableName;
+    }
+    @Override
+    public String toString() {
+      return tableName + ":" + partitionName;
+    }
+    public int compareTo(PartitionResult o) {
+      int ret = tableName.compareTo(o.tableName);
+      return ret != 0 ? ret : partitionName.compareTo(o.partitionName);
+    }
+  }
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/
index 294dfb7..ecd5996 100755
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/
@@ -409,7 +409,7 @@ public class Warehouse {
-  private static String escapePathName(String path) {
+  public static String escapePathName(String path) {
     return FileUtils.escapePathName(path);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/
new file mode 100644
index 0000000..ab89389
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/
@@ -0,0 +1,36 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.api;
+public class MetastoreException extends Exception {
+  public MetastoreException() {
+    super();
+  }
+  public MetastoreException(String message) {
+    super(message);
+  }
+  public MetastoreException(Throwable cause) {
+    super(cause);
+  }
+  public MetastoreException(String message, Throwable cause) {
+    super(message, cause);
+  }
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/
index 1d64cce..f3a78bf 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/
@@ -74,6 +74,8 @@ public class MetastoreConf {
+    "org.apache.hadoop.hive.metastore.PartitionManagementTask";
   static final String EVENT_CLEANER_TASK_CLASS =
@@ -651,6 +653,58 @@ public class MetastoreConf {
     METRICS_REPORTERS("metastore.metrics.reporters", "metastore.metrics.reporters", "json,jmx",
         new StringSetValidator("json", "jmx", "console", "hadoop"),
         "A comma separated list of metrics reporters to start"),
+    MSCK_PATH_VALIDATION("msck.path.validation", "hive.msck.path.validation", "throw",
+      new StringSetValidator("throw", "skip", "ignore"), "The approach msck should take with HDFS " +
+      "directories that are partition-like but contain unsupported characters. 'throw' (an " +
+      "exception) is the default; 'skip' will skip the invalid directories and still repair the" +
+      " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"),
+      "", 3000,
+      "Batch size for the msck repair command. If the value is greater than zero,\n "
+        + "it will execute batch wise with the configured batch size. In case of errors while\n"
+        + "adding unknown partitions the batch size is automatically reduced by half in the subsequent\n"
+        + "retry attempt. The default value is 3000 which means it will execute in the batches of 3000."),
+      "Maximum number of retries for the msck repair command when adding unknown partitions.\n "
+        + "If the value is greater than zero it will retry adding unknown partitions until the maximum\n"
+        + "number of attempts is reached or batch size is reduced to 0, whichever is earlier.\n"
+        + "In each retry attempt it will reduce the batch size by a factor of 2 until it reaches zero.\n"
+        + "If the value is set to zero it will retry until the batch size becomes zero as described above."),
+      "", false,
+      "If 'partition.retention.period' table property is set, this flag determines whether MSCK REPAIR\n" +
+      "command should handle partition retention. If enabled, and if a specific partition's age exceeded\n" +
+      "retention period the partition will be dropped along with data"),
+    // Partition management task params
+      "",
+      300, TimeUnit.SECONDS, "Frequency at which timer task runs to do automatic partition management for tables\n" +
+      "with table property 'discover.partitions'='true'. Partition management include 2 pieces. One is partition\n" +
+      "discovery and other is partition retention period. When 'discover.partitions'='true' is set, partition\n" +
+      "management will look for partitions in table location and add partitions objects for it in metastore.\n" +
+      "Similarly if partition object exists in metastore and partition location does not exist, partition object\n" +
+      "will be dropped. The second piece in partition management is retention period. When 'discover.partition'\n" +
+      "is set to true and if 'partition.retention.period' table property is defined, partitions that are older\n" +
+      "than the specified retention period will be automatically dropped from metastore along with the data."),
+      "Comma separated list of table types to use for partition management"),
+      "", 5,
+      "Partition management uses thread pool on to which tasks are submitted for discovering and retaining the\n" +
+      "partitions. This determines the size of the thread pool."),
+      "", "hive",
+      "Automatic partition management will look for tables under the specified catalog name"),
+      "", "*",
+      "Automatic partition management will look for tables using the specified database pattern"),
+      "", "*",
+      "Automatic partition management will look for tables using the specified table pattern"),
     MULTITHREADED("javax.jdo.option.Multithreaded", "javax.jdo.option.Multithreaded", true,
         "Set this to true if multiple threads access metastore through JDO concurrently."),
     MAX_OPEN_TXNS("", "", 100000,
@@ -799,7 +853,7 @@ public class MetastoreConf {
     TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always",
         "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
-            "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask",
+          "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask",
         "Comma separated list of tasks that will be started in separate threads.  These will " +
             "always be started, regardless of whether the metastore is running in embedded mode " +
             "or in server mode.  They must implement " + METASTORE_TASK_THREAD_CLASS),
@@ -808,7 +862,8 @@ public class MetastoreConf {
             ACID_WRITE_SET_SERVICE_CLASS + "," +
         "Command separated list of tasks that will be started in separate threads.  These will be" +
             " started only when the metastore is running as a separate service.  They must " +
             "implement " + METASTORE_TASK_THREAD_CLASS),
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/
index 8fb1fa7..1d89e12 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/
@@ -17,37 +17,6 @@
 package org.apache.hadoop.hive.metastore.utils;
-import org.apache.commons.beanutils.PropertyUtils;
-import org.apache.hadoop.hive.metastore.api.PartitionSpec;
-import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD;
-import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD;
-import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.metastore.ColumnType;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
 import java.beans.PropertyDescriptor;
@@ -69,6 +38,30 @@ import;
 import static java.util.regex.Pattern.compile;
+import javax.annotation.Nullable;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 public class MetaStoreUtils {
   /** A fixed date format to be used for hive partition column values. */
   public static final ThreadLocal<DateFormat> PARTITION_DATE_FORMAT =
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
new file mode 100644
index 0000000..2df45f6
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
@@ -0,0 +1,571 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getAllPartitionsOf;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getDataLocation;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartColNames;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartCols;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartition;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionName;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionSpec;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPath;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isPartitioned;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ * Verify that the information in the metastore matches what is on the
+ * filesystem. Return a CheckResult object containing lists of missing and any
+ * unexpected tables and partitions.
+ */
+public class HiveMetaStoreChecker {
+  public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreChecker.class);
+  private final IMetaStoreClient msc;
+  private final Configuration conf;
+  private final long partitionExpirySeconds;
+  private final Interner<Path> pathInterner = Interners.newStrongInterner();
+  public HiveMetaStoreChecker(IMetaStoreClient msc, Configuration conf) {
+    this(msc, conf, -1);
+  }
+  public HiveMetaStoreChecker(IMetaStoreClient msc, Configuration conf, long partitionExpirySeconds) {
+    super();
+    this.msc = msc;
+    this.conf = conf;
+    this.partitionExpirySeconds = partitionExpirySeconds;
+  }
+  public IMetaStoreClient getMsc() {
+    return msc;
+  }
+  /**
+   * Check the metastore for inconsistencies, data missing in either the
+   * metastore or on the dfs.
+   *
+   * @param catName
+   *          name of the catalog, if not specified default catalog will be used.
+   * @param dbName
+   *          name of the database, if not specified the default will be used.
+   * @param tableName
+   *          Table we want to run the check for. If null we'll check all the
+   *          tables in the database.
+   * @param partitions
+   *          List of partition name value pairs, if null or empty check all
+   *          partitions
+   * @param result
+   *          Fill this with the results of the check
+   * @throws MetastoreException
+   *           Failed to get required information from the metastore.
+   * @throws IOException
+   *           Most likely filesystem related
+   */
+  public void checkMetastore(String catName, String dbName, String tableName,
+      List<? extends Map<String, String>> partitions, CheckResult result)
+      throws MetastoreException, IOException {
+    if (dbName == null || "".equalsIgnoreCase(dbName)) {
+      dbName = Warehouse.DEFAULT_DATABASE_NAME;
+    }
+    try {
+      if (tableName == null || "".equals(tableName)) {
+        // no table specified, check all tables and all partitions.
+        List<String> tables = getMsc().getTables(catName, dbName, ".*");
+        for (String currentTableName : tables) {
+          checkTable(catName, dbName, currentTableName, null, result);
+        }
+        findUnknownTables(catName, dbName, tables, result);
+      } else if (partitions == null || partitions.isEmpty()) {
+        // only one table, let's check all partitions
+        checkTable(catName, dbName, tableName, null, result);
+      } else {
+        // check the specified partitions
+        checkTable(catName, dbName, tableName, partitions, result);
+      }
+"Number of partitionsNotInMs=" + result.getPartitionsNotInMs()
+              + ", partitionsNotOnFs=" + result.getPartitionsNotOnFs()
+              + ", tablesNotInMs=" + result.getTablesNotInMs()
+              + ", tablesNotOnFs=" + result.getTablesNotOnFs()
+              + ", expiredPartitions=" + result.getExpiredPartitions());
+    } catch (TException e) {
+      throw new MetastoreException(e);
+    }
+  }
+  /**
+   * Check for table directories that aren't in the metastore.
+   *
+   * @param catName
+   *          name of the catalog, if not specified default catalog will be used.
+   * @param dbName
+   *          Name of the database
+   * @param tables
+   *          List of table names
+   * @param result
+   *          Add any found tables to this
+   * @throws IOException
+   *           Most likely filesystem related
+   * @throws MetaException
+   *           Failed to get required information from the metastore.
+   * @throws NoSuchObjectException
+   *           Failed to get required information from the metastore.
+   * @throws TException
+   *           Thrift communication error.
+   */
+  void findUnknownTables(String catName, String dbName, List<String> tables, CheckResult result)
+      throws IOException, MetaException, TException {
+    Set<Path> dbPaths = new HashSet<Path>();
+    Set<String> tableNames = new HashSet<String>(tables);
+    for (String tableName : tables) {
+      Table table = getMsc().getTable(catName, dbName, tableName);
+      // hack, instead figure out a way to get the db paths
+      String isExternal = table.getParameters().get("EXTERNAL");
+      if (!"TRUE".equalsIgnoreCase(isExternal)) {
+        Path tablePath = getPath(table);
+        if (tablePath != null) {
+          dbPaths.add(tablePath.getParent());
+        }
+      }
+    }
+    for (Path dbPath : dbPaths) {
+      FileSystem fs = dbPath.getFileSystem(conf);
+      FileStatus[] statuses = fs.listStatus(dbPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
+      for (FileStatus status : statuses) {
+        if (status.isDir() && !tableNames.contains(status.getPath().getName())) {
+          result.getTablesNotInMs().add(status.getPath().getName());
+        }
+      }
+    }
+  }
+  /**
+   * Check the metastore for inconsistencies, data missing in either the
+   * metastore or on the dfs.
+   *
+   * @param catName
+   *          name of the catalog, if not specified default catalog will be used.
+   * @param dbName
+   *          Name of the database
+   * @param tableName
+   *          Name of the table
+   * @param partitions
+   *          Partitions to check, if null or empty get all the partitions.
+   * @param result
+   *          Result object
+   * @throws MetastoreException
+   *           Failed to get required information from the metastore.
+   * @throws IOException
+   *           Most likely filesystem related
+   * @throws MetaException
+   *           Failed to get required information from the metastore.
+   */
+  void checkTable(String catName, String dbName, String tableName,
+      List<? extends Map<String, String>> partitions, CheckResult result)
+      throws MetaException, IOException, MetastoreException {
+    Table table;
+    try {
+      table = getMsc().getTable(catName, dbName, tableName);
+    } catch (TException e) {
+      result.getTablesNotInMs().add(tableName);
+      return;
+    }
+    PartitionIterable parts;
+    boolean findUnknownPartitions = true;
+    if (isPartitioned(table)) {
+      if (partitions == null || partitions.isEmpty()) {
+        int batchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
+        if (batchSize > 0) {
+          parts = new PartitionIterable(getMsc(), table, batchSize);
+        } else {
+          List<Partition> loadedPartitions = getAllPartitionsOf(getMsc(), table);
+          parts = new PartitionIterable(loadedPartitions);
+        }
+      } else {
+        // we're interested in specific partitions,
+        // don't check for any others
+        findUnknownPartitions = false;
+        List<Partition> loadedPartitions = new ArrayList<>();
+        for (Map<String, String> map : partitions) {
+          Partition part = getPartition(getMsc(), table, map);
+          if (part == null) {
+            CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+            pr.setTableName(tableName);
+            pr.setPartitionName(Warehouse.makePartPath(map));
+            result.getPartitionsNotInMs().add(pr);
+          } else {
+            loadedPartitions.add(part);
+          }
+        }
+        parts = new PartitionIterable(loadedPartitions);
+      }
+    } else {
+      parts = new PartitionIterable(Collections.<Partition>emptyList());
+    }
+    checkTable(table, parts, findUnknownPartitions, result);
+  }
+  /**
+   * Check the metastore for inconsistencies, data missing in either the
+   * metastore or on the dfs.
+   *
+   * @param table
+   *          Table to check
+   * @param parts
+   *          Partitions to check
+   * @param result
+   *          Result object
+   * @param findUnknownPartitions
+   *          Should we try to find unknown partitions?
+   * @throws IOException
+   *           Could not get information from filesystem
+   * @throws MetastoreException
+   *           Could not create Partition object
+   */
+  void checkTable(Table table, PartitionIterable parts,
+      boolean findUnknownPartitions, CheckResult result) throws IOException,
+    MetastoreException {
+    Path tablePath = getPath(table);
+    if (tablePath == null) {
+      return;
+    }
+    FileSystem fs = tablePath.getFileSystem(conf);
+    if (!fs.exists(tablePath)) {
+      result.getTablesNotOnFs().add(table.getTableName());
+      return;
+    }
+    Set<Path> partPaths = new HashSet<Path>();
+    // check that the partition folders exist on disk
+    for (Partition partition : parts) {
+      if (partition == null) {
+        // most likely the user specified an invalid partition
+        continue;
+      }
+      Path partPath = getDataLocation(table, partition);
+      if (partPath == null) {
+        continue;
+      }
+      fs = partPath.getFileSystem(conf);
+      if (!fs.exists(partPath)) {
+        CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+        pr.setPartitionName(getPartitionName(table, partition));
+        pr.setTableName(partition.getTableName());
+        result.getPartitionsNotOnFs().add(pr);
+      }
+      if (partitionExpirySeconds > 0) {
+        long currentEpochSecs =;
+        long createdTime = partition.getCreateTime();
+        long partitionAgeSeconds = currentEpochSecs - createdTime;
+        if (partitionAgeSeconds > partitionExpirySeconds) {
+          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+          pr.setPartitionName(getPartitionName(table, partition));
+          pr.setTableName(partition.getTableName());
+          result.getExpiredPartitions().add(pr);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
+              partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
+              partitionAgeSeconds, partitionExpirySeconds);
+          }
+        }
+      }
+      for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
+        Path qualifiedPath = partPath.makeQualified(fs);
+        pathInterner.intern(qualifiedPath);
+        partPaths.add(qualifiedPath);
+        partPath = partPath.getParent();
+      }
+    }
+    if (findUnknownPartitions) {
+      findUnknownPartitions(table, partPaths, result);
+    }
+  }
+  /**
+   * Find partitions on the fs that are unknown to the metastore.
+   *
+   * @param table
+   *          Table where the partitions would be located
+   * @param partPaths
+   *          Paths of the partitions the ms knows about
+   * @param result
+   *          Result object
+   * @throws IOException
+   *           Thrown if we fail at fetching listings from the fs.
+   * @throws MetastoreException
+   */
+  void findUnknownPartitions(Table table, Set<Path> partPaths,
+      CheckResult result) throws IOException, MetastoreException {
+    Path tablePath = getPath(table);
+    if (tablePath == null) {
+      return;
+    }
+    // now check the table folder and see if we find anything
+    // that isn't in the metastore
+    Set<Path> allPartDirs = new HashSet<Path>();
+    checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(getPartColNames(table)));
+    // don't want the table dir
+    allPartDirs.remove(tablePath);
+    // remove the partition paths we know about
+    allPartDirs.removeAll(partPaths);
+    Set<String> partColNames = Sets.newHashSet();
+    for(FieldSchema fSchema : getPartCols(table)) {
+      partColNames.add(fSchema.getName());
+    }
+    // we should now only have the unexpected folders left
+    for (Path partPath : allPartDirs) {
+      FileSystem fs = partPath.getFileSystem(conf);
+      String partitionName = getPartitionName(fs.makeQualified(tablePath),
+          partPath, partColNames);
+      LOG.debug("PartitionName: " + partitionName);
+      if (partitionName != null) {
+        CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+        pr.setPartitionName(partitionName);
+        pr.setTableName(table.getTableName());
+        result.getPartitionsNotInMs().add(pr);
+      }
+    }
+    LOG.debug("Number of partitions not in metastore : " + result.getPartitionsNotInMs().size());
+  }
+  /**
+   * Assume that depth is 2, i.e., partition columns are a and b
+   * tblPath/a=1  => throw exception
+   * tblPath/a=1/file => throw exception
+   * tblPath/a=1/b=2/file => return a=1/b=2
+   * tblPath/a=1/b=2/c=3 => return a=1/b=2
+   * tblPath/a=1/b=2/c=3/file => return a=1/b=2
+   *
+   * @param basePath
+   *          Start directory
+   * @param allDirs
+   *          This set will contain the leaf paths at the end.
+   * @param partColNames
+   *          Partition column names
+   * @throws IOException
+   *           Thrown if we can't get lists from the fs.
+   * @throws MetastoreException
+   */
+  private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final List<String> partColNames) throws IOException, MetastoreException {
+    // Here we just reuse the THREAD_COUNT configuration for
+    // METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance
+    // The number of missing partitions discovered are later added by metastore using a
+    // threadpool of size METASTORE_FS_HANDLER_THREADS_COUNT. If we have different sized
+    // pool here the smaller sized pool of the two becomes a bottleneck
+    int poolSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.FS_HANDLER_THREADS_COUNT);
+    ExecutorService executor;
+    if (poolSize <= 1) {
+      LOG.debug("Using single-threaded version of MSCK-GetPaths");
+      executor = MoreExecutors.newDirectExecutorService();
+    } else {
+      LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " + poolSize);
+      ThreadFactory threadFactory =
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build();
+      executor = Executors.newFixedThreadPool(poolSize, threadFactory);
+    }
+    checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames);
+    executor.shutdown();
+  }
+  private final class PathDepthInfoCallable implements Callable<Path> {
+    private final List<String> partColNames;
+    private final FileSystem fs;
+    private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
+    private final boolean throwException;
+    private final PathDepthInfo pd;
+    private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, FileSystem fs,
+        ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
+      this.partColNames = partColNames;
+      this.pd = pd;
+      this.fs = fs;
+      this.pendingPaths = basePaths;
+      this.throwException = "throw".equals(MetastoreConf.getVar(conf, MetastoreConf.ConfVars.MSCK_PATH_VALIDATION));
+    }
+    @Override
+    public Path call() throws Exception {
+      return processPathDepthInfo(pd);
+    }
+    private Path processPathDepthInfo(final PathDepthInfo pd)
+        throws IOException, MetastoreException {
+      final Path currentPath = pd.p;
+      final int currentDepth = pd.depth;
+      FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
+      // found no files under a sub-directory under table base path; it is possible that the table
+      // is empty and hence there are no partition sub-directories created under base path
+      if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < partColNames.size()) {
+        // since maxDepth is not yet reached, we are missing partition
+        // columns in currentPath
+        logOrThrowExceptionWithMsg(
+            "MSCK is missing partition columns under " + currentPath.toString());
+      } else {
+        // found files under currentPath add them to the queue if it is a directory
+        for (FileStatus fileStatus : fileStatuses) {
+          if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) {
+            // found a file at depth which is less than number of partition keys
+            logOrThrowExceptionWithMsg(
+                "MSCK finds a file rather than a directory when it searches for "
+                    + fileStatus.getPath().toString());
+          } else if (fileStatus.isDirectory() && currentDepth < partColNames.size()) {
+            // found a sub-directory at a depth less than number of partition keys
+            // validate if the partition directory name matches with the corresponding
+            // partition colName at currentDepth
+            Path nextPath = fileStatus.getPath();
+            String[] parts = nextPath.getName().split("=");
+            if (parts.length != 2) {
+              logOrThrowExceptionWithMsg("Invalid partition name " + nextPath);
+            } else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) {
+              logOrThrowExceptionWithMsg(
+                  "Unexpected partition key " + parts[0] + " found at " + nextPath);
+            } else {
+              // add sub-directory to the work queue if maxDepth is not yet reached
+              pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1));
+            }
+          }
+        }
+        if (currentDepth == partColNames.size()) {
+          return currentPath;
+        }
+      }
+      return null;
+    }
+    private void logOrThrowExceptionWithMsg(String msg) throws MetastoreException {
+      if(throwException) {
+        throw new MetastoreException(msg);
+      } else {
+        LOG.warn(msg);
+      }
+    }
+  }
+  private static class PathDepthInfo {
+    private final Path p;
+    private final int depth;
+    PathDepthInfo(Path p, int depth) {
+      this.p = p;
+      this.depth = depth;
+    }
+  }
+  private void checkPartitionDirs(final ExecutorService executor,
+      final Path basePath, final Set<Path> result,
+      final FileSystem fs, final List<String> partColNames) throws MetastoreException {
+    try {
+      Queue<Future<Path>> futures = new LinkedList<Future<Path>>();
+      ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>();
+      nextLevel.add(new PathDepthInfo(basePath, 0));
+      //Uses level parallel implementation of a bfs. Recursive DFS implementations
+      //have a issue where the number of threads can run out if the number of
+      //nested sub-directories is more than the pool size.
+      //Using a two queue implementation is simpler than one queue since then we will
+      //have to add the complex mechanisms to let the free worker threads know when new levels are
+      //discovered using notify()/wait() mechanisms which can potentially lead to bugs if
+      //not done right
+      while(!nextLevel.isEmpty()) {
+        ConcurrentLinkedQueue<PathDepthInfo> tempQueue = new ConcurrentLinkedQueue<>();
+        //process each level in parallel
+        while(!nextLevel.isEmpty()) {
+          futures.add(
+              executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue)));
+        }
+        while(!futures.isEmpty()) {
+          Path p = futures.poll().get();
+          if (p != null) {
+            result.add(p);
+          }
+        }
+        //update the nextlevel with newly discovered sub-directories from the above
+        nextLevel = tempQueue;
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error(e.getMessage());
+      executor.shutdownNow();
+      throw new MetastoreException(e.getCause());
+    }
+  }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
new file mode 100644
index 0000000..b7ae1d8
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
@@ -0,0 +1,530 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ *
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.ObjectPair;
+import org.apache.hadoop.hive.metastore.utils.RetryUtilities;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ * Msck repairs table metadata specifically related to partition information to be in-sync with directories in table
+ * location.
+ */
+public class Msck {
+  public static final Logger LOG = LoggerFactory.getLogger(Msck.class);
+  public static final int separator = 9; // tabCode
+  private static final int terminator = 10; // newLineCode
+  private boolean acquireLock;
+  private boolean deleteData;
+  private Configuration conf;
+  private IMetaStoreClient msc;
+  public Msck(boolean acquireLock, boolean deleteData) {
+    this.acquireLock = acquireLock;
+    this.deleteData = deleteData;
+  }
+  public Configuration getConf() {
+    return conf;
+  }
+  public void setConf(final Configuration conf) {
+    this.conf = conf;
+  }
+  public void init(Configuration conf) throws MetaException {
+    if (msc == null) {
+      // the only reason we are using new conf here is to override EXPRESSION_PROXY_CLASS
+      Configuration metastoreConf = MetastoreConf.newMetastoreConf(new Configuration(conf));
+      metastoreConf.set(MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS.getVarname(),
+        MsckPartitionExpressionProxy.class.getCanonicalName());
+      setConf(metastoreConf);
+      this.msc = new HiveMetaStoreClient(metastoreConf);
+    }
+  }
+  /**
+   * MetastoreCheck, see if the data in the metastore matches what is on the
+   * dfs. Current version checks for tables and partitions that are either
+   * missing on disk on in the metastore.
+   *
+   * @param msckInfo Information about the tables and partitions we want to check for.
+   * @return Returns 0 when execution succeeds and above 0 if it fails.
+   */
+  public int repair(MsckInfo msckInfo) {
+    CheckResult result = new CheckResult();
+    List<String> repairOutput = new ArrayList<>();
+    String qualifiedTableName = null;
+    boolean success = false;
+    long txnId = -1;
+    int ret = 0;
+    try {
+      Table table = getMsc().getTable(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName());
+      if (getConf().getBoolean(MetastoreConf.ConfVars.MSCK_REPAIR_ENABLE_PARTITION_RETENTION.getHiveName(), false)) {
+        msckInfo.setPartitionExpirySeconds(PartitionManagementTask.getRetentionPeriodInSeconds(table));
+"Retention period ({}s) for partition is enabled for MSCK REPAIR..", msckInfo.getPartitionExpirySeconds());
+      }
+      HiveMetaStoreChecker checker = new HiveMetaStoreChecker(getMsc(), getConf(), msckInfo.getPartitionExpirySeconds());
+      // checkMetastore call will fill in result with partitions that are present in filesystem
+      // and missing in metastore - accessed through getPartitionsNotInMs
+      // And partitions that are not present in filesystem and metadata exists in metastore -
+      // accessed through getPartitionNotOnFS
+      checker.checkMetastore(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName(),
+        msckInfo.getPartSpecs(), result);
+      Set<CheckResult.PartitionResult> partsNotInMs = result.getPartitionsNotInMs();
+      Set<CheckResult.PartitionResult> partsNotInFs = result.getPartitionsNotOnFs();
+      Set<CheckResult.PartitionResult> expiredPartitions = result.getExpiredPartitions();
+      int totalPartsToFix = partsNotInMs.size() + partsNotInFs.size() + expiredPartitions.size();
+      // if nothing changed to partitions and if we are not repairing (add or drop) don't acquire for lock unnecessarily
+      boolean lockRequired = totalPartsToFix > 0 &&
+        msckInfo.isRepairPartitions() &&
+        (msckInfo.isAddPartitions() || msckInfo.isDropPartitions());
+"#partsNotInMs: {} #partsNotInFs: {} #expiredPartitions: {} lockRequired: {} (R: {} A: {} D: {})",
+        partsNotInMs.size(), partsNotInFs.size(), expiredPartitions.size(), lockRequired,
+        msckInfo.isRepairPartitions(), msckInfo.isAddPartitions(), msckInfo.isDropPartitions());
+      if (msckInfo.isRepairPartitions()) {
+        // Repair metadata in HMS
+        qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table);
+        long lockId;
+        if (acquireLock && lockRequired && table.getParameters() != null &&
+          MetaStoreServerUtils.isTransactionalTable(table.getParameters())) {
+          // Running MSCK from beeline/cli will make DDL task acquire X lock when repair is enabled, since we are directly
+          // invoking without SQL statement, we need to do the same and acquire X lock (repair is default)
+          LockRequest lockRequest = createLockRequest(msckInfo.getDbName(), msckInfo.getTableName());
+          txnId = lockRequest.getTxnid();
+          try {
+            LockResponse res = getMsc().lock(lockRequest);
+            if (res.getState() != LockState.ACQUIRED) {
+              throw new MetastoreException("Unable to acquire lock(X) on " + qualifiedTableName);
+            }
+            lockId = res.getLockid();
+          } catch (TException e) {
+            throw new MetastoreException("Unable to acquire lock(X) on " + qualifiedTableName, e);
+          }
+"Acquired lock(X) on {}. LockId: {}", qualifiedTableName, lockId);
+        }
+        int maxRetries = MetastoreConf.getIntVar(getConf(), MetastoreConf.ConfVars.MSCK_REPAIR_BATCH_MAX_RETRIES);
+        int decayingFactor = 2;
+        if (msckInfo.isAddPartitions() && !partsNotInMs.isEmpty()) {
+          // MSCK called to add missing paritions into metastore and there are
+          // missing partitions.
+          int batchSize = MetastoreConf.getIntVar(getConf(), MetastoreConf.ConfVars.MSCK_REPAIR_BATCH_SIZE);
+          if (batchSize == 0) {
+            //batching is not enabled. Try to add all the partitions in one call
+            batchSize = partsNotInMs.size();
+          }
+          AbstractList<String> vals = null;
+          String settingStr = MetastoreConf.getVar(getConf(), MetastoreConf.ConfVars.MSCK_PATH_VALIDATION);
+          boolean doValidate = !("ignore".equals(settingStr));
+          boolean doSkip = doValidate && "skip".equals(settingStr);
+          // The default setting is "throw"; assume doValidate && !doSkip means throw.
+          if (doValidate) {
+            // Validate that we can add partition without escaping. Escaping was originally intended
+            // to avoid creating invalid HDFS paths; however, if we escape the HDFS path (that we
+            // deem invalid but HDFS actually supports - it is possible to create HDFS paths with
+            // unprintable characters like ASCII 7), metastore will create another directory instead
+            // of the one we are trying to "repair" here.
+            Iterator<CheckResult.PartitionResult> iter = partsNotInMs.iterator();
+            while (iter.hasNext()) {
+              CheckResult.PartitionResult part =;
+              try {
+                vals = Warehouse.makeValsFromName(part.getPartitionName(), vals);
+              } catch (MetaException ex) {
+                throw new MetastoreException(ex);
+              }
+              for (String val : vals) {
+                String escapedPath = FileUtils.escapePathName(val);
+                assert escapedPath != null;
+                if (escapedPath.equals(val)) {
+                  continue;
+                }
+                String errorMsg = "Repair: Cannot add partition " + msckInfo.getTableName() + ':' +
+                  part.getPartitionName() + " due to invalid characters in the name";
+                if (doSkip) {
+                  repairOutput.add(errorMsg);
+                  iter.remove();
+                } else {
+                  throw new MetastoreException(errorMsg);
+                }
+              }
+            }
+          }
+          try {
+            createPartitionsInBatches(getMsc(), repairOutput, partsNotInMs, table, batchSize,
+              decayingFactor, maxRetries);
+          } catch (Exception e) {
+            throw new MetastoreException(e);
+          }
+        }
+        if (msckInfo.isDropPartitions() && (!partsNotInFs.isEmpty() || !expiredPartitions.isEmpty())) {
+          // MSCK called to drop stale paritions from metastore and there are
+          // stale partitions.
+          int batchSize = MetastoreConf.getIntVar(getConf(), MetastoreConf.ConfVars.MSCK_REPAIR_BATCH_SIZE);
+          if (batchSize == 0) {
+            //batching is not enabled. Try to drop all the partitions in one call
+            batchSize = partsNotInFs.size() + expiredPartitions.size();
+          }
+          try {
+            dropPartitionsInBatches(getMsc(), repairOutput, partsNotInFs, expiredPartitions, table, batchSize,
+              decayingFactor, maxRetries);
+          } catch (Exception e) {
+            throw new MetastoreException(e);
+          }
+        }
+      }
+      success = true;
+    } catch (Exception e) {
+      LOG.warn("Failed to run metacheck: ", e);
+      success = false;
+      ret = 1;
+    } finally {
+      if (msckInfo.getResFile() != null) {
+        BufferedWriter resultOut = null;
+        try {
+          Path resFile = new Path(msckInfo.getResFile());
+          FileSystem fs = resFile.getFileSystem(getConf());
+          resultOut = new BufferedWriter(new OutputStreamWriter(fs.create(resFile)));
+          boolean firstWritten = false;
+          firstWritten |= writeMsckResult(result.getTablesNotInMs(),
+            "Tables not in metastore:", resultOut, firstWritten);
+          firstWritten |= writeMsckResult(result.getTablesNotOnFs(),
+            "Tables missing on filesystem:", resultOut, firstWritten);
+          firstWritten |= writeMsckResult(result.getPartitionsNotInMs(),
+            "Partitions not in metastore:", resultOut, firstWritten);
+          firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(),
+            "Partitions missing from filesystem:", resultOut, firstWritten);
+          firstWritten |= writeMsckResult(result.getExpiredPartitions(),
+            "Expired partitions (retention period: " + msckInfo.getPartitionExpirySeconds() + "s) :", resultOut, firstWritten);
+          // sorting to stabilize qfile output (msck_repair_drop.q)
+          Collections.sort(repairOutput);
+          for (String rout : repairOutput) {
+            if (firstWritten) {
+              resultOut.write(terminator);
+            } else {
+              firstWritten = true;
+            }
+            resultOut.write(rout);
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to save metacheck output: ", e);
+          ret = 1;
+        } finally {
+          if (resultOut != null) {
+            try {
+              resultOut.close();
+            } catch (IOException e) {
+              LOG.warn("Failed to close output file: ", e);
+              ret = 1;
+            }
+          }
+        }
+      }
+"Tables not in metastore: {}", result.getTablesNotInMs());
+"Tables missing on filesystem: {}", result.getTablesNotOnFs());
+"Partitions not in metastore: {}", result.getPartitionsNotInMs());
+"Partitions missing from filesystem: {}", result.getPartitionsNotOnFs());
+"Expired partitions: {}", result.getExpiredPartitions());
+      if (acquireLock && txnId > 0) {
+          if (success) {
+            try {
+    "txnId: {} succeeded. Committing..", txnId);
+              getMsc().commitTxn(txnId);
+            } catch (Exception e) {
+              LOG.warn("Error while committing txnId: {} for table: {}", txnId, qualifiedTableName, e);
+              ret = 1;
+            }
+          } else {
+            try {
+    "txnId: {} failed. Aborting..", txnId);
+              getMsc().abortTxns(Lists.newArrayList(txnId));
+            } catch (Exception e) {
+              LOG.warn("Error while aborting txnId: {} for table: {}", txnId, qualifiedTableName, e);
+              ret = 1;
+            }
+          }
+      }
+      if (getMsc() != null) {
+        getMsc().close();
+        msc = null;
+      }
+    }
+    return ret;
+  }
+  private LockRequest createLockRequest(final String dbName, final String tableName) throws TException {
+    UserGroupInformation loggedInUser = null;
+    String username;
+    try {
+      loggedInUser = UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage());
+    }
+    if (loggedInUser == null) {
+      username = System.getProperty("");
+    } else {
+      username = loggedInUser.getShortUserName();
+    }
+    long txnId = getMsc().openTxn(username);
+    String agentInfo = Thread.currentThread().getName();
+    LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+    requestBuilder.setUser(username);
+    requestBuilder.setTransactionId(txnId);
+    LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+      .setDbName(dbName)
+      .setTableName(tableName)
+      .setIsTransactional(true)
+      .setExclusive()
+      // WriteType is DDL_EXCLUSIVE for MSCK REPAIR so we need NO_TXN. Refer AcidUtils.makeLockComponents
+      .setOperationType(DataOperationType.NO_TXN);
+    requestBuilder.addLockComponent(;
+"Created lock(X) request with info - user: {} txnId: {} agentInfo: {} dbName: {} tableName: {}",
+      username, txnId, agentInfo, dbName, tableName);
+    return;
+  }
+  public IMetaStoreClient getMsc() {
+    return msc;
+  }
+  @VisibleForTesting
+  public void createPartitionsInBatches(final IMetaStoreClient metastoreClient, List<String> repairOutput,
+    Set<CheckResult.PartitionResult> partsNotInMs, Table table, int batchSize, int decayingFactor, int maxRetries)
+    throws Exception {
+    String addMsgFormat = "Repair: Added partition to metastore "
+      + table.getTableName() + ":%s";
+    Set<CheckResult.PartitionResult> batchWork = new HashSet<>(partsNotInMs);
+    new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) {
+      @Override
+      public Void execute(int size) throws MetastoreException {
+        try {
+          while (!batchWork.isEmpty()) {
+            List<Partition> partsToAdd = new ArrayList<>();
+            //get the current batch size
+            int currentBatchSize = size;
+            //store the partitions temporarily until processed
+            List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize);
+            List<String> addMsgs = new ArrayList<>(currentBatchSize);
+            //add the number of partitions given by the current batchsize
+            for (CheckResult.PartitionResult part : batchWork) {
+              if (currentBatchSize == 0) {
+                break;
+              }
+              Path tablePath = MetaStoreServerUtils.getPath(table);
+              if (tablePath == null) {
+                continue;
+              }
+              Map<String, String> partSpec = Warehouse.makeSpecFromName(part.getPartitionName());
+              Path location = new Path(tablePath, Warehouse.makePartPath(partSpec));
+              Partition partition = MetaStoreServerUtils.createMetaPartitionObject(table, partSpec, location);
+              partition.setWriteId(table.getWriteId());
+              partsToAdd.add(partition);
+              lastBatch.add(part);
+              addMsgs.add(String.format(addMsgFormat, part.getPartitionName()));
+              currentBatchSize--;
+            }
+            metastoreClient.add_partitions(partsToAdd, true, false);
+            // if last batch is successful remove it from partsNotInMs
+            batchWork.removeAll(lastBatch);
+            repairOutput.addAll(addMsgs);
+          }
+          return null;
+        } catch (TException e) {
+          throw new MetastoreException(e);
+        }
+      }
+    }.run();
+  }
+  private static String makePartExpr(Map<String, String> spec)
+    throws MetaException {
+    StringBuilder suffixBuf = new StringBuilder();
+    int i = 0;
+    for (Map.Entry<String, String> e : spec.entrySet()) {
+      if (e.getValue() == null || e.getValue().length() == 0) {
+        throw new MetaException("Partition spec is incorrect. " + spec);
+      }
+      if (i > 0) {
+        suffixBuf.append(" AND ");
+      }
+      suffixBuf.append(Warehouse.escapePathName(e.getKey()));
+      suffixBuf.append('=');
+      suffixBuf.append("'").append(Warehouse.escapePathName(e.getValue())).append("'");
+      i++;
+    }
+    return suffixBuf.toString();
+  }
+  // Drops partitions in batches.  partNotInFs is split into batches based on batchSize
+  // and dropped.  The dropping will be through RetryUtilities which will retry when there is a
+  // failure after reducing the batchSize by decayingFactor.  Retrying will cease when maxRetries
+  // limit is reached or batchSize reduces to 0, whichever comes earlier.
+  @VisibleForTesting
+  public void dropPartitionsInBatches(final IMetaStoreClient metastoreClient, List<String> repairOutput,
+    Set<CheckResult.PartitionResult> partsNotInFs, Set<CheckResult.PartitionResult> expiredPartitions,
+    Table table, int batchSize, int decayingFactor, int maxRetries) throws Exception {
+    String dropMsgFormat =
+      "Repair: Dropped partition from metastore " + Warehouse.getCatalogQualifiedTableName(table) + ":%s";
+    // Copy of partitions that will be split into batches
+    Set<CheckResult.PartitionResult> batchWork = new HashSet<>(partsNotInFs);
+    if (expiredPartitions != null && !expiredPartitions.isEmpty()) {
+      batchWork.addAll(expiredPartitions);
+    }
+    PartitionDropOptions dropOptions = new PartitionDropOptions().deleteData(deleteData).ifExists(true);
+    new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) {
+      @Override
+      public Void execute(int size) throws MetastoreException {
+        try {
+          while (!batchWork.isEmpty()) {
+            int currentBatchSize = size;
+            // to store the partitions that are currently being processed
+            List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize);
+            // drop messages for the dropped partitions
+            List<String> dropMsgs = new ArrayList<>(currentBatchSize);
+            // Partitions to be dropped
+            List<String> dropParts = new ArrayList<>(currentBatchSize);
+            for (CheckResult.PartitionResult part : batchWork) {
+              // This batch is full: break out of for loop to execute
+              if (currentBatchSize == 0) {
+                break;
+              }
+              dropParts.add(part.getPartitionName());
+              // Add the part to lastBatch to track the parition being dropped
+              lastBatch.add(part);
+              // Update messages
+              dropMsgs.add(String.format(dropMsgFormat, part.getPartitionName()));
+              // Decrement batch size.  When this gets to 0, the batch will be executed
+              currentBatchSize--;
+            }
+            // this call is deleting partitions that are already missing from filesystem
+            // so 3rd parameter (deleteData) is set to false
+            // msck is doing a clean up of hms.  if for some reason the partition is already
+            // deleted, then it is good.  So, the last parameter ifexists is set to true
+            List<ObjectPair<Integer, byte[]>> partExprs = getPartitionExpr(dropParts);
+            metastoreClient.dropPartitions(table.getCatName(), table.getDbName(), table.getTableName(), partExprs, dropOptions);
+            // if last batch is successful remove it from partsNotInFs
+            batchWork.removeAll(lastBatch);
+            repairOutput.addAll(dropMsgs);
+          }
+          return null;
+        } catch (TException e) {
+          throw new MetastoreException(e);
+        }
+      }
+      private List<ObjectPair<Integer, byte[]>> getPartitionExpr(final List<String> parts) throws MetaException {
+        List<ObjectPair<Integer, byte[]>> expr = new ArrayList<>(parts.size());
+        for (int i = 0; i < parts.size(); i++) {
+          String partName = parts.get(i);
+          Map<String, String> partSpec = Warehouse.makeSpecFromName(partName);
+          String partExpr = makePartExpr(partSpec);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Generated partExpr: {} for partName: {}", partExpr, partName);
+          }
+          expr.add(new ObjectPair<>(i, partExpr.getBytes(StandardCharsets.UTF_8)));
+        }
+        return expr;
+      }
+    }.run();
+  }
+  /**
+   * Write the result of msck to a writer.
+   *
+   * @param result The result we're going to write
+   * @param msg    Message to write.
+   * @param out    Writer to write to
+   * @param wrote  if any previous call wrote data
+   * @return true if something was written
+   * @throws IOException In case the writing fails
+   */
+  private boolean writeMsckResult(Set<?> result, String msg,
+    Writer out, boolean wrote) throws IOException {
+    if (!result.isEmpty()) {
+      if (wrote) {
+        out.write(terminator);
+      }
+      out.write(msg);
+      for (Object entry : result) {
+        out.write(separator);
+        out.write(entry.toString());
+      }
+      return true;
+    }
+    return false;
+  }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
new file mode 100644
index 0000000..81bcb56
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
@@ -0,0 +1,125 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ *
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+ * Metadata related to Msck.
+ */
+public class MsckInfo {
+  private String catalogName;
+  private String dbName;
+  private String tableName;
+  private ArrayList<LinkedHashMap<String, String>> partSpecs;
+  private String resFile;
+  private boolean repairPartitions;
+  private boolean addPartitions;
+  private boolean dropPartitions;
+  private long partitionExpirySeconds;
+  public MsckInfo(final String catalogName, final String dbName, final String tableName,
+    final ArrayList<LinkedHashMap<String, String>> partSpecs, final String resFile, final boolean repairPartitions,
+    final boolean addPartitions,
+    final boolean dropPartitions,
+    final long partitionExpirySeconds) {
+    this.catalogName = catalogName;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.partSpecs = partSpecs;
+    this.resFile = resFile;
+    this.repairPartitions = repairPartitions;
+    this.addPartitions = addPartitions;
+    this.dropPartitions = dropPartitions;
+    this.partitionExpirySeconds = partitionExpirySeconds;
+  }
+  public String getCatalogName() {
+    return catalogName;
+  }
+  public void setCatalogName(final String catalogName) {
+    this.catalogName = catalogName;
+  }
+  public String getDbName() {
+    return dbName;
+  }
+  public void setDbName(final String dbName) {
+    this.dbName = dbName;
+  }
+  public String getTableName() {
+    return tableName;
+  }
+  public void setTableName(final String tableName) {
+    this.tableName = tableName;
+  }
+  public ArrayList<LinkedHashMap<String, String>> getPartSpecs() {
+    return partSpecs;
+  }
+  public void setPartSpecs(final ArrayList<LinkedHashMap<String, String>> partSpecs) {
+    this.partSpecs = partSpecs;
+  }
+  public String getResFile() {
+    return resFile;
+  }
+  public void setResFile(final String resFile) {
+    this.resFile = resFile;
+  }
+  public boolean isRepairPartitions() {
+    return repairPartitions;
+  }
+  public void setRepairPartitions(final boolean repairPartitions) {
+    this.repairPartitions = repairPartitions;
+  }
+  public boolean isAddPartitions() {
+    return addPartitions;
+  }
+  public void setAddPartitions(final boolean addPartitions) {
+    this.addPartitions = addPartitions;
+  }
+  public boolean isDropPartitions() {
+    return dropPartitions;
+  }
+  public void setDropPartitions(final boolean dropPartitions) {
+    this.dropPartitions = dropPartitions;
+  }
+  public long getPartitionExpirySeconds() {
+    return partitionExpirySeconds;
+  }
+  public void setPartitionExpirySeconds(final long partitionExpirySeconds) {
+    this.partitionExpirySeconds = partitionExpirySeconds;
+  }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
new file mode 100644
index 0000000..d842825
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
@@ -0,0 +1,64 @@
+package org.apache.hadoop.hive.metastore;
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ *
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+// This is added as part of moving MSCK code from ql to standalone-metastore. There is a metastore API to drop
+// partitions by name but we cannot use it because msck typically will contain partition value (year=2014). We almost
+// never drop partition by name (year). So we need to construct expression filters, the current
+// PartitionExpressionProxy implementations (PartitionExpressionForMetastore and HCatClientHMSImpl.ExpressionBuilder)
+// all depend on ql code to build ExprNodeDesc for the partition expressions. It also depends on kryo for serializing
+// the expression objects to byte[]. For MSCK drop partition, we don't need complex expression generator. For now,
+// all we do is split the partition spec (year=2014/month=24) into filter expression year='2014' and month='24' and
+// rely on metastore database to deal with type conversions. Ideally, PartitionExpressionProxy default implementation
+// should use SearchArgument (storage-api) to construct the filter expression and not depend on ql, but the usecase
+// for msck is pretty simple and this specific implementation should suffice.
+public class MsckPartitionExpressionProxy implements PartitionExpressionProxy {
+  @Override
+  public String convertExprToFilter(final byte[] exprBytes, final String defaultPartitionName) throws MetaException {
+    return new String(exprBytes, StandardCharsets.UTF_8);
+  }
+  @Override
+  public boolean filterPartitionsByExpr(List<FieldSchema> partColumns, byte[] expr, String
+    defaultPartitionName, List<String> partitionNames) throws MetaException {
+    return false;
+  }
+  @Override
+  public FileMetadataExprType getMetadataType(String inputFormat) {
+    throw new UnsupportedOperationException();
+  }
+  @Override
+  public FileFormatProxy getFileFormatProxy(FileMetadataExprType type) {
+    throw new UnsupportedOperationException();
+  }
+  @Override
+  public SearchArgument createSarg(byte[] expr) {
+    throw new UnsupportedOperationException();
+  }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
index 9c15804..0755483 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/
@@ -686,10 +686,9 @@ public class ObjectStore implements RawStore, Configurable {
       debugLog("rolling back transaction: no open transactions: " + openTrasactionCalls);
-    debugLog("Rollback transaction, isActive: " + currentTransaction.isActive());
+    debugLog("Rollback transaction, isActive: " + isActiveTransaction());
     try {
-      if (currentTransaction.isActive()
-          && transactionStatus != TXN_STATUS.ROLLBACK) {
+      if (isActiveTransaction() && transactionStatus != TXN_STATUS.ROLLBACK) {
     } finally {
@@ -1711,6 +1710,7 @@ public class ObjectStore implements RawStore, Configurable {
       for (MTable table : tables) {
         TableMeta metaData = new TableMeta(
             table.getDatabase().getName(), table.getTableName(), table.getTableType());
+        metaData.setCatName(catName);