You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2019/01/22 19:08:37 UTC

hive git commit: HIVE-20776 : Run HMS filterHooks on server-side in addition to client-side (Na Li reviewed by Karthik, Sergio, Morio, Adam and Vihang Karajgaonkar)

Repository: hive
Updated Branches:
  refs/heads/master d1460174d -> dfd63d979


HIVE-20776 : Run HMS filterHooks on server-side in addition to client-side (Na Li reviewed by Karthik, Sergio, Morio, Adam and Vihang Karajgaonkar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dfd63d97
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dfd63d97
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dfd63d97

Branch: refs/heads/master
Commit: dfd63d97902b359e1643e955a4d070ac983debd5
Parents: d146017
Author: Na Li <li...@cloudera.com>
Authored: Tue Jan 22 10:43:25 2019 -0800
Committer: Vihang Karajgaonkar <vi...@apache.org>
Committed: Tue Jan 22 11:08:02 2019 -0800

----------------------------------------------------------------------
 .../hive/metastore/HiveMetaStoreClient.java     | 141 ++++---
 .../hive/metastore/conf/MetastoreConf.java      |   8 +-
 .../hive/metastore/utils/FilterUtils.java       | 375 +++++++++++++++++++
 .../hadoop/hive/metastore/HiveMetaStore.java    | 168 ++++++++-
 .../hadoop/hive/metastore/TestFilterHooks.java  | 304 ++++++++++-----
 .../metastore/client/TestListPartitions.java    |   4 +-
 6 files changed, 849 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/dfd63d97/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 19bd9ba..30edc56 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hive.metastore;
 
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.CAT_NAME;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.parseDbName;
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependCatalogToDbName;
 
 import java.io.IOException;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hive.metastore.hooks.URIResolverHook;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.utils.FilterUtils;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.ObjectPair;
@@ -127,6 +130,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   private String tokenStrForm;
   private final boolean localMetaStore;
   private final MetaStoreFilterHook filterHook;
+  private final boolean isClientFilterEnabled;
   private final URIResolverHook uriResolverHook;
   private final int fileMetadataBatchSize;
 
@@ -164,6 +168,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     }
     version = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) ? TEST_VERSION : VERSION;
     filterHook = loadFilterHooks();
+    isClientFilterEnabled = getIfClientFilterEnabled();
     uriResolverHook = loadUriResolverHook();
     fileMetadataBatchSize = MetastoreConf.getIntVar(
         conf, ConfVars.BATCH_RETRIEVE_OBJECTS_MAX);
@@ -276,6 +281,12 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     return null;
   }
 
+  private boolean getIfClientFilterEnabled() {
+    boolean isEnabled = MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_CLIENT_FILTER_ENABLED);
+    LOG.info("HMS client filtering is " + (isEnabled?"enabled.":"disabled."));
+
+    return isEnabled;
+  }
   private void resolveUris() throws MetaException {
     String thriftUris = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS);
     String serviceDiscoveryMode = MetastoreConf.getVar(conf, ConfVars.THRIFT_SERVICE_DISCOVERY_MODE);
@@ -731,13 +742,15 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public Catalog getCatalog(String catName) throws TException {
     GetCatalogResponse rsp = client.get_catalog(new GetCatalogRequest(catName));
-    return rsp == null ? null : filterHook.filterCatalog(rsp.getCatalog());
+    return rsp == null ?
+        null : FilterUtils.filterCatalogIfEnabled(isClientFilterEnabled, filterHook, rsp.getCatalog());
   }
 
   @Override
   public List<String> getCatalogs() throws TException {
     GetCatalogsResponse rsp = client.get_catalogs();
-    return rsp == null ? null : filterHook.filterCatalogs(rsp.getNames());
+    return rsp == null ?
+        null : FilterUtils.filterCatalogNamesIfEnabled(isClientFilterEnabled, filterHook, rsp.getNames());
   }
 
   @Override
@@ -808,7 +821,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     req.setCatName(part.isSetCatName() ? part.getCatName() : getDefaultCatalog(conf));
     req.setNeedResult(needResults);
     AddPartitionsResult result = client.add_partitions_req(req);
-    return needResults ? filterHook.filterPartitions(result.getPartitions()) : null;
+    return needResults ? FilterUtils.filterPartitionsIfEnabled(
+        isClientFilterEnabled, filterHook, result.getPartitions()) : null;
   }
 
   @Override
@@ -1619,8 +1633,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
 
   @Override
   public List<String> getDatabases(String catName, String databasePattern) throws TException {
-    return filterHook.filterDatabases(client.get_databases(prependCatalogToDbName(
-        catName, databasePattern, conf)));
+    List<String> databases = client.get_databases(prependCatalogToDbName(
+        catName, databasePattern, conf));
+    return FilterUtils.filterDbNamesIfEnabled(isClientFilterEnabled, filterHook, databases);
   }
 
   @Override
@@ -1630,7 +1645,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
 
   @Override
   public List<String> getAllDatabases(String catName) throws TException {
-    return filterHook.filterDatabases(client.get_databases(prependCatalogToDbName(catName, null, conf)));
+    List<String> databases = client.get_databases(prependCatalogToDbName(catName, null, conf));
+    return FilterUtils.filterDbNamesIfEnabled(isClientFilterEnabled, filterHook, databases);
   }
 
   @Override
@@ -1644,7 +1660,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
                                         int max_parts) throws TException {
     List<Partition> parts = client.get_partitions(prependCatalogToDbName(catName, db_name, conf),
         tbl_name, shrinkMaxtoShort(max_parts));
-    return deepCopyPartitions(filterHook.filterPartitions(parts));
+    return deepCopyPartitions(
+        FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, parts));
   }
 
   @Override
@@ -1655,8 +1672,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public PartitionSpecProxy listPartitionSpecs(String catName, String dbName, String tableName,
                                                int maxParts) throws TException {
-    return PartitionSpecProxy.Factory.get(filterHook.filterPartitionSpecs(
-        client.get_partitions_pspec(prependCatalogToDbName(catName, dbName, conf), tableName, maxParts)));
+    List<PartitionSpec> partitionSpecs =
+        client.get_partitions_pspec(prependCatalogToDbName(catName, dbName, conf), tableName, maxParts);
+    partitionSpecs = FilterUtils.filterPartitionSpecsIfEnabled(isClientFilterEnabled, filterHook, partitionSpecs);
+    return PartitionSpecProxy.Factory.get(partitionSpecs);
   }
 
   @Override
@@ -1670,7 +1689,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
                                         List<String> part_vals, int max_parts) throws TException {
     List<Partition> parts = client.get_partitions_ps(prependCatalogToDbName(catName, db_name, conf),
         tbl_name, part_vals, shrinkMaxtoShort(max_parts));
-    return deepCopyPartitions(filterHook.filterPartitions(parts));
+    return deepCopyPartitions(FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, parts));
   }
 
   @Override
@@ -1687,7 +1706,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
                                                     List<String> groupNames) throws TException {
     List<Partition> parts = client.get_partitions_with_auth(prependCatalogToDbName(catName,
         dbName, conf), tableName, shrinkMaxtoShort(maxParts), userName, groupNames);
-    return deepCopyPartitions(filterHook.filterPartitions(parts));
+    return deepCopyPartitions(FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, parts));
   }
 
   @Override
@@ -1706,7 +1725,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
       throws TException {
     List<Partition> parts = client.get_partitions_ps_with_auth(prependCatalogToDbName(catName,
         dbName, conf), tableName, partialPvals, shrinkMaxtoShort(maxParts), userName, groupNames);
-    return deepCopyPartitions(filterHook.filterPartitions(parts));
+    return deepCopyPartitions(FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, parts));
   }
 
   @Override
@@ -1720,7 +1739,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
                                                 String filter, int max_parts) throws TException {
     List<Partition> parts =client.get_partitions_by_filter(prependCatalogToDbName(
         catName, db_name, conf), tbl_name, filter, shrinkMaxtoShort(max_parts));
-    return deepCopyPartitions(filterHook.filterPartitions(parts));
+    return deepCopyPartitions(FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, parts));
   }
 
   @Override
