You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2021/04/08 12:01:56 UTC

[impala] 02/02: IMPALA-10613: Standup HMS thrift server in Catalog

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a7eae471b84f05816780093938bba50f4d78aef1
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed Apr 29 16:59:23 2020 -0700

    IMPALA-10613: Standup HMS thrift server in Catalog
    
    This change adds the basic infrastructure to start the HMS server in
    Catalog. It introduces a new configuration (--start_hms_server) along with a
    config for the port and starts a HMS thrift server in the CatalogServiceCatalog
    instance. Currently, all the HMS APIs are "pass-through" to the backing HMS
    service. Except for the following 3 HMS APIs which can be used to request
    a table and its partitions.
    
    Additionally, there is another flag (--enable_catalogd_hms_cache) which
    can be used to disable the usage of catalogd for providing the table
    and partition metadata. This contribution was done by Kishen Das.
    
    1. get_table_req
    2. get_partitions_by_expr
    3. get_partitions_by_names
    
    In case of get_partitions_by_expr we need the hive-exec jar to be
    present in the classpath since it needs to load the PartitionExpressionProxy
    to push down the partition predicates to the HMS database. In case of
    get_table_req if column statistics are requested, we return the
    table level statistics.
    
    Additionally, this patch adds a new configuration
    fallback_to_hms_on_errors for the catalog which is used to determine
    if the Catalog falls back to HMS service in case of errors while
    executing the API. This is useful for testing purposes.
    
    In order to expose the file-metadata for the tables and partitions,
    HMS API changes were made to add the filemetadata fields to table
    and partitions. In case of transactional tables, the file-metadata
    which is returned is consistent with the provided ValidWriteIdList
    in the API call.
    
    There are a few TODOs which will be done in follow up tasks:
    1. Add support for SASL support.
    2. Pin the hive_metastore.thrift in the code so that any changes to HMS APIs
    in the hive branch doesn't break Catalog's HMS service.
    
    Testing:
    1. Added a new end-to-end test which starts the HMS service in Catalog and runs
    some basic HMS APIs against it.
    2. Ran a modification of TestRemoteHiveMetastore in the Hive code base and
    confirmed most tests are working. There were some test failures but they are
    unrelated since the test assumes an empty warehouse whereas we run against the
    actual HMS service running in the mini-cluster.
    
    Change-Id: I1b306f91d63cb5137c178e8e72b6e8b578a907b5
    Reviewed-on: http://gerrit.cloudera.org:8080/17244
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
    Tested-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |   10 +
 be/src/common/global-flags.cc                      |    5 +
 be/src/util/backend-gflag-util.cc                  |    8 +
 common/thrift/BackendGflags.thrift                 |    8 +
 common/thrift/CatalogService.thrift                |    5 +
 .../org/apache/impala/compat/MetastoreShim.java    |    2 +-
 .../apache/impala/catalog/CatalogHmsAPIHelper.java |  592 +++++
 .../impala/catalog/CatalogServiceCatalog.java      |   44 +-
 .../GetPartialCatalogObjectRequestBuilder.java     |  147 ++
 .../java/org/apache/impala/catalog/HdfsTable.java  |    6 +-
 .../impala/catalog/ParallelFileMetadataLoader.java |   19 +-
 .../main/java/org/apache/impala/catalog/Table.java |   11 +-
 .../catalog/metastore/CatalogHmsClientUtils.java   |  159 ++
 .../catalog/metastore/CatalogMetastoreServer.java  |  289 +++
 .../metastore/CatalogMetastoreServiceHandler.java  |  152 ++
 .../catalog/metastore/ICatalogMetastoreServer.java |   47 +
 .../catalog/metastore/MetastoreServiceHandler.java | 2736 ++++++++++++++++++++
 .../metastore/NoOpCatalogMetastoreServer.java      |   44 +
 .../org/apache/impala/service/BackendConfig.java   |   22 +
 .../metastore/CatalogHmsFileMetadataTest.java      |  165 ++
 .../metastore/EnableCatalogdHmsCacheFlagTest.java  |  126 +
 .../impala/testutil/CatalogServiceTestCatalog.java |   89 +-
 tests/common/impala_test_suite.py                  |   27 +
 tests/custom_cluster/test_metastore_service.py     |  707 +++++
 24 files changed, 5398 insertions(+), 22 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index d354deb..f7e7b35 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -98,6 +98,16 @@ DEFINE_int32(max_wait_time_for_sync_ddl_s, 0, "Maximum time (in seconds) until "
      "coordinators might have applied the changes caused due to the ddl.");
 
 DECLARE_string(debug_actions);
+DEFINE_bool(start_hms_server, false, "When set to true catalog server starts a HMS "
+    "server at a port specified by hms_port flag");
+
+DEFINE_int32(hms_port, 5899, "If start_hms_server is set to true, this "
+    "configuration specifies the port number at which it is started.");
+
+DEFINE_bool(fallback_to_hms_on_errors, true, "This configuration is only used if "
+    "start_hms_server is true. This is used to determine if the Catalog should fallback "
+    "to the backing HMS service if there are errors while processing the HMS request");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 795f2e4..5a4ca3c 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -346,6 +346,11 @@ DEFINE_bool(enable_incremental_metadata_updates, true,
     "propagated as a whole object in the statestore topic updates. Note that legacy "
     "coordinators can apply incremental or full table updates so don't need this flag.");
 
+DEFINE_bool(enable_catalogd_hms_cache, true,
+    "If true, response for the HMS APIs that are implemented in catalogd will be served "
+    "from catalogd. If this flag is false or a given API is not implemented in catalogd,"
+    " it will be redirected to HMS.");
+
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 54e4c39..062149f 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -91,6 +91,10 @@ DECLARE_int64(topic_update_tbl_max_wait_time_ms);
 DECLARE_int32(catalog_max_lock_skipped_topic_updates);
 DECLARE_string(scratch_dirs);
 DECLARE_int32(max_wait_time_for_sync_ddl_s);
+DECLARE_bool(start_hms_server);
+DECLARE_int32(hms_port);
+DECLARE_bool(fallback_to_hms_on_errors);
+DECLARE_bool(enable_catalogd_hms_cache);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -279,6 +283,10 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_saml2_ee_test_mode(FLAGS_saml2_ee_test_mode);
   cfg.__set_scratch_dirs(FLAGS_scratch_dirs);
   cfg.__set_max_wait_time_for_sync_ddl_s(FLAGS_max_wait_time_for_sync_ddl_s);
