You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/10/29 22:08:02 UTC

[2/4] hive git commit: HIVE-20707: Automatic partition management (Prasanth Jayachandran reviewed by Jason Dere)

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/rename_external_partition_location.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/rename_external_partition_location.q.out b/ql/src/test/results/clientpositive/rename_external_partition_location.q.out
index 02cd814..d854887 100644
--- a/ql/src/test/results/clientpositive/rename_external_partition_location.q.out
+++ b/ql/src/test/results/clientpositive/rename_external_partition_location.q.out
@@ -103,6 +103,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\"}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numPartitions       	1                   
 	numRows             	10                  
@@ -266,6 +267,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\"}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numPartitions       	1                   
 	numRows             	10                  

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
index b2bcd51..40b6ad7 100644
--- a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
+++ b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
@@ -345,6 +345,7 @@ LOCATION
 #### A masked pattern was here ####
 TBLPROPERTIES (
   'bucketing_version'='2', 
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: select * from ext_t_imported
 PREHOOK: type: QUERY
@@ -426,6 +427,7 @@ LOCATION
 TBLPROPERTIES (
   'EXTERNAL'='FALSE', 
   'bucketing_version'='2', 
+  'discover.partitions'='true', 
   'repl.last.id'='0', 
 #### A masked pattern was here ####
 PREHOOK: query: select * from ext_t_r_imported

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/show_create_table_alter.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/show_create_table_alter.q.out b/ql/src/test/results/clientpositive/show_create_table_alter.q.out
index 2c75c36..9d93ee9 100644
--- a/ql/src/test/results/clientpositive/show_create_table_alter.q.out
+++ b/ql/src/test/results/clientpositive/show_create_table_alter.q.out
@@ -32,6 +32,7 @@ LOCATION
 #### A masked pattern was here ####
 TBLPROPERTIES (
   'bucketing_version'='2', 
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('comment'='temporary table', 'EXTERNAL'='FALSE')
 PREHOOK: type: ALTERTABLE_PROPERTIES
@@ -67,6 +68,7 @@ LOCATION
 TBLPROPERTIES (
   'EXTERNAL'='FALSE', 
   'bucketing_version'='2', 
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('comment'='changed comment', 'EXTERNAL'='TRUE')
 PREHOOK: type: ALTERTABLE_PROPERTIES
@@ -101,6 +103,7 @@ LOCATION
 #### A masked pattern was here ####
 TBLPROPERTIES (
   'bucketing_version'='2', 
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('SORTBUCKETCOLSPREFIX'='FALSE')
 PREHOOK: type: ALTERTABLE_PROPERTIES
@@ -135,6 +138,7 @@ LOCATION
 #### A masked pattern was here ####
 TBLPROPERTIES (
   'bucketing_version'='2', 
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: ALTER TABLE tmp_showcrt1_n1 SET TBLPROPERTIES ('storage_handler'='org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler')
 PREHOOK: type: ALTERTABLE_PROPERTIES
@@ -169,6 +173,7 @@ LOCATION
 #### A masked pattern was here ####
 TBLPROPERTIES (
   'bucketing_version'='2', 
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: DROP TABLE tmp_showcrt1_n1
 PREHOOK: type: DROPTABLE

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out b/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out
index e554a18..8a56bfc 100644
--- a/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out
+++ b/ql/src/test/results/clientpositive/show_create_table_partitioned.q.out
@@ -32,6 +32,7 @@ LOCATION
 #### A masked pattern was here ####
 TBLPROPERTIES (
   'bucketing_version'='2', 
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: DROP TABLE tmp_showcrt1_n2
 PREHOOK: type: DROPTABLE

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/show_create_table_serde.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/show_create_table_serde.q.out b/ql/src/test/results/clientpositive/show_create_table_serde.q.out
index 8b95c9b..a66c09a 100644
--- a/ql/src/test/results/clientpositive/show_create_table_serde.q.out
+++ b/ql/src/test/results/clientpositive/show_create_table_serde.q.out
@@ -174,6 +174,7 @@ LOCATION
 #### A masked pattern was here ####
 TBLPROPERTIES (
   'bucketing_version'='2', 
+  'discover.partitions'='true', 
 #### A masked pattern was here ####
 PREHOOK: query: DROP TABLE tmp_showcrt1_n0
 PREHOOK: type: DROPTABLE

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out b/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out
index 2d713a8..74f8b5a 100644
--- a/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out
+++ b/ql/src/test/results/clientpositive/spark/stats_noscan_2.q.out
@@ -49,6 +49,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\"}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	totalSize           	11                  
 #### A masked pattern was here ####
@@ -90,6 +91,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\"}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numRows             	6                   
 	rawDataSize         	6                   

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/stats_noscan_2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/stats_noscan_2.q.out b/ql/src/test/results/clientpositive/stats_noscan_2.q.out
index 182820f..6625219 100644
--- a/ql/src/test/results/clientpositive/stats_noscan_2.q.out
+++ b/ql/src/test/results/clientpositive/stats_noscan_2.q.out
@@ -49,6 +49,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\"}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	totalSize           	11                  
 #### A masked pattern was here ####
@@ -90,6 +91,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\"}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numRows             	6                   
 	rawDataSize         	6                   

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out b/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out
index 2a442b4..065cd98 100644
--- a/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out
+++ b/ql/src/test/results/clientpositive/temp_table_display_colstats_tbllvl.q.out
@@ -61,6 +61,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"adrevenue\":\"true\",\"avgtimeonsite\":\"true\",\"ccode\":\"true\",\"desturl\":\"true\",\"lcode\":\"true\",\"skeyword\":\"true\",\"sourceip\":\"true\",\"useragent\":\"true\",\"visitdate\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	0                   
 	numRows             	0                   
 	rawDataSize         	0                   
@@ -111,6 +112,7 @@ Table Type:         	EXTERNAL_TABLE
 Table Parameters:	 	 
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numRows             	0                   
 	rawDataSize         	0                   
@@ -267,6 +269,7 @@ STAGE PLANS:
               columns sourceip,desturl,visitdate,adrevenue,useragent,ccode,lcode,skeyword,avgtimeonsite
               columns.comments 
               columns.types string:string:string:float:string:string:string:string:int
+              discover.partitions true
               field.delim |
 #### A masked pattern was here ####
               name default.uservisits_web_text_none
@@ -289,6 +292,7 @@ STAGE PLANS:
                 columns sourceip,desturl,visitdate,adrevenue,useragent,ccode,lcode,skeyword,avgtimeonsite
                 columns.comments 
                 columns.types string:string:string:float:string:string:string:string:int
+                discover.partitions true
                 field.delim |
 #### A masked pattern was here ####
                 name default.uservisits_web_text_none
@@ -381,6 +385,7 @@ Table Parameters:
 	COLUMN_STATS_ACCURATE	{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"adRevenue\":\"true\",\"avgTimeOnSite\":\"true\",\"sourceIP\":\"true\"}}
 	EXTERNAL            	TRUE                
 	bucketing_version   	2                   
+	discover.partitions 	true                
 	numFiles            	1                   
 	numRows             	55                  
 	rawDataSize         	7005                

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java
new file mode 100644
index 0000000..5287f47
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/CheckResult.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Result class used by the HiveMetaStoreChecker.
+ */
+public class CheckResult {
+
+  // tree sets to preserve ordering in qfile tests
+  private Set<String> tablesNotOnFs = new TreeSet<String>();
+  private Set<String> tablesNotInMs = new TreeSet<String>();
+  private Set<PartitionResult> partitionsNotOnFs = new TreeSet<PartitionResult>();
+  private Set<PartitionResult> partitionsNotInMs = new TreeSet<PartitionResult>();
+  private Set<PartitionResult> expiredPartitions = new TreeSet<>();
+
+  /**
+   * @return a list of tables not found on the filesystem.
+   */
+  public Set<String> getTablesNotOnFs() {
+    return tablesNotOnFs;
+  }
+
+  /**
+   * @param tablesNotOnFs
+   *          a list of tables not found on the filesystem.
+   */
+  public void setTablesNotOnFs(Set<String> tablesNotOnFs) {
+    this.tablesNotOnFs = tablesNotOnFs;
+  }
+
+  /**
+   * @return a list of tables not found in the metastore.
+   */
+  public Set<String> getTablesNotInMs() {
+    return tablesNotInMs;
+  }
+
+  /**
+   * @param tablesNotInMs
+   *          a list of tables not found in the metastore.
+   */
+  public void setTablesNotInMs(Set<String> tablesNotInMs) {
+    this.tablesNotInMs = tablesNotInMs;
+  }
+
+  /**
+   * @return a list of partitions not found on the fs
+   */
+  public Set<PartitionResult> getPartitionsNotOnFs() {
+    return partitionsNotOnFs;
+  }
+
+  /**
+   * @param partitionsNotOnFs
+   *          a list of partitions not found on the fs
+   */
+  public void setPartitionsNotOnFs(Set<PartitionResult> partitionsNotOnFs) {
+    this.partitionsNotOnFs = partitionsNotOnFs;
+  }
+
+  /**
+   * @return a list of partitions not found in the metastore
+   */
+  public Set<PartitionResult> getPartitionsNotInMs() {
+    return partitionsNotInMs;
+  }
+
+  /**
+   * @param partitionsNotInMs
+   *          a list of partitions not found in the metastore
+   */
+  public void setPartitionsNotInMs(Set<PartitionResult> partitionsNotInMs) {
+    this.partitionsNotInMs = partitionsNotInMs;
+  }
+
+  public Set<PartitionResult> getExpiredPartitions() {
+    return expiredPartitions;
+  }
+
+  public void setExpiredPartitions(
+    final Set<PartitionResult> expiredPartitions) {
+    this.expiredPartitions = expiredPartitions;
+  }
+
+  /**
+   * A basic description of a partition that is missing from either the fs or
+   * the ms.
+   */
+  public static class PartitionResult implements Comparable<PartitionResult> {
+    private String partitionName;
+    private String tableName;
+
+    /**
+     * @return name of partition
+     */
+    public String getPartitionName() {
+      return partitionName;
+    }
+
+    /**
+     * @param partitionName
+     *          name of partition
+     */
+    public void setPartitionName(String partitionName) {
+      this.partitionName = partitionName;
+    }
+
+    /**
+     * @return table name
+     */
+    public String getTableName() {
+      return tableName;
+    }
+
+    /**
+     * @param tableName
+     *          table name
+     */
+    public void setTableName(String tableName) {
+      this.tableName = tableName;
+    }
+
+    @Override
+    public String toString() {
+      return tableName + ":" + partitionName;
+    }
+
+    public int compareTo(PartitionResult o) {
+      int ret = tableName.compareTo(o.tableName);
+      return ret != 0 ? ret : partitionName.compareTo(o.partitionName);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
index 294dfb7..ecd5996 100755
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -409,7 +409,7 @@ public class Warehouse {
     }
   }
 
-  private static String escapePathName(String path) {
+  public static String escapePathName(String path) {
     return FileUtils.escapePathName(path);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/MetastoreException.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/MetastoreException.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/MetastoreException.java
new file mode 100644
index 0000000..ab89389
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/api/MetastoreException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.api;
+
+public class MetastoreException extends Exception {
+  public MetastoreException() {
+    super();
+  }
+
+  public MetastoreException(String message) {
+    super(message);
+  }
+
+  public MetastoreException(Throwable cause) {
+    super(cause);
+  }
+
+  public MetastoreException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 1d64cce..f3a78bf 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -74,6 +74,8 @@ public class MetastoreConf {
   @VisibleForTesting
   static final String RUNTIME_STATS_CLEANER_TASK_CLASS =
       "org.apache.hadoop.hive.metastore.RuntimeStatsCleanerTask";
+  static final String PARTITION_MANAGEMENT_TASK_CLASS =
+    "org.apache.hadoop.hive.metastore.PartitionManagementTask";
   @VisibleForTesting
   static final String EVENT_CLEANER_TASK_CLASS =
       "org.apache.hadoop.hive.metastore.events.EventCleanerTask";
@@ -651,6 +653,58 @@ public class MetastoreConf {
     METRICS_REPORTERS("metastore.metrics.reporters", "metastore.metrics.reporters", "json,jmx",
         new StringSetValidator("json", "jmx", "console", "hadoop"),
         "A comma separated list of metrics reporters to start"),
+    MSCK_PATH_VALIDATION("msck.path.validation", "hive.msck.path.validation", "throw",
+      new StringSetValidator("throw", "skip", "ignore"), "The approach msck should take with HDFS " +
+      "directories that are partition-like but contain unsupported characters. 'throw' (an " +
+      "exception) is the default; 'skip' will skip the invalid directories and still repair the" +
+      " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many cases)"),
+    MSCK_REPAIR_BATCH_SIZE("msck.repair.batch.size",
+      "hive.msck.repair.batch.size", 3000,
+      "Batch size for the msck repair command. If the value is greater than zero,\n "
+        + "it will execute batch wise with the configured batch size. In case of errors while\n"
+        + "adding unknown partitions the batch size is automatically reduced by half in the subsequent\n"
+        + "retry attempt. The default value is 3000 which means it will execute in the batches of 3000."),
+    MSCK_REPAIR_BATCH_MAX_RETRIES("msck.repair.batch.max.retries", "hive.msck.repair.batch.max.retries", 4,
+      "Maximum number of retries for the msck repair command when adding unknown partitions.\n "
+        + "If the value is greater than zero it will retry adding unknown partitions until the maximum\n"
+        + "number of attempts is reached or batch size is reduced to 0, whichever is earlier.\n"
+        + "In each retry attempt it will reduce the batch size by a factor of 2 until it reaches zero.\n"
+        + "If the value is set to zero it will retry until the batch size becomes zero as described above."),
+    MSCK_REPAIR_ENABLE_PARTITION_RETENTION("msck.repair.enable.partition.retention",
+      "msck.repair.enable.partition.retention", false,
+      "If 'partition.retention.period' table property is set, this flag determines whether MSCK REPAIR\n" +
+      "command should handle partition retention. If enabled, and if a specific partition's age exceeded\n" +
+      "retention period the partition will be dropped along with data"),
+
+
+    // Partition management task params
+    PARTITION_MANAGEMENT_TASK_FREQUENCY("metastore.partition.management.task.frequency",
+      "metastore.partition.management.task.frequency",
+      300, TimeUnit.SECONDS, "Frequency at which timer task runs to do automatic partition management for tables\n" +
+      "with table property 'discover.partitions'='true'. Partition management include 2 pieces. One is partition\n" +
+      "discovery and other is partition retention period. When 'discover.partitions'='true' is set, partition\n" +
+      "management will look for partitions in table location and add partitions objects for it in metastore.\n" +
+      "Similarly if partition object exists in metastore and partition location does not exist, partition object\n" +
+      "will be dropped. The second piece in partition management is retention period. When 'discover.partition'\n" +
+      "is set to true and if 'partition.retention.period' table property is defined, partitions that are older\n" +
+      "than the specified retention period will be automatically dropped from metastore along with the data."),
+    PARTITION_MANAGEMENT_TABLE_TYPES("metastore.partition.management.table.types",
+      "metastore.partition.management.table.types", "MANAGED_TABLE,EXTERNAL_TABLE",
+      "Comma separated list of table types to use for partition management"),
+    PARTITION_MANAGEMENT_TASK_THREAD_POOL_SIZE("metastore.partition.management.task.thread.pool.size",
+      "metastore.partition.management.task.thread.pool.size", 5,
+      "Partition management uses thread pool on to which tasks are submitted for discovering and retaining the\n" +
+      "partitions. This determines the size of the thread pool."),
+    PARTITION_MANAGEMENT_CATALOG_NAME("metastore.partition.management.catalog.name",
+      "metastore.partition.management.catalog.name", "hive",
+      "Automatic partition management will look for tables under the specified catalog name"),
+    PARTITION_MANAGEMENT_DATABASE_PATTERN("metastore.partition.management.database.pattern",
+      "metastore.partition.management.database.pattern", "*",
+      "Automatic partition management will look for tables using the specified database pattern"),
+    PARTITION_MANAGEMENT_TABLE_PATTERN("metastore.partition.management.table.pattern",
+      "metastore.partition.management.table.pattern", "*",
+      "Automatic partition management will look for tables using the specified table pattern"),
+
     MULTITHREADED("javax.jdo.option.Multithreaded", "javax.jdo.option.Multithreaded", true,
         "Set this to true if multiple threads access metastore through JDO concurrently."),
     MAX_OPEN_TXNS("metastore.max.open.txns", "hive.max.open.txns", 100000,
@@ -799,7 +853,7 @@ public class MetastoreConf {
     TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always",
         EVENT_CLEANER_TASK_CLASS + "," + RUNTIME_STATS_CLEANER_TASK_CLASS + "," +
         "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
-            "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask",
+          "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask",
         "Comma separated list of tasks that will be started in separate threads.  These will " +
             "always be started, regardless of whether the metastore is running in embedded mode " +
             "or in server mode.  They must implement " + METASTORE_TASK_THREAD_CLASS),
@@ -808,7 +862,8 @@ public class MetastoreConf {
             ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS + "," +
             ACID_COMPACTION_HISTORY_SERVICE_CLASS + "," +
             ACID_WRITE_SET_SERVICE_CLASS + "," +
-            MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS,
+            MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS + "," +
+            PARTITION_MANAGEMENT_TASK_CLASS,
         "Command separated list of tasks that will be started in separate threads.  These will be" +
             " started only when the metastore is running as a separate service.  They must " +
             "implement " + METASTORE_TASK_THREAD_CLASS),

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 8fb1fa7..1d89e12 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -17,37 +17,6 @@
  */
 package org.apache.hadoop.hive.metastore.utils;
 
-import org.apache.commons.beanutils.PropertyUtils;
-import org.apache.hadoop.hive.metastore.api.PartitionSpec;
-import org.apache.hadoop.hive.metastore.api.PartitionSpecWithSharedSD;
-import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD;
-import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy;
-
-import com.google.common.base.Joiner;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.metastore.ColumnType;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
 import java.beans.PropertyDescriptor;
 import java.io.File;
 import java.net.URL;
@@ -69,6 +38,30 @@ import java.util.stream.Collectors;
 
 import static java.util.regex.Pattern.compile;
 
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
 public class MetaStoreUtils {
   /** A fixed date format to be used for hive partition column values. */
   public static final ThreadLocal<DateFormat> PARTITION_DATE_FORMAT =

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
new file mode 100644
index 0000000..2df45f6
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getAllPartitionsOf;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getDataLocation;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartColNames;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartCols;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartition;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionName;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionSpec;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPath;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isPartitioned;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Verify that the information in the metastore matches what is on the
+ * filesystem. Return a CheckResult object containing lists of missing and any
+ * unexpected tables and partitions.
+ */
+public class HiveMetaStoreChecker {
+
+  public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreChecker.class);
+
+  private final IMetaStoreClient msc;
+  private final Configuration conf;
+  private final long partitionExpirySeconds;
+  private final Interner<Path> pathInterner = Interners.newStrongInterner();
+
+  public HiveMetaStoreChecker(IMetaStoreClient msc, Configuration conf) {
+    this(msc, conf, -1);
+  }
+
+  public HiveMetaStoreChecker(IMetaStoreClient msc, Configuration conf, long partitionExpirySeconds) {
+    super();
+    this.msc = msc;
+    this.conf = conf;
+    this.partitionExpirySeconds = partitionExpirySeconds;
+  }
+
+  public IMetaStoreClient getMsc() {
+    return msc;
+  }
+
+  /**
+   * Check the metastore for inconsistencies, data missing in either the
+   * metastore or on the dfs.
+   *
+   * @param catName
+   *          name of the catalog, if not specified default catalog will be used.
+   * @param dbName
+   *          name of the database, if not specified the default will be used.
+   * @param tableName
+   *          Table we want to run the check for. If null we'll check all the
+   *          tables in the database.
+   * @param partitions
+   *          List of partition name value pairs, if null or empty check all
+   *          partitions
+   * @param result
+   *          Fill this with the results of the check
+   * @throws MetastoreException
+   *           Failed to get required information from the metastore.
+   * @throws IOException
+   *           Most likely filesystem related
+   */
+  public void checkMetastore(String catName, String dbName, String tableName,
+      List<? extends Map<String, String>> partitions, CheckResult result)
+      throws MetastoreException, IOException {
+
+    if (dbName == null || "".equalsIgnoreCase(dbName)) {
+      dbName = Warehouse.DEFAULT_DATABASE_NAME;
+    }
+
+    try {
+      if (tableName == null || "".equals(tableName)) {
+        // no table specified, check all tables and all partitions.
+        List<String> tables = getMsc().getTables(catName, dbName, ".*");
+        for (String currentTableName : tables) {
+          checkTable(catName, dbName, currentTableName, null, result);
+        }
+
+        findUnknownTables(catName, dbName, tables, result);
+      } else if (partitions == null || partitions.isEmpty()) {
+        // only one table, let's check all partitions
+        checkTable(catName, dbName, tableName, null, result);
+      } else {
+        // check the specified partitions
+        checkTable(catName, dbName, tableName, partitions, result);
+      }
+      LOG.info("Number of partitionsNotInMs=" + result.getPartitionsNotInMs()
+              + ", partitionsNotOnFs=" + result.getPartitionsNotOnFs()
+              + ", tablesNotInMs=" + result.getTablesNotInMs()
+              + ", tablesNotOnFs=" + result.getTablesNotOnFs()
+              + ", expiredPartitions=" + result.getExpiredPartitions());
+    } catch (TException e) {
+      throw new MetastoreException(e);
+    }
+  }
+
+  /**
+   * Check for table directories that aren't in the metastore.
+   *
+   * @param catName
+   *          name of the catalog, if not specified default catalog will be used.
+   * @param dbName
+   *          Name of the database
+   * @param tables
+   *          List of table names
+   * @param result
+   *          Add any found tables to this
+   * @throws IOException
+   *           Most likely filesystem related
+   * @throws MetaException
+   *           Failed to get required information from the metastore.
+   * @throws NoSuchObjectException
+   *           Failed to get required information from the metastore.
+   * @throws TException
+   *           Thrift communication error.
+   */
+  void findUnknownTables(String catName, String dbName, List<String> tables, CheckResult result)
+      throws IOException, MetaException, TException {
+
+    Set<Path> dbPaths = new HashSet<Path>();
+    Set<String> tableNames = new HashSet<String>(tables);
+
+    for (String tableName : tables) {
+      Table table = getMsc().getTable(catName, dbName, tableName);
+      // hack, instead figure out a way to get the db paths
+      String isExternal = table.getParameters().get("EXTERNAL");
+      if (!"TRUE".equalsIgnoreCase(isExternal)) {
+        Path tablePath = getPath(table);
+        if (tablePath != null) {
+          dbPaths.add(tablePath.getParent());
+        }
+      }
+    }
+
+    for (Path dbPath : dbPaths) {
+      FileSystem fs = dbPath.getFileSystem(conf);
+      FileStatus[] statuses = fs.listStatus(dbPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
+      for (FileStatus status : statuses) {
+
+        if (status.isDir() && !tableNames.contains(status.getPath().getName())) {
+
+          result.getTablesNotInMs().add(status.getPath().getName());
+        }
+      }
+    }
+  }
+
+  /**
+   * Check the metastore for inconsistencies, data missing in either the
+   * metastore or on the dfs.
+   *
+   * @param catName
+   *          name of the catalog, if not specified default catalog will be used.
+   * @param dbName
+   *          Name of the database
+   * @param tableName
+   *          Name of the table
+   * @param partitions
+   *          Partitions to check, if null or empty get all the partitions.
+   * @param result
+   *          Result object
+   * @throws MetastoreException
+   *           Failed to get required information from the metastore.
+   * @throws IOException
+   *           Most likely filesystem related
+   * @throws MetaException
+   *           Failed to get required information from the metastore.
+   */
+  void checkTable(String catName, String dbName, String tableName,
+      List<? extends Map<String, String>> partitions, CheckResult result)
+      throws MetaException, IOException, MetastoreException {
+
+    Table table;
+
+    try {
+      table = getMsc().getTable(catName, dbName, tableName);
+    } catch (TException e) {
+      result.getTablesNotInMs().add(tableName);
+      return;
+    }
+
+    PartitionIterable parts;
+    boolean findUnknownPartitions = true;
+
+    if (isPartitioned(table)) {
+      if (partitions == null || partitions.isEmpty()) {
+        int batchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
+        if (batchSize > 0) {
+          parts = new PartitionIterable(getMsc(), table, batchSize);
+        } else {
+          List<Partition> loadedPartitions = getAllPartitionsOf(getMsc(), table);
+          parts = new PartitionIterable(loadedPartitions);
+        }
+      } else {
+        // we're interested in specific partitions,
+        // don't check for any others
+        findUnknownPartitions = false;
+        List<Partition> loadedPartitions = new ArrayList<>();
+        for (Map<String, String> map : partitions) {
+          Partition part = getPartition(getMsc(), table, map);
+          if (part == null) {
+            CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+            pr.setTableName(tableName);
+            pr.setPartitionName(Warehouse.makePartPath(map));
+            result.getPartitionsNotInMs().add(pr);
+          } else {
+            loadedPartitions.add(part);
+          }
+        }
+        parts = new PartitionIterable(loadedPartitions);
+      }
+    } else {
+      parts = new PartitionIterable(Collections.<Partition>emptyList());
+    }
+
+    checkTable(table, parts, findUnknownPartitions, result);
+  }
+
+  /**
+   * Check the metastore for inconsistencies, data missing in either the
+   * metastore or on the dfs.
+   *
+   * @param table
+   *          Table to check
+   * @param parts
+   *          Partitions to check
+   * @param result
+   *          Result object
+   * @param findUnknownPartitions
+   *          Should we try to find unknown partitions?
+   * @throws IOException
+   *           Could not get information from filesystem
+   * @throws MetastoreException
+   *           Could not create Partition object
+   */
+  void checkTable(Table table, PartitionIterable parts,
+      boolean findUnknownPartitions, CheckResult result) throws IOException,
+    MetastoreException {
+
+    Path tablePath = getPath(table);
+    if (tablePath == null) {
+      return;
+    }
+    FileSystem fs = tablePath.getFileSystem(conf);
+    if (!fs.exists(tablePath)) {
+      result.getTablesNotOnFs().add(table.getTableName());
+      return;
+    }
+
+    Set<Path> partPaths = new HashSet<Path>();
+
+    // check that the partition folders exist on disk
+    for (Partition partition : parts) {
+      if (partition == null) {
+        // most likely the user specified an invalid partition
+        continue;
+      }
+      Path partPath = getDataLocation(table, partition);
+      if (partPath == null) {
+        continue;
+      }
+      fs = partPath.getFileSystem(conf);
+      if (!fs.exists(partPath)) {
+        CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+        pr.setPartitionName(getPartitionName(table, partition));
+        pr.setTableName(partition.getTableName());
+        result.getPartitionsNotOnFs().add(pr);
+      }
+
+      if (partitionExpirySeconds > 0) {
+        long currentEpochSecs = Instant.now().getEpochSecond();
+        long createdTime = partition.getCreateTime();
+        long partitionAgeSeconds = currentEpochSecs - createdTime;
+        if (partitionAgeSeconds > partitionExpirySeconds) {
+          CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+          pr.setPartitionName(getPartitionName(table, partition));
+          pr.setTableName(partition.getTableName());
+          result.getExpiredPartitions().add(pr);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
+              partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
+              partitionAgeSeconds, partitionExpirySeconds);
+          }
+        }
+      }
+
+      for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
+        Path qualifiedPath = partPath.makeQualified(fs);
+        pathInterner.intern(qualifiedPath);
+        partPaths.add(qualifiedPath);
+        partPath = partPath.getParent();
+      }
+    }
+
+    if (findUnknownPartitions) {
+      findUnknownPartitions(table, partPaths, result);
+    }
+  }
+
+  /**
+   * Find partitions on the fs that are unknown to the metastore.
+   *
+   * @param table
+   *          Table where the partitions would be located
+   * @param partPaths
+   *          Paths of the partitions the ms knows about
+   * @param result
+   *          Result object
+   * @throws IOException
+   *           Thrown if we fail at fetching listings from the fs.
+   * @throws MetastoreException
+   */
+  void findUnknownPartitions(Table table, Set<Path> partPaths,
+      CheckResult result) throws IOException, MetastoreException {
+
+    Path tablePath = getPath(table);
+    if (tablePath == null) {
+      return;
+    }
+    // now check the table folder and see if we find anything
+    // that isn't in the metastore
+    Set<Path> allPartDirs = new HashSet<Path>();
+    checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(getPartColNames(table)));
+    // don't want the table dir
+    allPartDirs.remove(tablePath);
+
+    // remove the partition paths we know about
+    allPartDirs.removeAll(partPaths);
+
+    Set<String> partColNames = Sets.newHashSet();
+    for(FieldSchema fSchema : getPartCols(table)) {
+      partColNames.add(fSchema.getName());
+    }
+
+    // we should now only have the unexpected folders left
+    for (Path partPath : allPartDirs) {
+      FileSystem fs = partPath.getFileSystem(conf);
+      String partitionName = getPartitionName(fs.makeQualified(tablePath),
+          partPath, partColNames);
+      LOG.debug("PartitionName: " + partitionName);
+
+      if (partitionName != null) {
+        CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
+        pr.setPartitionName(partitionName);
+        pr.setTableName(table.getTableName());
+
+        result.getPartitionsNotInMs().add(pr);
+      }
+    }
+    LOG.debug("Number of partitions not in metastore : " + result.getPartitionsNotInMs().size());
+  }
+
+  /**
+   * Assume that depth is 2, i.e., partition columns are a and b
+   * tblPath/a=1  => throw exception
+   * tblPath/a=1/file => throw exception
+   * tblPath/a=1/b=2/file => return a=1/b=2
+   * tblPath/a=1/b=2/c=3 => return a=1/b=2
+   * tblPath/a=1/b=2/c=3/file => return a=1/b=2
+   *
+   * @param basePath
+   *          Start directory
+   * @param allDirs
+   *          This set will contain the leaf paths at the end.
+   * @param partColNames
+   *          Partition column names
+   * @throws IOException
+   *           Thrown if we can't get lists from the fs.
+   * @throws MetastoreException
+   */
+
+  private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final List<String> partColNames) throws IOException, MetastoreException {
+    // Here we just reuse the THREAD_COUNT configuration for
+    // METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance
+    // The number of missing partitions discovered are later added by metastore using a
+    // threadpool of size METASTORE_FS_HANDLER_THREADS_COUNT. If we have different sized
+    // pool here the smaller sized pool of the two becomes a bottleneck
+    int poolSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.FS_HANDLER_THREADS_COUNT);
+
+    ExecutorService executor;
+    if (poolSize <= 1) {
+      LOG.debug("Using single-threaded version of MSCK-GetPaths");
+      executor = MoreExecutors.newDirectExecutorService();
+    } else {
+      LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " + poolSize);
+      ThreadFactory threadFactory =
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build();
+      executor = Executors.newFixedThreadPool(poolSize, threadFactory);
+    }
+    checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames);
+
+    executor.shutdown();
+  }
+
+  private final class PathDepthInfoCallable implements Callable<Path> {
+    private final List<String> partColNames;
+    private final FileSystem fs;
+    private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
+    private final boolean throwException;
+    private final PathDepthInfo pd;
+
+    private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, FileSystem fs,
+        ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
+      this.partColNames = partColNames;
+      this.pd = pd;
+      this.fs = fs;
+      this.pendingPaths = basePaths;
+      this.throwException = "throw".equals(MetastoreConf.getVar(conf, MetastoreConf.ConfVars.MSCK_PATH_VALIDATION));
+    }
+
+    @Override
+    public Path call() throws Exception {
+      return processPathDepthInfo(pd);
+    }
+
+    private Path processPathDepthInfo(final PathDepthInfo pd)
+        throws IOException, MetastoreException {
+      final Path currentPath = pd.p;
+      final int currentDepth = pd.depth;
+      FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
+      // found no files under a sub-directory under table base path; it is possible that the table
+      // is empty and hence there are no partition sub-directories created under base path
+      if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < partColNames.size()) {
+        // since maxDepth is not yet reached, we are missing partition
+        // columns in currentPath
+        logOrThrowExceptionWithMsg(
+            "MSCK is missing partition columns under " + currentPath.toString());
+      } else {
+        // found files under currentPath add them to the queue if it is a directory
+        for (FileStatus fileStatus : fileStatuses) {
+          if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) {
+            // found a file at depth which is less than number of partition keys
+            logOrThrowExceptionWithMsg(
+                "MSCK finds a file rather than a directory when it searches for "
+                    + fileStatus.getPath().toString());
+          } else if (fileStatus.isDirectory() && currentDepth < partColNames.size()) {
+            // found a sub-directory at a depth less than number of partition keys
+            // validate if the partition directory name matches with the corresponding
+            // partition colName at currentDepth
+            Path nextPath = fileStatus.getPath();
+            String[] parts = nextPath.getName().split("=");
+            if (parts.length != 2) {
+              logOrThrowExceptionWithMsg("Invalid partition name " + nextPath);
+            } else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) {
+              logOrThrowExceptionWithMsg(
+                  "Unexpected partition key " + parts[0] + " found at " + nextPath);
+            } else {
+              // add sub-directory to the work queue if maxDepth is not yet reached
+              pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1));
+            }
+          }
+        }
+        if (currentDepth == partColNames.size()) {
+          return currentPath;
+        }
+      }
+      return null;
+    }
+
+    private void logOrThrowExceptionWithMsg(String msg) throws MetastoreException {
+      if(throwException) {
+        throw new MetastoreException(msg);
+      } else {
+        LOG.warn(msg);
+      }
+    }
+  }
+
+  private static class PathDepthInfo {
+    private final Path p;
+    private final int depth;
+    PathDepthInfo(Path p, int depth) {
+      this.p = p;
+      this.depth = depth;
+    }
+  }
+
+  private void checkPartitionDirs(final ExecutorService executor,
+      final Path basePath, final Set<Path> result,
+      final FileSystem fs, final List<String> partColNames) throws MetastoreException {
+    try {
+      Queue<Future<Path>> futures = new LinkedList<Future<Path>>();
+      ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>();
+      nextLevel.add(new PathDepthInfo(basePath, 0));
+      //Uses level parallel implementation of a bfs. Recursive DFS implementations
+      //have a issue where the number of threads can run out if the number of
+      //nested sub-directories is more than the pool size.
+      //Using a two queue implementation is simpler than one queue since then we will
+      //have to add the complex mechanisms to let the free worker threads know when new levels are
+      //discovered using notify()/wait() mechanisms which can potentially lead to bugs if
+      //not done right
+      while(!nextLevel.isEmpty()) {
+        ConcurrentLinkedQueue<PathDepthInfo> tempQueue = new ConcurrentLinkedQueue<>();
+        //process each level in parallel
+        while(!nextLevel.isEmpty()) {
+          futures.add(
+              executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue)));
+        }
+        while(!futures.isEmpty()) {
+          Path p = futures.poll().get();
+          if (p != null) {
+            result.add(p);
+          }
+        }
+        //update the nextlevel with newly discovered sub-directories from the above
+        nextLevel = tempQueue;
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error(e.getMessage());
+      executor.shutdownNow();
+      throw new MetastoreException(e.getCause());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
new file mode 100644
index 0000000..b7ae1d8
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/Msck.java
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetastoreException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.ObjectPair;
+import org.apache.hadoop.hive.metastore.utils.RetryUtilities;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+/**
+ * Msck repairs table metadata specifically related to partition information to be in-sync with directories in table
+ * location.
+ */
+public class Msck {
+  public static final Logger LOG = LoggerFactory.getLogger(Msck.class);
+  public static final int separator = 9; // tabCode
+  private static final int terminator = 10; // newLineCode
+  private boolean acquireLock;
+  private boolean deleteData;
+
+  private Configuration conf;
+  private IMetaStoreClient msc;
+
+  public Msck(boolean acquireLock, boolean deleteData) {
+    this.acquireLock = acquireLock;
+    this.deleteData = deleteData;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  public void init(Configuration conf) throws MetaException {
+    if (msc == null) {
+      // the only reason we are using new conf here is to override EXPRESSION_PROXY_CLASS
+      Configuration metastoreConf = MetastoreConf.newMetastoreConf(new Configuration(conf));
+      metastoreConf.set(MetastoreConf.ConfVars.EXPRESSION_PROXY_CLASS.getVarname(),
+        MsckPartitionExpressionProxy.class.getCanonicalName());
+      setConf(metastoreConf);
+      this.msc = new HiveMetaStoreClient(metastoreConf);
+    }
+  }
+
+  /**
+   * MetastoreCheck, see if the data in the metastore matches what is on the
+   * dfs. Current version checks for tables and partitions that are either
+   * missing on disk on in the metastore.
+   *
+   * @param msckInfo Information about the tables and partitions we want to check for.
+   * @return Returns 0 when execution succeeds and above 0 if it fails.
+   */
+  public int repair(MsckInfo msckInfo) {
+    CheckResult result = new CheckResult();
+    List<String> repairOutput = new ArrayList<>();
+    String qualifiedTableName = null;
+    boolean success = false;
+    long txnId = -1;
+    int ret = 0;
+    try {
+      Table table = getMsc().getTable(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName());
+      if (getConf().getBoolean(MetastoreConf.ConfVars.MSCK_REPAIR_ENABLE_PARTITION_RETENTION.getHiveName(), false)) {
+        msckInfo.setPartitionExpirySeconds(PartitionManagementTask.getRetentionPeriodInSeconds(table));
+        LOG.info("Retention period ({}s) for partition is enabled for MSCK REPAIR..", msckInfo.getPartitionExpirySeconds());
+      }
+      HiveMetaStoreChecker checker = new HiveMetaStoreChecker(getMsc(), getConf(), msckInfo.getPartitionExpirySeconds());
+      // checkMetastore call will fill in result with partitions that are present in filesystem
+      // and missing in metastore - accessed through getPartitionsNotInMs
+      // And partitions that are not present in filesystem and metadata exists in metastore -
+      // accessed through getPartitionNotOnFS
+      checker.checkMetastore(msckInfo.getCatalogName(), msckInfo.getDbName(), msckInfo.getTableName(),
+        msckInfo.getPartSpecs(), result);
+      Set<CheckResult.PartitionResult> partsNotInMs = result.getPartitionsNotInMs();
+      Set<CheckResult.PartitionResult> partsNotInFs = result.getPartitionsNotOnFs();
+      Set<CheckResult.PartitionResult> expiredPartitions = result.getExpiredPartitions();
+      int totalPartsToFix = partsNotInMs.size() + partsNotInFs.size() + expiredPartitions.size();
+      // if nothing changed to partitions and if we are not repairing (add or drop) don't acquire for lock unnecessarily
+      boolean lockRequired = totalPartsToFix > 0 &&
+        msckInfo.isRepairPartitions() &&
+        (msckInfo.isAddPartitions() || msckInfo.isDropPartitions());
+      LOG.info("#partsNotInMs: {} #partsNotInFs: {} #expiredPartitions: {} lockRequired: {} (R: {} A: {} D: {})",
+        partsNotInMs.size(), partsNotInFs.size(), expiredPartitions.size(), lockRequired,
+        msckInfo.isRepairPartitions(), msckInfo.isAddPartitions(), msckInfo.isDropPartitions());
+
+      if (msckInfo.isRepairPartitions()) {
+        // Repair metadata in HMS
+        qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table);
+        long lockId;
+        if (acquireLock && lockRequired && table.getParameters() != null &&
+          MetaStoreServerUtils.isTransactionalTable(table.getParameters())) {
+          // Running MSCK from beeline/cli will make DDL task acquire X lock when repair is enabled, since we are directly
+          // invoking msck.repair() without SQL statement, we need to do the same and acquire X lock (repair is default)
+          LockRequest lockRequest = createLockRequest(msckInfo.getDbName(), msckInfo.getTableName());
+          txnId = lockRequest.getTxnid();
+          try {
+            LockResponse res = getMsc().lock(lockRequest);
+            if (res.getState() != LockState.ACQUIRED) {
+              throw new MetastoreException("Unable to acquire lock(X) on " + qualifiedTableName);
+            }
+            lockId = res.getLockid();
+          } catch (TException e) {
+            throw new MetastoreException("Unable to acquire lock(X) on " + qualifiedTableName, e);
+          }
+          LOG.info("Acquired lock(X) on {}. LockId: {}", qualifiedTableName, lockId);
+        }
+        int maxRetries = MetastoreConf.getIntVar(getConf(), MetastoreConf.ConfVars.MSCK_REPAIR_BATCH_MAX_RETRIES);
+        int decayingFactor = 2;
+
+        if (msckInfo.isAddPartitions() && !partsNotInMs.isEmpty()) {
+          // MSCK called to add missing paritions into metastore and there are
+          // missing partitions.
+
+          int batchSize = MetastoreConf.getIntVar(getConf(), MetastoreConf.ConfVars.MSCK_REPAIR_BATCH_SIZE);
+          if (batchSize == 0) {
+            //batching is not enabled. Try to add all the partitions in one call
+            batchSize = partsNotInMs.size();
+          }
+
+          AbstractList<String> vals = null;
+          String settingStr = MetastoreConf.getVar(getConf(), MetastoreConf.ConfVars.MSCK_PATH_VALIDATION);
+          boolean doValidate = !("ignore".equals(settingStr));
+          boolean doSkip = doValidate && "skip".equals(settingStr);
+          // The default setting is "throw"; assume doValidate && !doSkip means throw.
+          if (doValidate) {
+            // Validate that we can add partition without escaping. Escaping was originally intended
+            // to avoid creating invalid HDFS paths; however, if we escape the HDFS path (that we
+            // deem invalid but HDFS actually supports - it is possible to create HDFS paths with
+            // unprintable characters like ASCII 7), metastore will create another directory instead
+            // of the one we are trying to "repair" here.
+            Iterator<CheckResult.PartitionResult> iter = partsNotInMs.iterator();
+            while (iter.hasNext()) {
+              CheckResult.PartitionResult part = iter.next();
+              try {
+                vals = Warehouse.makeValsFromName(part.getPartitionName(), vals);
+              } catch (MetaException ex) {
+                throw new MetastoreException(ex);
+              }
+              for (String val : vals) {
+                String escapedPath = FileUtils.escapePathName(val);
+                assert escapedPath != null;
+                if (escapedPath.equals(val)) {
+                  continue;
+                }
+                String errorMsg = "Repair: Cannot add partition " + msckInfo.getTableName() + ':' +
+                  part.getPartitionName() + " due to invalid characters in the name";
+                if (doSkip) {
+                  repairOutput.add(errorMsg);
+                  iter.remove();
+                } else {
+                  throw new MetastoreException(errorMsg);
+                }
+              }
+            }
+          }
+          try {
+            createPartitionsInBatches(getMsc(), repairOutput, partsNotInMs, table, batchSize,
+              decayingFactor, maxRetries);
+          } catch (Exception e) {
+            throw new MetastoreException(e);
+          }
+        }
+
+        if (msckInfo.isDropPartitions() && (!partsNotInFs.isEmpty() || !expiredPartitions.isEmpty())) {
+          // MSCK called to drop stale paritions from metastore and there are
+          // stale partitions.
+
+          int batchSize = MetastoreConf.getIntVar(getConf(), MetastoreConf.ConfVars.MSCK_REPAIR_BATCH_SIZE);
+          if (batchSize == 0) {
+            //batching is not enabled. Try to drop all the partitions in one call
+            batchSize = partsNotInFs.size() + expiredPartitions.size();
+          }
+
+          try {
+            dropPartitionsInBatches(getMsc(), repairOutput, partsNotInFs, expiredPartitions, table, batchSize,
+              decayingFactor, maxRetries);
+          } catch (Exception e) {
+            throw new MetastoreException(e);
+          }
+        }
+      }
+      success = true;
+    } catch (Exception e) {
+      LOG.warn("Failed to run metacheck: ", e);
+      success = false;
+      ret = 1;
+    } finally {
+      if (msckInfo.getResFile() != null) {
+        BufferedWriter resultOut = null;
+        try {
+          Path resFile = new Path(msckInfo.getResFile());
+          FileSystem fs = resFile.getFileSystem(getConf());
+          resultOut = new BufferedWriter(new OutputStreamWriter(fs.create(resFile)));
+
+          boolean firstWritten = false;
+          firstWritten |= writeMsckResult(result.getTablesNotInMs(),
+            "Tables not in metastore:", resultOut, firstWritten);
+          firstWritten |= writeMsckResult(result.getTablesNotOnFs(),
+            "Tables missing on filesystem:", resultOut, firstWritten);
+          firstWritten |= writeMsckResult(result.getPartitionsNotInMs(),
+            "Partitions not in metastore:", resultOut, firstWritten);
+          firstWritten |= writeMsckResult(result.getPartitionsNotOnFs(),
+            "Partitions missing from filesystem:", resultOut, firstWritten);
+          firstWritten |= writeMsckResult(result.getExpiredPartitions(),
+            "Expired partitions (retention period: " + msckInfo.getPartitionExpirySeconds() + "s) :", resultOut, firstWritten);
+          // sorting to stabilize qfile output (msck_repair_drop.q)
+          Collections.sort(repairOutput);
+          for (String rout : repairOutput) {
+            if (firstWritten) {
+              resultOut.write(terminator);
+            } else {
+              firstWritten = true;
+            }
+            resultOut.write(rout);
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to save metacheck output: ", e);
+          ret = 1;
+        } finally {
+          if (resultOut != null) {
+            try {
+              resultOut.close();
+            } catch (IOException e) {
+              LOG.warn("Failed to close output file: ", e);
+              ret = 1;
+            }
+          }
+        }
+      }
+
+      LOG.info("Tables not in metastore: {}", result.getTablesNotInMs());
+      LOG.info("Tables missing on filesystem: {}", result.getTablesNotOnFs());
+      LOG.info("Partitions not in metastore: {}", result.getPartitionsNotInMs());
+      LOG.info("Partitions missing from filesystem: {}", result.getPartitionsNotOnFs());
+      LOG.info("Expired partitions: {}", result.getExpiredPartitions());
+      if (acquireLock && txnId > 0) {
+          if (success) {
+            try {
+              LOG.info("txnId: {} succeeded. Committing..", txnId);
+              getMsc().commitTxn(txnId);
+            } catch (Exception e) {
+              LOG.warn("Error while committing txnId: {} for table: {}", txnId, qualifiedTableName, e);
+              ret = 1;
+            }
+          } else {
+            try {
+              LOG.info("txnId: {} failed. Aborting..", txnId);
+              getMsc().abortTxns(Lists.newArrayList(txnId));
+            } catch (Exception e) {
+              LOG.warn("Error while aborting txnId: {} for table: {}", txnId, qualifiedTableName, e);
+              ret = 1;
+            }
+          }
+      }
+      if (getMsc() != null) {
+        getMsc().close();
+        msc = null;
+      }
+    }
+
+    return ret;
+  }
+
+  private LockRequest createLockRequest(final String dbName, final String tableName) throws TException {
+    UserGroupInformation loggedInUser = null;
+    String username;
+    try {
+      loggedInUser = UserGroupInformation.getLoginUser();
+    } catch (IOException e) {
+      LOG.warn("Unable to get logged in user via UGI. err: {}", e.getMessage());
+    }
+    if (loggedInUser == null) {
+      username = System.getProperty("user.name");
+    } else {
+      username = loggedInUser.getShortUserName();
+    }
+    long txnId = getMsc().openTxn(username);
+    String agentInfo = Thread.currentThread().getName();
+    LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+    requestBuilder.setUser(username);
+    requestBuilder.setTransactionId(txnId);
+
+    LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+      .setDbName(dbName)
+      .setTableName(tableName)
+      .setIsTransactional(true)
+      .setExclusive()
+      // WriteType is DDL_EXCLUSIVE for MSCK REPAIR so we need NO_TXN. Refer AcidUtils.makeLockComponents
+      .setOperationType(DataOperationType.NO_TXN);
+    requestBuilder.addLockComponent(lockCompBuilder.build());
+
+    LOG.info("Created lock(X) request with info - user: {} txnId: {} agentInfo: {} dbName: {} tableName: {}",
+      username, txnId, agentInfo, dbName, tableName);
+    return requestBuilder.build();
+  }
+
+  public IMetaStoreClient getMsc() {
+    return msc;
+  }
+
+  @VisibleForTesting
+  public void createPartitionsInBatches(final IMetaStoreClient metastoreClient, List<String> repairOutput,
+    Set<CheckResult.PartitionResult> partsNotInMs, Table table, int batchSize, int decayingFactor, int maxRetries)
+    throws Exception {
+    String addMsgFormat = "Repair: Added partition to metastore "
+      + table.getTableName() + ":%s";
+    Set<CheckResult.PartitionResult> batchWork = new HashSet<>(partsNotInMs);
+    new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) {
+      @Override
+      public Void execute(int size) throws MetastoreException {
+        try {
+          while (!batchWork.isEmpty()) {
+            List<Partition> partsToAdd = new ArrayList<>();
+            //get the current batch size
+            int currentBatchSize = size;
+            //store the partitions temporarily until processed
+            List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize);
+            List<String> addMsgs = new ArrayList<>(currentBatchSize);
+            //add the number of partitions given by the current batchsize
+            for (CheckResult.PartitionResult part : batchWork) {
+              if (currentBatchSize == 0) {
+                break;
+              }
+              Path tablePath = MetaStoreServerUtils.getPath(table);
+              if (tablePath == null) {
+                continue;
+              }
+              Map<String, String> partSpec = Warehouse.makeSpecFromName(part.getPartitionName());
+              Path location = new Path(tablePath, Warehouse.makePartPath(partSpec));
+              Partition partition = MetaStoreServerUtils.createMetaPartitionObject(table, partSpec, location);
+              partition.setWriteId(table.getWriteId());
+              partsToAdd.add(partition);
+              lastBatch.add(part);
+              addMsgs.add(String.format(addMsgFormat, part.getPartitionName()));
+              currentBatchSize--;
+            }
+            metastoreClient.add_partitions(partsToAdd, true, false);
+            // if last batch is successful remove it from partsNotInMs
+            batchWork.removeAll(lastBatch);
+            repairOutput.addAll(addMsgs);
+          }
+          return null;
+        } catch (TException e) {
+          throw new MetastoreException(e);
+        }
+      }
+    }.run();
+  }
+
+  private static String makePartExpr(Map<String, String> spec)
+    throws MetaException {
+    StringBuilder suffixBuf = new StringBuilder();
+    int i = 0;
+    for (Map.Entry<String, String> e : spec.entrySet()) {
+      if (e.getValue() == null || e.getValue().length() == 0) {
+        throw new MetaException("Partition spec is incorrect. " + spec);
+      }
+      if (i > 0) {
+        suffixBuf.append(" AND ");
+      }
+      suffixBuf.append(Warehouse.escapePathName(e.getKey()));
+      suffixBuf.append('=');
+      suffixBuf.append("'").append(Warehouse.escapePathName(e.getValue())).append("'");
+      i++;
+    }
+    return suffixBuf.toString();
+  }
+
+  // Drops partitions in batches.  partNotInFs is split into batches based on batchSize
+  // and dropped.  The dropping will be through RetryUtilities which will retry when there is a
+  // failure after reducing the batchSize by decayingFactor.  Retrying will cease when maxRetries
+  // limit is reached or batchSize reduces to 0, whichever comes earlier.
+  @VisibleForTesting
+  public void dropPartitionsInBatches(final IMetaStoreClient metastoreClient, List<String> repairOutput,
+    Set<CheckResult.PartitionResult> partsNotInFs, Set<CheckResult.PartitionResult> expiredPartitions,
+    Table table, int batchSize, int decayingFactor, int maxRetries) throws Exception {
+    String dropMsgFormat =
+      "Repair: Dropped partition from metastore " + Warehouse.getCatalogQualifiedTableName(table) + ":%s";
+    // Copy of partitions that will be split into batches
+    Set<CheckResult.PartitionResult> batchWork = new HashSet<>(partsNotInFs);
+    if (expiredPartitions != null && !expiredPartitions.isEmpty()) {
+      batchWork.addAll(expiredPartitions);
+    }
+    PartitionDropOptions dropOptions = new PartitionDropOptions().deleteData(deleteData).ifExists(true);
+    new RetryUtilities.ExponentiallyDecayingBatchWork<Void>(batchSize, decayingFactor, maxRetries) {
+      @Override
+      public Void execute(int size) throws MetastoreException {
+        try {
+          while (!batchWork.isEmpty()) {
+            int currentBatchSize = size;
+
+            // to store the partitions that are currently being processed
+            List<CheckResult.PartitionResult> lastBatch = new ArrayList<>(currentBatchSize);
+
+            // drop messages for the dropped partitions
+            List<String> dropMsgs = new ArrayList<>(currentBatchSize);
+
+            // Partitions to be dropped
+            List<String> dropParts = new ArrayList<>(currentBatchSize);
+
+            for (CheckResult.PartitionResult part : batchWork) {
+              // This batch is full: break out of for loop to execute
+              if (currentBatchSize == 0) {
+                break;
+              }
+
+              dropParts.add(part.getPartitionName());
+
+              // Add the part to lastBatch to track the parition being dropped
+              lastBatch.add(part);
+
+              // Update messages
+              dropMsgs.add(String.format(dropMsgFormat, part.getPartitionName()));
+
+              // Decrement batch size.  When this gets to 0, the batch will be executed
+              currentBatchSize--;
+            }
+
+            // this call is deleting partitions that are already missing from filesystem
+            // so 3rd parameter (deleteData) is set to false
+            // msck is doing a clean up of hms.  if for some reason the partition is already
+            // deleted, then it is good.  So, the last parameter ifexists is set to true
+            List<ObjectPair<Integer, byte[]>> partExprs = getPartitionExpr(dropParts);
+            metastoreClient.dropPartitions(table.getCatName(), table.getDbName(), table.getTableName(), partExprs, dropOptions);
+
+            // if last batch is successful remove it from partsNotInFs
+            batchWork.removeAll(lastBatch);
+            repairOutput.addAll(dropMsgs);
+          }
+          return null;
+        } catch (TException e) {
+          throw new MetastoreException(e);
+        }
+      }
+
+      private List<ObjectPair<Integer, byte[]>> getPartitionExpr(final List<String> parts) throws MetaException {
+        List<ObjectPair<Integer, byte[]>> expr = new ArrayList<>(parts.size());
+        for (int i = 0; i < parts.size(); i++) {
+          String partName = parts.get(i);
+          Map<String, String> partSpec = Warehouse.makeSpecFromName(partName);
+          String partExpr = makePartExpr(partSpec);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Generated partExpr: {} for partName: {}", partExpr, partName);
+          }
+          expr.add(new ObjectPair<>(i, partExpr.getBytes(StandardCharsets.UTF_8)));
+        }
+        return expr;
+      }
+    }.run();
+  }
+
+  /**
+   * Write the result of msck to a writer.
+   *
+   * @param result The result we're going to write
+   * @param msg    Message to write.
+   * @param out    Writer to write to
+   * @param wrote  if any previous call wrote data
+   * @return true if something was written
+   * @throws IOException In case the writing fails
+   */
+  private boolean writeMsckResult(Set<?> result, String msg,
+    Writer out, boolean wrote) throws IOException {
+
+    if (!result.isEmpty()) {
+      if (wrote) {
+        out.write(terminator);
+      }
+
+      out.write(msg);
+      for (Object entry : result) {
+        out.write(separator);
+        out.write(entry.toString());
+      }
+      return true;
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java
new file mode 100644
index 0000000..81bcb56
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckInfo.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+
+/**
+ * Metadata related to Msck.
+ */
+public class MsckInfo {
+
+  private String catalogName;
+  private String dbName;
+  private String tableName;
+  private ArrayList<LinkedHashMap<String, String>> partSpecs;
+  private String resFile;
+  private boolean repairPartitions;
+  private boolean addPartitions;
+  private boolean dropPartitions;
+  private long partitionExpirySeconds;
+
+  public MsckInfo(final String catalogName, final String dbName, final String tableName,
+    final ArrayList<LinkedHashMap<String, String>> partSpecs, final String resFile, final boolean repairPartitions,
+    final boolean addPartitions,
+    final boolean dropPartitions,
+    final long partitionExpirySeconds) {
+    this.catalogName = catalogName;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.partSpecs = partSpecs;
+    this.resFile = resFile;
+    this.repairPartitions = repairPartitions;
+    this.addPartitions = addPartitions;
+    this.dropPartitions = dropPartitions;
+    this.partitionExpirySeconds = partitionExpirySeconds;
+  }
+
+  public String getCatalogName() {
+    return catalogName;
+  }
+
+  public void setCatalogName(final String catalogName) {
+    this.catalogName = catalogName;
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  public void setDbName(final String dbName) {
+    this.dbName = dbName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(final String tableName) {
+    this.tableName = tableName;
+  }
+
+  public ArrayList<LinkedHashMap<String, String>> getPartSpecs() {
+    return partSpecs;
+  }
+
+  public void setPartSpecs(final ArrayList<LinkedHashMap<String, String>> partSpecs) {
+    this.partSpecs = partSpecs;
+  }
+
+  public String getResFile() {
+    return resFile;
+  }
+
+  public void setResFile(final String resFile) {
+    this.resFile = resFile;
+  }
+
+  public boolean isRepairPartitions() {
+    return repairPartitions;
+  }
+
+  public void setRepairPartitions(final boolean repairPartitions) {
+    this.repairPartitions = repairPartitions;
+  }
+
+  public boolean isAddPartitions() {
+    return addPartitions;
+  }
+
+  public void setAddPartitions(final boolean addPartitions) {
+    this.addPartitions = addPartitions;
+  }
+
+  public boolean isDropPartitions() {
+    return dropPartitions;
+  }
+
+  public void setDropPartitions(final boolean dropPartitions) {
+    this.dropPartitions = dropPartitions;
+  }
+
+  public long getPartitionExpirySeconds() {
+    return partitionExpirySeconds;
+  }
+
+  public void setPartitionExpirySeconds(final long partitionExpirySeconds) {
+    this.partitionExpirySeconds = partitionExpirySeconds;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java
new file mode 100644
index 0000000..d842825
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MsckPartitionExpressionProxy.java
@@ -0,0 +1,64 @@
+package org.apache.hadoop.hive.metastore;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+
+// This is added as part of moving MSCK code from ql to standalone-metastore. There is a metastore API to drop
+// partitions by name but we cannot use it because msck typically will contain partition value (year=2014). We almost
+// never drop partition by name (year). So we need to construct expression filters, the current
+// PartitionExpressionProxy implementations (PartitionExpressionForMetastore and HCatClientHMSImpl.ExpressionBuilder)
+// all depend on ql code to build ExprNodeDesc for the partition expressions. It also depends on kryo for serializing
+// the expression objects to byte[]. For MSCK drop partition, we don't need complex expression generator. For now,
+// all we do is split the partition spec (year=2014/month=24) into filter expression year='2014' and month='24' and
+// rely on metastore database to deal with type conversions. Ideally, PartitionExpressionProxy default implementation
+// should use SearchArgument (storage-api) to construct the filter expression and not depend on ql, but the usecase
+// for msck is pretty simple and this specific implementation should suffice.
+public class MsckPartitionExpressionProxy implements PartitionExpressionProxy {
+  @Override
+  public String convertExprToFilter(final byte[] exprBytes, final String defaultPartitionName) throws MetaException {
+    return new String(exprBytes, StandardCharsets.UTF_8);
+  }
+
+  @Override
+  public boolean filterPartitionsByExpr(List<FieldSchema> partColumns, byte[] expr, String
+    defaultPartitionName, List<String> partitionNames) throws MetaException {
+    return false;
+  }
+
+  @Override
+  public FileMetadataExprType getMetadataType(String inputFormat) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public FileFormatProxy getFileFormatProxy(FileMetadataExprType type) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SearchArgument createSarg(byte[] expr) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/64bea035/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 9c15804..0755483 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -686,10 +686,9 @@ public class ObjectStore implements RawStore, Configurable {
       debugLog("rolling back transaction: no open transactions: " + openTrasactionCalls);
       return;
     }
-    debugLog("Rollback transaction, isActive: " + currentTransaction.isActive());
+    debugLog("Rollback transaction, isActive: " + isActiveTransaction());
     try {
-      if (currentTransaction.isActive()
-          && transactionStatus != TXN_STATUS.ROLLBACK) {
+      if (isActiveTransaction() && transactionStatus != TXN_STATUS.ROLLBACK) {
         currentTransaction.rollback();
       }
     } finally {
@@ -1711,6 +1710,7 @@ public class ObjectStore implements RawStore, Configurable {
       for (MTable table : tables) {
         TableMeta metaData = new TableMeta(
             table.getDatabase().getName(), table.getTableName(), table.getTableType());
+        metaData.setCatName(catName);
         metaData.setComments(table.getParameters().get("comment"));
         metas.add(metaData);
       }