@@ -1734,9 +1753,11 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   public PartitionSpecProxy listPartitionSpecsByFilter(String catName, String db_name,
                                                        String tbl_name, String filter,
                                                        int max_parts) throws TException {
-    return PartitionSpecProxy.Factory.get(filterHook.filterPartitionSpecs(
+    List<PartitionSpec> partitionSpecs =
         client.get_part_specs_by_filter(prependCatalogToDbName(catName, db_name, conf), tbl_name, filter,
-            max_parts)));
+        max_parts);
+    return PartitionSpecProxy.Factory.get(
+        FilterUtils.filterPartitionSpecsIfEnabled(isClientFilterEnabled, filterHook, partitionSpecs));
   }
 
   @Override
@@ -1772,7 +1793,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
       throw new IncompatibleMetastoreException(
           "Metastore doesn't support listPartitionsByExpr: " + te.getMessage());
     }
-    r.setPartitions(filterHook.filterPartitions(r.getPartitions()));
+
+    r.setPartitions(FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, r.getPartitions()));
     // TODO: in these methods, do we really need to deepcopy?
     deepCopyPartitions(r.getPartitions(), result);
     return !r.isSetHasUnknownPartitions() || r.isHasUnknownPartitions(); // Assume the worst.
@@ -1786,7 +1808,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public Database getDatabase(String catalogName, String databaseName) throws TException {
     Database d = client.get_database(prependCatalogToDbName(catalogName, databaseName, conf));
-    return deepCopy(filterHook.filterDatabase(d));
+    return deepCopy(FilterUtils.filterDbIfEnabled(isClientFilterEnabled, filterHook, d));
   }
 
   @Override
@@ -1799,7 +1821,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   public Partition getPartition(String catName, String dbName, String tblName,
                                 List<String> partVals) throws TException {
     Partition p = client.get_partition(prependCatalogToDbName(catName, dbName, conf), tblName, partVals);
-    return deepCopy(filterHook.filterPartition(p));
+    return deepCopy(FilterUtils.filterPartitionIfEnabled(isClientFilterEnabled, filterHook, p));
   }
 
   @Override
@@ -1811,9 +1833,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public List<Partition> getPartitionsByNames(String catName, String db_name, String tbl_name,
                                               List<String> part_names) throws TException {
+    checkDbAndTableFilters(catName, db_name, tbl_name);
     List<Partition> parts =
         client.get_partitions_by_names(prependCatalogToDbName(catName, db_name, conf), tbl_name, part_names);
-    return deepCopyPartitions(filterHook.filterPartitions(parts));
+    return deepCopyPartitions(FilterUtils.filterPartitionsIfEnabled(isClientFilterEnabled, filterHook, parts));
   }
 
   @Override
@@ -1822,6 +1845,12 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     if (!request.isSetCatName()) {
       request.setCatName(getDefaultCatalog(conf));
     }
+
+    String catName = request.isSetCatName() ? request.getCatName() : getDefaultCatalog(conf);
+    String dbName = request.getDbName();
+    String tblName = request.getTblName();
+
+    checkDbAndTableFilters(catName, dbName, tblName);
     return client.get_partition_values(request);
   }
 
@@ -1839,7 +1868,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
                                             List<String> groupNames) throws TException {
     Partition p = client.get_partition_with_auth(prependCatalogToDbName(catName, dbName, conf), tableName,
         pvals, userName, groupNames);
-    return deepCopy(filterHook.filterPartition(p));
+    return deepCopy(FilterUtils.filterPartitionIfEnabled(isClientFilterEnabled, filterHook, p));
   }
 
   @Override
@@ -1853,7 +1882,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     req.setCatName(catName);
     req.setCapabilities(version);
     Table t = client.get_table_req(req).getTable();
-    return deepCopy(filterHook.filterTable(t));
+    return deepCopy(FilterUtils.filterTableIfEnabled(isClientFilterEnabled, filterHook, t));
   }
 
   @Override
@@ -1864,7 +1893,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     req.setCapabilities(version);
     req.setValidWriteIdList(validWriteIdList);
     Table t = client.get_table_req(req).getTable();
-    return deepCopy(filterHook.filterTable(t));
+    return deepCopy(FilterUtils.filterTableIfEnabled(isClientFilterEnabled, filterHook, t));
   }
 
   @Override
@@ -1881,7 +1910,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     req.setTblNames(tableNames);
     req.setCapabilities(version);
     List<Table> tabs = client.get_table_objects_by_name_req(req).getTables();
-    return deepCopyTables(filterHook.filterTables(tabs));
+    return deepCopyTables(FilterUtils.filterTablesIfEnabled(isClientFilterEnabled, filterHook, tabs));
   }
 
   @Override