+  cfg.__set_start_hms_server(FLAGS_start_hms_server);
+  cfg.__set_hms_port(FLAGS_hms_port);
+  cfg.__set_fallback_to_hms_on_errors(FLAGS_fallback_to_hms_on_errors);
+  cfg.__set_enable_catalogd_hms_cache(FLAGS_enable_catalogd_hms_cache);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 7d75c69..be05ae1 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -195,4 +195,12 @@ struct TBackendGflags {
   85: required bool enable_row_filtering
 
   86: required i32 max_wait_time_for_sync_ddl_s
+
+  87: required bool start_hms_server
+
+  88: required i32 hms_port
+
+  89: required bool fallback_to_hms_on_errors
+
+  90: required bool enable_catalogd_hms_cache
 }
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index b314369..a743a23 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -370,6 +370,11 @@ struct TTableInfoSelector {
   // with the HMS table which it has and triggers a reload in case it doesn't match.
   // this field is only used when valid_write_ids is set, otherwise it is ignored
   10: optional i64 table_id = -1
+
+  // The response should contain the column statistics for all columns. If this is set
+  // to true, the list provided in want_stats_for_column_names will be ignored and
+  // stats for all columns will be returned.
+  11: optional bool want_stats_for_all_columns
 }
 
 // Returned information about a particular partition.
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index de7c586..523ca21 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -190,7 +190,7 @@ public class MetastoreShim {
    * Constant variable that stores engine value needed to store / access
    * Impala column statistics.
    */
-  protected static final String IMPALA_ENGINE = "impala";
+  public static final String IMPALA_ENGINE = "impala";
 
   /**
    * Wrapper around MetaStoreUtils.validateName() to deal with added arguments.
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
new file mode 100644
index 0000000..697499b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
@@ -0,0 +1,592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FileMetadata;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.ObjectDictionary;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.analysis.TableName;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.metastore.CatalogMetastoreServiceHandler;
+import org.apache.impala.common.Pair;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.thrift.CatalogLookupStatus;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialPartitionInfo;
+import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Util class which includes helper methods to fetch the information from
+ * Catalog in order to serve the HMS APIs.
+ */
+public class CatalogHmsAPIHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogHmsAPIHelper.class);
+  // we log at info level if the file-metadata load time on the fallback path took more
+  // than 100 msec.
+  //TODO change this to per directory level value based on empirical average value
+  //for various filesystems.
+  private static final long FALLBACK_FILE_MD_TIME_WARN_THRESHOLD_MS = 100;
+
+  //TODO Make this individually configurable
+  private static final ExecutorService fallbackFdLoaderPool = Executors
+      .newFixedThreadPool(20,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("HMS-Filemetadata-loader-%d").build());
+
+  public static final String IMPALA_TNETWORK_ADDRESSES = "impala:TNetworkAddress";
+
+  /**
+   * Helper method to serve a get_table_req() API via the catalog. If isGetColumnStats()
+   * is true in the request, we check that the engine is Impala.
+   *
+   * @param catalog         The catalog instance which caches the table.
+   * @param getTableRequest The GetTableRequest object passed by the caller.
+   * @return GetTableResult for the given request.
+   */
+  public static GetTableResult getTableReq(CatalogServiceCatalog catalog,
+      String defaultCatalogName, GetTableRequest getTableRequest)
+      throws CatalogException, NoSuchObjectException, MetaException {
+    checkCatalogName(getTableRequest.getCatName(), defaultCatalogName);
+    checkCondition(getTableRequest.getDbName() != null,
+        "Database name is null");
+    checkCondition(getTableRequest.getTblName() != null,
+        "Table name is null");
+    String dbName = getTableRequest.getDbName();
+    String tblName = getTableRequest.getTblName();
+    TableName tableName = new TableName(dbName, tblName);
+    GetPartialCatalogObjectRequestBuilder reqBuilder =
+        new GetPartialCatalogObjectRequestBuilder()
+            .db(dbName)
+            .tbl(tblName);
+    // Make sure if getColumnStats() is true, the processor engine is set to Impala.
+    if (getTableRequest.isGetColumnStats()) {
+      checkCondition(getTableRequest.getEngine() != null, "Column stats are requested "
+          + "but engine is not set in the request.");
+      checkCondition(
+          MetastoreShim.IMPALA_ENGINE.equalsIgnoreCase(getTableRequest.getEngine()),
+          "Unsupported engine " + getTableRequest.getEngine()
+              + " while requesting column statistics of the table " + dbName + "."
+              + tblName);
+      reqBuilder.wantStatsForAllColums();
+    }
+    // if the client request has getFileMetadata flag set,
+    // we retrieve file descriptors too
+    if (getTableRequest.isGetFileMetadata()) {
+      reqBuilder.wantFiles();
+    }
+    if (getTableRequest.isSetValidWriteIdList()) {
+      reqBuilder.writeId(getTableRequest.getValidWriteIdList());
+    }
+    //TODO add table id in the request when client passes it. Also, looks like
+    // when HIVE-24662 is resolved, we may not need the table id in this request.
+    TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(
+        catalog, reqBuilder.build(), dbName, tblName, "processing get_table_req");
+    // As of writing this code, we know that the table which is returned in the response
+    // is a copy of table stored in the catalogd. Should this assumption change in the
+    // future, we must make sure that we take a deepCopy() of the table before modifying
+    // it later below.
+    Table retTable = response.table_info.hms_table;
+    checkCondition(
+        !AcidUtils.isTransactionalTable(retTable.getParameters()) || getTableRequest
+            .isSetValidWriteIdList(), "Table " + dbName + "." + tblName
+            + " is transactional but it was requested without providing"
+            + " validWriteIdList");
+    if (getTableRequest.isGetColumnStats()) {
+      List<ColumnStatisticsObj> columnStatisticsObjList =
+          response.table_info.column_stats;
+      checkCondition(columnStatisticsObjList != null,
+          "Catalog returned a null ColumnStatisticsObj for table %s", tableName);
+      // Set the table level column statistics for this table.
+      ColumnStatistics columnStatistics = MetastoreShim.createNewHiveColStats();
+      columnStatistics.setStatsDesc(new ColumnStatisticsDesc(true, dbName, tblName));
+      for (ColumnStatisticsObj colsStatsObj : columnStatisticsObjList) {
+        columnStatistics.addToStatsObj(colsStatsObj);
+      }
+      retTable.setColStats(columnStatistics);
+    }
+    if (getTableRequest.isGetFileMetadata()) {
+      // set the file-metadata in the response
+      checkCondition(response.table_info.partitions != null,
+          "File metadata was not returned by catalog");
+      checkCondition(response.table_info.partitions.size() == 1,
+          "Retrieving file-metadata for partitioned tables must use partition level"
+              + " fetch APIs");
+      FileMetadata fileMetadata = new FileMetadata();
+      for (THdfsFileDesc fd : response.table_info.partitions.get(0).file_descriptors) {
+        fileMetadata.addToData(fd.file_desc_data);
+      }
+      retTable.setFileMetadata(fileMetadata);
+      retTable.setDictionary(getSerializedNetworkAddress(
+          response.table_info.network_addresses));
+    }
+    return new GetTableResult(retTable);
+  }
+
+  /**
+   * Util method to issue the {@link CatalogServiceCatalog#getPartialCatalogObject} and
+   * parse the response to do some sanity checks.
+   */
+  private static TGetPartialCatalogObjectResponse getPartialCatalogObjResponse(
+      CatalogServiceCatalog catalog, TGetPartialCatalogObjectRequest request,
+      String dbName, String tblName, String mesg)
+      throws CatalogException, NoSuchObjectException {
+    TGetPartialCatalogObjectResponse response = catalog
+        .getPartialCatalogObject(request, mesg);
+    checkCondition(response != null, "Catalog returned a null response");
+    if (response.lookup_status == CatalogLookupStatus.DB_NOT_FOUND) {
+      throw new NoSuchObjectException("Database " + dbName + " not found");
+    }
+    if (response.lookup_status == CatalogLookupStatus.TABLE_NOT_FOUND) {
+      throw new NoSuchObjectException("Table " + dbName + "." + tblName + " not found");
+    }
+    checkCondition(response.lookup_status != CatalogLookupStatus.TABLE_NOT_LOADED,
+        "Could not load table %s.%s", dbName, tblName);
+    checkCondition(response.table_info.hms_table != null,
+        "Catalog returned a null table for %s.%s", dbName, tblName);
+    return response;
+  }
+
+  /**
+   * Helper method to get the partitions filtered by a Hive expression.
+   *
+   * @param catalog         The catalog instance which caches the table.
+   * @param defaultCatalog  Currently, custom catalog names are not supported.
+   *                        If the catalog is not null it must be same as defaultCatalog.
+   * @param request         The request object as received from the HMS client.
+   * @param expressionProxy The PartitionExpressionProxy instance which is used to
+   *                        deserialize the filter expression.
+   * @return PartitionsByExprResult for the given request.
+   * @throws CatalogException If there are any internal processing errors (eg. table does
+   *                          not exist in Catalog cache) within the context of Catalog.
+   * @throws MetaException    If the expression cannot be deserialized using the given
+   *                          PartitionExpressionProxy.
+   */
+  public static PartitionsByExprResult getPartitionsByExpr(CatalogServiceCatalog catalog,
+      String defaultCatalog, PartitionsByExprRequest request,
+      PartitionExpressionProxy expressionProxy)
+      throws CatalogException, NoSuchObjectException, MetaException {
+    checkCatalogName(request.getCatName(), defaultCatalog);
+    checkCondition(request.getDbName() != null, "Database name is null");
+    checkCondition(request.getTblName() != null, "Table name is null");
+    String dbName = request.getDbName();
+    String tblName = request.getTblName();
+    TableName tableName = new TableName(dbName, tblName);
+    // TODO we are currently fetching all the partitions metadata here since catalog
+    // loads the entire table. Once we have fine-grained loading, we should just get
+    // the partitionNames here and then get the partition metadata after the pruning
+    // is done.
+    GetPartialCatalogObjectRequestBuilder catalogReq =
+        new GetPartialCatalogObjectRequestBuilder()
+            .db(dbName)
+            .tbl(tblName)
+            .wantPartitions();
+    // set the validWriteIdList if available
+    if (request.isSetValidWriteIdList()) {
+      catalogReq.writeId(request.getValidWriteIdList());
+    }
+    // set the table id if available
+    if (request.isSetId()) {
+      catalogReq.tableId(request.getId());
+    }
+    TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
+        catalogReq.build(), dbName, tblName,
+        "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_EXPR);
+    checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
+        "%s is not a partitioned table", tableName);
+    // create a mapping of the Partition name to the Partition so that we can return the
+    // filtered partitions later.
+    Map<String, TPartialPartitionInfo> partitionNameToPartInfo = new HashMap<>();
+    for (TPartialPartitionInfo partInfo : response.getTable_info().getPartitions()) {
+      partitionNameToPartInfo.put(partInfo.getName(), partInfo);
+    }
+    List<String> filteredPartNames = Lists.newArrayList(partitionNameToPartInfo.keySet());
+    Stopwatch st = Stopwatch.createStarted();
+    boolean hasUnknownPartitions = expressionProxy
+        .filterPartitionsByExpr(response.table_info.hms_table.getPartitionKeys(),
+            request.getExpr(), request.getDefaultPartitionName(), filteredPartNames);
+    LOG.info("{}/{} partitions were selected for table {} after expression evaluation."
+            + " Time taken: {} msec.", filteredPartNames.size(),
+        partitionNameToPartInfo.size(), tableName,
+        st.stop().elapsed(TimeUnit.MILLISECONDS));
+    List<Partition> filteredPartitions = Lists
+        .newArrayListWithCapacity(filteredPartNames.size());
+    // TODO add file-metadata to Partitions. This would requires changes to HMS API
+    // so that request can pass a flag to send back filemetadata
+    for (String partName : filteredPartNames) {
+      // Note that we are not using String.format arguments here since String.format()
+      // throws a java.util.MissingFormatArgumentException for special characters like
+      // '%3A' which could be present in the PartitionName.
+      checkCondition(partitionNameToPartInfo.containsKey(partName),
+          "Could not find partition id for partition name " + partName);
+      TPartialPartitionInfo partInfo = partitionNameToPartInfo.get(partName);
+      checkCondition(partInfo != null,
+          "Catalog did not return the partition " + partName + " for table " + tableName,
+          partName, tableName);
+      filteredPartitions.add(partitionNameToPartInfo.get(partName).getHms_partition());
+    }
+    // confirm if the number of partitions is equal to number of filtered ids
+    checkCondition(filteredPartNames.size() == filteredPartitions.size(),
+        "Unexpected number of partitions received. Expected %s got %s",
+        filteredPartNames.size(), filteredPartitions.size());
+    PartitionsByExprResult result = new PartitionsByExprResult();
+    result.setPartitions(filteredPartitions);
+    result.setHasUnknownPartitions(hasUnknownPartitions);
+    return result;
+  }
+
+  /**
+   * Throws a {@code CatalogException} with the given message string and arguments if the
+   * condition is False.
+   *
+   * @param condition throws CatalogException when the condition is False.
+   * @param msg       msg compatible with {@code String.format()} format.
+   * @param args      args for the {@code String.format()} to used as message in the
+   *                  thrown exception.
+   * @throws CatalogException if condition is False.
+   */
+  private static void checkCondition(boolean condition, String msg, Object... args)
+      throws CatalogException {
+    if (condition) return;
+    throw new CatalogException(String.format(msg, args));
+  }
+
+  /**
+   * Catalog service does not support {@link org.apache.hadoop.hive.metastore.api.Catalog}
+   * currently. This method validates the name of the given catalog with default Catalog
+   * and throws a {@link MetaException} if it doesn't match.
+   */
+  public static void checkCatalogName(String catalogName, String defaultCatalogName)
+      throws MetaException {
+    if (catalogName == null) return;
+    try {
+      checkCondition(defaultCatalogName != null, "Default catalog name is null");
+      checkCondition(defaultCatalogName.equalsIgnoreCase(catalogName),
+          "Catalog service does not support non-default catalogs. Expected %s got %s",
+          defaultCatalogName, catalogName);
+    } catch (CatalogException ex) {
+      LOG.error(ex.getMessage(), ex);
+      throw new MetaException(ex.getMessage());
+    }
+  }
+
+  /**
+   * Helper method to return a list of Partitions filtered by names. Currently partition
+   * level column statistics cannot be requested and this method throws an exception if
+   * the request has get_col_stats parameter set. If the request has {@code
+   * getFileMetadata} set the returned response will include the file-metadata as well.
+   */
+  public static GetPartitionsByNamesResult getPartitionsByNames(
+      CatalogServiceCatalog catalog, Configuration serverConf,
+      GetPartitionsByNamesRequest request)
+      throws CatalogException, NoSuchObjectException, MetaException {
+    // in case of GetPartitionsByNamesRequest there is no explicit catalogName
+    // field and hence the dbName is prepended to the database name.
+    // TODO this needs to be fixed in HMS APIs so that we have explicit catalog field.
+    String catAnddbName = request.getDb_name();
+    String tblName = request.getTbl_name();
+    checkCondition(!Strings.isNullOrEmpty(catAnddbName),
+        "Database name is empty or null");
+    checkCondition(!Strings.isNullOrEmpty(tblName), "Table name is empty or null");
+    String[] parsedCatDbName = MetaStoreUtils
+        .parseDbName(request.getDb_name(), serverConf);
+    checkCondition(parsedCatDbName.length == 2,
+        "Unexpected error during parsing the catalog and database name %s", catAnddbName);
+    checkCatalogName(parsedCatDbName[0], MetaStoreUtils.getDefaultCatalog(serverConf));
+    String dbName = parsedCatDbName[1];
+    //TODO partition granularity column statistics are not supported in catalogd currently
+    checkCondition(!request.isGet_col_stats(),
+        "Partition level column statistics are not supported in catalog");
+    GetPartialCatalogObjectRequestBuilder requestBuilder =
+        new GetPartialCatalogObjectRequestBuilder()
+            .db(dbName)
+            .tbl(tblName)
+            .wantPartitions();
+    // get the file metadata if the request has it set.
+    if (request.isGetFileMetadata()) {
+      requestBuilder.wantFiles();
+    }
+    if (request.isSetValidWriteIdList()) {
+      requestBuilder.writeId(request.getValidWriteIdList());
+    }
+    //TODO add table id in the request when client passes it. Also, looks like
+    // when HIVE-24662 is resolved, we may not need the table id in this request.
+    TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
+        requestBuilder.build(), dbName, tblName,
+        "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_NAMES);
+    checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
+        "%s.%s is not a partitioned table", dbName, tblName);
+    checkCondition(
+        !request.isGetFileMetadata() || response.table_info.network_addresses != null,
+        "Network addresses were not returned for %s.%s", dbName, tblName);
+    checkCondition(
+        !AcidUtils.isTransactionalTable(response.table_info.hms_table.getParameters())
+            || request.isSetValidWriteIdList(), "Table " + dbName + "." + tblName
+            + " is a transactional table but partitions were requested without "
+            + "providing validWriteIdList");
+    // create a mapping of the Partition name to the Partition so that we can return the
+    // filtered partitions later.
+    Set<String> requestedNames = new HashSet<>(request.getNames());
+    List<Partition> retPartitions = new ArrayList<>();
+    // filter by names
+    for (TPartialPartitionInfo partInfo : response.getTable_info().getPartitions()) {
+      if (requestedNames.contains(partInfo.getName())) {
+        // as of writing this code, we know that the partInfo object has a copy of the
+        // the partition which is stored in the catalogd instead of the reference to the
+        // actual partition object of the catalogd. Hence modifying it below is okay.
+        // Should this assumption change in the future, we must make a deepCopy of the
+        // partition object here to make sure that we don't modify the state of the
+        // partition in the catalogd.
+        Partition part = partInfo.getHms_partition();
+        if (request.isGetFileMetadata()) {
+          FileMetadata fileMetadata = new FileMetadata();
+          checkCondition(partInfo.file_descriptors != null,
+              "Catalog did not return file descriptors for partition %s of table %s.%s",
+              partInfo.getName(), dbName, tblName);
+          for (THdfsFileDesc fd : partInfo.file_descriptors) {
+            fileMetadata.addToData(fd.file_desc_data);
+          }
+          part.setFileMetadata(fileMetadata);
+        }
+        retPartitions.add(part);
+      }
+    }
+    GetPartitionsByNamesResult result = new GetPartitionsByNamesResult(retPartitions);
+    if (request.isSetGetFileMetadata()) {
+      result.setDictionary(getSerializedNetworkAddress(
+          response.table_info.network_addresses));
+    }
+    return result;
+  }
+
+  /**
+   * Util method to serialize a given list of {@link TNetworkAddress} into a {@link
+   * ObjectDictionary}.
+   */
+  public static ObjectDictionary getSerializedNetworkAddress(
+      List<TNetworkAddress> networkAddresses) throws CatalogException {
+    checkCondition(networkAddresses != null, "Network addresses is null");
+    // we assume that this method is only called when we want to return the file-metadata
+    // in the response in which case there should always a valid ObjectDictionary to be
+    // returned.
+    ObjectDictionary result = new ObjectDictionary(Maps.newHashMap());
+    if (networkAddresses.isEmpty()) return result;
+    List<ByteBuffer> serializedAddresses = new ArrayList<>();
+    TSerializer serializer =
+        new TSerializer(new TCompactProtocol.Factory());
+    for (TNetworkAddress networkAddress : networkAddresses) {
+      byte[] serializedNetAddress;
+      try {
+        serializedNetAddress = serializer.serialize(networkAddress);
+      } catch (TException tException) {
+        throw new CatalogException(
+            "Could not serialize network address " + networkAddress.hostname + ":"
+                + networkAddress.port, tException);
+      }
+      serializedAddresses.add(ByteBuffer.wrap(serializedNetAddress));
+    }
+    result.putToValues(IMPALA_TNETWORK_ADDRESSES, serializedAddresses);
+    return result;
+  }
+
+  /**
+   * This method computes the file-metadata directly from filesystem and sets it in the
+   * Table object of the provided GetTableResult. The file-metadata is loaded using a
+   * common static thread pool and is consistent with the given ValidTxnList and
+   * ValidWriteIdList.
+   *
+   * @throws MetaException in case there were errors while loading file-metadata.
+   */
+  public static void loadAndSetFileMetadataFromFs(@Nullable ValidTxnList validTxnList,
+      @Nullable ValidWriteIdList writeIdList, GetTableResult result)
+      throws MetaException {
+    try {
+      Stopwatch sw = Stopwatch.createStarted();
+      Table tbl = result.getTable();
+      checkCondition(tbl != null, "Table is null");
+      checkCondition(tbl.getSd() != null && tbl.getSd().getLocation() != null,
+          "Cannot get the location of table %s.%s", tbl.getDbName(), tbl.getTableName());
+      Path tblPath = new Path(tbl.getSd().getLocation());
+      // since this table doesn't exist in catalogd we compute the network addresses
+      // for the files which are being returned.
+      ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+      FileMetadataLoader fmLoader = new FileMetadataLoader(tblPath, true,
+          Collections.EMPTY_LIST, hostIndex, validTxnList, writeIdList);
+      boolean success = getFileMetadata(Arrays.asList(fmLoader));
+      checkCondition(success,
+          "Could not load file-metadata for table %s.%s. See catalogd log for details",
+          tbl.getDbName(), tbl.getTableName());
+      FileMetadata fileMetadata = new FileMetadata();
+      for (FileDescriptor fd : fmLoader.getLoadedFds()) {
+        fileMetadata.addToData(fd.toThrift().file_desc_data);
+      }
+      tbl.setFileMetadata(fileMetadata);
+      tbl.setDictionary(getSerializedNetworkAddress(hostIndex.getList()));
+      long timeTaken = sw.stop().elapsed(TimeUnit.MILLISECONDS);
+      if (timeTaken > FALLBACK_FILE_MD_TIME_WARN_THRESHOLD_MS) {
+        LOG.warn("Loading the filemetadata for table {}.{} on the fallback path. Time "
+            + "taken: {} msec", tbl.getDbName(), tbl.getTableName(), timeTaken);
+      } else {
+        LOG.debug("Loading the filemetadata for table {}.{} on the fallback path. Time "
+            + "taken: {} msec", tbl.getDbName(), tbl.getTableName(), timeTaken);
+      }
+    } catch (Exception ex) {
+      LOG.error("Unexpected error when loading filemetadata", ex);
+      throw new MetaException(
+          "Could not load filemetadata. Cause " + ex.getMessage());
+    }
+  }
+
+  /**
+   * Sets the file metadata for given partitions from the file-system. In case of
+   * transactional tables it uses the given ValidTxnList and ValidWriteIdList to compute
+   * the snapshot of the file-metadata.
+   */
+  public static void loadAndSetFileMetadataFromFs(@Nullable ValidTxnList txnList,
+      @Nullable ValidWriteIdList writeIdList,
+      GetPartitionsByNamesResult getPartsResult) throws MetaException {
+    if (getPartsResult.getPartitionsSize() == 0) return;
+    final String dbName = getPartsResult.getPartitions().get(0).getDbName();
+    final String tblName = getPartsResult.getPartitions().get(0).getTableName();
+    try {
+      Stopwatch sw = Stopwatch.createStarted();
+      Map<Partition, FileMetadataLoader> fileMdLoaders = new HashMap<>();
+      ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+      for (Partition part : getPartsResult.getPartitions()) {
+        checkCondition(part.getSd() != null && part.getSd().getLocation() != null,
+            "Could not get the location for partition %s of table %s.%s",
+            part.getValues(), part.getDbName(), part.getTableName());
+        Path partPath = new Path(part.getSd().getLocation());
+        fileMdLoaders.put(part,
+            new FileMetadataLoader(partPath, true, Collections.EMPTY_LIST, hostIndex,
+                txnList, writeIdList));
+      }
+      boolean success = getFileMetadata(fileMdLoaders.values());
+      checkCondition(success,
+          "Could not load file-metadata for %s partitions of table %s.%s. See "
+              + "catalogd log for details", getPartsResult.getPartitionsSize(), dbName,
+          tblName);
+      for (Entry<Partition, FileMetadataLoader> entry : fileMdLoaders.entrySet()) {
+        FileMetadata filemetadata = new FileMetadata();
+        for (FileDescriptor fd : entry.getValue().getLoadedFds()) {
+          filemetadata.addToData(fd.toThrift().file_desc_data);
+        }
+        entry.getKey().setFileMetadata(filemetadata);
+      }
+      getPartsResult.setDictionary(getSerializedNetworkAddress(hostIndex.getList()));
+      long timeTaken = sw.stop().elapsed(TimeUnit.MILLISECONDS);
+      if (timeTaken > FALLBACK_FILE_MD_TIME_WARN_THRESHOLD_MS) {
+        LOG.info("Loading the file metadata for {} partitions of table {}.{} on the "
+                + "fallback path. Time taken: {} msec",
+            getPartsResult.getPartitionsSize(), dbName, tblName, timeTaken);
+      } else {
+        LOG.debug("Loading the file metadata for {} partitions of table {}.{} on the "
+                + "fallback path. Time taken: {} msec",
+            getPartsResult.getPartitionsSize(), dbName, tblName, timeTaken);
+      }
+    } catch (CatalogException ex) {
+      LOG.error(
+          "Unexpected error when loading file-metadata for partitions of table {}.{}",
+          dbName, tblName, ex);
+      throw new MetaException("Could not load file metadata. Cause " + ex.getMessage());
+    }
+  }
+
+  /**
+   * Loads the file-metadata in parallel using the common thread pool {@code
+   * fallbackFdLoaderPool}
+   *
+   * @param loaders The FileMetadataLoader objects corresponding for each path.
+   * @return false if there were errors, else returns true.
+   */
+  private static boolean getFileMetadata(Collection<FileMetadataLoader> loaders) {
+    List<Pair<Path, Future<Void>>> futures = new ArrayList<>(loaders.size());
+    for (FileMetadataLoader fmdLoader : loaders) {
+      futures.add(new Pair<>(fmdLoader.getPartDir(), fallbackFdLoaderPool.submit(() -> {
+        fmdLoader.load();
+        return null;
+      })));
+    }
+    int numberOfErrorsToLog = 100;
+    int errors = 0;
+    for (Pair<Path, Future<Void>> pair : futures) {
+      try {
+        pair.second.get();
+      } catch (InterruptedException | ExecutionException e) {
+        errors++;
+        if (errors < numberOfErrorsToLog) {
+          LOG.error("Could not load file-metadata for path {}", pair.first, e);
+        }
+      }
+    }
+    if (errors > 0 && (numberOfErrorsToLog - errors) > 0) {
+      LOG.error("{} loading errors were not logged. Only logged the first {} errors",
+          numberOfErrorsToLog - errors, numberOfErrorsToLog);
+    }
+    return errors == 0;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 3703b43..67bdb26 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -54,6 +54,7 @@ import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.AuthorizationDelta;
 import org.apache.impala.authorization.AuthorizationManager;
 import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.FeFsTable.Utils;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.ExternalEventsProcessor;
@@ -63,6 +64,9 @@ import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.catalog.events.SelfEventContext;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.monitor.CatalogTableMetrics;
+import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
+import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
+import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -273,6 +277,8 @@ public class CatalogServiceCatalog extends Catalog {
   // Manages the event processing from metastore for issuing invalidates on tables
   private ExternalEventsProcessor metastoreEventProcessor_;
 
+  private final ICatalogMetastoreServer catalogMetastoreServer_;
+
   /**
    * See the gflag definition in be/.../catalog-server.cc for details on these modes.
    */
@@ -349,6 +355,8 @@ public class CatalogServiceCatalog extends Catalog {
     Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0);
     // start polling for metastore events
     metastoreEventProcessor_.start();
+    catalogMetastoreServer_ = getCatalogMetastoreServer();
+    catalogMetastoreServer_.start();
   }
 
   /**
@@ -365,6 +373,20 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Returns an instance of CatalogMetastoreServer if start_hms_server configuration is
+   * true. Otherwise, returns a NoOpCatalogMetastoreServer
+   */
+  @VisibleForTesting
+  protected ICatalogMetastoreServer getCatalogMetastoreServer() {
+    if (!BackendConfig.INSTANCE.startHmsServer()) {
+      return NoOpCatalogMetastoreServer.INSTANCE;
+    }
+    int portNumber = BackendConfig.INSTANCE.getHMSPort();
+    Preconditions.checkState(portNumber > 0, "Invalid port number for HMS service.");
+    return new CatalogMetastoreServer(this);
+  }
+
+  /**
    * Check whether the database is in blacklist
    */
   public boolean isBlacklistedDb(String dbName) {
@@ -3325,6 +3347,16 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public TGetPartialCatalogObjectResponse getPartialCatalogObject(
       TGetPartialCatalogObjectRequest req) throws CatalogException {
+    return getPartialCatalogObject(req, "needed by coordinator");
+  }
+
+  /**
+   * A wrapper around doGetPartialCatalogObject() that controls the number of concurrent
+   * invocations.
+   */
+  public TGetPartialCatalogObjectResponse getPartialCatalogObject(
+      TGetPartialCatalogObjectRequest req, String reason) throws CatalogException {
+    Preconditions.checkNotNull(reason);
     try {
       if (!partialObjectFetchAccess_.tryAcquire(1,
           PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S, TimeUnit.SECONDS)) {
@@ -3348,7 +3380,7 @@ public class CatalogServiceCatalog extends Catalog {
       try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
             "Get Partial Catalog Object - " +
             Catalog.toCatalogObjectKey(req.object_desc))) {
-        return doGetPartialCatalogObject(req);
+        return doGetPartialCatalogObject(req, reason);
       } finally {
         partialObjectFetchAccess_.release();
       }
@@ -3387,7 +3419,8 @@ public class CatalogServiceCatalog extends Catalog {
    * response's lookup_status.
    */
   private TGetPartialCatalogObjectResponse doGetPartialCatalogObject(
-      TGetPartialCatalogObjectRequest req) throws CatalogException {
+      TGetPartialCatalogObjectRequest req, String tableLoadReason)
+      throws CatalogException {
     TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc,
         "missing object_desc");
     switch (objectDesc.type) {
@@ -3423,7 +3456,7 @@ public class CatalogServiceCatalog extends Catalog {
         }
         table = getOrLoadTable(
             objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
-            "needed by coordinator", writeIdList, tableId);
+            tableLoadReason, writeIdList, tableId);
       } catch (DatabaseNotFoundException e) {
         return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
       }
@@ -3511,7 +3544,10 @@ public class CatalogServiceCatalog extends Catalog {
           .map(HdfsPartition.Builder::new)
           .collect(Collectors.toList());
       new ParallelFileMetadataLoader(
-          table, partBuilders, reqWriteIdList, validTxnList, null, logPrefix).load();
+          table.getFileSystem(), partBuilders, reqWriteIdList,
+          validTxnList, Utils.shouldRecursivelyListPartitions(table),
+          table.getHostIndex(), null, logPrefix)
+          .load();
       for (HdfsPartition.Builder builder : partBuilders) {
         // Let's retrieve the original partition instance from builder because this is
         // stored in the keys of 'partToPartialInfoMap'.
diff --git a/fe/src/main/java/org/apache/impala/catalog/GetPartialCatalogObjectRequestBuilder.java b/fe/src/main/java/org/apache/impala/catalog/GetPartialCatalogObjectRequestBuilder.java
new file mode 100644
index 0000000..c012eed
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/GetPartialCatalogObjectRequestBuilder.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableInfoSelector;
+
+/**
+ * Simple Request builder class. Assumes all the metadata at higher granularity is
+ * required if a specific level is requested. For example, if files are requested,
+ * assumes that partitions names and partitions are also requested.
+ */
+public class GetPartialCatalogObjectRequestBuilder {
+  private boolean wantFileMetadata_;
+  private boolean wantPartitionMeta_;
+  private boolean wantPartitionNames_;
+  private String tblName_, dbName_;
+  private boolean wantHmsTable_;
+  private boolean wantStatsForAllColumns_;
+  private long tableId = CatalogServiceCatalog.TABLE_ID_UNAVAILABLE;
+  private ValidWriteIdList writeIdList_;
+
+  /**
+   * Sets the database name for the request object.
+   */
+  public GetPartialCatalogObjectRequestBuilder db(String db) {
+    this.dbName_ = db;
+    return this;
+  }
+
+  /**
+   * Sets the table name for the request object.
+   */
+  public GetPartialCatalogObjectRequestBuilder tbl(String tbl) {
+    this.tblName_ = tbl;
+    return this;
+  }
+
+  /**
+   * Sets the request fields required for fetching partition names,
+   * HMS Partition object and the file-metadata of the partitions.
+   */
+  public GetPartialCatalogObjectRequestBuilder wantFiles() {
+    wantFileMetadata_ = true;
+    wantPartitionMeta_ = true;
+    wantPartitionNames_ = true;
+    return this;
+  }
+
+  /**
+   * Sets the request fields required for fetching partition names,
+   * HMS Partition objects. No file-metadata will be fetched.
+   */
+  public GetPartialCatalogObjectRequestBuilder wantPartitions() {
+    wantPartitionNames_ = true;
+    wantPartitionMeta_ = true;
+    return this;
+  }
+
+  /**
+   * Sets the request fields required for fetching partition names. No
+   * partition metadata or file-metadata will be fetched.
+   */
+  public GetPartialCatalogObjectRequestBuilder wantPartitionNames() {
+    wantPartitionNames_ = true;
+    return this;
+  }
+
+  /**
+   * Sets the request fields for fetching column statistics for all columns.
+   */
+  public GetPartialCatalogObjectRequestBuilder wantStatsForAllColums() {
+    wantStatsForAllColumns_ = true;
+    return this;
+  }
+
+  GetPartialCatalogObjectRequestBuilder tableId(long id) {
+    this.tableId = id;
+    return this;
+  }
+
+  GetPartialCatalogObjectRequestBuilder writeId(String writeIdList) {
+    Preconditions.checkNotNull(writeIdList);
+    writeIdList_ = new ValidReaderWriteIdList();
+    writeIdList_.readFromString(writeIdList);
+    return this;
+  }
+
+  GetPartialCatalogObjectRequestBuilder writeId(
+      ValidWriteIdList validWriteIdList) {
+    writeIdList_ = Preconditions.checkNotNull(validWriteIdList);
+    return this;
+  }
+
+  /**
+   * Builds the {@link TGetPartialCatalogObjectRequest} object based on the fields
+   * set in this Builder.
+   */
+  public TGetPartialCatalogObjectRequest build() {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable(dbName_, tblName_);
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.table_id = tableId;
+    if (writeIdList_ != null) {
+      req.table_info_selector.valid_write_ids = MetastoreShim
+          .convertToTValidWriteIdList(writeIdList_);
+    }
+    if (wantPartitionNames_) {
+      req.table_info_selector.want_partition_names = true;
+    }
+    if (wantPartitionMeta_) {
+      req.table_info_selector.want_partition_metadata = true;
+    }
+    if (wantFileMetadata_) {
+      req.table_info_selector.want_partition_files = true;
+    }
+    if (wantStatsForAllColumns_) {
+      req.table_info_selector.want_stats_for_all_columns = true;
+    }
+    return req;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index f0ecdee..0539387 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -742,9 +742,9 @@ public class HdfsTable extends Table implements FeFsTable {
     // we'll throw an exception here and end up bailing out of whatever catalog operation
     // we're in the middle of. This could cause a partial metadata update -- eg we may
     // have refreshed the top-level table properties without refreshing the files.
-    ParallelFileMetadataLoader loader = new ParallelFileMetadataLoader(
-        this, partBuilders, validWriteIds_, validTxnList, debugActions, logPrefix);
-    loader.load();
+    new ParallelFileMetadataLoader(getFileSystem(), partBuilders, validWriteIds_,
+        validTxnList, Utils.shouldRecursivelyListPartitions(this),
+        getHostIndex(), debugActions, logPrefix).load();
 
     // TODO(todd): would be good to log a summary of the loading process:
     // - how many block locations did we reuse/load individually/load via batch
diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 3f8fdcc..098fdc8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -20,6 +20,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -27,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import javax.annotation.Nullable;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -36,6 +38,8 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
 import org.apache.impala.util.ThreadNameAnnotator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,10 +73,11 @@ public class ParallelFileMetadataLoader {
   private final Map<Path, List<HdfsPartition.Builder>> partsByPath_;
   private final FileSystem fs_;
 
-  public ParallelFileMetadataLoader(HdfsTable table,
+  public ParallelFileMetadataLoader(FileSystem fs,
       Collection<HdfsPartition.Builder> partBuilders,
-      ValidWriteIdList writeIdList, ValidTxnList validTxnList, String debugAction,
-      String logPrefix) throws CatalogException {
+      ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive,
+      @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix)
+      throws CatalogException {
     if (writeIdList != null || validTxnList != null) {
       // make sure that both either both writeIdList and validTxnList are set or both
       // of them are not.
@@ -91,8 +96,8 @@ public class ParallelFileMetadataLoader {
     for (Map.Entry<Path, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) {
       List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
       FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
-          Utils.shouldRecursivelyListPartitions(table), oldFds, table.getHostIndex(),
-          validTxnList, writeIdList, e.getValue().get(0).getFileFormat());
+          isRecursive, oldFds, hostIndex, validTxnList, writeIdList,
+          e.getValue().get(0).getFileFormat());
       // If there is a cached partition mapped to this path, we recompute the block
       // locations even if the underlying files have not changed.
       // This is done to keep the cached block metadata up to date.
@@ -103,7 +108,7 @@ public class ParallelFileMetadataLoader {
       loaders_.put(e.getKey(), loader);
     }
     this.logPrefix_ = logPrefix;
-    this.fs_ = table.getFileSystem();
+    this.fs_ = fs;
   }
 
   /**
@@ -153,7 +158,7 @@ public class ParallelFileMetadataLoader {
       List<Pair<FileMetadataLoader, Future<Void>>> futures =
           new ArrayList<>(loaders_.size());
       for (FileMetadataLoader loader : loaders_.values()) {
-        futures.add(new Pair<FileMetadataLoader, Future<Void>>(
+        futures.add(new Pair<>(
             loader, pool.submit(() -> { loader.load(); return null; })));
       }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 910bd49..3d8fa13 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -660,10 +660,13 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
       // is done while we continue to hold the table lock.
       resp.table_info.setHms_table(getMetaStoreTable().deepCopy());
     }
-    if (selector.want_stats_for_column_names != null) {
-      List<ColumnStatisticsObj> statsList = Lists.newArrayListWithCapacity(
-          selector.want_stats_for_column_names.size());
-      for (String colName: selector.want_stats_for_column_names) {
+    if (selector.want_stats_for_column_names != null ||
+        selector.want_stats_for_all_columns) {
+      List<String> colList = selector.want_stats_for_all_columns ? getColumnNames() :
+          selector.want_stats_for_column_names;
+      List<ColumnStatisticsObj> statsList =
+          Lists.newArrayListWithCapacity(colList.size());
+      for (String colName: colList) {
         Column col = getColumn(colName);
         if (col == null) continue;
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
new file mode 100644
index 0000000..08e30ce
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import static org.apache.impala.catalog.CatalogHmsAPIHelper.IMPALA_TNETWORK_ADDRESSES;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.FileMetadata;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.ObjectDictionary;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogHmsAPIHelper;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol.Factory;
+
+public class CatalogHmsClientUtils {
+
+  /**
+   * Util method useful to deserialize the
+   * {@link FileMetadata} in the {@link GetPartitionsByNamesResult} into a list of
+   * {@link FileDescriptor} objects. The returned Map contains a mapping of partitions
+   * to its List of FileDescriptors.
+   * @param getPartitionsResult The getPartitionsByNamesResult as returned by the HMS API
+   * {@code getPartitionsByNames}. If this param does not contain fileMetadata the method
+   *                            throws an exception.
+   * @param hostIndex The HostIndex to be used to clone the FileDescriptors before
+   *                  returning. This is useful to map the FileDescriptors to an existing
+   *                  HostIndex which is available for the table.
+   * @return Map of Partition to its FileDescriptors. Note that list can be empty in
+   * case there are no files in a partition.
+   * @throws CatalogException If there are any deserialization errors.
+   */
+  public static Map<Partition, List<FileDescriptor>> extractFileDescriptors(
+      GetPartitionsByNamesResult getPartitionsResult, ListMap<TNetworkAddress> hostIndex)
+      throws CatalogException {
+    // if there are no partitions in the result, return early
+    if (getPartitionsResult.getPartitionsSize() == 0) return new HashMap<>(0);
+    Preconditions
+        .checkArgument(getPartitionsResult.isSetDictionary(), "Host info is unavailable");
+    List<TNetworkAddress> hostInfo = deserializeNetworkAddresses(
+        getPartitionsResult.getDictionary());
+    Map<Partition, List<FileDescriptor>> fds = Maps
+        .newHashMapWithExpectedSize(getPartitionsResult.getPartitionsSize());
+    for (Partition part : getPartitionsResult.getPartitions()) {
+      Preconditions.checkArgument(part.isSetFileMetadata(),
+          "Filemetadata is not set for partition with values " + part.getValues());
+      FileMetadata fileMetadata = part.getFileMetadata();
+      List<FileDescriptor> partitionFds = Lists
+          .newArrayListWithCapacity(fileMetadata.getDataSize());
+      if (fileMetadata.getData() != null) {
+        for (ByteBuffer data : fileMetadata.getData()) {
+          FileDescriptor fd = FileDescriptor.fromThrift(new THdfsFileDesc(data));
+          fd.cloneWithNewHostIndex(hostInfo, hostIndex);
+          partitionFds.add(fd);
+        }
+      }
+      fds.put(part, partitionFds);
+    }
+    return fds;
+  }
+
+  /**
+   * Util method to extract the FileDescriptors from the Table returned
+   * from HMS. The method expects that the result has FileMetadata set in the table
+   * along with the Host network addresses serialized in the table dictionary.
+   * @param tbl The table returned from HMS API getTableReq
+   * @param hostIndex The hostIndex of the table which is used to clone the returned
+   *                  FileDescriptors.
+   * @return List of FileDescriptors for the table.
+   * @throws CatalogException
+   */
+  public static List<FileDescriptor> extractFileDescriptors(Table tbl,
+      ListMap<TNetworkAddress> hostIndex) throws CatalogException {
+    String fullTblName =
+        tbl.getDbName() + "." + tbl.getTableName();
+    Preconditions.checkArgument(tbl.isSetDictionary(),
+        "Host info is not available in the table " + fullTblName);
+    List<TNetworkAddress> hostInfo = deserializeNetworkAddresses(
+        tbl.getDictionary());
+    Preconditions.checkArgument(tbl.isSetFileMetadata(),
+        "Filemetadata is not set for table " + fullTblName);
+    FileMetadata fileMetadata = tbl.getFileMetadata();
+    // it is possible that there are no files in the table.
+    if (fileMetadata.getData() == null) return Collections.emptyList();
+    List<FileDescriptor> tableFds = Lists
+        .newArrayListWithCapacity(fileMetadata.getDataSize());
+    for (ByteBuffer data : fileMetadata.getData()) {
+      FileDescriptor fd = FileDescriptor.fromThrift(new THdfsFileDesc(data));
+      fd.cloneWithNewHostIndex(hostInfo, hostIndex);
+      tableFds.add(fd);
+    }
+    return tableFds;
+  }
+
+  /**
+   * Util method to deserialize a given {@link ObjectDictionary} object into a list of
+   * {@link TNetworkAddress}. This is used to deserialize the output of
+   * {@link CatalogHmsAPIHelper#getSerializedNetworkAddress(List)}.
+   * @param dictionary
+   * @return list of deserialized TNetworkAddress
+   * @throws CatalogException in case of deserialization errors.
+   */
+  private static List<TNetworkAddress> deserializeNetworkAddresses(
+      ObjectDictionary dictionary) throws CatalogException {
+    if (dictionary == null) return null;
+    if (dictionary.getValuesSize() == 0) return Collections.EMPTY_LIST;
+    if (!dictionary.getValues()
+        .containsKey(IMPALA_TNETWORK_ADDRESSES)) {
+      throw new CatalogException("Key " + IMPALA_TNETWORK_ADDRESSES + " not found");
+    }
+    List<ByteBuffer> serializedNetAddresses = dictionary.getValues()
+        .get(IMPALA_TNETWORK_ADDRESSES);
+    List<TNetworkAddress> networkAddresses = Lists
+        .newArrayListWithCapacity(serializedNetAddresses.size());
+    int index = 0;
+    TDeserializer deserializer = new TDeserializer(new Factory());
+    for (ByteBuffer serializedData : serializedNetAddresses) {
+      TNetworkAddress networkAddress = new TNetworkAddress();
+      try {
+        deserializer.deserialize(networkAddress, serializedData.array());
+        networkAddresses.add(networkAddress);
+        index++;
+      } catch (TException tException) {
+        throw new CatalogException(
+            "Could not deserialize network address at position " + index);
+      }
+    }
+    return networkAddresses;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
new file mode 100644
index 0000000..3414f4d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.common.Metrics;
+import org.apache.impala.service.BackendConfig;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CatalogMetastoreServer implements ThriftHiveMetastore interface. This is useful to
+ * expose HMS APIs via catalog server. Currently, most of the APIs implementations are
+ * "pass-through" to the Metastore server except for the following 3 which are mostly
+ * useful for getting table and partition level metadata during query planning.
+ * 1. get_table_req
+ * 2. get_partitions_by_expr
+ * 3. get_partitions_by_names_req
+ *
+ * This class mostly deals with the thrift server instantiation and its lifecycle
+ * management. The actual implementation of the HMS APIs is done in
+ * {@link CatalogMetastoreServiceHandler} class.
+ */
+public class CatalogMetastoreServer extends ThriftHiveMetastore implements
+    ICatalogMetastoreServer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogMetastoreServer.class);
+
+  // Maximum number of bytes to read from transport for variable length fields
+  // (strings, bytes). Also, used as a maximum number of elements to read for
+  // containers (maps, lists etc) fields.
+  private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
+
+  // Minimum number of thrift server threads (concurrent connections) which serve the
+  // clients. // TODO make this configurable
+  private static final int MIN_SERVER_THREADS = 1;
+  // Maximum number of thrift server threads (concurrent connections) which serve the
+  // clients. // TODO make this configurable. A connection which is beyond this limit
+  // will be blocked until a server thread is closed.
+  private static final int MAX_SERVER_THREADS = 500;
+
+  private static final String ACTIVE_CONNECTIONS_METRIC = "metastore.active.connections";
+  private static final String RPC_DURATION_FORMAT_METRIC = "metastore.rpc.duration.%s";
+
+  // flag to indicate if the server is started or not
+  private final AtomicBoolean started_ = new AtomicBoolean(false);
+
+  // the server is started in a daemon thread so that instantiating this is not
+  // a blocking call.
+  private CompletableFuture<Void> serverHandle_;
+
+  // reference to the catalog Service catalog object
+  private final CatalogServiceCatalog catalog_;
+
+  // Metrics for this Metastore server. Also this instance is passed to the
+  // TServerEventHandler when the server is started so that RPC metrics can be registered.
+  private final Metrics metrics_ = new Metrics();
+
+  public CatalogMetastoreServer(CatalogServiceCatalog catalogServiceCatalog) {
+    Preconditions.checkNotNull(catalogServiceCatalog);
+    catalog_ = catalogServiceCatalog;
+  }
+
+  /**
+   * Simple RpcEventHandler which adds metrics for this Metastore server
+   */
+  private class RpcMetricsEventHandler implements TServerEventHandler {
+
+    @Override
+    public void preServe() {}
+
+    @Override
+    public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
+      metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).inc();
+      return null;
+    }
+
+    @Override
+    public void deleteContext(ServerContext serverContext, TProtocol tProtocol,
+        TProtocol tProtocol1) {
+      metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).dec();
+    }
+
+    @Override
+    public void processContext(ServerContext serverContext, TTransport tTransport,
+        TTransport tTransport1) {
+    }
+  }
+
+  /**
+   * Simple wrapper InvocationHandler which registers the duration metrics for each method
+   * called on the Proxy instance. The method execution is delegated to the the handler
+   * instance in the invoke method. Using such a invocation handler is much simpler than
+   * wrapping all the methods in the {@link CatalogMetastoreServiceHandler}. Additionally,
+   * this class also logs an error with the full trace in case the method invocation
+   * fails.
+   */
+  private class TimingInvocationHandler implements InvocationHandler {
+
+    private final CatalogMetastoreServiceHandler handler_;
+
+    TimingInvocationHandler(CatalogMetastoreServiceHandler handler) {
+      Preconditions.checkNotNull(handler);
+      handler_ = handler;
+    }
+
+    /**
+     * This method is called on every HMS API invocation. We invoke the method on the
+     * handler class with the given set of arguments. Additionally, this class is used to
+     * register the duration of such API calls.
+     */
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      Timer.Context context =
+          metrics_.getTimer(String.format(RPC_DURATION_FORMAT_METRIC, method.getName()))
+              .time();
+      try {
+        LOG.debug("Invoking HMS API: {}", method.getName());
+        return method.invoke(handler_, args);
+      } catch (Exception ex) {
+        Throwable unwrapped = unwrap(ex);
+        LOG.error("Received exception while executing " + method.getName() + " : ",
+            unwrapped);
+        throw unwrapped;
+      } finally {
+        context.stop();
+      }
+    }
+
+    /**
+     * The InvocationHandler throws an InvocationTargetException if the underlying method
+     * throws and exception. This method unwraps the underlying cause of such an exception
+     * and returns it if available.
+     */
+    private Throwable unwrap(Exception ex) {
+      if (ex instanceof InvocationTargetException) {
+        return ((InvocationTargetException) ex).getTargetException();
+      }
+      return ex;
+    }
+  }
+
+  @VisibleForTesting
+  protected int getPort() throws CatalogException {
+    return BackendConfig.INSTANCE.getHMSPort();
+  }
+
+  /**
+   * Starts the thrift server in a background thread and the configured port. Currently,
+   * only support NOSASL mode. TODO Add SASL and ssl support (IMPALA-10638)
+   *
+   * @throws CatalogException
+   */
+  public synchronized void start() throws CatalogException {
+    final int portNumber = getPort();
+    Preconditions.checkState(portNumber > 0);
+    Preconditions.checkState(!started_.get(), "Metastore server is already started");
+    LOG.info("Starting the Metastore server at port number {}", portNumber);
+    CatalogMetastoreServiceHandler handler =
+        new CatalogMetastoreServiceHandler(catalog_, metrics_,
+            BackendConfig.INSTANCE.fallbackToHMSOnErrors());
+    // create a proxy class for the ThriftMetastore.Iface and ICatalogMetastoreServer
+    // so that all the APIs can be invoked via a TimingInvocationHandler
+    ThriftHiveMetastore.Iface proxyCatalogHMSIFace =
+        (ThriftHiveMetastore.Iface) Proxy
+            .newProxyInstance(ThriftHiveMetastore.Iface.class.getClassLoader(),
+                new Class[]{ThriftHiveMetastore.Iface.class,
+                    ICatalogMetastoreServer.class},
+                new TimingInvocationHandler(handler));
+    //TODO Add Sasl support (IMPALA-10638)
+    final TProtocolFactory protocolFactory;
+    final TProtocolFactory inputProtoFactory;
+    //TODO add config for this (IMPALA-10639)
+    boolean useCompactProtocol = false;
+    if (useCompactProtocol) {
+      protocolFactory = new TCompactProtocol.Factory();
+      inputProtoFactory = new TCompactProtocol.Factory(MAX_MESSAGE_SIZE,
+          MAX_MESSAGE_SIZE);
+    } else {
+      protocolFactory = new TBinaryProtocol.Factory();
+      inputProtoFactory = new TBinaryProtocol.Factory(true, true, MAX_MESSAGE_SIZE,
+          MAX_MESSAGE_SIZE);
+    }
+
+    TProcessor processor;
+    try {
+      processor =
+          new ThriftHiveMetastore.Processor<>(proxyCatalogHMSIFace);
+    } catch (Exception e) {
+      throw new CatalogException("Unable to create processor for catalog metastore "
+          + "server", e);
+    }
+
+    //TODO add SSL support
+    boolean useSSL = false;
+    TServerSocket serverSocket;
+    try {
+      serverSocket =
+          new TServerSocketKeepAlive(
+              new TServerSocket(new InetSocketAddress(portNumber)));
+    } catch (TTransportException e) {
+      throw new CatalogException(
+          "Unable to create server socket at port number " + portNumber, e);
+    }
+
+    TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
+        .processor(processor)
+        .transportFactory(new TTransportFactory())
+        .protocolFactory(protocolFactory)
+        .inputProtocolFactory(inputProtoFactory)
+        .minWorkerThreads(MIN_SERVER_THREADS)
+        .maxWorkerThreads(MAX_SERVER_THREADS);
+
+    TServer tServer = new TThreadPoolServer(args);
+    TServerEventHandler rpcMetricsEventHandler = new RpcMetricsEventHandler();
+
+    tServer.setServerEventHandler(rpcMetricsEventHandler);
+    LOG.info("Started the new metaserver on port [" + portNumber
+        + "]...");
+    LOG.info("minWorkerThreads = "
+        + MIN_SERVER_THREADS);
+    LOG.info("maxWorkerThreads = "
+        + MAX_SERVER_THREADS);
+    LOG.info("Enable SSL = " + useSSL);
+    serverHandle_ = CompletableFuture.runAsync(() -> tServer.serve());
+    started_.set(true);
+  }
+
+  /**
+   * Returns the RPC and connection metrics for this metastore server. //TODO hook this
+   * method to the Catalog's debug UI
+   */
+  @Override
+  public String getMetrics() {
+    return metrics_.toString();
+  }
+
+  /**
+   * Stops this CatalogMetastoreServer on a best-effort basis. May interrupt running
+   * threads in the server.
+   * <p>
+   * // TODO currently this method is not used anywhere. We should hook this method to the
+   * shutdown process of catalogd
+   */
+  public void stop() throws CatalogException {
+    serverHandle_.cancel(true);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
new file mode 100644
index 0000000..be00d69
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import org.apache.hadoop.hive.metastore.api.GetFieldsRequest;
+import org.apache.hadoop.hive.metastore.api.GetFieldsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
+import org.apache.hadoop.hive.metastore.api.GetSchemaRequest;
+import org.apache.hadoop.hive.metastore.api.GetSchemaResponse;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.SeedTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTxnIdRequest;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogHmsAPIHelper;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.common.Metrics;
+import org.apache.impala.service.BackendConfig;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the HMS APIs that are served by CatalogD
+ * and is exposed via {@link CatalogMetastoreServer}.
+ * HMS APIs that are redirected to HMS can be found in {@link MetastoreServiceHandler}.
+ *
+ */
+public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CatalogMetastoreServiceHandler.class);
+
+  public CatalogMetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+      boolean fallBackToHMSOnErrors) {
+    super(catalog, metrics, fallBackToHMSOnErrors);
+  }
+
+  @Override
+  public GetTableResult get_table_req(GetTableRequest getTableRequest)
+      throws MetaException, NoSuchObjectException, TException {
+
+    if (!BackendConfig.INSTANCE.enableCatalogdHMSCache()) {
+      return super.get_table_req(getTableRequest);
+    }
+
+    try {
+      LOG.trace("Received get_Table_req for {}. File metadata is {}",
+          getTableRequest.getTblName(), getTableRequest.isGetFileMetadata());
+      return CatalogHmsAPIHelper.getTableReq(catalog_, defaultCatalogName_,
+          getTableRequest);
+    } catch (Exception e) {
+      // we catch the CatalogException and fall-back to HMS
+      throwIfNoFallback(e, "get_table_req");
+    }
+    return super.get_table_req(getTableRequest);
+  }
+
+  /**
+   * This is the main API which is used by Hive to get the partitions. In case of Hive it
+   * pushes the pruning logic to HMS by sending over the expression which is used to
+   * filter the partitions during query compilation. The expression is specific to Hive
+   * and loaded in the runtime based on whether we have hive-exec jar in the classpath
+   * or not. If the hive-exec jar is not present in the classpath, we fall-back to HMS
+   * since Catalog has no way to deserialize the expression sent over by the client.
+   */
+  @Override
+  public PartitionsByExprResult get_partitions_by_expr(
+      PartitionsByExprRequest partitionsByExprRequest) throws TException {
+
+    if (!BackendConfig.INSTANCE.enableCatalogdHMSCache()) {
+      return super.get_partitions_by_expr(partitionsByExprRequest);
+    }
+
+    try {
+      // expressionProxy is null or if there were errors when loading the
+      // PartitionExpressionProxy.
+      if (expressionProxy_ != null) {
+        return CatalogHmsAPIHelper.getPartitionsByExpr(
+            catalog_, defaultCatalogName_, partitionsByExprRequest, expressionProxy_);
+      } else {
+        throw new CatalogException("PartitionExpressionProxy could not be initialized");
+      }
+    } catch (Exception e) {
+      // we catch the CatalogException and fall-back to HMS
+      throwIfNoFallback(e, GET_PARTITION_BY_EXPR);
+    }
+    String tblName =
+        partitionsByExprRequest.getDbName() + "." + partitionsByExprRequest.getTblName();
+    LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_EXPR, tblName));
+    return super.get_partitions_by_expr(partitionsByExprRequest);
+  }
+
+  /**
+   * HMS API to get the partitions filtered by a provided list of names. The request
+   * contains a list of partitions names which the client is interested in. Catalog
+   * returns the partitions only for requested names. Additionally, this API also returns
+   * the file-metadata for the returned partitions if the request has
+   * {@code getFileMetadata} flag set. In case of errors, this API falls back to HMS if
+   * {@code fallBackToHMSOnErrors_} is set.
+   */
+  @Override
+  public GetPartitionsByNamesResult get_partitions_by_names_req(
+      GetPartitionsByNamesRequest getPartitionsByNamesRequest) throws TException {
+
+    if (!BackendConfig.INSTANCE.enableCatalogdHMSCache()) {
+      return super.get_partitions_by_names_req(getPartitionsByNamesRequest);
+    }
+
+    try {
+      return CatalogHmsAPIHelper
+          .getPartitionsByNames(catalog_, serverConf_, getPartitionsByNamesRequest);
+    } catch (Exception ex) {
+      throwIfNoFallback(ex, GET_PARTITION_BY_NAMES);
+    }
+    String tblName =
+        getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
+            .getTbl_name();
+    LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_NAMES, tblName));
+    return super.get_partitions_by_names_req(getPartitionsByNamesRequest);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
new file mode 100644
index 0000000..e5aa7a9
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import org.apache.impala.catalog.CatalogException;
+
+/**
+ * This is the main Interface which a CatalogMetastore service should implement. It
+ * provides lifecycle and monitoring methods which are called from
+ * CatalogServiceCatalog to instantiate a Metastore service.
+ */
+public interface ICatalogMetastoreServer {
+
+  /**
+   * Starts the metastore service
+   * @throws CatalogException
+   */
+  void start() throws CatalogException;
+
+  /**
+   * Stop the metastore service.
+   * @throws CatalogException
+   */
+  void stop() throws CatalogException;
+
+  /**
+   * Returns the metrics for this Catalog Metastore service.
+   * @return Encoded String (eg. Json format) representation of all the metrics for this
+   * metastore service.
+   */
+  String getMetrics();
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
new file mode 100644
index 0000000..9a1b6ba
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -0,0 +1,2736 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import com.facebook.fb303.fb_status;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.PartFilterExprUtil;
+import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.AddCheckConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AddDefaultConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AddForeignKeyRequest;
+import org.apache.hadoop.hive.metastore.api.AddNotNullConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.AddPrimaryKeyRequest;
+import org.apache.hadoop.hive.metastore.api.AddUniqueConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.AlterCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.AlterISchemaRequest;
+import org.apache.hadoop.hive.metastore.api.AlterPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.AlterPartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.AlterTableRequest;
+import org.apache.hadoop.hive.metastore.api.AlterTableResponse;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.CheckConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.CheckConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.CmRecycleRequest;
+import org.apache.hadoop.hive.metastore.api.CmRecycleResponse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
+import org.apache.hadoop.hive.metastore.api.CreateCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DefaultConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.DefaultConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.DropCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.DropConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.ExtendedTableInfo;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FindSchemasByColsResp;
+import org.apache.hadoop.hive.metastore.api.FindSchemasByColsRqst;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysResponse;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.GetCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.GetCatalogResponse;
+import org.apache.hadoop.hive.metastore.api.GetCatalogsResponse;
+import org.apache.hadoop.hive.metastore.api.GetDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.GetFieldsRequest;
+import org.apache.hadoop.hive.metastore.api.GetFieldsResponse;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
+import org.apache.hadoop.hive.metastore.api.GetReplicationMetricsRequest;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
+import org.apache.hadoop.hive.metastore.api.GetRuntimeStatsRequest;
+import org.apache.hadoop.hive.metastore.api.GetSchemaRequest;
+import org.apache.hadoop.hive.metastore.api.GetSchemaResponse;
+import org.apache.hadoop.hive.metastore.api.GetSerdeRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.GetTablesExtRequest;
+import org.apache.hadoop.hive.metastore.api.GetTablesRequest;
+import org.apache.hadoop.hive.metastore.api.GetTablesResult;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
+import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
+import org.apache.hadoop.hive.metastore.api.ISchema;
+import org.apache.hadoop.hive.metastore.api.ISchemaName;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MapSchemaVersionToSerdeRequest;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.NotNullConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.OptionalCompactionInfoStruct;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionValuesRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsResult;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse;
+import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.RenamePartitionRequest;
+import org.apache.hadoop.hive.metastore.api.RenamePartitionResponse;
+import org.apache.hadoop.hive.metastore.api.ReplicationMetricList;
+import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
+import org.apache.hadoop.hive.metastore.api.Role;
+import org.apache.hadoop.hive.metastore.api.RuntimeStat;
+import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.api.ScheduledQuery;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryMaintenanceRequest;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollRequest;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryProgressInfo;
+import org.apache.hadoop.hive.metastore.api.SchemaVersion;
+import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
+import org.apache.hadoop.hive.metastore.api.SeedTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTxnIdRequest;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsResponse;
+import org.apache.hadoop.hive.metastore.api.SetSchemaVersionStateRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
+import org.apache.hadoop.hive.metastore.api.TableStatsResult;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface;
+import org.apache.hadoop.hive.metastore.api.TruncateTableRequest;
+import org.apache.hadoop.hive.metastore.api.TruncateTableResponse;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.Type;
+import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.UniqueConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterPoolRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterPoolResponse;
+import org.apache.hadoop.hive.metastore.api.WMAlterResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMAlterTriggerRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterTriggerResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrDropTriggerToPoolMappingRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrDropTriggerToPoolMappingResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrUpdateMappingRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrUpdateMappingResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreatePoolRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreatePoolResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateTriggerRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateTriggerResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropMappingRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropMappingResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropPoolRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropPoolResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropTriggerRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropTriggerResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetActiveResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetActiveResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetAllResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetAllResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetTriggersForResourePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetTriggersForResourePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.catalog.CatalogHmsAPIHelper;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.common.Metrics;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.util.AcidUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the HMS APIs that are redirected to the HMS server from CatalogD.
+ * APIs that should be served from CatalogD must be overridden in {@link
+ * CatalogMetastoreServer}.
+ * <p>
+ * Implementation Notes: Care should taken to use
+ * {@link IMetaStoreClient#getThriftClient()}
+ * method when forwarding a API call to HMS service since IMetastoreClient itself modifies
+ * the arguments before sending the RPC to the HMS server. This can lead to unexpected
+ * side-effects like (processorCapabilities do not match with the actual client).
+ */
+public abstract class MetastoreServiceHandler implements Iface {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(MetastoreServiceHandler.class);
+  protected static final String NOT_IMPLEMENTED_UNSUPPORTED = "%s method not supported"
+      + " by Catalog metastore service.";
+  protected static final String METAEXCEPTION_MSG_FORMAT =
+      "Unexpected error occurred while"
+          + " executing %s. Cause: %s. See catalog logs for details.";
+  protected static final String HMS_FALLBACK_MSG_FORMAT = "Forwarding the request %s for "
+      + "table %s to the backing HiveMetastore service";
+
+  // constant used for logging error messages
+  public static final String GET_PARTITION_BY_EXPR = "get_partitions_by_expr";
+  public static final String GET_PARTITION_BY_NAMES = "get_partitions_by_names_req";
+  protected final CatalogServiceCatalog catalog_;
+  protected final boolean fallBackToHMSOnErrors_;
+  protected final Metrics metrics_;
+  // TODO handle session configuration
+  protected Configuration serverConf_;
+  protected PartitionExpressionProxy expressionProxy_;
+  protected final String defaultCatalogName_;
+
+  public MetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+      boolean fallBackToHMSOnErrors) {
+    catalog_ = Preconditions.checkNotNull(catalog);
+    metrics_ = Preconditions.checkNotNull(metrics);
+    fallBackToHMSOnErrors_ = fallBackToHMSOnErrors;
+    LOG.info("Fallback to hive metastore service on errors is {}",
+        fallBackToHMSOnErrors_);
+    // load the metastore configuration from the classpath
+    serverConf_ = Preconditions.checkNotNull(MetastoreConf.newMetastoreConf());
+    String className = MetastoreConf
+        .get(serverConf_, ConfVars.EXPRESSION_PROXY_CLASS.getVarname());
+    try {
+      Preconditions.checkNotNull(className);
+      LOG.info("Instantiating {}", className);
+      expressionProxy_ = PartFilterExprUtil.createExpressionProxy(serverConf_);
+      if (expressionProxy_ instanceof DefaultPartitionExpressionProxy) {
+        LOG.error("PartFilterExprUtil.createExpressionProxy returned"
+            + " DefaultPartitionExpressionProxy. Check if hive-exec"
+            + " jar is available in the classpath.");
+        expressionProxy_ = null;
+      }
+    } catch (Exception ex) {
+      LOG.error("Could not instantiate {}", className, ex);
+    }
+    defaultCatalogName_ =
+        MetaStoreUtils.getDefaultCatalog(serverConf_);
+  }
+
+  @Override
+  public String get_hms_api_version() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_hms_api_version();
+    }
+  }
+
+  @Override
+  public String getMetaConf(String configKey) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().getMetaConf(configKey);
+    }
+  }
+
+  @Override
+  public void setMetaConf(String configKey, String configValue)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().setMetaConf(configKey, configValue);
+    }
+  }
+
+  @Override
+  public void create_catalog(CreateCatalogRequest createCatalogRequest)
+      throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_catalog(createCatalogRequest);
+    }
+  }
+
+  @Override
+  public void alter_catalog(AlterCatalogRequest alterCatalogRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_catalog(alterCatalogRequest);
+    }
+  }
+
+  @Override
+  public GetCatalogResponse get_catalog(GetCatalogRequest getCatalogRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_catalog(getCatalogRequest);
+    }
+  }
+
+  @Override
+  public GetCatalogsResponse get_catalogs() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_catalogs();
+    }
+  }
+
+  @Override
+  public void drop_catalog(DropCatalogRequest dropCatalogRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_catalog(dropCatalogRequest);
+    }
+  }
+
+  @Override
+  public void create_database(Database database)
+      throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_database(database);
+    }
+  }
+
+  @Override
+  public Database get_database(String databaseName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_database(databaseName);
+    }
+  }
+
+  @Override
+  public Database get_database_req(GetDatabaseRequest getDatabaseRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_database_req(getDatabaseRequest);
+    }
+  }
+
+  @Override
+  public void drop_database(String databaseName, boolean deleteData,
+      boolean ignoreUnknownDb)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .drop_database(databaseName, deleteData, ignoreUnknownDb);
+    }
+  }
+
+  @Override
+  public List<String> get_databases(String pattern) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_databases(pattern);
+    }
+  }
+
+  @Override
+  public List<String> get_all_databases() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_all_databases();
+    }
+  }
+
+  @Override
+  public void alter_database(String dbname, Database database)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_database(dbname, database);
+    }
+  }
+
+  @Override
+  public Type get_type(String name) throws MetaException, NoSuchObjectException,
+      TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_type(name);
+    }
+  }
+
+  @Override
+  public boolean create_type(Type type)
+      throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().create_type(type);
+    }
+  }
+
+  @Override
+  public boolean drop_type(String type)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().drop_type(type);
+    }
+  }
+
+  @Override
+  public Map<String, Type> get_type_all(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_type_all(s);
+    }
+  }
+
+  @Override
+  public List<FieldSchema> get_fields(String dbname, String tblname)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_fields(dbname, tblname);
+    }
+  }
+
+  @Override
+  public List<FieldSchema> get_fields_with_environment_context(String dbName,
+      String tblName, EnvironmentContext environmentContext)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_fields_with_environment_context(dbName, tblName, environmentContext);
+    }
+  }
+
+  @Override
+  public List<FieldSchema> get_schema(String dbname, String tblname)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_schema(dbname, tblname);
+    }
+  }
+
+  @Override
+  public List<FieldSchema> get_schema_with_environment_context(String dbname,
+      String tblname, EnvironmentContext environmentContext)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schema_with_environment_context(dbname, tblname, environmentContext);
+    }
+  }
+
+  @Override
+  public void create_table(Table table)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_table(table);
+    }
+  }
+
+  @Override
+  public void create_table_with_environment_context(Table table,
+      EnvironmentContext environmentContext)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .create_table_with_environment_context(table, environmentContext);
+    }
+  }
+
+  @Override
+  public void create_table_with_constraints(Table table,
+      List<SQLPrimaryKey> sqlPrimaryKeys,
+      List<SQLForeignKey> sqlForeignKeys, List<SQLUniqueConstraint> sqlUniqueConstraints,
+      List<SQLNotNullConstraint> sqlNotNullConstraints,
+      List<SQLDefaultConstraint> sqlDefaultConstraints,
+      List<SQLCheckConstraint> sqlCheckConstraints)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_table_with_constraints(table,
+          sqlPrimaryKeys, sqlForeignKeys, sqlUniqueConstraints, sqlNotNullConstraints,
+          sqlDefaultConstraints, sqlCheckConstraints);
+    }
+  }
+
+  @Override
+  public void create_table_req(CreateTableRequest createTableRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_table_req(createTableRequest);
+    }
+  }
+
+  @Override
+  public void drop_constraint(DropConstraintRequest dropConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_constraint(dropConstraintRequest);
+    }
+  }
+
+  @Override
+  public void add_primary_key(AddPrimaryKeyRequest addPrimaryKeyRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_primary_key(addPrimaryKeyRequest);
+    }
+  }
+
+  @Override
+  public void add_foreign_key(AddForeignKeyRequest addForeignKeyRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_foreign_key(addForeignKeyRequest);
+    }
+  }
+
+  @Override
+  public void add_unique_constraint(AddUniqueConstraintRequest addUniqueConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_unique_constraint(addUniqueConstraintRequest);
+    }
+  }
+
+  @Override
+  public void add_not_null_constraint(
+      AddNotNullConstraintRequest addNotNullConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_not_null_constraint(addNotNullConstraintRequest);
+    }
+  }
+
+  @Override
+  public void add_default_constraint(
+      AddDefaultConstraintRequest addDefaultConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_default_constraint(addDefaultConstraintRequest);
+    }
+  }
+
+  @Override
+  public void add_check_constraint(AddCheckConstraintRequest addCheckConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_check_constraint(addCheckConstraintRequest);
+    }
+  }
+
+  @Override
+  public void drop_table(String dbname, String tblname, boolean deleteData)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_table(dbname, tblname, deleteData);
+    }
+  }
+
+  @Override
+  public void drop_table_with_environment_context(String dbname, String tblname,
+      boolean deleteData,
+      EnvironmentContext environmentContext)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .drop_table_with_environment_context(dbname, tblname, deleteData,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public void truncate_table(String dbName, String tblName, List<String> partNames)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().truncate_table(dbName, tblName, partNames);
+    }
+  }
+
+  @Override
+  public TruncateTableResponse truncate_table_req(
+      TruncateTableRequest truncateTableRequest) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .truncate_table_req(truncateTableRequest);
+    }
+  }
+
+  @Override
+  public List<String> get_tables(String dbname, String tblName)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_tables(dbname, tblName);
+    }
+  }
+
+  @Override
+  public List<String> get_tables_by_type(String dbname, String tablePattern,
+      String tableType)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_tables_by_type(dbname,
+          tablePattern, tableType);
+    }
+  }
+
+  @Override
+  public List<Table> get_all_materialized_view_objects_for_rewriting()
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_all_materialized_view_objects_for_rewriting();
+    }
+  }
+
+  @Override
+  public List<String> get_materialized_views_for_rewriting(String dbName)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_materialized_views_for_rewriting(dbName);
+    }
+  }
+
+  @Override
+  public List<TableMeta> get_table_meta(String dbnamePattern, String tblNamePattern,
+      List<String> tableTypes)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table_meta(dbnamePattern,
+          tblNamePattern, tableTypes);
+    }
+  }
+
+  @Override
+  public List<String> get_all_tables(String dbname) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_all_tables(dbname);
+    }
+  }
+
+  @Override
+  public Table get_table(String dbname, String tblname)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table(dbname, tblname);
+    }
+  }
+
+  @Override
+  public List<Table> get_table_objects_by_name(String dbname, List<String> list)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table_objects_by_name(dbname,
+          list);
+    }
+  }
+
+  @Override
+  public List<ExtendedTableInfo> get_tables_ext(GetTablesExtRequest getTablesExtRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_tables_ext(getTablesExtRequest);
+    }
+  }
+
+  /**
+   * This method gets the table from the HMS directly. Additionally, if the request has
+   * {@code getFileMetadata} set it computes the filemetadata and returns it in the
+   * response. For transactional tables, it uses the ValidWriteIdList from the request and
+   * gets the current ValidTxnList to get the requested snapshot of the file-metadata for
+   * the table.
+   */
+  @Override
+  public GetTableResult get_table_req(GetTableRequest getTableRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    String tblName = getTableRequest.getDbName() + "." + getTableRequest.getTblName();
+    LOG.debug(String.format(HMS_FALLBACK_MSG_FORMAT, "get_table_req", tblName));
+    GetTableResult result;
+    ValidTxnList txnList = null;
+    ValidWriteIdList writeIdList = null;
+    String requestWriteIdList = getTableRequest.getValidWriteIdList();
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      result = client.getHiveClient().getThriftClient()
+          .get_table_req(getTableRequest);
+      Table tbl = result.getTable();
+      // return early if file-metadata is not requested
+      if (!getTableRequest.isGetFileMetadata()) {
+        LOG.trace("File metadata is not requested. Returning table {}",
+            tbl.getTableName());
+        return result;
+      }
+      // we need to get the current ValidTxnIdList to avoid returning
+      // file-metadata for in-progress compactions. If the request does not
+      // include ValidWriteIdList or if the table is not transactional we compute
+      // the file-metadata as seen on the file-system.
+      boolean isTransactional = tbl.getParameters() != null && AcidUtils
+          .isTransactionalTable(tbl.getParameters());
+      if (isTransactional && requestWriteIdList != null) {
+        txnList = MetastoreShim.getValidTxns(client.getHiveClient());
+        writeIdList = MetastoreShim
+            .getValidWriteIdListFromString(requestWriteIdList);
+      }
+    }
+    CatalogHmsAPIHelper.loadAndSetFileMetadataFromFs(txnList, writeIdList, result);
+    return result;
+  }
+
+  @Override
+  public GetTablesResult get_table_objects_by_name_req(GetTablesRequest getTablesRequest)
+      throws MetaException, InvalidOperationException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_table_objects_by_name_req(getTablesRequest);
+    }
+  }
+
+  @Override
+  public Materialization get_materialization_invalidation_info(
+      CreationMetadata creationMetadata, String validTxnList)
+      throws MetaException, InvalidOperationException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_materialization_invalidation_info(creationMetadata, validTxnList);
+    }
+  }
+
+  @Override
+  public void update_creation_metadata(String catName, String dbName, String tblName,
+      CreationMetadata creationMetadata)
+      throws MetaException, InvalidOperationException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().update_creation_metadata(catName,
+          dbName, tblName, creationMetadata);
+    }
+  }
+
+  @Override
+  public List<String> get_table_names_by_filter(String dbname, String tblname,
+      short maxParts)
+      throws MetaException, InvalidOperationException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table_names_by_filter(dbname,
+          tblname, maxParts);
+    }
+  }
+
+  @Override
+  public void alter_table(String dbname, String tblName, Table newTable)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_table(dbname, tblName, newTable);
+    }
+  }
+
+  @Override
+  public void alter_table_with_environment_context(String dbname, String tblName,
+      Table table,
+      EnvironmentContext environmentContext)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_table_with_environment_context(dbname,
+              tblName, table, environmentContext);
+    }
+  }
+
+  @Override
+  public void alter_table_with_cascade(String dbname, String tblName, Table table,
+      boolean cascade)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_table_with_cascade(dbname, tblName,
+          table, cascade);
+    }
+  }
+
+  @Override
+  public AlterTableResponse alter_table_req(AlterTableRequest alterTableRequest)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().alter_table_req(alterTableRequest);
+    }
+  }
+
+  @Override
+  public Partition add_partition(Partition partition)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().add_partition(partition);
+    }
+  }
+
+  @Override
+  public Partition add_partition_with_environment_context(Partition partition,
+      EnvironmentContext environmentContext)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .add_partition_with_environment_context(partition, environmentContext);
+    }
+  }
+
+  @Override
+  public int add_partitions(List<Partition> partitionList)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().add_partitions(partitionList);
+    }
+  }
+
+  @Override
+  public int add_partitions_pspec(List<PartitionSpec> list)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().add_partitions_pspec(list);
+    }
+  }
+
+  @Override
+  public Partition append_partition(String dbname, String tblName, List<String> partVals)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .append_partition(dbname, tblName, partVals);
+    }
+  }
+
+  @Override
+  public AddPartitionsResult add_partitions_req(AddPartitionsRequest addPartitionsRequest)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .add_partitions_req(addPartitionsRequest);
+    }
+  }
+
+  @Override
+  public Partition append_partition_with_environment_context(String dbname,
+      String tblname,
+      List<String> partVals, EnvironmentContext environmentContext)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .append_partition_with_environment_context(dbname, tblname, partVals,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public Partition append_partition_by_name(String dbname, String tblname,
+      String partName)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .append_partition_by_name(dbname, tblname, partName);
+    }
+  }
+
+  @Override
+  public Partition append_partition_by_name_with_environment_context(String dbname,
+      String tblname, String partName, EnvironmentContext environmentContext)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .append_partition_by_name_with_environment_context(dbname, tblname, partName,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public boolean drop_partition(String dbname, String tblanme, List<String> partVals,
+      boolean deleteData)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_partition(dbname, tblanme, partVals, deleteData);
+    }
+  }
+
+  @Override
+  public boolean drop_partition_with_environment_context(String dbname, String tblname,
+      List<String> partNames, boolean deleteData, EnvironmentContext environmentContext)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_partition_with_environment_context(dbname, tblname, partNames, deleteData,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public boolean drop_partition_by_name(String dbname, String tblname, String partName,
+      boolean deleteData)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().drop_partition_by_name(dbname,
+          tblname, partName, deleteData);
+    }
+  }
+
+  @Override
+  public boolean drop_partition_by_name_with_environment_context(String dbName,
+      String tableName,
+      String partName, boolean deleteData, EnvironmentContext envContext)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_partition_by_name_with_environment_context(dbName, tableName, partName,
+              deleteData, envContext);
+    }
+  }
+
+  @Override
+  public DropPartitionsResult drop_partitions_req(
+      DropPartitionsRequest dropPartitionsRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_partitions_req(dropPartitionsRequest);
+    }
+  }
+
+  @Override
+  public Partition get_partition(String dbName, String tblName, List<String> values)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition(dbName, tblName,
+          values);
+    }
+  }
+
+  @Override
+  public Partition exchange_partition(Map<String, String> partitionSpecMap,
+      String sourcedb, String sourceTbl,
+      String destDb, String destTbl)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .exchange_partition(partitionSpecMap, sourcedb, sourceTbl, destDb
+              , destTbl);
+    }
+  }
+
+  @Override
+  public List<Partition> exchange_partitions(Map<String, String> partitionSpecs,
+      String sourceDb, String sourceTable, String destDb,
+      String destinationTableName)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().exchange_partitions(partitionSpecs,
+          sourceDb, sourceTable, destDb, destinationTableName);
+    }
+  }
+
+  @Override
+  public Partition get_partition_with_auth(String dbname, String tblName,
+      List<String> values,
+      String user, List<String> groups)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition_with_auth(dbname,
+          tblName, values, user,
+          groups);
+    }
+  }
+
+  @Override
+  public Partition get_partition_by_name(String dbName, String tblName,
+      String partitionName)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition_by_name(dbName,
+          tblName, partitionName);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions(String dbName, String tblName, short maxLimit)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions(dbName, tblName, maxLimit);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_with_auth(String dbName, String tblName,
+      short maxParts, String username,
+      List<String> groups) throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partitions_with_auth(dbName,
+          tblName, maxParts, username,
+          groups);
+    }
+  }
+
+  @Override
+  public List<PartitionSpec> get_partitions_pspec(String dbName, String tblName,
+      int maxParts)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_pspec(dbName, tblName, maxParts);
+    }
+  }
+
+  @Override
+  public GetPartitionsResponse get_partitions_with_specs(GetPartitionsRequest request)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_with_specs(request);
+    }
+  }
+
+  @Override
+  public List<String> get_partition_names(String dbName, String tblName, short maxParts)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition_names(dbName,
+          tblName, maxParts);
+    }
+  }
+
+  @Override
+  public PartitionValuesResponse get_partition_values(
+      PartitionValuesRequest partitionValuesRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partition_values(partitionValuesRequest);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_ps(String dbName, String tblName,
+      List<String> partValues,
+      short maxParts) throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_ps(dbName, tblName, partValues, maxParts);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_ps_with_auth(String dbName, String tblName,
+      List<String> partVals, short maxParts, String user, List<String> groups)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_ps_with_auth(dbName, tblName
+              , partVals, maxParts, user, groups);
+    }
+  }
+
+  @Override
+  public List<String> get_partition_names_ps(String dbName, String tblName,
+      List<String> partitionNames,
+      short maxParts) throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partition_names_ps(dbName, tblName,
+              partitionNames, maxParts);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_by_filter(String dbName, String tblName,
+      String filter, short maxParts)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_by_filter(dbName, tblName,
+              filter, maxParts);
+    }
+  }
+
+  @Override
+  public List<PartitionSpec> get_part_specs_by_filter(String dbName, String tblName,
+      String filter,
+      int maxParts) throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_part_specs_by_filter(dbName, tblName, filter
+              , maxParts);
+    }
+  }
+
+  @Override
+  public GetFieldsResponse get_fields_req(GetFieldsRequest req)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      List<FieldSchema> fields = client.getHiveClient().getThriftClient()
+          .get_fields_with_environment_context(MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getEnvContext());
+      GetFieldsResponse res = new GetFieldsResponse();
+      res.setFields(fields);
+      return res;
+    }
+  }
+
+  @Override
+  public GetSchemaResponse get_schema_req(GetSchemaRequest req)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      // TODO Remove the usage of old API here once this API is ported to cdpd-master
+      List<FieldSchema> fields = client.getHiveClient().getThriftClient()
+          .get_schema_with_environment_context(MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getEnvContext());
+      GetSchemaResponse res = new GetSchemaResponse();
+      res.setFields(fields);
+      return res;
+    }
+  }
+
+  @Override
+  public GetPartitionResponse get_partition_req(GetPartitionRequest req)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      Partition p =
+          client.getHiveClient().getThriftClient().get_partition(
+              MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getPartVals());
+      GetPartitionResponse res = new GetPartitionResponse();
+      res.setPartition(p);
+      return res;
+    }
+  }
+
+  @Override
+  public PartitionsResponse get_partitions_req(PartitionsRequest req)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      List<Partition> partitions =
+          client.getHiveClient().getThriftClient().get_partitions(MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getMaxParts());
+      PartitionsResponse res = new PartitionsResponse();
+      res.setPartitions(partitions);
+      return res;
+    }
+  }
+
+  @Override
+  public GetPartitionNamesPsResponse get_partition_names_ps_req(
+      GetPartitionNamesPsRequest req)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      List<String> names = client.getHiveClient().getThriftClient()
+          .get_partition_names_ps(MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getPartValues(), req.getMaxParts());
+      GetPartitionNamesPsResponse res = new GetPartitionNamesPsResponse();
+      res.setNames(names);
+      return res;
+    }
+  }
+
+ @Override
+ public GetPartitionsPsWithAuthResponse get_partitions_ps_with_auth_req(
+     GetPartitionsPsWithAuthRequest req)
+     throws MetaException, NoSuchObjectException, TException {
+   try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+     List<Partition> partitions = client.getHiveClient().getThriftClient()
+         .get_partitions_ps_with_auth(MetaStoreUtils
+                 .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+             req.getTblName(), req.getPartVals(), req.getMaxParts(),
+             req.getUserName(), req.getGroupNames());
+     GetPartitionsPsWithAuthResponse res = new GetPartitionsPsWithAuthResponse();
+     res.setPartitions(partitions);
+     return res;
+   }
+ }
+
+  @Override
+  public PartitionsByExprResult get_partitions_by_expr(
+      PartitionsByExprRequest partitionsByExprRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_by_expr(partitionsByExprRequest);
+    }
+  }
+
+  @Override
+  public int get_num_partitions_by_filter(String dbName, String tblName, String filter)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_num_partitions_by_filter(dbName,
+          tblName, filter);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_by_names(String dbName, String tblName,
+      List<String> partitionNames)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_by_names(dbName, tblName,
+              partitionNames);
+    }
+  }
+
+  /**
+   * Util method to evaluate if the received exception needs to be thrown to the Client
+   * based on the server configuration.
+   *
+   * @param cause   The underlying exception received from Catalog.
+   * @param apiName The HMS API name which threw the given exception.
+   * @throws TException Wrapped exception with the cause in case the given Exception is
+   *                    not a TException. Else, throws the given TException.
+   */
+  protected void throwIfNoFallback(Exception cause, String apiName)
+      throws TException {
+    LOG.debug("Received exception while executing {}", apiName, cause);
+    if (fallBackToHMSOnErrors_) return;
+    if (cause instanceof TException) throw (TException) cause;
+    // if this is not a TException we wrap it to a MetaException
+    throw new MetaException(
+        String.format(METAEXCEPTION_MSG_FORMAT, apiName, cause));
+  }
+
+  /**
+   * This method gets the partitions for the given list of names from HMS. Additionally,
+   * if the {@code getFileMetadata} flag is set in the request, it also computes the file
+   * metadata and sets it in the partitions which are returned.
+   *
+   * @throws TException
+   */
+  public GetPartitionsByNamesResult get_partitions_by_names_req(
+      GetPartitionsByNamesRequest getPartitionsByNamesRequest) throws TException {
+    String tblName =
+        getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
+            .getTbl_name();
+    LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_NAMES, tblName));
+    boolean getFileMetadata = getPartitionsByNamesRequest.isGetFileMetadata();
+    GetPartitionsByNamesResult result;
+    ValidWriteIdList validWriteIdList = null;
+    ValidTxnList validTxnList = null;
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      result = client.getHiveClient().getThriftClient()
+          .get_partitions_by_names_req(getPartitionsByNamesRequest);
+      // if file-metadata is not request; return early
+      if (!getFileMetadata) return result;
+      // we don't really know if the requested partitions are for a transactional table
+      // or not. Hence we should get the table from HMS to confirm.
+      // TODO: may be we could assume that if ValidWriteIdList is not set, the table is
+      // not transactional
+      String[] parsedCatDbName = MetaStoreUtils
+          .parseDbName(getPartitionsByNamesRequest.getDb_name(), serverConf_);
+      Table tbl = client.getHiveClient().getTable(parsedCatDbName[0], parsedCatDbName[1],
+          getPartitionsByNamesRequest.getTbl_name(),
+          getPartitionsByNamesRequest.getValidWriteIdList());
+      boolean isTransactional = tbl.getParameters() != null && AcidUtils
+          .isTransactionalTable(tbl.getParameters());
+      if (isTransactional) {
+        if (getPartitionsByNamesRequest.getValidWriteIdList() == null) {
+          throw new MetaException(
+              "ValidWriteIdList is not set when requesting partitions for table " + tbl
+                  .getDbName() + "." + tbl.getTableName());
+        }
+        validWriteIdList = MetastoreShim
+            .getValidWriteIdListFromString(
+                getPartitionsByNamesRequest.getValidWriteIdList());
+        validTxnList = client.getHiveClient().getValidTxns();
+      }
+    }
+    CatalogHmsAPIHelper
+        .loadAndSetFileMetadataFromFs(validTxnList, validWriteIdList, result);
+    return result;
+  }
+
+  @Override
+  public void alter_partition(String dbName, String tblName, Partition partition)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_partition(dbName, tblName, partition);
+    }
+  }
+
+  @Override
+  public void alter_partitions(String dbNme, String tblName, List<Partition> partitions)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_partitions(dbNme, tblName, partitions);
+    }
+  }
+
+  @Override
+  public void alter_partitions_with_environment_context(String s, String s1,
+      List<Partition> list, EnvironmentContext environmentContext)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_partitions_with_environment_context(s, s1, list, environmentContext);
+    }
+  }
+
+  @Override
+  public AlterPartitionsResponse alter_partitions_req(
+      AlterPartitionsRequest alterPartitionsRequest)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .alter_partitions_req(alterPartitionsRequest);
+    }
+  }
+
+  @Override
+  public void alter_partition_with_environment_context(String dbName, String tblName,
+      Partition partition, EnvironmentContext environmentContext)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_partition_with_environment_context(dbName, tblName, partition,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public void rename_partition(String dbName, String tblName, List<String> list,
+      Partition partition) throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .rename_partition(dbName, tblName, list, partition);
+    }
+  }
+
+  @Override
+  public RenamePartitionResponse rename_partition_req(
+      RenamePartitionRequest renamePartitionRequest)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .rename_partition_req(renamePartitionRequest);
+    }
+  }
+
+  @Override
+  public boolean partition_name_has_valid_characters(List<String> list, boolean b)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .partition_name_has_valid_characters(list, b);
+    }
+  }
+
+  @Override
+  public String get_config_value(String key, String defaultVal)
+      throws ConfigValSecurityException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_config_value(key, defaultVal);
+    }
+  }
+
+  @Override
+  public List<String> partition_name_to_vals(String name)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().partition_name_to_vals(name);
+    }
+  }
+
+  @Override
+  public Map<String, String> partition_name_to_spec(String name)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().partition_name_to_spec(name);
+    }
+  }
+
+  @Override
+  public void markPartitionForEvent(String s, String s1, Map<String, String> map,
+      PartitionEventType partitionEventType) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .markPartitionForEvent(s, s1, map, partitionEventType);
+    }
+  }
+
+  @Override
+  public boolean isPartitionMarkedForEvent(String s, String s1, Map<String, String> map,
+      PartitionEventType partitionEventType) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().isPartitionMarkedForEvent(s, s1,
+          map, partitionEventType);
+    }
+  }
+
+  @Override
+  public PrimaryKeysResponse get_primary_keys(PrimaryKeysRequest primaryKeysRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_primary_keys(primaryKeysRequest);
+    }
+  }
+
+  @Override
+  public ForeignKeysResponse get_foreign_keys(ForeignKeysRequest foreignKeysRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_foreign_keys(foreignKeysRequest);
+    }
+  }
+
+  @Override
+  public UniqueConstraintsResponse get_unique_constraints(
+      UniqueConstraintsRequest uniqueConstraintsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_unique_constraints(uniqueConstraintsRequest);
+    }
+  }
+
+  @Override
+  public NotNullConstraintsResponse get_not_null_constraints(
+      NotNullConstraintsRequest notNullConstraintsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_not_null_constraints(notNullConstraintsRequest);
+    }
+  }
+
+  @Override
+  public DefaultConstraintsResponse get_default_constraints(
+      DefaultConstraintsRequest defaultConstraintsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_default_constraints(defaultConstraintsRequest);
+    }
+  }
+
+  @Override
+  public CheckConstraintsResponse get_check_constraints(
+      CheckConstraintsRequest checkConstraintsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_check_constraints(checkConstraintsRequest);
+    }
+  }
+
+  @Override
+  public boolean update_table_column_statistics(ColumnStatistics columnStatistics)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .update_table_column_statistics(columnStatistics);
+    }
+  }
+
+  @Override
+  public boolean update_partition_column_statistics(ColumnStatistics columnStatistics)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .update_partition_column_statistics(columnStatistics);
+    }
+  }
+
+  @Override
+  public SetPartitionsStatsResponse update_table_column_statistics_req(
+      SetPartitionsStatsRequest setPartitionsStatsRequest)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .update_table_column_statistics_req(setPartitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public SetPartitionsStatsResponse update_partition_column_statistics_req(
+      SetPartitionsStatsRequest setPartitionsStatsRequest)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .update_partition_column_statistics_req(setPartitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public ColumnStatistics get_table_column_statistics(String s, String s1, String s2)
+      throws NoSuchObjectException, MetaException, InvalidInputException,
+      InvalidObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table_column_statistics(s, s1
+          , s2);
+    }
+  }
+
+  @Override
+  public ColumnStatistics get_partition_column_statistics(String s, String s1, String s2,
+      String s3)
+      throws NoSuchObjectException, MetaException, InvalidInputException,
+      InvalidObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition_column_statistics(s
+          , s1, s2, s3);
+    }
+  }
+
+  @Override
+  public TableStatsResult get_table_statistics_req(TableStatsRequest tableStatsRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_table_statistics_req(tableStatsRequest);
+    }
+  }
+
+  @Override
+  public PartitionsStatsResult get_partitions_statistics_req(
+      PartitionsStatsRequest partitionsStatsRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_statistics_req(partitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public AggrStats get_aggr_stats_for(PartitionsStatsRequest partitionsStatsRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_aggr_stats_for(partitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public boolean set_aggr_stats_for(SetPartitionsStatsRequest setPartitionsStatsRequest)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .set_aggr_stats_for(setPartitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public boolean delete_partition_column_statistics(String dbName, String tblName,
+      String partName,
+      String colName, String engine)
+      throws NoSuchObjectException, MetaException, InvalidObjectException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .delete_partition_column_statistics(dbName, tblName
+              , partName, colName, engine);
+    }
+  }
+
+  @Override
+  public boolean delete_table_column_statistics(String dbName, String tblName,
+      String columnName, String engien)
+      throws NoSuchObjectException, MetaException, InvalidObjectException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .delete_table_column_statistics(dbName,
+              tblName, columnName, engien);
+    }
+  }
+
+  @Override
+  public void create_function(Function function)
+      throws AlreadyExistsException, InvalidObjectException, MetaException,
+      NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_function(function);
+    }
+  }
+
+  @Override
+  public void drop_function(String dbName, String funcName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_function(dbName, funcName);
+    }
+  }
+
+  @Override
+  public void alter_function(String s, String s1, Function function)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_function(s, s1, function);
+    }
+  }
+
+  @Override
+  public List<String> get_functions(String s, String s1)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_functions(s, s1);
+    }
+  }
+
+  @Override
+  public Function get_function(String s, String s1)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_function(s, s1);
+    }
+  }
+
+  @Override
+  public GetAllFunctionsResponse get_all_functions() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_all_functions();
+    }
+  }
+
+  @Override
+  public boolean create_role(Role role) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().create_role(role);
+    }
+  }
+
+  @Override
+  public boolean drop_role(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().drop_role(s);
+    }
+  }
+
+  @Override
+  public List<String> get_role_names() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_role_names();
+    }
+  }
+
+  @Override
+  public boolean grant_role(String roleName, String userName, PrincipalType principalType,
+      String grantor,
+      PrincipalType grantorType, boolean grantOption) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .grant_role(roleName, userName, principalType,
+              grantor, grantorType, grantOption);
+    }
+  }
+
+  @Override
+  public boolean revoke_role(String s, String s1, PrincipalType principalType)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().revoke_role(s, s1, principalType);
+    }
+  }
+
+  @Override
+  public List<Role> list_roles(String s, PrincipalType principalType)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().list_roles(s, principalType);
+    }
+  }
+
+  @Override
+  public GrantRevokeRoleResponse grant_revoke_role(
+      GrantRevokeRoleRequest grantRevokeRoleRequest) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .grant_revoke_role(grantRevokeRoleRequest);
+    }
+  }
+
+  @Override
+  public GetPrincipalsInRoleResponse get_principals_in_role(
+      GetPrincipalsInRoleRequest getPrincipalsInRoleRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_principals_in_role(getPrincipalsInRoleRequest);
+    }
+  }
+
+  @Override
+  public GetRoleGrantsForPrincipalResponse get_role_grants_for_principal(
+      GetRoleGrantsForPrincipalRequest getRoleGrantsForPrincipalRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_role_grants_for_principal(getRoleGrantsForPrincipalRequest);
+    }
+  }
+
+  @Override
+  public PrincipalPrivilegeSet get_privilege_set(HiveObjectRef hiveObjectRef, String s,
+      List<String> list) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_privilege_set(hiveObjectRef,
+          s, list);
+    }
+  }
+
+  @Override
+  public List<HiveObjectPrivilege> list_privileges(String s, PrincipalType principalType,
+      HiveObjectRef hiveObjectRef) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().list_privileges(s, principalType,
+          hiveObjectRef);
+    }
+  }
+
+  @Override
+  public boolean grant_privileges(PrivilegeBag privilegeBag)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().grant_privileges(privilegeBag);
+    }
+  }
+
+  @Override
+  public boolean revoke_privileges(PrivilegeBag privilegeBag)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().revoke_privileges(privilegeBag);
+    }
+  }
+
+  @Override
+  public GrantRevokePrivilegeResponse grant_revoke_privileges(
+      GrantRevokePrivilegeRequest grantRevokePrivilegeRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .grant_revoke_privileges(grantRevokePrivilegeRequest);
+    }
+  }
+
+  @Override
+  public GrantRevokePrivilegeResponse refresh_privileges(HiveObjectRef hiveObjectRef,
+      String s, GrantRevokePrivilegeRequest grantRevokePrivilegeRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().refresh_privileges(hiveObjectRef, s,
+          grantRevokePrivilegeRequest);
+    }
+  }
+
+  @Override
+  public List<String> set_ugi(String s, List<String> list)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().set_ugi(s, list);
+    }
+  }
+
+  @Override
+  public String get_delegation_token(String s, String s1)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_delegation_token(s, s1);
+    }
+  }
+
+  @Override
+  public long renew_delegation_token(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().renew_delegation_token(s);
+    }
+  }
+
+  @Override
+  public void cancel_delegation_token(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().cancel_delegation_token(s);
+    }
+  }
+
+  @Override
+  public boolean add_token(String tokenIdentifier, String delegationToken)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .add_token(tokenIdentifier, delegationToken);
+    }
+  }
+
+  @Override
+  public boolean remove_token(String tokenIdentifier) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().remove_token(tokenIdentifier);
+    }
+  }
+
+  @Override
+  public String get_token(String s) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_token(s);
+    }
+  }
+
+  @Override
+  public List<String> get_all_token_identifiers() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_all_token_identifiers();
+    }
+  }
+
+  @Override
+  public int add_master_key(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().add_master_key(s);
+    }
+  }
+
+  @Override
+  public void update_master_key(int i, String s)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().update_master_key(i, s);
+    }
+  }
+
+  @Override
+  public boolean remove_master_key(int i) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().remove_master_key(i);
+    }
+  }
+
+  @Override
+  public List<String> get_master_keys() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_master_keys();
+    }
+  }
+
+  @Override
+  public GetOpenTxnsResponse get_open_txns() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_open_txns();
+    }
+  }
+
+  @Override
+  public GetOpenTxnsInfoResponse get_open_txns_info() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_open_txns_info();
+    }
+  }
+
+  @Override
+  public OpenTxnsResponse open_txns(OpenTxnRequest openTxnRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().open_txns(openTxnRequest);
+    }
+  }
+
+  @Override
+  public void abort_txn(AbortTxnRequest abortTxnRequest)
+      throws NoSuchTxnException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().abort_txn(abortTxnRequest);
+    }
+  }
+
+  @Override
+  public void abort_txns(AbortTxnsRequest abortTxnsRequest)
+      throws NoSuchTxnException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().abort_txns(abortTxnsRequest);
+    }
+  }
+
+  @Override
+  public void commit_txn(CommitTxnRequest commitTxnRequest)
+      throws NoSuchTxnException, TxnAbortedException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().commit_txn(commitTxnRequest);
+    }
+  }
+
+  @Override
+  public void repl_tbl_writeid_state(
+      ReplTblWriteIdStateRequest replTblWriteIdStateRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .repl_tbl_writeid_state(replTblWriteIdStateRequest);
+    }
+  }
+
+  @Override
+  public GetValidWriteIdsResponse get_valid_write_ids(
+      GetValidWriteIdsRequest getValidWriteIdsRequest)
+      throws NoSuchTxnException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_valid_write_ids(getValidWriteIdsRequest);
+    }
+  }
+
+  @Override
+  public AllocateTableWriteIdsResponse allocate_table_write_ids(
+      AllocateTableWriteIdsRequest allocateTableWriteIdsRequest)
+      throws NoSuchTxnException, TxnAbortedException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .allocate_table_write_ids(allocateTableWriteIdsRequest);
+    }
+  }
+
+  @Override
+  public LockResponse lock(LockRequest lockRequest)
+      throws NoSuchTxnException, TxnAbortedException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().lock(lockRequest);
+    }
+  }
+
+  @Override
+  public LockResponse check_lock(CheckLockRequest checkLockRequest)
+      throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().check_lock(checkLockRequest);
+    }
+  }
+
+  @Override
+  public void unlock(UnlockRequest unlockRequest)
+      throws NoSuchLockException, TxnOpenException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().unlock(unlockRequest);
+    }
+  }
+
+  @Override
+  public ShowLocksResponse show_locks(ShowLocksRequest showLocksRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().show_locks(showLocksRequest);
+    }
+  }
+
+  @Override
+  public void heartbeat(HeartbeatRequest heartbeatRequest)
+      throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().heartbeat(heartbeatRequest);
+    }
+  }
+
+  @Override
+  public HeartbeatTxnRangeResponse heartbeat_txn_range(
+      HeartbeatTxnRangeRequest heartbeatTxnRangeRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .heartbeat_txn_range(heartbeatTxnRangeRequest);
+    }
+  }
+
+  @Override
+  public void compact(CompactionRequest compactionRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().compact(compactionRequest);
+    }
+  }
+
+  @Override
+  public CompactionResponse compact2(CompactionRequest compactionRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().compact2(compactionRequest);
+    }
+  }
+
+  @Override
+  public ShowCompactResponse show_compact(ShowCompactRequest showCompactRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().show_compact(showCompactRequest);
+    }
+  }
+
+  @Override
+  public void add_dynamic_partitions(AddDynamicPartitions addDynamicPartitions)
+      throws NoSuchTxnException, TxnAbortedException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_dynamic_partitions(addDynamicPartitions);
+    }
+  }
+
+  @Override
+  public OptionalCompactionInfoStruct find_next_compact(String s)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().find_next_compact(s);
+    }
+  }
+
+  @Override
+  public void update_compactor_state(CompactionInfoStruct compactionInfoStruct, long l)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .update_compactor_state(compactionInfoStruct, l);
+    }
+  }
+
+  @Override
+  public List<String> find_columns_with_stats(CompactionInfoStruct compactionInfoStruct)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .find_columns_with_stats(compactionInfoStruct);
+    }
+  }
+
+  @Override
+  public void mark_cleaned(CompactionInfoStruct compactionInfoStruct)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().mark_cleaned(compactionInfoStruct);
+    }
+  }
+
+  @Override
+  public void mark_compacted(CompactionInfoStruct compactionInfoStruct)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().markCompacted(compactionInfoStruct);
+    }
+  }
+
+  @Override
+  public void mark_failed(CompactionInfoStruct compactionInfoStruct)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().mark_failed(compactionInfoStruct);
+    }
+  }
+
+  @Override
+  public MaxAllocatedTableWriteIdResponse get_max_allocated_table_write_id(
+      MaxAllocatedTableWriteIdRequest rqst)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_max_allocated_table_write_id(rqst);
+    }
+  }
+
+  @Override
+  public void seed_write_id(SeedTableWriteIdsRequest rqst)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().seed_write_id(rqst);
+    }
+  }
+
+  @Override
+  public void seed_txn_id(SeedTxnIdRequest rqst)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().seed_txn_id(rqst);
+    }
+  }
+
+  @Override
+  public void set_hadoop_jobid(String s, long l) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().set_hadoop_jobid(s, l);
+    }
+  }
+
+  @Override
+  public NotificationEventResponse get_next_notification(
+      NotificationEventRequest notificationEventRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_next_notification(notificationEventRequest);
+    }
+  }
+
+  @Override
+  public CurrentNotificationEventId get_current_notificationEventId() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_current_notificationEventId();
+    }
+  }
+
+  @Override
+  public NotificationEventsCountResponse get_notification_events_count(
+      NotificationEventsCountRequest notificationEventsCountRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_notification_events_count(notificationEventsCountRequest);
+    }
+  }
+
+  @Override
+  public FireEventResponse fire_listener_event(FireEventRequest fireEventRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .fire_listener_event(fireEventRequest);
+    }
+  }
+
+  @Override
+  public void flushCache() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().flushCache();
+    }
+  }
+
+  @Override
+  public WriteNotificationLogResponse add_write_notification_log(
+      WriteNotificationLogRequest writeNotificationLogRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .add_write_notification_log(writeNotificationLogRequest);
+    }
+  }
+
+  @Override
+  public CmRecycleResponse cm_recycle(CmRecycleRequest cmRecycleRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().cm_recycle(cmRecycleRequest);
+    }
+  }
+
+  @Override
+  public GetFileMetadataByExprResult get_file_metadata_by_expr(
+      GetFileMetadataByExprRequest getFileMetadataByExprRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_file_metadata_by_expr(getFileMetadataByExprRequest);
+    }
+  }
+
+  @Override
+  public GetFileMetadataResult get_file_metadata(
+      GetFileMetadataRequest getFileMetadataRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_file_metadata(getFileMetadataRequest);
+    }
+  }
+
+  @Override
+  public PutFileMetadataResult put_file_metadata(
+      PutFileMetadataRequest putFileMetadataRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .put_file_metadata(putFileMetadataRequest);
+    }
+  }
+
+  @Override
+  public ClearFileMetadataResult clear_file_metadata(
+      ClearFileMetadataRequest clearFileMetadataRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .clear_file_metadata(clearFileMetadataRequest);
+    }
+  }
+
+  @Override
+  public CacheFileMetadataResult cache_file_metadata(
+      CacheFileMetadataRequest cacheFileMetadataRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .cache_file_metadata(cacheFileMetadataRequest);
+    }
+  }
+
+  @Override
+  public String get_metastore_db_uuid() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_metastore_db_uuid();
+    }
+  }
+
+  @Override
+  public WMCreateResourcePlanResponse create_resource_plan(
+      WMCreateResourcePlanRequest wmCreateResourcePlanRequest)
+      throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_resource_plan(wmCreateResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMGetResourcePlanResponse get_resource_plan(
+      WMGetResourcePlanRequest wmGetResourcePlanRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_resource_plan(wmGetResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMGetActiveResourcePlanResponse get_active_resource_plan(
+      WMGetActiveResourcePlanRequest wmGetActiveResourcePlanRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_active_resource_plan(wmGetActiveResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMGetAllResourcePlanResponse get_all_resource_plans(
+      WMGetAllResourcePlanRequest wmGetAllResourcePlanRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_all_resource_plans(wmGetAllResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMAlterResourcePlanResponse alter_resource_plan(
+      WMAlterResourcePlanRequest wmAlterResourcePlanRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .alter_resource_plan(wmAlterResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMValidateResourcePlanResponse validate_resource_plan(
+      WMValidateResourcePlanRequest wmValidateResourcePlanRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .validate_resource_plan(wmValidateResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMDropResourcePlanResponse drop_resource_plan(
+      WMDropResourcePlanRequest wmDropResourcePlanRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_resource_plan(wmDropResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMCreateTriggerResponse create_wm_trigger(
+      WMCreateTriggerRequest wmCreateTriggerRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_wm_trigger(wmCreateTriggerRequest);
+    }
+  }
+
+  @Override
+  public WMAlterTriggerResponse alter_wm_trigger(
+      WMAlterTriggerRequest wmAlterTriggerRequest)
+      throws NoSuchObjectException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .alter_wm_trigger(wmAlterTriggerRequest);
+    }
+  }
+
+  @Override
+  public WMDropTriggerResponse drop_wm_trigger(WMDropTriggerRequest wmDropTriggerRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_wm_trigger(wmDropTriggerRequest);
+    }
+  }
+
+  @Override
+  public WMGetTriggersForResourePlanResponse get_triggers_for_resourceplan(
+      WMGetTriggersForResourePlanRequest wmGetTriggersForResourePlanRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_triggers_for_resourceplan(wmGetTriggersForResourePlanRequest);
+    }
+  }
+
+  @Override
+  public WMCreatePoolResponse create_wm_pool(WMCreatePoolRequest wmCreatePoolRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_wm_pool(wmCreatePoolRequest);
+    }
+  }
+
+  @Override
+  public WMAlterPoolResponse alter_wm_pool(WMAlterPoolRequest wmAlterPoolRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().alter_wm_pool(wmAlterPoolRequest);
+    }
+  }
+
+  @Override
+  public WMDropPoolResponse drop_wm_pool(WMDropPoolRequest wmDropPoolRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().drop_wm_pool(wmDropPoolRequest);
+    }
+  }
+
+  @Override
+  public WMCreateOrUpdateMappingResponse create_or_update_wm_mapping(
+      WMCreateOrUpdateMappingRequest wmCreateOrUpdateMappingRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_or_update_wm_mapping(wmCreateOrUpdateMappingRequest);
+    }
+  }
+
+  @Override
+  public WMDropMappingResponse drop_wm_mapping(WMDropMappingRequest wmDropMappingRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_wm_mapping(wmDropMappingRequest);
+    }
+  }
+
+  @Override
+  public WMCreateOrDropTriggerToPoolMappingResponse
+    create_or_drop_wm_trigger_to_pool_mapping(
+      WMCreateOrDropTriggerToPoolMappingRequest wmCreateOrDropTriggerToPoolMappingRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_or_drop_wm_trigger_to_pool_mapping(
+              wmCreateOrDropTriggerToPoolMappingRequest);
+    }
+  }
+
+  @Override
+  public void create_ischema(ISchema iSchema)
+      throws AlreadyExistsException, NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_ischema(iSchema);
+    }
+  }
+
+  @Override
+  public void alter_ischema(AlterISchemaRequest alterISchemaRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_ischema(alterISchemaRequest);
+    }
+  }
+
+  @Override
+  public ISchema get_ischema(ISchemaName iSchemaName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_ischema(iSchemaName);
+    }
+  }
+
+  @Override
+  public void drop_ischema(ISchemaName iSchemaName)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_ischema(iSchemaName);
+    }
+  }
+
+  @Override
+  public void add_schema_version(SchemaVersion schemaVersion)
+      throws AlreadyExistsException, NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_schema_version(schemaVersion);
+    }
+  }
+
+  @Override
+  public SchemaVersion get_schema_version(SchemaVersionDescriptor schemaVersionDescriptor)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schema_version(schemaVersionDescriptor);
+    }
+  }
+
+  @Override
+  public SchemaVersion get_schema_latest_version(ISchemaName iSchemaName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schema_latest_version(iSchemaName);
+    }
+  }
+
+  @Override
+  public List<SchemaVersion> get_schema_all_versions(ISchemaName iSchemaName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schema_all_versions(iSchemaName);
+    }
+  }
+
+  @Override
+  public void drop_schema_version(SchemaVersionDescriptor schemaVersionDescriptor)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .drop_schema_version(schemaVersionDescriptor);
+    }
+  }
+
+  @Override
+  public FindSchemasByColsResp get_schemas_by_cols(
+      FindSchemasByColsRqst findSchemasByColsRqst) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schemas_by_cols(findSchemasByColsRqst);
+    }
+  }
+
+  @Override
+  public void map_schema_version_to_serde(
+      MapSchemaVersionToSerdeRequest mapSchemaVersionToSerdeRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .map_schema_version_to_serde(mapSchemaVersionToSerdeRequest);
+    }
+  }
+
+  @Override
+  public void set_schema_version_state(
+      SetSchemaVersionStateRequest setSchemaVersionStateRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .set_schema_version_state(setSchemaVersionStateRequest);
+    }
+  }
+
+  @Override
+  public void add_serde(SerDeInfo serDeInfo)
+      throws AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_serde(serDeInfo);
+    }
+  }
+
+  @Override
+  public SerDeInfo get_serde(GetSerdeRequest getSerdeRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_serde(getSerdeRequest);
+    }
+  }
+
+  @Override
+  public LockResponse get_lock_materialization_rebuild(String s, String s1, long l)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_lock_materialization_rebuild(s, s1, l);
+    }
+  }
+
+  @Override
+  public boolean heartbeat_lock_materialization_rebuild(String s, String s1, long l)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .heartbeat_lock_materialization_rebuild(s, s1, l);
+    }
+  }
+
+  @Override
+  public void add_runtime_stats(RuntimeStat runtimeStat)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_runtime_stats(runtimeStat);
+    }
+  }
+
+  @Override
+  public List<RuntimeStat> get_runtime_stats(
+      GetRuntimeStatsRequest getRuntimeStatsRequest) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_runtime_stats(getRuntimeStatsRequest);
+    }
+  }
+
+  @Override
+  public ScheduledQueryPollResponse scheduled_query_poll(
+      ScheduledQueryPollRequest scheduledQueryPollRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .scheduled_query_poll(scheduledQueryPollRequest);
+    }
+  }
+
+  @Override
+  public void scheduled_query_maintenance(
+      ScheduledQueryMaintenanceRequest scheduledQueryMaintenanceRequest)
+      throws MetaException, NoSuchObjectException, AlreadyExistsException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .scheduled_query_maintenance(scheduledQueryMaintenanceRequest);
+    }
+  }
+
+  @Override
+  public void scheduled_query_progress(
+      ScheduledQueryProgressInfo scheduledQueryProgressInfo)
+      throws MetaException, InvalidOperationException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .scheduled_query_progress(scheduledQueryProgressInfo);
+    }
+  }
+
+  @Override
+  public ScheduledQuery get_scheduled_query(ScheduledQueryKey scheduledQueryKey)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_scheduled_query(scheduledQueryKey);
+    }
+  }
+
+  @Override
+  public void add_replication_metrics(ReplicationMetricList replicationMetricList)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_replication_metrics(replicationMetricList);
+    }
+  }
+
+  @Override
+  public ReplicationMetricList get_replication_metrics(
+      GetReplicationMetricsRequest getReplicationMetricsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_replication_metrics(getReplicationMetricsRequest);
+    }
+  }
+
+  @Override
+  public long get_latest_txnid_in_conflict(long txnId) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_latest_txnid_in_conflict(txnId);
+    }
+  }
+
+  @Override
+  public String getName() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getName"));
+  }
+
+  @Override
+  public String getVersion() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getVersion"));
+  }
+
+  @Override
+  public fb_status getStatus() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getStatus"));
+  }
+
+  @Override
+  public String getStatusDetails() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getStatusDetails"));
+  }
+
+  @Override
+  public Map<String, Long> getCounters() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getCounters"));
+  }
+
+  @Override
+  public long getCounter(String s) throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getCounter"));
+  }
+
+  @Override
+  public void setOption(String s, String s1) throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "setOption"));
+
+  }
+
+  @Override
+  public String getOption(String s) throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getOption"));
+  }
+
+  @Override
+  public Map<String, String> getOptions() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getOptions"));
+  }
+
+  @Override
+  public String getCpuProfile(int i) throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getCpuProfile"));
+  }
+
+  @Override
+  public long aliveSince() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "aliveSince"));
+  }
+
+  @Override
+  public void reinitialize() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "reinitialize"));
+
+  }
+
+  @Override
+  public void shutdown() throws TException {
+    // nothing to do. Use this call to clean-up any session specific clean-up.
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
new file mode 100644
index 0000000..399ed09
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import org.apache.impala.catalog.CatalogException;
+
+/**
+ * NoOpCatalogMetastoreServer which is used when metastore service is not configured.
+ */
+public class NoOpCatalogMetastoreServer implements ICatalogMetastoreServer {
+  public static final NoOpCatalogMetastoreServer INSTANCE =
+      new NoOpCatalogMetastoreServer();
+  private static final String EMPTY = "";
+
+  @Override
+  public void start() throws CatalogException {
+    // no-op
+  }
+
+  @Override
+  public void stop() throws CatalogException {
+    // no-op
+  }
+
+  @Override
+  public String getMetrics() {
+    return EMPTY;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 9513b9b..f9d3410 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.impala.analysis.SqlScanner;
 import org.apache.impala.thrift.TBackendGflags;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
@@ -280,4 +281,25 @@ public class BackendConfig {
   public int getMaxWaitTimeForSyncDdlSecs() {
     return backendCfg_.max_wait_time_for_sync_ddl_s;
   }
+
+  public boolean startHmsServer() {
+    return backendCfg_.start_hms_server;
+  }
+
+  public int getHMSPort() {
+    return backendCfg_.hms_port;
+  }
+
+  public boolean fallbackToHMSOnErrors() {
+    return backendCfg_.fallback_to_hms_on_errors;
+  }
+
+  @VisibleForTesting
+  public void setEnableCatalogdHMSCache(boolean flag) {
+    backendCfg_.enable_catalogd_hms_cache = flag;
+  }
+
+  public boolean enableCatalogdHMSCache() {
+    return backendCfg_.enable_catalogd_hms_cache;
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsFileMetadataTest.java b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsFileMetadataTest.java
new file mode 100644
index 0000000..caaff23
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsFileMetadataTest.java
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.HdfsPartition.FileBlock;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.fb.FbFileBlock;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.CatalogServiceTestCatalog.CatalogServiceTestHMSCatalog;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CatalogHmsFileMetadataTest {
+  private static CatalogServiceCatalog catalog_;
+  private static HiveMetaStoreClient catalogHmsClient_;
+  private static final Configuration CONF = MetastoreConf.newMetastoreConf();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    catalog_ = CatalogServiceTestCatalog.createTestCatalogMetastoreServer();
+    MetastoreConf.setVar(CONF, ConfVars.THRIFT_URIS,
+        "thrift://localhost:" + ((CatalogServiceTestHMSCatalog) catalog_).getPort());
+    catalogHmsClient_ = new HiveMetaStoreClient(CONF);
+  }
+
+  @AfterClass
+  public static void cleanUp() throws Exception {
+    catalog_.close();
+  }
+
+  /**
+   * The test fetches partitions of a table over HMS API and then compares if the
+   * deserialized filemetadata from the response matches with what we have in catalogd.
+   */
+  @Test
+  public void testFileMetadataForPartitions() throws Exception {
+    // get partitions from catalog directly
+    HdfsTable tbl = (HdfsTable) catalog_
+        .getOrLoadTable("functional", "alltypes", "test", null);
+    HdfsPartition hdfsPartition1 = tbl
+        .getPartitionsForNames(Arrays.asList("year=2009/month=1")).get(0);
+    HdfsPartition hdfsPartition2 = tbl
+        .getPartitionsForNames(Arrays.asList("year=2009/month=2")).get(0);
+
+    // test empty partitions result case.
+    GetPartitionsByNamesRequest request = new GetPartitionsByNamesRequest();
+    String dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+    request.setDb_name(dbName);
+    request.setTbl_name("alltypes");
+    // no names are set so the result is expected to be empty
+    request.setNames(new ArrayList<>());
+    request.setGetFileMetadata(true);
+    GetPartitionsByNamesResult result = catalogHmsClient_.getPartitionsByNames(request);
+    assertTrue(result.getPartitions().isEmpty());
+    Map<Partition, List<FileDescriptor>> fds = CatalogHmsClientUtils
+        .extractFileDescriptors(result, tbl.getHostIndex());
+    assertTrue(fds.isEmpty());
+
+    // get the partitions over HMS API.
+    request = new GetPartitionsByNamesRequest();
+    dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+    request.setDb_name(dbName);
+    request.setTbl_name("alltypes");
+    request.setNames(Arrays.asList("year=2009/month=1", "year=2009/month=2"));
+    request.setGetFileMetadata(true);
+    result = catalogHmsClient_.getPartitionsByNames(request);
+    for (Partition part : result.getPartitions()) {
+      assertNotNull(part.getFileMetadata());
+    }
+    assertNotNull(result.getDictionary());
+    fds = CatalogHmsClientUtils
+        .extractFileDescriptors(result, tbl.getHostIndex());
+    assertEquals(2, fds.size());
+    for (List<FileDescriptor> partFds : fds.values()) {
+      assertFalse(partFds.isEmpty());
+      assertEquals(1, partFds.size());
+    }
+    // make sure that the FileDescriptors from catalog and over HMS API are the same
+    // for the same hostIndex
+    assertFdsAreSame(hdfsPartition1.getFileDescriptors(),
+        fds.get(result.getPartitions().get(0)));
+    assertFdsAreSame(hdfsPartition2.getFileDescriptors(),
+        fds.get(result.getPartitions().get(1)));
+  }
+
+  public static void assertFdsAreSame(List<FileDescriptor> fdsFromCatalog,
+      List<FileDescriptor> fdsFromHMS) {
+    assertEquals(fdsFromCatalog.size(), fdsFromHMS.size());
+    for (int i=0; i<fdsFromCatalog.size(); i++) {
+      FileDescriptor fdFromCatalog = fdsFromCatalog.get(i);
+      FileDescriptor fdFromHMS = fdsFromHMS.get(i);
+      assertEquals(fdFromCatalog.getRelativePath(), fdFromHMS.getRelativePath());
+      assertEquals(fdFromCatalog.getFileCompression(), fdFromHMS.getFileCompression());
+      assertEquals(fdFromCatalog.getFileLength(), fdFromHMS.getFileLength());
+      assertEquals(fdFromCatalog.getIsEc(), fdFromHMS.getIsEc());
+      assertEquals(fdFromCatalog.getModificationTime(), fdFromHMS.getModificationTime());
+      assertEquals(fdFromCatalog.getNumFileBlocks(), fdFromHMS.getNumFileBlocks());
+      for (int j=0; j<fdFromCatalog.getNumFileBlocks(); j++) {
+        FbFileBlock blockFromCat = fdFromCatalog.getFbFileBlock(j);
+        FbFileBlock blockFromHMS = fdFromCatalog.getFbFileBlock(j);
+        // quick and dirty way to compare the relevant fields within the file blocks.
+        assertEquals(FileBlock.debugString(blockFromCat),
+            FileBlock.debugString(blockFromHMS));
+      }
+    }
+  }
+
+  /**
+   * Test requests a table over HMS API with file-metadata and then compares if the
+   * file-metadata returned is same as what we have in catalogd.
+   */
+  @Test
+  public void testFileMetadataForTable() throws Exception {
+    Table tbl = catalogHmsClient_
+        .getTable(null, "functional", "zipcode_incomes", null, false, null, true);
+    assertNotNull(tbl.getFileMetadata());
+    HdfsTable catTbl = (HdfsTable) catalog_
+        .getOrLoadTable("functional", "zipcode_incomes", "test", null);
+    HdfsPartition part = (HdfsPartition) Iterables.getOnlyElement(catTbl.getPartitions());
+    List<FileDescriptor> hmsTblFds = CatalogHmsClientUtils
+        .extractFileDescriptors(tbl, catTbl.getHostIndex());
+    assertFdsAreSame(part.getFileDescriptors(), hmsTblFds);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/catalog/metastore/EnableCatalogdHmsCacheFlagTest.java b/fe/src/test/java/org/apache/impala/catalog/metastore/EnableCatalogdHmsCacheFlagTest.java
new file mode 100644
index 0000000..8bd138d
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/metastore/EnableCatalogdHmsCacheFlagTest.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.CatalogServiceTestCatalog.CatalogServiceTestHMSCatalog;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class EnableCatalogdHmsCacheFlagTest {
+  private static CatalogServiceCatalog catalog_;
+  private static HiveMetaStoreClient catalogHmsClient_;
+  private static final Configuration CONF = MetastoreConf.newMetastoreConf();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    catalog_ = CatalogServiceTestCatalog.createTestCatalogMetastoreServer();
+    MetastoreConf.setVar(CONF, ConfVars.THRIFT_URIS,
+        "thrift://localhost:" + ((CatalogServiceTestHMSCatalog) catalog_).getPort());
+    catalogHmsClient_ = new HiveMetaStoreClient(CONF);
+  }
+
+  @AfterClass
+  public static void cleanUp() throws Exception {
+    catalog_.close();
+  }
+
+  /**
+   * The test fetches partitions of a table over HMS API and then compares if the
+   * deserialized filemetadata from the response matches with what we have in catalogd.
+   */
+  @Test
+  public void testEnableCatalogdCachingFlag() throws Exception {
+
+    BackendConfig.INSTANCE.setEnableCatalogdHMSCache(true);
+
+    // get partitions from catalog directly
+    HdfsTable tbl = (HdfsTable) catalog_
+            .getOrLoadTable("functional", "alltypes", "test", null);
+    HdfsPartition hdfsPartition1 = tbl
+            .getPartitionsForNames(Arrays.asList("year=2009/month=1")).get(0);
+    HdfsPartition hdfsPartition2 = tbl
+            .getPartitionsForNames(Arrays.asList("year=2009/month=2")).get(0);
+
+    BackendConfig.INSTANCE.setEnableCatalogdHMSCache(false);
+
+    // test empty partitions result case.
+    GetPartitionsByNamesRequest request = new GetPartitionsByNamesRequest();
+    String dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+    request.setDb_name(dbName);
+    request.setTbl_name("alltypes");
+    // no names are set so the result is expected to be empty
+    request.setNames(new ArrayList<>());
+    request.setGetFileMetadata(true);
+    GetPartitionsByNamesResult result = catalogHmsClient_.getPartitionsByNames(request);
+    assertTrue(result.getPartitions().isEmpty());
+    Map<Partition, List<FileDescriptor>> fds = CatalogHmsClientUtils
+            .extractFileDescriptors(result, tbl.getHostIndex());
+    assertTrue(fds.isEmpty());
+
+    // get the partitions over HMS API.
+    request = new GetPartitionsByNamesRequest();
+    dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+    request.setDb_name(dbName);
+    request.setTbl_name("alltypes");
+    request.setNames(Arrays.asList("year=2009/month=1", "year=2009/month=2"));
+    request.setGetFileMetadata(true);
+    result = catalogHmsClient_.getPartitionsByNames(request);
+    for (Partition part : result.getPartitions()) {
+      assertNotNull(part.getFileMetadata());
+    }
+    assertNotNull(result.getDictionary());
+    fds = CatalogHmsClientUtils
+            .extractFileDescriptors(result, tbl.getHostIndex());
+    assertEquals(2, fds.size());
+    for (List<FileDescriptor> partFds : fds.values()) {
+      assertFalse(partFds.isEmpty());
+      assertEquals(1, partFds.size());
+    }
+
+    // make sure that the FileDescriptors from catalog and over HMS API are the same
+    // for the same hostIndex
+    CatalogHmsFileMetadataTest.assertFdsAreSame(hdfsPartition1.getFileDescriptors(),
+            fds.get(result.getPartitions().get(0)));
+    CatalogHmsFileMetadataTest.assertFdsAreSame(hdfsPartition2.getFileDescriptors(),
+            fds.get(result.getPartitions().get(1)));
+  }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 637a114..a942ee2 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -17,12 +17,15 @@
 
 package org.apache.impala.testutil;
 
+import java.net.ServerSocket;
 import org.apache.impala.authorization.AuthorizationFactory;
 import org.apache.impala.authorization.NoopAuthorizationFactory;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.authorization.NoopAuthorizationFactory.NoopAuthorizationManager;
+import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.MetaStoreClientPool;
+import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.service.FeSupport;
@@ -54,19 +57,29 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     return createWithAuth(new NoopAuthorizationFactory());
   }
 
+  public static CatalogServiceCatalog createWithAuth(AuthorizationFactory authzFactory) {
+    return createWithAuth(authzFactory, false);
+  }
+
   /**
    * Creates a catalog server that reads authorization policy metadata from the
    * authorization config.
    */
-  public static CatalogServiceCatalog createWithAuth(AuthorizationFactory authzFactory) {
+  public static CatalogServiceCatalog createWithAuth(AuthorizationFactory authzFactory,
+      boolean startCatalogHms) {
     FeSupport.loadLibrary();
     CatalogServiceCatalog cs;
     try {
       if (MetastoreShim.getMajorVersion() > 2) {
         MetastoreShim.setHiveClientCapabilities();
       }
-      cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
-          new MetaStoreClientPool(0, 0));
+      if (startCatalogHms) {
+        cs = new CatalogServiceTestHMSCatalog(false, 16, new TUniqueId(),
+            new MetaStoreClientPool(0, 0));
+      } else {
+        cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
+            new MetaStoreClientPool(0, 0));
+      }
       cs.setAuthzManager(authzFactory.newAuthorizationManager(cs));
       cs.reset();
     } catch (ImpalaException e) {
@@ -95,6 +108,76 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     return cs;
   }
 
+  private static class CatalogTestMetastoreServer extends CatalogMetastoreServer {
+    private final int port;
+    public CatalogTestMetastoreServer(
+        CatalogServiceCatalog catalogServiceCatalog) throws ImpalaException {
+      super(catalogServiceCatalog);
+      try {
+        port = getRandomPort();
+      } catch (Exception e) {
+        throw new CatalogException(e.getMessage());
+      }
+    }
+
+    @Override
+    public int getPort() {
+      return port;
+    }
+
+    private static int getRandomPort() throws Exception {
+      for (int i=0; i<5; i++) {
+        ServerSocket serverSocket = null;
+        try {
+          serverSocket = new ServerSocket(0);
+          return serverSocket.getLocalPort();
+        } finally {
+          if (serverSocket != null) serverSocket.close();
+        }
+      }
+      throw new Exception("Could not find a free port");
+    }
+  }
+
+  public static class CatalogServiceTestHMSCatalog extends CatalogServiceTestCatalog {
+    private CatalogTestMetastoreServer metastoreServer;
+    public CatalogServiceTestHMSCatalog(boolean loadInBackground, int numLoadingThreads,
+        TUniqueId catalogServiceId,
+        MetaStoreClientPool metaStoreClientPool) throws ImpalaException {
+      super(loadInBackground, numLoadingThreads, catalogServiceId, metaStoreClientPool);
+    }
+
+    @Override
+    protected CatalogMetastoreServer getCatalogMetastoreServer() {
+      synchronized (this) {
+        if (metastoreServer != null) return metastoreServer;
+        try {
+          metastoreServer = new CatalogTestMetastoreServer(this);
+        } catch (ImpalaException e) {
+          return null;
+        }
+        return metastoreServer;
+      }
+    }
+
+    public int getPort() { return metastoreServer.getPort(); }
+
+    @Override
+    public void close() {
+      super.close();
+      try {
+        metastoreServer.stop();
+      } catch (CatalogException e) {
+        // ignored
+      }
+    }
+  }
+
+  public static CatalogServiceCatalog createTestCatalogMetastoreServer()
+      throws ImpalaException {
+    return createWithAuth(new NoopAuthorizationFactory(), true);
+  }
+
   @Override
   public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
 }
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 039485c..19c5d23 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -30,6 +30,7 @@ import requests
 import socket
 import subprocess
 import time
+import string
 from functools import wraps
 from getpass import getuser
 from random import choice
@@ -151,6 +152,24 @@ class ImpalaTestSuite(BaseTestSuite):
     # to work with the HS2 client can add HS2 in addition to or instead of beeswax.
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 'beeswax'))
 
+  @staticmethod
+  def create_hive_client(port):
+    """
+    Creates a HMS client to a external running metastore service at the provided port
+    """
+    trans_type = 'buffered'
+    if pytest.config.option.use_kerberos:
+      trans_type = 'kerberos'
+    hive_transport = create_transport(
+      host=pytest.config.option.metastore_server.split(':')[0],
+      port=port,
+      service=pytest.config.option.hive_service_name,
+      transport_type=trans_type)
+    protocol = TBinaryProtocol.TBinaryProtocol(hive_transport)
+    hive_client = ThriftHiveMetastore.Client(protocol)
+    hive_transport.open()
+    return hive_client, hive_transport
+
   @classmethod
   def setup_class(cls):
     """Setup section that runs before each test suite"""
@@ -1191,3 +1210,11 @@ class ImpalaTestSuite(BaseTestSuite):
         LOG.info("Expected log lines could not be found, sleeping before retrying: %s",
             str(e))
         time.sleep(1)
+
+  @staticmethod
+  def get_random_name(prefix='', length=5):
+    """
+    Gets a random name used to create unique database or table
+    """
+    assert length > 0
+    return prefix + ''.join(choice(string.ascii_lowercase) for i in range(length))
diff --git a/tests/custom_cluster/test_metastore_service.py b/tests/custom_cluster/test_metastore_service.py
new file mode 100644
index 0000000..4d2b8d8
--- /dev/null
+++ b/tests/custom_cluster/test_metastore_service.py
@@ -0,0 +1,707 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from hive_metastore.ttypes import Database
+from hive_metastore.ttypes import FieldSchema
+from hive_metastore.ttypes import GetTableRequest
+from hive_metastore.ttypes import GetPartitionsByNamesRequest
+from hive_metastore.ttypes import Table
+from hive_metastore.ttypes import StorageDescriptor
+from hive_metastore.ttypes import SerDeInfo
+
+from tests.util.event_processor_utils import EventProcessorUtils
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class TestMetastoreService(CustomClusterTestSuite):
+    """
+    Tests for the Catalog Metastore service. Each test in this class should
+    start a hms_server using the catalogd flag --start_hms_server=true
+    """
+
+    part_tbl = ImpalaTestSuite.get_random_name("test_metastore_part_tbl")
+    unpart_tbl = ImpalaTestSuite.get_random_name("test_metastore_unpart_tbl")
+    acid_part_tbl = ImpalaTestSuite.get_random_name("test_metastore_acid_part_tbl")
+    unpart_acid_tbl = ImpalaTestSuite.get_random_name("test_metastore_unpart_acid_tbl")
+    default_unknowntbl = ImpalaTestSuite.get_random_name("test_metastore_default_tbl")
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899"
+    )
+    def test_passthrough_apis(self):
+        """
+        This test exercises some of the Catalog HMS APIs which are directly
+        passed through to the backing HMS service. This is by no means an
+        exhaustive set but merely used to as a sanity check to make sure that a
+        hive_client can connect to the Catalog's metastore service and is
+        able to execute calls to the backing HMS service.
+        """
+        catalog_hms_client = None
+        db_name = ImpalaTestSuite.get_random_name("test_passthrough_apis_db")
+        try:
+            catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+            assert catalog_hms_client is not None
+            # get_databases
+            databases = catalog_hms_client.get_all_databases()
+            assert databases is not None
+            assert len(databases) > 0
+            assert "functional" in databases
+            # get_database
+            database = catalog_hms_client.get_database("functional")
+            assert database is not None
+            assert "functional" == database.name
+            # get_tables
+            tables = catalog_hms_client.get_tables("functional", "*")
+            assert tables is not None
+            assert len(tables) > 0
+            assert "alltypes" in tables
+            # get table
+            table = catalog_hms_client.get_table("functional", "alltypes")
+            assert table is not None
+            assert "alltypes" == table.tableName
+            assert table.sd is not None
+            # get partitions
+            partitions = catalog_hms_client.get_partitions("functional", "alltypes", -1)
+            assert partitions is not None
+            assert len(partitions) > 0
+            # get partition names
+            part_names = catalog_hms_client.get_partition_names("functional", "alltypes",
+                                                                -1)
+            assert part_names is not None
+            assert len(part_names) > 0
+            assert "year=2009/month=1" in part_names
+            # notification APIs
+            event_id = EventProcessorUtils.get_current_notification_id(catalog_hms_client)
+            assert event_id is not None
+            assert event_id > 0
+            # DDLs
+            catalog_hms_client.create_database(self.__get_test_database(db_name))
+            database = catalog_hms_client.get_database(db_name)
+            assert database is not None
+            assert db_name == database.name
+            tbl_name = ImpalaTestSuite.get_random_name(
+                "test_passthrough_apis_tbl")
+            cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+            part_cols = [["part", "string", "part col"]]
+            catalog_hms_client.create_table(self.__get_test_tbl(db_name, tbl_name,
+                cols, part_cols))
+            table = catalog_hms_client.get_table(db_name, tbl_name)
+            assert table is not None
+            assert tbl_name == table.tableName
+            self.__compare_cols(cols, table.sd.cols)
+            self.__compare_cols(part_cols, table.partitionKeys)
+        finally:
+            if catalog_hms_client is not None:
+                catalog_hms_client.drop_database(db_name, True, True)
+                catalog_hms_client.shutdown()
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899"
+    )
+    def test_get_table_req_with_fallback(self):
+      """
+      Test the get_table_req APIs with fallback to HMS enabled. These calls
+      succeed even if catalog throws exceptions since we fallback to HMS.
+      """
+      catalog_hms_client = None
+      db_name = ImpalaTestSuite.get_random_name(
+          "test_get_table_req_with_fallback_db")
+      try:
+        catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+        assert catalog_hms_client is not None
+
+        # Test simple get_table_req without stats.
+        get_table_request = GetTableRequest()
+        get_table_request.dbName = "functional"
+        get_table_request.tblName = "alltypes"
+        get_table_request.getFileMetadata = True
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == "functional"
+        assert get_table_response.table.tableName == "alltypes"
+        # Request did not ask for stats, verify colStats are not populated.
+        assert get_table_response.table.colStats is None
+        # assert that fileMetadata is in the response
+        self.__assert_filemd(get_table_response.table.fileMetadata,
+                             get_table_response.table.dictionary)
+
+        # Test get_table_request with stats and engine set to Impala.
+        get_table_request.getColumnStats = True
+        get_table_request.engine = "impala"
+        get_table_request.getFileMetadata = False
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == "functional"
+        assert get_table_response.table.tableName == "alltypes"
+        assert get_table_response.table.colStats is not None
+        # Verify the column stats objects are populated for
+        # non-clustering columns.
+        assert len(get_table_response.table.colStats.statsObj) > 0
+        # file-metadata is not requested and hence should be not be set
+        # assert that fileMetadata is in the response
+        self.__assert_no_filemd(get_table_response.table.fileMetadata,
+                             get_table_response.table.dictionary)
+
+        # Create table in Hive and test this is properly falling back to HMS.
+        # Create table via Hive.
+        catalog_hms_client.create_database(self.__get_test_database(db_name))
+        database = catalog_hms_client.get_database(db_name)
+        assert database is not None
+        assert db_name == database.name
+        tbl_name = ImpalaTestSuite.get_random_name(
+            "test_get_table_req_with_fallback_tbl")
+        cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+        catalog_hms_client.create_table(self.__get_test_tbl(db_name, tbl_name, cols))
+
+        get_table_request.dbName = db_name
+        get_table_request.tblName = tbl_name
+        get_table_request.getColumnStats = True
+        get_table_request.getFileMetadata = True
+
+        # Engine is not specified, this should throw an exception even if
+        # fallback_to_hms_on_errors is true.
+        expected_exception = "isGetColumnStats() is true in the request but " \
+            "engine is not specified."
+        try:
+          catalog_hms_client.get_table_req(get_table_request)
+        except Exception as e:
+          if expected_exception is not None:
+            assert expected_exception in str(e)
+
+        # Set engine and request impala, this should fallback to HMS since
+        # the table is not in catalog cache. File metadata should be set even if the
+        # request is served via HMS fallback.
+        get_table_request.engine = "Impala"
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == db_name
+        assert get_table_response.table.tableName == tbl_name
+        assert get_table_response.table.fileMetadata is not None
+        # The table does not have any data so we expect the fileMetadata.data to be None
+        assert get_table_response.table.fileMetadata.data is None
+        # even if there are no files, the object dictionary should be not null and empty
+        assert get_table_response.table.dictionary is not None
+        assert len(get_table_response.table.dictionary.values) == 0
+      finally:
+        if catalog_hms_client is not None:
+            catalog_hms_client.shutdown()
+        if self.__get_database_no_throw(db_name) is not None:
+          self.hive_client.drop_database(db_name, True, True)
+
+    def __get_database_no_throw(self, db_name):
+      try:
+        return self.hive_client.get_database(db_name)
+      except:
+        return None
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899 "
+                      "--fallback_to_hms_on_errors=false"
+    )
+    def test_get_table_req_without_fallback(self):
+      """
+      Test the get_table_req APIs with fallback to HMS enabled. These calls
+      throw exceptions since we do not fallback to HMS if the db/table is not
+      in Catalog cache.
+      """
+      catalog_hms_client = None
+      db_name = ImpalaTestSuite.get_random_name(
+          "test_get_table_req_without_fallback_db")
+      new_db_name = None
+      try:
+        catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+        assert catalog_hms_client is not None
+
+        # Create table via Hive.
+        self.hive_client.create_database(self.__get_test_database(db_name))
+        database = self.hive_client.get_database(db_name)
+        assert database is not None
+        assert db_name == database.name
+        tbl_name = ImpalaTestSuite.get_random_name("test_get_table_req_tbl")
+        cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+        self.hive_client.create_table(self.__get_test_tbl(db_name, tbl_name, cols))
+
+        # Test get_table_request with stats and engine set to Impala.
+        # Test simple get_table_req without stats.
+        get_table_request = GetTableRequest()
+        get_table_request.dbName = db_name
+        get_table_request.tblName = tbl_name
+        get_table_request.getColumnStats = True
+        # Engine is not set, this should throw an exception without falling
+        # back to HMS.
+        expected_exception_str = "Column stats are requested " \
+            "but engine is not set in the request."
+        self.__call_get_table_req_expect_exception(catalog_hms_client,
+            get_table_request, expected_exception_str)
+        # Verify DB not found exception is thrown. Engine does not matter,
+        # we only return Impala table level column stats but this field is
+        # required to be consistent with Hive's implementation.
+        get_table_request.engine = "Impala"
+        expected_exception_str = "Database " + db_name + " not found"
+        self.__call_get_table_req_expect_exception(catalog_hms_client,
+            get_table_request, expected_exception_str)
+
+        # Create database in Impala
+        new_db_name = ImpalaTestSuite.get_random_name(
+            "test_get_table_req_without_fallback_db")
+        query = "create database " + new_db_name
+        self.execute_query_expect_success(self.client, query)
+        new_tbl_name = ImpalaTestSuite.get_random_name(
+            "test_get_table_req_without_fallback_tbl")
+        new_cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+        # DDLs currently only pass-through to HMS so this call will not create a table
+        # in catalogd.
+        catalog_hms_client.create_table(self.__get_test_tbl(new_db_name, new_tbl_name,
+            new_cols))
+        # Verify table not found exception is thrown
+        get_table_request.dbName = new_db_name
+        get_table_request.tblName = new_tbl_name
+        expected_exception_str = "Table " + new_db_name + "." + new_tbl_name \
+                                 + " not found"
+        self.__call_get_table_req_expect_exception(catalog_hms_client,
+            get_table_request, expected_exception_str)
+
+        # Create a table in Impala and verify that the get_table_req gets the
+        # table and stats for non-clustering columns immediately.
+        impala_tbl_name = ImpalaTestSuite.get_random_name(
+            "test_get_table_req_without_fallback_impala_tbl")
+        query = "create table " + new_db_name + "." + impala_tbl_name + \
+            "(id int) partitioned by (year int)"
+        self.execute_query_expect_success(self.client, query)
+        get_table_request.dbName = new_db_name
+        get_table_request.tblName = impala_tbl_name
+        get_table_request.getColumnStats = True
+        # Engine does not matter, we only return Impala table level column
+        # stats but this field is required to be consistent with Hive's
+        # implementation.
+        get_table_request.engine = "impala"
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == new_db_name
+        assert get_table_response.table.tableName == impala_tbl_name
+        assert get_table_response.table.colStats is not None
+        assert get_table_response.table.colStats.statsObj is not None
+        # Verify there is a ColumnStatisticsObj only for non-clustering columns
+        # (id in this case).
+        assert len(get_table_response.table.colStats.statsObj) == 1
+        assert get_table_response.table.colStats.statsObj[0].colName == "id"
+
+        # Verify a non-default catalog in the request throws an exception.
+        get_table_request.catName = "testCatalog"
+        expected_exception_str = "Catalog service does not support " \
+            "non-default catalogs"
+        self.__call_get_table_req_expect_exception(catalog_hms_client,
+            get_table_request, expected_exception_str)
+        # fetch file-metadata of a non-partitioned table
+        get_table_request = GetTableRequest()
+        get_table_request.dbName = "functional"
+        get_table_request.tblName = "tinytable"
+        get_table_request.getFileMetadata = True
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == "functional"
+        assert get_table_response.table.tableName == "tinytable"
+        self.__assert_filemd(get_table_response.table.fileMetadata,
+                             get_table_response.table.dictionary)
+
+        # fetch file-metadata along with stats
+        get_table_request = GetTableRequest()
+        get_table_request.dbName = "functional"
+        get_table_request.tblName = "tinytable"
+        get_table_request.getFileMetadata = True
+        get_table_request.getColumnStats = True
+        get_table_request.engine = "impala"
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == "functional"
+        assert get_table_response.table.tableName == "tinytable"
+        self.__assert_filemd(get_table_response.table.fileMetadata,
+                             get_table_response.table.dictionary)
+        assert get_table_response.table.colStats is not None
+        assert get_table_response.table.colStats.statsObj is not None
+      finally:
+        if catalog_hms_client is not None:
+          catalog_hms_client.shutdown()
+        if self.__get_database_no_throw(db_name) is not None:
+          self.hive_client.drop_database(db_name, True, True)
+        if self.__get_database_no_throw(new_db_name) is not None:
+          self.hive_client.drop_database(new_db_name, True, True)
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899 "
+                      "--fallback_to_hms_on_errors=false"
+    )
+    def test_get_partitions_by_names(self):
+      catalog_hms_client = None
+      try:
+        catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+        assert catalog_hms_client is not None
+        valid_part_names = ["year=2009/month=1", "year=2009/month=2", "year=2009/month=3",
+           "year=2009/month=4"]
+        self.__run_partitions_by_names_tests(catalog_hms_client, "functional_parquet",
+          "alltypestiny", valid_part_names)
+      finally:
+        if catalog_hms_client is not None:
+          catalog_hms_client.shutdown()
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899 "
+                      "--fallback_to_hms_on_errors=true"
+    )
+    def test_fallback_get_partitions_by_names(self, unique_database):
+      """
+      Test makes sure that the fall-back path is working in case of errors in catalogd's
+      implementation of get_partitions_by_name_req.
+      """
+      catalog_client = None
+      try:
+        catalog_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+        assert catalog_client is not None
+        assert self.hive_client is not None
+        self.__create_test_tbls_from_hive(unique_database)
+        valid_part_names = ["year=2009/month=1", "year=2009/month=2", "year=2009/month=3",
+                            "year=2009/month=4"]
+        self.__run_partitions_by_names_tests(catalog_client, unique_database,
+          TestMetastoreService.part_tbl, valid_part_names, True)
+      finally:
+        if catalog_client is not None:
+          catalog_client.shutdown()
+
+    def __create_test_tbls_from_hive(self, db_name):
+      """Util method to create test tables from hive in the given database. It creates
+      4 tables (partitioned and unpartitioned) for non-acid and acid cases and returns
+      the valid partition names for the partitioned tables."""
+      # create a partitioned table
+      # Creating a table from hive and inserting into it takes very long. Hence we create
+      # a external table pointing to an existing table location
+      tbl_location = self.hive_client.get_table("functional", "alltypessmall").sd.location
+      self.run_stmt_in_hive(
+        "create external table {0}.{1} like functional.alltypessmall location '{2}'"
+          .format(db_name, TestMetastoreService.part_tbl, tbl_location))
+      self.run_stmt_in_hive(
+        "msck repair table {0}.{1}".format(db_name, TestMetastoreService.part_tbl))
+      # TODO create a acid partitioned table
+      tbl_location = self.hive_client.get_table("functional", "tinytable").sd.location
+      self.run_stmt_in_hive(
+        "create external table {0}.{1} like functional.tinytable location '{2}'"
+          .format(db_name, TestMetastoreService.unpart_tbl, tbl_location))
+      self.run_stmt_in_hive(
+        "create table default.{0} (c1 int)"
+          .format(TestMetastoreService.default_unknowntbl))
+
+    @classmethod
+    def __run_get_table_tests(cls, catalog_hms_client, db_name, tbl_name,
+                              fallback_expected=False):
+      """
+      Issues get_table_req for various cases and validates the response.
+      """
+      # Test simple get_table_req without stats.
+      get_table_request = GetTableRequest()
+      get_table_request.dbName = db_name
+      get_table_request.tblName = tbl_name
+      get_table_response = catalog_hms_client.get_table_req(get_table_request)
+      assert get_table_response.table.dbName == db_name
+      assert get_table_response.table.tableName == tbl_name
+      # Request did not ask for stats, verify colStats are not populated.
+      assert get_table_response.table.colStats is None
+
+      # Test get_table_request with stats and engine set to Impala.
+      get_table_request.getColumnStats = True
+      get_table_request.engine = "impala"
+      get_table_response = catalog_hms_client.get_table_req(get_table_request)
+      assert get_table_response.table.dbName == db_name
+      assert get_table_response.table.tableName == tbl_name
+      assert get_table_response.table.colStats is not None
+      # Verify the column stats objects are populated for
+      # non-clustering columns.
+      assert len(get_table_response.table.colStats.statsObj) > 0
+
+      # We only return Impala table level column stats but engine field is
+      # required to be consistent with Hive's implementation.
+      get_table_request.engine = "hive"
+      get_table_response = catalog_hms_client.get_table_req(get_table_request)
+      assert get_table_response.table.dbName == db_name
+      assert get_table_response.table.tableName == tbl_name
+      assert get_table_response.table.colStats is not None
+      # Verify the column stats objects are populated for
+      # non-clustering columns.
+      assert len(get_table_response.table.colStats.statsObj) > 0
+
+      # request for tables which are unknown to catalogd should fallback to HMS
+      # if fallback_expected if True
+      get_table_request.dbName = TestMetastoreService.test_db_name
+      get_table_request.tblName = TestMetastoreService.unpart_tbl
+      get_table_request.getColumnStats = True
+
+      # Engine is not specified, this should throw an exception even if
+      # fallback_to_hms_on_errors is true.
+      expected_exception = "isGetColumnStats() is true in the request but " \
+                           "engine is not specified."
+      try:
+        catalog_hms_client.get_table_req(get_table_request)
+      except Exception as e:
+        if expected_exception is not None:
+          assert expected_exception in str(e)
+
+      # Set engine and request impala, this should fallback to HMS since
+      # the table is not in catalog cache.
+      get_table_request.engine = "Impala"
+      if fallback_expected:
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == db_name
+        assert get_table_response.table.tableName == tbl_name
+      else:
+        expected_exception = None
+        try:
+          catalog_hms_client.get_table_req(get_table_request)
+        except Exception as e:
+          expected_exception = e
+        assert expected_exception is not None
+
+    def __run_partitions_by_names_tests(self, catalog_hms_client, db_name, tbl_name,
+                                        valid_part_names, expect_fallback=False):
+      """This method runs all the test cases for fetching partitions. The method
+      expects that the partition keys are year and month for the given table and
+      that there are atleast 3 partitions in the table."""
+      # Test get_partition_by_name_req
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      # must have more than 3 partitions to test the filtering below
+      assert len(valid_part_names) > 3
+      part_names = valid_part_names[:3]
+      part_name_format = "year={0}/month={1}"
+      get_parts_req.names = part_names
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      assert response.partitions is not None
+      assert len(response.partitions) == 3
+      self.__validate_partitions(part_names, response, part_name_format)
+      # request partitions with file-metadata
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      get_parts_req.names = part_names
+      get_parts_req.getFileMetadata = True
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      assert response.partitions is not None
+      assert len(response.partitions) == 3
+      self.__validate_partitions(part_names, response, part_name_format,
+                                 True)
+      # request contains unavailable partitions
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      part_names = [x for x in valid_part_names]
+      part_names[0] = "year=2009/month=13"
+      get_parts_req.names = part_names
+      get_parts_req.getFileMetadata = True
+      assert get_parts_req.names is not None
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      assert response.partitions is not None
+      assert len(response.partitions) == 3
+      # remove the bad partname for comparison below
+      part_names.pop(0)
+      self.__validate_partitions(part_names, response, part_name_format, True)
+      # request contains empty partition names
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      get_parts_req.names = []
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      assert response.partitions is not None
+      assert len(response.partitions) == 0
+      # Negative test cases
+      # invalid partition keys
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      get_parts_req.names = ["invalid-key=1"]
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      # in this case we replicate what HMS does which is to return 0 partitions
+      assert len(response.partitions) == 0
+      # empty table name
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = ""
+      get_parts_req.names = []
+      if expect_fallback:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                   "NoSuchObjectException")
+      else:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                 "Table name is empty or null")
+      # empty db name
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = ""
+      get_parts_req.tbl_name = tbl_name
+      get_parts_req.names = []
+      if expect_fallback:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                   "NoSuchObjectException")
+      else:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                 "Database name is empty or null")
+      # table does not exist
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = "table-does-not-exist"
+      get_parts_req.names = []
+      if expect_fallback:
+        # TODO HMS actually throws an InvalidObjectException but the HMS API signature
+        # doesn't declare it in the signature.
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                   "Internal error")
+      else:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+          "Table {0}.table-does-not-exist not found".format(
+            db_name))
+
+    @classmethod
+    def __validate_partitions(cls, part_names, get_parts_by_names_result, name_format,
+                              expect_files=False):
+      """
+      Validates that the given list of partitions contains all the partitions from the
+      given list of partition names.
+      """
+      test_part_names = list(part_names)
+      if expect_files:
+        assert get_parts_by_names_result.dictionary is not None
+        assert len(get_parts_by_names_result.dictionary.values) > 0
+      else:
+        assert get_parts_by_names_result.dictionary is None
+      partitions = get_parts_by_names_result.partitions
+      for part in partitions:
+        assert part is not None
+        # we create the part name here since partition object only has keys
+        assert len(part.values) == 2
+        name = name_format.format(part.values[0], part.values[1])
+        assert name in test_part_names
+        if expect_files:
+          assert part.fileMetadata is not None
+          assert part.fileMetadata.data is not None
+        else:
+          assert part.fileMetadata is None
+        test_part_names.remove(name)
+      assert len(test_part_names) == 0
+
+    def __assert_filemd(self, filemetadata, obj_dict):
+      """
+      Util method which asserts that the given file-metadata is valid.
+      """
+      assert filemetadata is not None
+      assert filemetadata.data is not None
+      assert obj_dict is not None
+      assert len(obj_dict.values) > 0
+
+    def __assert_no_filemd(self, filemetadata, obj_dict):
+      """
+      Util method which asserts that the given file-metadata not valid. Used to verify
+      that the HMS response objects do not contain the file-metadata.
+      """
+      assert filemetadata is None
+      assert obj_dict is None
+
+    @classmethod
+    def __call_get_table_req_expect_exception(cls, client,
+        get_table_request, expected_exception_str=None):
+      exception_recieved = None
+      try:
+        client.get_table_req(get_table_request)
+      except Exception as e:
+        exception_recieved = e
+        if expected_exception_str is not None:
+          assert expected_exception_str in str(e)
+      assert exception_recieved is not None
+
+    @classmethod
+    def __get_parts_by_names_expect_exception(self, catalog_hms_client,
+        request, expected_exception_str=None):
+      """
+      Calls get_partitions_by_names_req on the HMS client and expects
+      it to call with the exception msg as provided in expected_exception_str
+      """
+      exception = None
+      try:
+        catalog_hms_client.get_partitions_by_names_req(request)
+      except Exception as e:
+        exception = e
+        if expected_exception_str is not None:
+          assert expected_exception_str in str(e)
+      assert exception is not None
+
+    def __compare_cols(self, cols, fieldSchemas):
+        """
+        Compares the given list of fieldSchemas with the expected cols
+        """
+        assert len(cols) == len(fieldSchemas)
+        for i in range(len(cols)):
+            assert cols[i][0] == fieldSchemas[i].name
+            assert cols[i][1] == fieldSchemas[i].type
+            assert cols[i][2] == fieldSchemas[i].comment
+
+    def __get_test_database(self, db_name,
+        description="test_db_for_metastore_service"):
+        db = Database()
+        db.name = db_name
+        db.desription = description
+        return db
+
+    def __get_test_tbl(self, db_name, tbl_name, cols, part_cols=None):
+        tbl = Table()
+        tbl.dbName = db_name
+        tbl.tableName = tbl_name
+        sd = StorageDescriptor()
+        tbl.sd = sd
+        sd.cols = self.__create_field_schemas(cols)
+        sd.inputFormat = \
+          "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
+        sd.outputFormat = \
+          "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
+        if part_cols is not None:
+            tbl.partitionKeys = self.__create_field_schemas(part_cols)
+        serde = SerDeInfo()
+        serde.name = ""
+        serde.serializationLib = \
+          "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+        tbl.sd.serdeInfo = serde
+        return tbl
+
+    def __create_field_schemas(self, cols):
+        fieldSchemas = []
+        for col in cols:
+            f = FieldSchema()
+            f.name = col[0]
+            f.type = col[1]
+            f.comment = col[2]
+            fieldSchemas.append(f)
+        return fieldSchemas