@@ -1913,9 +1942,11 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public List<String> listTableNamesByFilter(String catName, String dbName, String filter,
                                              int maxTables) throws TException {
-    return filterHook.filterTableNames(catName, dbName,
+    List<String> tableNames =
         client.get_table_names_by_filter(prependCatalogToDbName(catName, dbName, conf), filter,
-            shrinkMaxtoShort(maxTables)));
+        shrinkMaxtoShort(maxTables));
+    return FilterUtils.filterTableNamesIfEnabled(
+        isClientFilterEnabled, filterHook, catName, dbName, tableNames);
   }
 
   /**
@@ -1943,8 +1974,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public List<String> getTables(String catName, String dbName, String tablePattern)
       throws TException {
-    return filterHook.filterTableNames(catName, dbName,
-        client.get_tables(prependCatalogToDbName(catName, dbName, conf), tablePattern));
+    List<String> tables = client.get_tables(prependCatalogToDbName(catName, dbName, conf), tablePattern);
+    return FilterUtils.filterTableNamesIfEnabled(isClientFilterEnabled, filterHook, catName, dbName, tables);
   }
 
   @Override
@@ -1960,9 +1991,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public List<String> getTables(String catName, String dbName, String tablePattern,
                                 TableType tableType) throws TException {
-    return filterHook.filterTableNames(catName, dbName,
+    List<String> tables =
         client.get_tables_by_type(prependCatalogToDbName(catName, dbName, conf), tablePattern,
-            tableType.toString()));
+        tableType.toString());
+    return FilterUtils.filterTableNamesIfEnabled(isClientFilterEnabled, filterHook, catName, dbName, tables);
   }
 
   @Override
@@ -1974,8 +2006,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   public List<String> getMaterializedViewsForRewriting(String catName, String dbname)
       throws MetaException {
     try {
-      return filterHook.filterTableNames(catName, dbname,
-          client.get_materialized_views_for_rewriting(prependCatalogToDbName(catName, dbname, conf)));
+      List<String> views =
+          client.get_materialized_views_for_rewriting(prependCatalogToDbName(catName, dbname, conf));
+      return FilterUtils.filterTableNamesIfEnabled(isClientFilterEnabled, filterHook, catName, dbname, views);
     } catch (Exception e) {
       MetaStoreUtils.logAndThrowMetaException(e);
     }
@@ -1996,8 +2029,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public List<TableMeta> getTableMeta(String catName, String dbPatterns, String tablePatterns,
                                       List<String> tableTypes) throws TException {
-    return filterHook.filterTableMetas(catName,dbPatterns,client.get_table_meta(prependCatalogToDbName(
-        catName, dbPatterns, conf), tablePatterns, tableTypes));
+    List<TableMeta> tableMetas = client.get_table_meta(prependCatalogToDbName(
+        catName, dbPatterns, conf), tablePatterns, tableTypes);
+    return FilterUtils.filterTableMetasIfEnabled(isClientFilterEnabled, filterHook, catName,dbPatterns, tableMetas);
   }
 
   @Override
@@ -2012,8 +2046,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
 
   @Override
   public List<String> getAllTables(String catName, String dbName) throws TException {
-    return filterHook.filterTableNames(catName, dbName, client.get_all_tables(
-        prependCatalogToDbName(catName, dbName, conf)));
+    List<String> tableNames = client.get_all_tables(
+        prependCatalogToDbName(catName, dbName, conf));
+    return FilterUtils.filterTableNamesIfEnabled(isClientFilterEnabled, filterHook, catName, dbName, tableNames);
   }
 
   @Override
@@ -2027,7 +2062,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
       GetTableRequest req = new GetTableRequest(dbName, tableName);
       req.setCatName(catName);
       req.setCapabilities(version);
-      return filterHook.filterTable(client.get_table_req(req).getTable()) != null;
+      Table table = client.get_table_req(req).getTable();
+      return FilterUtils.filterTableIfEnabled(isClientFilterEnabled, filterHook, table) != null;
     } catch (NoSuchObjectException e) {
       return false;
     }
@@ -2042,8 +2078,11 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public List<String> listPartitionNames(String catName, String dbName, String tableName,
                                          int maxParts) throws TException {
-    return filterHook.filterPartitionNames(catName, dbName, tableName,
-        client.get_partition_names(prependCatalogToDbName(catName, dbName, conf), tableName, shrinkMaxtoShort(maxParts)));
+    List<String> partNames =
+        client.get_partition_names(
+            prependCatalogToDbName(catName, dbName, conf), tableName, shrinkMaxtoShort(maxParts));
+    return FilterUtils.filterPartitionNamesIfEnabled(
+        isClientFilterEnabled, filterHook, catName, dbName, tableName, partNames);
   }
 
   @Override
@@ -2055,9 +2094,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   @Override
   public List<String> listPartitionNames(String catName, String db_name, String tbl_name,
                                          List<String> part_vals, int max_parts) throws TException {
-    return filterHook.filterPartitionNames(catName, db_name, tbl_name,
-        client.get_partition_names_ps(prependCatalogToDbName(catName, db_name, conf), tbl_name,
-            part_vals, shrinkMaxtoShort(max_parts)));
+    List<String> partNames = client.get_partition_names_ps(prependCatalogToDbName(catName, db_name, conf), tbl_name,
+        part_vals, shrinkMaxtoShort(max_parts));
+    return FilterUtils.filterPartitionNamesIfEnabled(
+        isClientFilterEnabled, filterHook, catName, db_name, tbl_name, partNames);
   }
 
   @Override
@@ -2384,7 +2424,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
       throws TException {
     Partition p = client.get_partition_by_name(prependCatalogToDbName(catName, dbName, conf), tblName,
         name);
-    return deepCopy(filterHook.filterPartition(p));
+    return deepCopy(FilterUtils.filterPartitionIfEnabled(isClientFilterEnabled, filterHook, p));
   }
 
   public Partition appendPartitionByName(String dbName, String tableName, String partName)
@@ -2419,6 +2459,23 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     return hookLoader.getHook(tbl);
   }
 
+  /**
+   * Check if the current user has access to a given database and table name. Throw
+   * NoSuchObjectException if user has no access. When the db or table is filtered out, we don't need
+   * to even fetch the partitions. Therefore this check ensures table-level security and
+   * could improve performance when filtering partitions.
+   * @param catName the catalog name
+   * @param dbName the database name
+   * @param tblName the table name contained in the database
+   * @throws NoSuchObjectException if the database or table is filtered out
+   */
+  private void checkDbAndTableFilters(final String catName, final String dbName, final String tblName)
+      throws NoSuchObjectException, MetaException {
+
+    FilterUtils.checkDbAndTableFilters(
+        isClientFilterEnabled, filterHook, catName, dbName, tblName);
+  }
+
   @Override
   public List<String> partitionNameToVals(String name) throws MetaException, TException {
     return client.partition_name_to_vals(name);

http://git-wip-us.apache.org/repos/asf/hive/blob/dfd63d97/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index be1f8c7..75f0c0a 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -216,7 +216,9 @@ public class MetastoreConf {
       ConfVars.AGGREGATE_STATS_CACHE_MAX_FULL,
       ConfVars.AGGREGATE_STATS_CACHE_CLEAN_UNTIL,
       ConfVars.DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES,
-      ConfVars.FILE_METADATA_THREADS
+      ConfVars.FILE_METADATA_THREADS,
+      ConfVars.METASTORE_CLIENT_FILTER_ENABLED,
+      ConfVars.METASTORE_SERVER_FILTER_ENABLED
   };
 
   /**
@@ -657,6 +659,10 @@ public class MetastoreConf {
             "metadata being exported to the current user's home directory on HDFS."),
     METASTORE_MAX_EVENT_RESPONSE("metastore.max.event.response", "hive.metastore.max.event.response", 1000000,
         "The parameter will decide the maximum number of events that HMS will respond."),
+    METASTORE_CLIENT_FILTER_ENABLED("metastore.client.filter.enabled", "hive.metastore.client.filter.enabled", true,
+        "Enable filtering the metadata read results at HMS client. Default is true."),
+    METASTORE_SERVER_FILTER_ENABLED("metastore.server.filter.enabled", "hive.metastore.server.filter.enabled", false,
+        "Enable filtering the metadata read results at HMS server. Default is false."),
     MOVE_EXPORTED_METADATA_TO_TRASH("metastore.metadata.move.exported.metadata.to.trash",
         "hive.metadata.move.exported.metadata.to.trash", true,
         "When used in conjunction with the org.apache.hadoop.hive.ql.parse.MetaDataExportListener pre event listener, \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/dfd63d97/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FilterUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FilterUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FilterUtils.java
new file mode 100644
index 0000000..da70dbc
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/FilterUtils.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.utils;
+
+import java.util.Collections;
+import java.util.List;
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.CATALOG_DB_SEPARATOR;
+
+import org.apache.hadoop.hive.metastore.MetaStoreFilterHook;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+
+/**
+ * Utilities common to Filtering operations.
+ */
+public class FilterUtils {
+
+  /**
+   * Filter the DB if filtering is enabled. Otherwise, return original DB object
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param db: the database object from HMS metadata
+   * @return the original database object if current user has access;
+   *         otherwise, throw NoSuchObjectException exception
+   * @throws MetaException
+   * @throws NoSuchObjectException
+   */
+  public static Database filterDbIfEnabled(
+      boolean isFilterEnabled,
+      MetaStoreFilterHook filterHook,
+      Database db) throws MetaException, NoSuchObjectException {
+
+    if (isFilterEnabled) {
+      Database filteredDb = filterHook.filterDatabase(db);
+
+      if (filteredDb == null) {
+        throw new NoSuchObjectException("DB " + db.getName() + " not found.");
+      }
+    }
+
+    return  db;
+  }
+
+  /**
+   * Filter the list of databases if filtering is enabled. Otherwise, return original list
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param dbNames: the list of database names to filter
+   * @return the list of database names that current user has access if filtering is enabled;
+   *         otherwise, the original list
+   * @throws MetaException
+   */
+  public static List<String> filterDbNamesIfEnabled(
+      boolean isFilterEnabled,
+      MetaStoreFilterHook filterHook,
+      List<String> dbNames) throws MetaException {
+
+    if (isFilterEnabled) {
+      return filterHook.filterDatabases(dbNames);
+    }
+
+    return dbNames;
+  }
+
+  /**
+   * Filter the list of tables if filtering is enabled. Otherwise, return original list
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param catName: the catalog name of the tables
+   * @param dbName: the database name to the tables
+   * @param tableNames: the list of table names to filter
+   * @return the list of table names that current user has access if filtering is enabled;
+   *         otherwise, the original list
+   * @throws MetaException
+   */
+  public static List<String> filterTableNamesIfEnabled(
+      boolean isFilterEnabled, MetaStoreFilterHook filterHook, String catName, String dbName,
+      List<String> tableNames) throws MetaException{
+
+    if (isFilterEnabled) {
+      return filterHook.filterTableNames(catName, dbName, tableNames);
+    }
+
+    return tableNames;
+  }
+
+  /**
+   * Filter the list of tables if filtering is enabled. Otherwise, return original list
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param tables: the list of table objects to filter
+   * @return the list of tables that current user has access if filtering is enabled;
+   *         otherwise, the original list
+   * @throws MetaException
+   */
+  public static List<Table> filterTablesIfEnabled(
+      boolean isFilterEnabled, MetaStoreFilterHook filterHook, List<Table> tables)
+      throws MetaException{
+
+    if (isFilterEnabled) {
+      return filterHook.filterTables(tables);
+    }
+
+    return tables;
+  }
+
+  /**
+   * Filter the table if filtering is enabled. Otherwise, return original table object
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param table: the table object from Hive meta data
+   * @return the table object if user has access or filtering is disabled;
+   *         throw NoSuchObjectException if user does not have access to this table
+   * @throws MetaException
+   * @throws NoSuchObjectException
+   */
+  public static Table filterTableIfEnabled(
+      boolean isFilterEnabled, MetaStoreFilterHook filterHook, Table table)
+      throws MetaException, NoSuchObjectException {
+    if (isFilterEnabled) {
+      Table filteredTable = filterHook.filterTable(table);
+
+      if (filteredTable == null) {
+        throw new NoSuchObjectException("Table " + table.getDbName() + "." +
+            table.getTableName() + " not found.");
+      }
+    }
+
+    return table;
+  }
+
+  /**
+   * Filter list of meta data of tables if filtering is enabled. Otherwise, return original list
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param catName: the catalog name
+   * @param dbName: the database name
+   * @param tableMetas: the list of meta data of tables
+   * @return the list of table meta data that current user has access if filtering is enabled;
+   *         otherwise, the original list
+   * @throws MetaException
+   * @throws NoSuchObjectException
+   */
+  public static List<TableMeta> filterTableMetasIfEnabled(
+      boolean isFilterEnabled, MetaStoreFilterHook filterHook,
+      String catName, String dbName, List<TableMeta> tableMetas)
+      throws MetaException, NoSuchObjectException {
+    if (tableMetas == null || tableMetas.isEmpty()) {
+      return tableMetas;
+    }
+
+    if (isFilterEnabled) {
+      return filterHook.filterTableMetas(
+          catName, dbName, tableMetas);
+    }
+
+    return tableMetas;
+  }
+
+  /**
+   * Filter the partition if filtering is enabled. Otherwise, return original object
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param p: the partition object
+   * @return the partition object that user has access or original list if filtering is disabled;
+   *         Otherwise, throw NoSuchObjectException
+   * @throws MetaException
+   * @throws NoSuchObjectException
+   */
+  public static Partition filterPartitionIfEnabled(
+      boolean isFilterEnabled,
+      MetaStoreFilterHook filterHook, Partition p) throws MetaException, NoSuchObjectException {
+
+    if (isFilterEnabled) {
+      Partition filteredPartition = filterHook.filterPartition(p);
+
+      if (filteredPartition == null) {
+        throw new NoSuchObjectException("Partition in " + p.getCatName() + CATALOG_DB_SEPARATOR + p.getDbName() + "." +
+            p.getTableName() + " not found.");
+      }
+    }
+
+    return p;
+  }
+
+  /**
+   * Filter the list of partitions if filtering is enabled. Otherwise, return original list
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param partitions: the list of partitions
+   * @return the list of partitions that user has access or original list if filtering is disabled;
+   * @throws MetaException
+   */
+  public static List<Partition> filterPartitionsIfEnabled(
+      boolean isFilterEnabled,
+      MetaStoreFilterHook filterHook, List<Partition> partitions) throws MetaException {
+
+    if (isFilterEnabled) {
+      return filterHook.filterPartitions(partitions);
+    }
+
+    return partitions;
+  }
+
+  /**
+   * Filter the list of partitions if filtering is enabled. Otherwise, return original list
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param catName: the catalog name
+   * @param dbName: the database name
+   * @param tableName: the table name
+   * @param partitionNames: the list of partition names
+   * @return the list of partitions that current user has access if filtering is enabled;
+   *         Otherwise, the original list
+   * @throws MetaException
+   */
+  public static List<String> filterPartitionNamesIfEnabled(
+      boolean isFilterEnabled,
+      MetaStoreFilterHook filterHook,
+      final String catName, final String dbName,
+      final String tableName, List<String> partitionNames) throws MetaException {
+    if (isFilterEnabled) {
+      return
+          filterHook.filterPartitionNames(catName,
+              dbName, tableName, partitionNames);
+    }
+
+    return partitionNames;
+  }
+
+  /**
+   * Filter the list of PartitionSpec if filtering is enabled; Otherwise, return original list
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param partitionSpecs: the list of PartitionSpec
+   * @return the list of PartitionSpec that current user has access if filtering is enabled;
+   *         Otherwise, the original list
+   * @throws MetaException
+   */
+  public static List<PartitionSpec> filterPartitionSpecsIfEnabled(
+      boolean isFilterEnabled,
+      MetaStoreFilterHook filterHook,
+      List<PartitionSpec> partitionSpecs) throws MetaException {
+    if (isFilterEnabled) {
+      return
+          filterHook.filterPartitionSpecs(partitionSpecs);
+    }
+
+    return partitionSpecs;
+  }
+
+  /**
+   * Filter the catalog if filtering is enabled; Otherwise, return original object
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param catalog: the catalog object
+   * @return the catalog object that current user has access or filtering is disabled;
+   *         Otherwise, throw NoSuchObjectException
+   * @throws MetaException
+   * @throws NoSuchObjectException
+   */
+  public static Catalog filterCatalogIfEnabled(
+      boolean isFilterEnabled,
+      MetaStoreFilterHook filterHook,
+      Catalog catalog
+  ) throws MetaException, NoSuchObjectException {
+    if (isFilterEnabled) {
+      Catalog filteredCatalog = filterHook.filterCatalog(catalog);
+
+      if (filteredCatalog == null) {
+        throw new NoSuchObjectException("Catalog " + catalog.getName() + " not found.");
+      }
+    }
+
+    return  catalog;
+  }
+
+  /**
+   * Filter list of catalog names if filtering is enabled; Otherwise, return original list
+   * @param isFilterEnabled true: filtering is enabled; false: filtring is disabled.
+   * @param filterHook: the object that does filtering
+   * @param catalogNames: the list of catalog names
+   * @return the list of catalog names that the current user has access or
+   *         original list if filtering is disabled;
+   * @throws MetaException
+   */
+  public static List<String> filterCatalogNamesIfEnabled(
+      boolean isFilterEnabled, MetaStoreFilterHook filterHook,
+      List<String> catalogNames) throws MetaException{
+
+    if (isFilterEnabled) {
+      return filterHook.filterCatalogs(catalogNames);
+    }
+
+    return catalogNames;
+  }
+
+
+  /**
+   * Check if the current user has access to a given database and table name. Throw
+   * NoSuchObjectException if user has no access. When the db or table is filtered out, we don't need
+   * to even fetch the partitions. Therefore this check ensures table-level security and
+   * could improve performance when filtering partitions.
+   * @param dbName the database name
+   * @param tblName the table name contained in the database
+   * @return if the
+   * @throws NoSuchObjectException if the database or table is filtered out
+   */
+  public static void checkDbAndTableFilters(boolean isFilterEnabled,
+      MetaStoreFilterHook filterHook,
+      final String catName, final String dbName, final String tblName)
+      throws NoSuchObjectException, MetaException {
+
+    if (catName == null) {
+      throw new NullPointerException("catName is null");
+    }
+
+    if (isBlank(catName)) {
+      throw new NoSuchObjectException("catName is not valid");
+    }
+
+    if (dbName == null) {
+      throw new NullPointerException("dbName is null");
+    }
+
+    if (isBlank(dbName)) {
+      throw new NoSuchObjectException("dbName is not valid");
+    }
+
+    List<String> filteredDb = filterDbNamesIfEnabled(isFilterEnabled, filterHook,
+        Collections.singletonList(dbName));
+
+    if (filteredDb.isEmpty()) {
+      throw new NoSuchObjectException("Database " + dbName + " does not exist");
+    }
+
+    if (tblName == null) {
+      throw new NullPointerException("tblName is null");
+    }
+
+    if (isBlank(tblName)) {
+      throw new NoSuchObjectException("tblName is not valid");
+    }
+
+    List<String> filteredTable =
+        filterTableNamesIfEnabled(isFilterEnabled, filterHook,
+            catName, dbName, Collections.singletonList(tblName));
+    if (filteredTable.isEmpty()) {
+      throw new NoSuchObjectException("Table " + tblName + " does not exist");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/dfd63d97/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index a7b2061..eb11e9f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_COMMENT;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
@@ -30,6 +31,8 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependCatal
 import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependNotNullCatToDbName;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
@@ -156,6 +159,7 @@ import org.apache.hadoop.hive.metastore.security.TUGIContainingTransport;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.FilterUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
@@ -506,6 +510,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     private List<TransactionalMetaStoreEventListener> transactionalListeners;
     private List<MetaStoreEndFunctionListener> endFunctionListeners;
     private List<MetaStoreInitListener> initListeners;
+    private MetaStoreFilterHook filterHook;
+    private boolean isServerFilterEnabled = false;
+
     private Pattern partitionValidationPattern;
     private final boolean isInTest;
 
@@ -616,6 +623,76 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       }
       expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
       fileMetadataManager = new FileMetadataManager(this.getMS(), conf);
+
+      isServerFilterEnabled = getIfServerFilterenabled();
+      filterHook = isServerFilterEnabled ? loadFilterHooks() : null;
+    }
+
+    /**
+     *
+     * Filter is actually enabled only when the configured filter hook is configured, not default, and
+     * enabled in configuration
+     * @return
+     */
+    private boolean getIfServerFilterenabled() throws MetaException{
+      boolean isEnabled = MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_SERVER_FILTER_ENABLED);
+
+      if (!isEnabled) {
+        LOG.info("HMS server filtering is disabled by configuration");
+        return false;
+      }
+
+      String filterHookClassName = MetastoreConf.getVar(conf, ConfVars.FILTER_HOOK);
+
+      if (isBlank(filterHookClassName)) {
+        throw new MetaException("HMS server filtering is enabled but no filter hook is configured");
+      }
+
+      if (filterHookClassName.trim().equalsIgnoreCase(DefaultMetaStoreFilterHookImpl.class.getName())) {
+        throw new MetaException("HMS server filtering is enabled but the filter hook is DefaultMetaStoreFilterHookImpl, which does no filtering");
+      }
+
+      LOG.info("HMS server filtering is enabled. The filter class is " + filterHookClassName);
+      return true;
+    }
+
+    private MetaStoreFilterHook loadFilterHooks() throws IllegalStateException  {
+      String errorMsg = "Unable to load filter hook at HMS server. ";
+
+      String filterHookClassName = MetastoreConf.getVar(conf, ConfVars.FILTER_HOOK);
+      Preconditions.checkState(!isBlank(filterHookClassName));
+
+      try {
+        return (MetaStoreFilterHook)Class.forName(
+            filterHookClassName.trim(), true, JavaUtils.getClassLoader()).getConstructor(
+            Configuration.class).newInstance(conf);
+      } catch (Exception e) {
+        LOG.error(errorMsg, e);
+        throw new IllegalStateException(errorMsg + e.getMessage(), e);
+      }
+    }
+
+    /**
+     * Check if user can access the table associated with the partition. If not, then throw exception
+     * so user cannot access partitions associated with this table
+     * We are not calling Pre event listener for authorization because it requires getting the
+     * table object from DB, more overhead. Instead ,we call filter hook to filter out table if user
+     * has no access. Filter hook only requires table name, not table object. That saves DB access for
+     * table object, and still achieve the same purpose: checking if user can access the specified
+     * table
+     *
+     * @param catName catalog name of the table
+     * @param dbName database name of the table
+     * @param tblName table name
+     * @throws NoSuchObjectException
+     * @throws MetaException
+     */
+    private void authorizeTableForPartitionMetadata(
+        final String catName, final String dbName, final String tblName)
+        throws NoSuchObjectException, MetaException {
+
+      FilterUtils.checkDbAndTableFilters(
+          isServerFilterEnabled, filterHook, catName, dbName, tblName);
     }
 
     private static String addPrefix(String s) {
@@ -1167,7 +1244,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         ex = e;
         throw e;
       } finally {
-        endFunction("get_database", cat != null, ex);
+        endFunction("get_catalog", cat != null, ex);
       }
     }
 
@@ -1685,8 +1762,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       try {
         if (parsedDbNamed[DB_NAME] == null) {
           ret = getMS().getAllDatabases(parsedDbNamed[CAT_NAME]);
+          ret = FilterUtils.filterDbNamesIfEnabled(isServerFilterEnabled, filterHook, ret);
         } else {
           ret = getMS().getDatabases(parsedDbNamed[CAT_NAME], parsedDbNamed[DB_NAME]);
+          ret = FilterUtils.filterDbNamesIfEnabled(isServerFilterEnabled, filterHook, ret);
         }
       } catch (MetaException e) {
         ex = e;
@@ -1702,6 +1781,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
     @Override
     public List<String> get_all_databases() throws MetaException {
+      // get_databases filters results already. No need to filter here
       return get_databases(MetaStoreUtils.prependCatalogToDbName(null, null, conf));
     }
 
@@ -2909,7 +2989,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         NoSuchObjectException {
       String[] parsedDbName = parseDbName(dbname, conf);
       return getTableInternal(
-          parsedDbName[CAT_NAME], parsedDbName[DB_NAME], name, null, null);
+            parsedDbName[CAT_NAME], parsedDbName[DB_NAME], name, null, null);
     }
 
     @Override
@@ -2937,6 +3017,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           assertClientHasCapability(capabilities, ClientCapability.INSERT_ONLY_TABLES,
               "insert-only tables", "get_table_req");
         }
+
         firePreEvent(new PreReadTableEvent(t, this));
       } catch (MetaException | NoSuchObjectException e) {
         ex = e;
@@ -2957,6 +3038,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Exception ex = null;
       try {
         t = getMS().getTableMeta(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblNames, tblTypes);
+        t = FilterUtils.filterTableMetasIfEnabled(isServerFilterEnabled, filterHook,
+            parsedDbName[CAT_NAME], parsedDbName[DB_NAME], t);
       } catch (Exception e) {
         ex = e;
         throw newMetaException(e);
@@ -3022,8 +3105,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     @Override
     public GetTablesResult get_table_objects_by_name_req(GetTablesRequest req) throws TException {
       String catName = req.isSetCatName() ? req.getCatName() : getDefaultCatalog(conf);
-      return new GetTablesResult(getTableObjectsInternal(catName,
-          req.getDbName(), req.getTblNames(), req.getCapabilities()));
+      return new GetTablesResult(getTableObjectsInternal(catName, req.getDbName(),
+          req.getTblNames(), req.getCapabilities()));
     }
 
     private List<Table> getTableObjectsInternal(String catName, String dbName,
@@ -3076,6 +3159,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
                 "insert-only tables", "get_table_req");
           }
         }
+
+        FilterUtils.filterTablesIfEnabled(isServerFilterEnabled, filterHook, tables);
       } catch (MetaException | InvalidOperationException | UnknownDBException e) {
         ex = e;
         throw e;
@@ -3134,6 +3219,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           throw new InvalidOperationException(filter + " cannot apply null filter");
         }
         tables = getMS().listTableNamesByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], filter, maxTables);
+        tables = FilterUtils.filterTableNamesIfEnabled(
+            isServerFilterEnabled, filterHook, parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tables);
+
       } catch (MetaException | InvalidOperationException | UnknownDBException e) {
         ex = e;
         throw e;
@@ -4534,8 +4622,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Partition ret = null;
       Exception ex = null;
       try {
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name);
         fireReadTablePreEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name);
         ret = getMS().getPartition(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, part_vals);
+        ret = FilterUtils.filterPartitionIfEnabled(isServerFilterEnabled, filterHook, ret);
       } catch (Exception e) {
         ex = e;
         throwMetaException(e);
@@ -4575,8 +4665,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Partition ret = null;
       Exception ex = null;
       try {
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name);
+
         ret = getMS().getPartitionWithAuth(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
             tbl_name, part_vals, user_name, group_names);
+        ret = FilterUtils.filterPartitionIfEnabled(isServerFilterEnabled, filterHook, ret);
       } catch (InvalidObjectException e) {
         ex = e;
         throw new NoSuchObjectException(e.getMessage());
@@ -4600,8 +4693,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       try {
         checkLimitNumberOfPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
             tbl_name, NO_FILTER_STRING, max_parts);
+
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name);
+
         ret = getMS().getPartitions(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name,
             max_parts);
+        ret = FilterUtils.filterPartitionsIfEnabled(isServerFilterEnabled, filterHook, ret);
       } catch (Exception e) {
         ex = e;
         throwMetaException(e);
@@ -4624,8 +4721,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       try {
         checkLimitNumberOfPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
             tblName, NO_FILTER_STRING, maxParts);
+
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName);
+
         ret = getMS().getPartitionsWithAuth(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName,
             maxParts, userName, groupNames);
+        ret = FilterUtils.filterPartitionsIfEnabled(isServerFilterEnabled, filterHook, ret);
       } catch (InvalidObjectException e) {
         ex = e;
         throw new NoSuchObjectException(e.getMessage());
@@ -4761,8 +4862,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<String> ret = null;
       Exception ex = null;
       try {
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name);
         ret = getMS().listPartitionNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name,
             max_parts);
+        ret = FilterUtils.filterPartitionNamesIfEnabled(isServerFilterEnabled,
+            filterHook, parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, ret);
       } catch (MetaException e) {
         ex = e;
         throw e;
@@ -4776,17 +4880,26 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
 
     @Override
-    public PartitionValuesResponse get_partition_values(PartitionValuesRequest request) throws MetaException {
+    public PartitionValuesResponse get_partition_values(PartitionValuesRequest request)
+        throws MetaException {
       String catName = request.isSetCatName() ? request.getCatName() : getDefaultCatalog(conf);
       String dbName = request.getDbName();
       String tblName = request.getTblName();
-      // This is serious black magic, as the following 2 lines do nothing AFAICT but without them
-      // the subsequent call to listPartitionValues fails.
-      List<FieldSchema> partCols = new ArrayList<FieldSchema>();
-      partCols.add(request.getPartitionKeys().get(0));
-      return getMS().listPartitionValues(catName, dbName, tblName, request.getPartitionKeys(),
-          request.isApplyDistinct(), request.getFilter(), request.isAscending(),
-          request.getPartitionOrder(), request.getMaxParts());
+
+      try {
+        authorizeTableForPartitionMetadata(catName, dbName, tblName);
+
+        // This is serious black magic, as the following 2 lines do nothing AFAICT but without them
+        // the subsequent call to listPartitionValues fails.
+        List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+        partCols.add(request.getPartitionKeys().get(0));
+        return getMS().listPartitionValues(catName, dbName, tblName, request.getPartitionKeys(),
+            request.isApplyDistinct(), request.getFilter(), request.isAscending(),
+            request.getPartitionOrder(), request.getMaxParts());
+      } catch (NoSuchObjectException e) {
+        LOG.error(String.format("Unable to get partition for %s.%s.%s", catName, dbName, tblName), e);
+        throw new MetaException(e.getMessage());
+      }
     }
 
     @Deprecated
@@ -5109,6 +5222,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       String[] parsedDbName = parseDbName(dbname, conf);
       try {
         ret = getMS().getTables(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], pattern);
+        ret = FilterUtils.filterTableNamesIfEnabled(isServerFilterEnabled, filterHook,
+            parsedDbName[CAT_NAME], dbname, ret);
       } catch (MetaException e) {
         ex = e;
         throw e;
@@ -5174,6 +5289,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       String[] parsedDbName = parseDbName(dbname, conf);
       try {
         ret = getMS().getAllTables(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]);
+        ret = FilterUtils.filterTableNamesIfEnabled(isServerFilterEnabled, filterHook,
+            parsedDbName[CAT_NAME], parsedDbName[DB_NAME], ret);
       } catch (MetaException e) {
         ex = e;
         throw e;
@@ -5426,6 +5543,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         throw new NoSuchObjectException(e.getMessage());
       }
       Partition p = ms.getPartition(catName, db_name, tbl_name, partVals);
+      p = FilterUtils.filterPartitionIfEnabled(isServerFilterEnabled, filterHook, p);
 
       if (p == null) {
         throw new NoSuchObjectException(TableName.getQualified(catName, db_name, tbl_name)
@@ -5446,7 +5564,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Exception ex = null;
       try {
         ret = get_partition_by_name_core(getMS(), parsedDbName[CAT_NAME],
-            parsedDbName[DB_NAME], tbl_name, part_name); } catch (Exception e) {
+            parsedDbName[DB_NAME], tbl_name, part_name);
+        ret = FilterUtils.filterPartitionIfEnabled(isServerFilterEnabled, filterHook, ret);
+      } catch (Exception e) {
         ex = e;
         rethrowException(e);
       } finally {
@@ -5549,9 +5669,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Partition> ret = null;
       Exception ex = null;
       try {
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name);
         // Don't send the parsedDbName, as this method will parse itself.
         ret = get_partitions_ps_with_auth(db_name, tbl_name, part_vals,
             max_parts, null, null);
+        ret = FilterUtils.filterPartitionsIfEnabled(isServerFilterEnabled, filterHook, ret);
       } catch (Exception e) {
         ex = e;
         rethrowException(e);
@@ -5574,8 +5696,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Partition> ret = null;
       Exception ex = null;
       try {
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name);
         ret = getMS().listPartitionsPsWithAuth(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
             tbl_name, part_vals, max_parts, userName, groupNames);
+        ret = FilterUtils.filterPartitionsIfEnabled(isServerFilterEnabled, filterHook, ret);
       } catch (InvalidObjectException e) {
         ex = e;
         throw new MetaException(e.getMessage());
@@ -5599,8 +5723,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<String> ret = null;
       Exception ex = null;
       try {
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name);
         ret = getMS().listPartitionNamesPs(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name,
             part_vals, max_parts);
+        ret = FilterUtils.filterPartitionNamesIfEnabled(isServerFilterEnabled,
+            filterHook, parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tbl_name, ret);
       } catch (Exception e) {
         ex = e;
         rethrowException(e);
@@ -6043,8 +6170,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       try {
         checkLimitNumberOfPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
             tblName, filter, maxParts);
+
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName);
+
         ret = getMS().getPartitionsByFilter(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName,
             filter, maxParts);
+        ret = FilterUtils.filterPartitionsIfEnabled(isServerFilterEnabled, filterHook, ret);
       } catch (Exception e) {
         ex = e;
         rethrowException(e);
@@ -6172,14 +6303,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
                                                    final List<String> partNames) throws TException {
 
       String[] parsedDbName = parseDbName(dbName, conf);
-      startTableFunction("get_partitions_by_names", parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
-          tblName);
-      fireReadTablePreEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName);
       List<Partition> ret = null;
       Exception ex = null;
+
+      startTableFunction("get_partitions_by_names", parsedDbName[CAT_NAME], parsedDbName[DB_NAME],
+          tblName);
       try {
+        authorizeTableForPartitionMetadata(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName);
+
+        fireReadTablePreEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName);
+
         ret = getMS().getPartitionsByNames(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tblName,
             partNames);
+        ret = FilterUtils.filterPartitionsIfEnabled(isServerFilterEnabled, filterHook, ret);
       } catch (Exception e) {
         ex = e;
         rethrowException(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/dfd63d97/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestFilterHooks.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestFilterHooks.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestFilterHooks.java
index 7dc69bc..6a0d0aa 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestFilterHooks.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestFilterHooks.java
@@ -15,47 +15,48 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.metastore;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+package org.apache.hadoop.hive.metastore;
 
 import java.util.ArrayList;
 import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionSpec;
-import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
-import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder;
 import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
-import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test the filtering behavior at HMS client and HMS server. The configuration at each test
+ * changes, and therefore HMS client and server are created for each test case
+ */
 @Category(MetastoreUnitTest.class)
 public class TestFilterHooks {
-  private static final Logger LOG = LoggerFactory.getLogger(TestFilterHooks.class);
-
-  public static class DummyMetaStoreFilterHookImpl extends DefaultMetaStoreFilterHookImpl {
+  public static class DummyMetaStoreFilterHookImpl implements MetaStoreFilterHook {
     private static boolean blockResults = false;
 
     public DummyMetaStoreFilterHookImpl(Configuration conf) {
-      super(conf);
     }
 
     @Override
@@ -63,7 +64,7 @@ public class TestFilterHooks {
       if (blockResults) {
         return new ArrayList<>();
       }
-      return super.filterDatabases(dbList);
+      return dbList;
     }
 
     @Override
@@ -71,7 +72,7 @@ public class TestFilterHooks {
       if (blockResults) {
         throw new NoSuchObjectException("Blocked access");
       }
-      return super.filterDatabase(dataBase);
+      return dataBase;
     }
 
     @Override
@@ -80,7 +81,7 @@ public class TestFilterHooks {
       if (blockResults) {
         return new ArrayList<>();
       }
-      return super.filterTableNames(catName, dbName, tableList);
+      return tableList;
     }
 
     @Override
@@ -88,7 +89,7 @@ public class TestFilterHooks {
       if (blockResults) {
         throw new NoSuchObjectException("Blocked access");
       }
-      return super.filterTable(table);
+      return table;
     }
 
     @Override
@@ -96,7 +97,12 @@ public class TestFilterHooks {
       if (blockResults) {
         return new ArrayList<>();
       }
-      return super.filterTables(tableList);
+      return tableList;
+    }
+
+    @Override
+    public List<TableMeta> filterTableMetas(String catName, String dbName,List<TableMeta> tableMetas) throws MetaException {
+      return tableMetas;
     }
 
     @Override
@@ -104,7 +110,7 @@ public class TestFilterHooks {
       if (blockResults) {
         return new ArrayList<>();
       }
-      return super.filterPartitions(partitionList);
+      return partitionList;
     }
 
     @Override
@@ -113,7 +119,7 @@ public class TestFilterHooks {
       if (blockResults) {
         return new ArrayList<>();
       }
-      return super.filterPartitionSpecs(partitionSpecList);
+      return partitionSpecList;
     }
 
     @Override
@@ -121,7 +127,7 @@ public class TestFilterHooks {
       if (blockResults) {
         throw new NoSuchObjectException("Blocked access");
       }
-      return super.filterPartition(partition);
+      return partition;
     }
 
     @Override
@@ -130,125 +136,243 @@ public class TestFilterHooks {
       if (blockResults) {
         return new ArrayList<>();
       }
-      return super.filterPartitionNames(catName, dbName, tblName, partitionNames);
+      return partitionNames;
     }
-
   }
 
-  private static final String DBNAME1 = "testdb1";
-  private static final String DBNAME2 = "testdb2";
+  protected static HiveMetaStoreClient client;
+  protected static Configuration conf;
+  protected static Warehouse warehouse;
+
+  private static final int DEFAULT_LIMIT_PARTITION_REQUEST = 100;
+
+  private static String DBNAME1 = "testdb1";
+  private static String DBNAME2 = "testdb2";
   private static final String TAB1 = "tab1";
   private static final String TAB2 = "tab2";
-  private static Configuration conf;
-  private static HiveMetaStoreClient msc;
+
+
+  protected HiveMetaStoreClient createClient(Configuration metaStoreConf) throws Exception {
+    try {
+      return new HiveMetaStoreClient(metaStoreConf);
+    } catch (Throwable e) {
+      System.err.println("Unable to open the metastore");
+      System.err.println(StringUtils.stringifyException(e));
+      throw new Exception(e);
+    }
+  }
 
   @BeforeClass
   public static void setUp() throws Exception {
-    DummyMetaStoreFilterHookImpl.blockResults = false;
+    DummyMetaStoreFilterHookImpl.blockResults = true;
+  }
+
+  @Before
+  public void setUpForTest() throws Exception {
 
     conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setLongVar(conf, ConfVars.THRIFT_CONNECTION_RETRIES, 3);
     MetastoreConf.setBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     MetastoreConf.setClass(conf, ConfVars.FILTER_HOOK, DummyMetaStoreFilterHookImpl.class,
         MetaStoreFilterHook.class);
+    MetastoreConf.setBoolVar(conf, ConfVars.METRICS_ENABLED, true);
+    conf.set("hive.key1", "value1");
+    conf.set("hive.key2", "http://www.example.com");
+    conf.set("hive.key3", "");
+    conf.set("hive.key4", "0");
+    conf.set("datanucleus.autoCreateTables", "false");
+    conf.set("hive.in.test", "true");
+
+    MetastoreConf.setLongVar(conf, ConfVars.BATCH_RETRIEVE_MAX, 2);
+    MetastoreConf.setLongVar(conf, ConfVars.LIMIT_PARTITION_REQUEST, DEFAULT_LIMIT_PARTITION_REQUEST);
+    MetastoreConf.setVar(conf, ConfVars.STORAGE_SCHEMA_READER_IMPL, "no.such.class");
     MetaStoreTestUtils.setConfForStandloneMode(conf);
-    MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf);
 
-    msc = new HiveMetaStoreClient(conf);
+    warehouse = new Warehouse(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  /**
+   * This is called in each test after the configuration is set in each test case
+   * @throws Exception
+   */
+  protected void creatEnv(Configuration conf) throws Exception {
+    client = createClient(conf);
 
-    msc.dropDatabase(DBNAME1, true, true, true);
-    msc.dropDatabase(DBNAME2, true, true, true);
+    client.dropDatabase(DBNAME1, true, true, true);
+    client.dropDatabase(DBNAME2, true, true, true);
     Database db1 = new DatabaseBuilder()
         .setName(DBNAME1)
         .setCatalogName(Warehouse.DEFAULT_CATALOG_NAME)
-        .create(msc, conf);
+        .create(client, conf);
     Database db2 = new DatabaseBuilder()
         .setName(DBNAME2)
         .setCatalogName(Warehouse.DEFAULT_CATALOG_NAME)
-        .create(msc, conf);
+        .create(client, conf);
     new TableBuilder()
         .setDbName(DBNAME1)
         .setTableName(TAB1)
         .addCol("id", "int")
         .addCol("name", "string")
-        .create(msc, conf);
+        .create(client, conf);
     Table tab2 = new TableBuilder()
         .setDbName(DBNAME1)
         .setTableName(TAB2)
         .addCol("id", "int")
         .addPartCol("name", "string")
-        .create(msc, conf);
+        .create(client, conf);
     new PartitionBuilder()
         .inTable(tab2)
         .addValue("value1")
-        .addToTable(msc, conf);
+        .addToTable(client, conf);
     new PartitionBuilder()
         .inTable(tab2)
         .addValue("value2")
-        .addToTable(msc, conf);
+        .addToTable(client, conf);
   }
 
-  @AfterClass
-  public static void tearDown() throws Exception {
-    msc.close();
+  /**
+   * The default configuration should be disable filtering at HMS server
+   * Disable the HMS client side filtering in order to see HMS server filtering behavior
+   * @throws Exception
+   */
+  @Test
+  public void testHMSServerWithoutFilter() throws Exception {
+    MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CLIENT_FILTER_ENABLED, false);
+    DBNAME1 = "db_testHMSServerWithoutFilter_1";
+    DBNAME2 = "db_testHMSServerWithoutFilter_2";
+    creatEnv(conf);
+
+    assertNotNull(client.getTable(DBNAME1, TAB1));
+    assertEquals(2, client.getTables(DBNAME1, "*").size());
+    assertEquals(2, client.getAllTables(DBNAME1).size());
+    assertEquals(1, client.getTables(DBNAME1, TAB2).size());
+    assertEquals(0, client.getAllTables(DBNAME2).size());
+
+    assertNotNull(client.getDatabase(DBNAME1));
+    assertEquals(2, client.getDatabases("*testHMSServerWithoutFilter*").size());
+    assertEquals(1, client.getDatabases(DBNAME1).size());
+
+    assertNotNull(client.getPartition(DBNAME1, TAB2, "name=value1"));
+    assertEquals(1, client.getPartitionsByNames(DBNAME1, TAB2, Lists.newArrayList("name=value1")).size());
   }
 
+  /**
+   * Enable the HMS server side filtering
+   * Disable the HMS client side filtering in order to see HMS server filtering behavior
+   * @throws Exception
+   */
   @Test
-  public void testDefaultFilter() throws Exception {
-    assertNotNull(msc.getTable(DBNAME1, TAB1));
-    assertEquals(2, msc.getTables(DBNAME1, "*").size());
-    assertEquals(2, msc.getAllTables(DBNAME1).size());
-    assertEquals(1, msc.getTables(DBNAME1, TAB2).size());
-    assertEquals(0, msc.getAllTables(DBNAME2).size());
-
-    assertNotNull(msc.getDatabase(DBNAME1));
-    assertEquals(3, msc.getDatabases("*").size());
-    assertEquals(3, msc.getAllDatabases().size());
-    assertEquals(1, msc.getDatabases(DBNAME1).size());
-
-    assertNotNull(msc.getPartition(DBNAME1, TAB2, "name=value1"));
-    assertEquals(1, msc.getPartitionsByNames(DBNAME1, TAB2, Lists.newArrayList("name=value1")).size());
+  public void testHMSServerWithFilter() throws Exception {
+    MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CLIENT_FILTER_ENABLED, false);
+    MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_SERVER_FILTER_ENABLED, true);
+    DBNAME1 = "db_testHMSServerWithFilter_1";
+    DBNAME2 = "db_testHMSServerWithFilter_2";
+    creatEnv(conf);
+
+    testFilterForDb(true);
+    testFilterForTables(true);
+    testFilterForPartition();
   }
 
+  /**
+   * Disable filtering at HMS client
+   * By default, the HMS server side filtering is diabled, so we can see HMS client filtering behavior
+   * @throws Exception
+   */
   @Test
-  public void testDummyFilterForTables() throws Exception {
-    DummyMetaStoreFilterHookImpl.blockResults = true;
-    try {
-      msc.getTable(DBNAME1, TAB1);
-      fail("getTable() should fail with blocking mode");
-    } catch (NoSuchObjectException e) {
-      // Excepted
-    }
-    assertEquals(0, msc.getTables(DBNAME1, "*").size());
-    assertEquals(0, msc.getAllTables(DBNAME1).size());
-    assertEquals(0, msc.getTables(DBNAME1, TAB2).size());
+  public void testHMSClientWithoutFilter() throws Exception {
+    MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CLIENT_FILTER_ENABLED, false);
+    DBNAME1 = "db_testHMSClientWithoutFilter_1";
+    DBNAME2 = "db_testHMSClientWithoutFilter_2";
+    creatEnv(conf);
+
+    assertNotNull(client.getTable(DBNAME1, TAB1));
+    assertEquals(2, client.getTables(DBNAME1, "*").size());
+    assertEquals(2, client.getAllTables(DBNAME1).size());
+    assertEquals(1, client.getTables(DBNAME1, TAB2).size());
+    assertEquals(0, client.getAllTables(DBNAME2).size());
+
+    assertNotNull(client.getDatabase(DBNAME1));
+    assertEquals(2, client.getDatabases("*testHMSClientWithoutFilter*").size());
+    assertEquals(1, client.getDatabases(DBNAME1).size());
+
+    assertNotNull(client.getPartition(DBNAME1, TAB2, "name=value1"));
+    assertEquals(1, client.getPartitionsByNames(DBNAME1, TAB2, Lists.newArrayList("name=value1")).size());
   }
 
+  /**
+   * By default, the HMS Client side filtering is enabled
+   * Disable the HMS server side filtering in order to see HMS client filtering behavior
+   * @throws Exception
+   */
   @Test
-  public void testDummyFilterForDb() throws Exception {
-    DummyMetaStoreFilterHookImpl.blockResults = true;
-    try {
-      assertNotNull(msc.getDatabase(DBNAME1));
-      fail("getDatabase() should fail with blocking mode");
-    } catch (NoSuchObjectException e) {
+  public void testHMSClientWithFilter() throws Exception {
+    MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_SERVER_FILTER_ENABLED, false);
+    DBNAME1 = "db_testHMSClientWithFilter_1";
+    DBNAME2 = "db_testHMSClientWithFilter_2";
+    creatEnv(conf);
+
+    testFilterForDb(false);
+    testFilterForTables(false);
+    testFilterForPartition();
+  }
+
+  protected void testFilterForDb(boolean filterAtServer) throws Exception {
+
+    // Skip this call when testing filter hook at HMS server because HMS server calls authorization
+    // API for getDatabase(), and does not call filter hook
+    if (!filterAtServer) {
+      try {
+        assertNotNull(client.getDatabase(DBNAME1));
+        fail("getDatabase() should fail with blocking mode");
+      } catch (NoSuchObjectException e) {
         // Excepted
+      }
     }
-    assertEquals(0, msc.getDatabases("*").size());
-    assertEquals(0, msc.getAllDatabases().size());
-    assertEquals(0, msc.getDatabases(DBNAME1).size());
+
+    assertEquals(0, client.getDatabases("*").size());
+    assertEquals(0, client.getAllDatabases().size());
+    assertEquals(0, client.getDatabases(DBNAME1).size());
   }
 
-  @Test
-  public void testDummyFilterForPartition() throws Exception {
-    DummyMetaStoreFilterHookImpl.blockResults = true;
+  protected void testFilterForTables(boolean filterAtServer) throws Exception {
+
+    // Skip this call when testing filter hook at HMS server because HMS server calls authorization
+    // API for getTable(), and does not call filter hook
+    if (!filterAtServer) {
+      try {
+        client.getTable(DBNAME1, TAB1);
+        fail("getTable() should fail with blocking mode");
+      } catch (NoSuchObjectException e) {
+        // Excepted
+      }
+    }
+
+    assertEquals(0, client.getTables(DBNAME1, "*").size());
+    assertEquals(0, client.getAllTables(DBNAME1).size());
+    assertEquals(0, client.getTables(DBNAME1, TAB2).size());
+  }
+
+  protected void testFilterForPartition() throws Exception {
     try {
-      assertNotNull(msc.getPartition(DBNAME1, TAB2, "name=value1"));
+      assertNotNull(client.getPartition(DBNAME1, TAB2, "name=value1"));
       fail("getPartition() should fail with blocking mode");
     } catch (NoSuchObjectException e) {
       // Excepted
     }
-    assertEquals(0, msc.getPartitionsByNames(DBNAME1, TAB2,
-        Lists.newArrayList("name=value1")).size());
-  }
 
+    try {
+      client.getPartitionsByNames(DBNAME1, TAB2,
+          Lists.newArrayList("name=value1")).size();
+    } catch (NoSuchObjectException e) {
+      // Excepted
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/dfd63d97/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestListPartitions.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestListPartitions.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestListPartitions.java
index a338bd4..34ceb34 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestListPartitions.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestListPartitions.java
@@ -1287,7 +1287,7 @@ public class TestListPartitions extends MetaStoreClientTest {
     }
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = NoSuchObjectException.class)
   public void testListPartitionValuesNoDbName() throws Exception {
     createTable4PartColsParts(client);
     List<FieldSchema> partitionSchema = Lists.newArrayList(
@@ -1299,7 +1299,7 @@ public class TestListPartitions extends MetaStoreClientTest {
     client.listPartitionValues(request);
   }
 
-  @Test(expected = MetaException.class)
+  @Test(expected = NoSuchObjectException.class)
   public void testListPartitionValuesNoTblName() throws Exception {
     createTable4PartColsParts(client);
     List<FieldSchema> partitionSchema = Lists.newArrayList(