You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/05/19 01:36:52 UTC

[impala] 02/05: Revert "Revert "IMPALA-10613: Standup HMS thrift server in Catalog""

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5c85bf5c54fb1e0dffbe01b3f70c289de3700a66
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed May 12 07:30:48 2021 -0700

    Revert "Revert "IMPALA-10613: Standup HMS thrift server in Catalog""
    
    This reverts commit 829d1a6ab4643b07877fb410971b67f1b1d1b045.
    
    Additionally, this patch has couple of addendums which are related
    to the original change:
    1. Bug fix the original reverted commit which uses
    isSetGetFileMetadata instead of isGetFileMetadata
    (see https://gerrit.cloudera.org/#/c/17330/)
    2. Fix for intermittent failures on CatalogHmsFileMetadataTest
    due to the limitation of the catalogd's HMS client requirement
    of need to set "hive.metastore.execute.setugi" to false.
    
    Change-Id: Icbe93f3ae4efd585d4b0092a9ac7081b0b2c1c44
    Reviewed-on: http://gerrit.cloudera.org:8080/17429
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Aman Sinha <am...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |   10 +
 be/src/common/global-flags.cc                      |    5 +
 be/src/util/backend-gflag-util.cc                  |    8 +
 common/thrift/BackendGflags.thrift                 |    8 +
 common/thrift/CatalogService.thrift                |    5 +
 .../org/apache/impala/compat/MetastoreShim.java    |    2 +-
 .../apache/impala/catalog/CatalogHmsAPIHelper.java |  592 +++++
 .../impala/catalog/CatalogServiceCatalog.java      |   44 +-
 .../GetPartialCatalogObjectRequestBuilder.java     |  147 ++
 .../java/org/apache/impala/catalog/HdfsTable.java  |    6 +-
 .../impala/catalog/ParallelFileMetadataLoader.java |   19 +-
 .../main/java/org/apache/impala/catalog/Table.java |   11 +-
 .../catalog/metastore/CatalogHmsClientUtils.java   |  159 ++
 .../catalog/metastore/CatalogMetastoreServer.java  |  289 +++
 .../metastore/CatalogMetastoreServiceHandler.java  |  152 ++
 .../catalog/metastore/ICatalogMetastoreServer.java |   47 +
 .../catalog/metastore/MetastoreServiceHandler.java | 2736 ++++++++++++++++++++
 .../metastore/NoOpCatalogMetastoreServer.java      |   44 +
 .../org/apache/impala/service/BackendConfig.java   |   22 +
 .../metastore/CatalogHmsFileMetadataTest.java      |  169 ++
 .../metastore/EnableCatalogdHmsCacheFlagTest.java  |  130 +
 .../impala/testutil/CatalogServiceTestCatalog.java |   89 +-
 tests/common/impala_test_suite.py                  |   27 +
 tests/custom_cluster/test_metastore_service.py     |  707 +++++
 24 files changed, 5406 insertions(+), 22 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index d354deb..f7e7b35 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -98,6 +98,16 @@ DEFINE_int32(max_wait_time_for_sync_ddl_s, 0, "Maximum time (in seconds) until "
      "coordinators might have applied the changes caused due to the ddl.");
 
 DECLARE_string(debug_actions);
+DEFINE_bool(start_hms_server, false, "When set to true catalog server starts a HMS "
+    "server at a port specified by hms_port flag");
+
+DEFINE_int32(hms_port, 5899, "If start_hms_server is set to true, this "
+    "configuration specifies the port number at which it is started.");
+
+DEFINE_bool(fallback_to_hms_on_errors, true, "This configuration is only used if "
+    "start_hms_server is true. This is used to determine if the Catalog should fallback "
+    "to the backing HMS service if there are errors while processing the HMS request");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 795f2e4..5a4ca3c 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -346,6 +346,11 @@ DEFINE_bool(enable_incremental_metadata_updates, true,
     "propagated as a whole object in the statestore topic updates. Note that legacy "
     "coordinators can apply incremental or full table updates so don't need this flag.");
 
+DEFINE_bool(enable_catalogd_hms_cache, true,
+    "If true, response for the HMS APIs that are implemented in catalogd will be served "
+    "from catalogd. If this flag is false or a given API is not implemented in catalogd,"
+    " it will be redirected to HMS.");
+
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index a218c06..a15b633 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -91,6 +91,10 @@ DECLARE_int64(topic_update_tbl_max_wait_time_ms);
 DECLARE_int32(catalog_max_lock_skipped_topic_updates);
 DECLARE_string(scratch_dirs);
 DECLARE_int32(max_wait_time_for_sync_ddl_s);
+DECLARE_bool(start_hms_server);
+DECLARE_int32(hms_port);
+DECLARE_bool(fallback_to_hms_on_errors);
+DECLARE_bool(enable_catalogd_hms_cache);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -285,6 +289,10 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_scratch_dirs(FLAGS_scratch_dirs);
   cfg.__set_max_wait_time_for_sync_ddl_s(FLAGS_max_wait_time_for_sync_ddl_s);
   cfg.__set_allow_ordinals_in_having(FLAGS_allow_ordinals_in_having);
+  cfg.__set_start_hms_server(FLAGS_start_hms_server);
+  cfg.__set_hms_port(FLAGS_hms_port);
+  cfg.__set_fallback_to_hms_on_errors(FLAGS_fallback_to_hms_on_errors);
+  cfg.__set_enable_catalogd_hms_cache(FLAGS_enable_catalogd_hms_cache);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 2d25efe..c483469 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -197,4 +197,12 @@ struct TBackendGflags {
   86: required i32 max_wait_time_for_sync_ddl_s
 
   87: required bool allow_ordinals_in_having
+
+  88: required bool start_hms_server
+
+  89: required i32 hms_port
+
+  90: required bool fallback_to_hms_on_errors
+
+  91: required bool enable_catalogd_hms_cache
 }
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 852c0ad..d70e4b6 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -375,6 +375,11 @@ struct TTableInfoSelector {
   // with the HMS table which it has and triggers a reload in case it doesn't match.
   // this field is only used when valid_write_ids is set, otherwise it is ignored
   10: optional i64 table_id = -1
+
+  // The response should contain the column statistics for all columns. If this is set
+  // to true, the list provided in want_stats_for_column_names will be ignored and
+  // stats for all columns will be returned.
+  11: optional bool want_stats_for_all_columns
 }
 
 // Returned information about a particular partition.
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index de7c586..523ca21 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -190,7 +190,7 @@ public class MetastoreShim {
    * Constant variable that stores engine value needed to store / access
    * Impala column statistics.
    */
-  protected static final String IMPALA_ENGINE = "impala";
+  public static final String IMPALA_ENGINE = "impala";
 
   /**
    * Wrapper around MetaStoreUtils.validateName() to deal with added arguments.
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
new file mode 100644
index 0000000..ddde537
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
@@ -0,0 +1,592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FileMetadata;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.ObjectDictionary;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.analysis.TableName;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.metastore.CatalogMetastoreServiceHandler;
+import org.apache.impala.common.Pair;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.thrift.CatalogLookupStatus;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialPartitionInfo;
+import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Util class which includes helper methods to fetch the information from
+ * Catalog in order to serve the HMS APIs.
+ */
+public class CatalogHmsAPIHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogHmsAPIHelper.class);
+  // we log at info level if the file-metadata load time on the fallback path took more
+  // than 100 msec.
+  //TODO change this to per directory level value based on empirical average value
+  //for various filesystems.
+  private static final long FALLBACK_FILE_MD_TIME_WARN_THRESHOLD_MS = 100;
+
+  //TODO Make this individually configurable
+  private static final ExecutorService fallbackFdLoaderPool = Executors
+      .newFixedThreadPool(20,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("HMS-Filemetadata-loader-%d").build());
+
+  public static final String IMPALA_TNETWORK_ADDRESSES = "impala:TNetworkAddress";
+
+  /**
+   * Helper method to serve a get_table_req() API via the catalog. If isGetColumnStats()
+   * is true in the request, we check that the engine is Impala.
+   *
+   * @param catalog         The catalog instance which caches the table.
+   * @param getTableRequest The GetTableRequest object passed by the caller.
+   * @return GetTableResult for the given request.
+   */
+  public static GetTableResult getTableReq(CatalogServiceCatalog catalog,
+      String defaultCatalogName, GetTableRequest getTableRequest)
+      throws CatalogException, NoSuchObjectException, MetaException {
+    checkCatalogName(getTableRequest.getCatName(), defaultCatalogName);
+    checkCondition(getTableRequest.getDbName() != null,
+        "Database name is null");
+    checkCondition(getTableRequest.getTblName() != null,
+        "Table name is null");
+    String dbName = getTableRequest.getDbName();
+    String tblName = getTableRequest.getTblName();
+    TableName tableName = new TableName(dbName, tblName);
+    GetPartialCatalogObjectRequestBuilder reqBuilder =
+        new GetPartialCatalogObjectRequestBuilder()
+            .db(dbName)
+            .tbl(tblName);
+    // Make sure if getColumnStats() is true, the processor engine is set to Impala.
+    if (getTableRequest.isGetColumnStats()) {
+      checkCondition(getTableRequest.getEngine() != null, "Column stats are requested "
+          + "but engine is not set in the request.");
+      checkCondition(
+          MetastoreShim.IMPALA_ENGINE.equalsIgnoreCase(getTableRequest.getEngine()),
+          "Unsupported engine " + getTableRequest.getEngine()
+              + " while requesting column statistics of the table " + dbName + "."
+              + tblName);
+      reqBuilder.wantStatsForAllColums();
+    }
+    // if the client request has getFileMetadata flag set,
+    // we retrieve file descriptors too
+    if (getTableRequest.isGetFileMetadata()) {
+      reqBuilder.wantFiles();
+    }
+    if (getTableRequest.isSetValidWriteIdList()) {
+      reqBuilder.writeId(getTableRequest.getValidWriteIdList());
+    }
+    //TODO add table id in the request when client passes it. Also, looks like
+    // when HIVE-24662 is resolved, we may not need the table id in this request.
+    TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(
+        catalog, reqBuilder.build(), dbName, tblName, "processing get_table_req");
+    // As of writing this code, we know that the table which is returned in the response
+    // is a copy of table stored in the catalogd. Should this assumption change in the
+    // future, we must make sure that we take a deepCopy() of the table before modifying
+    // it later below.
+    Table retTable = response.table_info.hms_table;
+    checkCondition(
+        !AcidUtils.isTransactionalTable(retTable.getParameters()) || getTableRequest
+            .isSetValidWriteIdList(), "Table " + dbName + "." + tblName
+            + " is transactional but it was requested without providing"
+            + " validWriteIdList");
+    if (getTableRequest.isGetColumnStats()) {
+      List<ColumnStatisticsObj> columnStatisticsObjList =
+          response.table_info.column_stats;
+      checkCondition(columnStatisticsObjList != null,
+          "Catalog returned a null ColumnStatisticsObj for table %s", tableName);
+      // Set the table level column statistics for this table.
+      ColumnStatistics columnStatistics = MetastoreShim.createNewHiveColStats();
+      columnStatistics.setStatsDesc(new ColumnStatisticsDesc(true, dbName, tblName));
+      for (ColumnStatisticsObj colsStatsObj : columnStatisticsObjList) {
+        columnStatistics.addToStatsObj(colsStatsObj);
+      }
+      retTable.setColStats(columnStatistics);
+    }
+    if (getTableRequest.isGetFileMetadata()) {
+      // set the file-metadata in the response
+      checkCondition(response.table_info.partitions != null,
+          "File metadata was not returned by catalog");
+      checkCondition(response.table_info.partitions.size() == 1,
+          "Retrieving file-metadata for partitioned tables must use partition level"
+              + " fetch APIs");
+      FileMetadata fileMetadata = new FileMetadata();
+      for (THdfsFileDesc fd : response.table_info.partitions.get(0).file_descriptors) {
+        fileMetadata.addToData(fd.file_desc_data);
+      }
+      retTable.setFileMetadata(fileMetadata);
+      retTable.setDictionary(getSerializedNetworkAddress(
+          response.table_info.network_addresses));
+    }
+    return new GetTableResult(retTable);
+  }
+
+  /**
+   * Util method to issue the {@link CatalogServiceCatalog#getPartialCatalogObject} and
+   * parse the response to do some sanity checks.
+   */
+  private static TGetPartialCatalogObjectResponse getPartialCatalogObjResponse(
+      CatalogServiceCatalog catalog, TGetPartialCatalogObjectRequest request,
+      String dbName, String tblName, String mesg)
+      throws CatalogException, NoSuchObjectException {
+    TGetPartialCatalogObjectResponse response = catalog
+        .getPartialCatalogObject(request, mesg);
+    checkCondition(response != null, "Catalog returned a null response");
+    if (response.lookup_status == CatalogLookupStatus.DB_NOT_FOUND) {
+      throw new NoSuchObjectException("Database " + dbName + " not found");
+    }
+    if (response.lookup_status == CatalogLookupStatus.TABLE_NOT_FOUND) {
+      throw new NoSuchObjectException("Table " + dbName + "." + tblName + " not found");
+    }
+    checkCondition(response.lookup_status != CatalogLookupStatus.TABLE_NOT_LOADED,
+        "Could not load table %s.%s", dbName, tblName);
+    checkCondition(response.table_info.hms_table != null,
+        "Catalog returned a null table for %s.%s", dbName, tblName);
+    return response;
+  }
+
+  /**
+   * Helper method to get the partitions filtered by a Hive expression.
+   *
+   * @param catalog         The catalog instance which caches the table.
+   * @param defaultCatalog  Currently, custom catalog names are not supported.
+   *                        If the catalog is not null it must be same as defaultCatalog.
+   * @param request         The request object as received from the HMS client.
+   * @param expressionProxy The PartitionExpressionProxy instance which is used to
+   *                        deserialize the filter expression.
+   * @return PartitionsByExprResult for the given request.
+   * @throws CatalogException If there are any internal processing errors (eg. table does
+   *                          not exist in Catalog cache) within the context of Catalog.
+   * @throws MetaException    If the expression cannot be deserialized using the given
+   *                          PartitionExpressionProxy.
+   */
+  public static PartitionsByExprResult getPartitionsByExpr(CatalogServiceCatalog catalog,
+      String defaultCatalog, PartitionsByExprRequest request,
+      PartitionExpressionProxy expressionProxy)
+      throws CatalogException, NoSuchObjectException, MetaException {
+    checkCatalogName(request.getCatName(), defaultCatalog);
+    checkCondition(request.getDbName() != null, "Database name is null");
+    checkCondition(request.getTblName() != null, "Table name is null");
+    String dbName = request.getDbName();
+    String tblName = request.getTblName();
+    TableName tableName = new TableName(dbName, tblName);
+    // TODO we are currently fetching all the partitions metadata here since catalog
+    // loads the entire table. Once we have fine-grained loading, we should just get
+    // the partitionNames here and then get the partition metadata after the pruning
+    // is done.
+    GetPartialCatalogObjectRequestBuilder catalogReq =
+        new GetPartialCatalogObjectRequestBuilder()
+            .db(dbName)
+            .tbl(tblName)
+            .wantPartitions();
+    // set the validWriteIdList if available
+    if (request.isSetValidWriteIdList()) {
+      catalogReq.writeId(request.getValidWriteIdList());
+    }
+    // set the table id if available
+    if (request.isSetId()) {
+      catalogReq.tableId(request.getId());
+    }
+    TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
+        catalogReq.build(), dbName, tblName,
+        "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_EXPR);
+    checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
+        "%s is not a partitioned table", tableName);
+    // create a mapping of the Partition name to the Partition so that we can return the
+    // filtered partitions later.
+    Map<String, TPartialPartitionInfo> partitionNameToPartInfo = new HashMap<>();
+    for (TPartialPartitionInfo partInfo : response.getTable_info().getPartitions()) {
+      partitionNameToPartInfo.put(partInfo.getName(), partInfo);
+    }
+    List<String> filteredPartNames = Lists.newArrayList(partitionNameToPartInfo.keySet());
+    Stopwatch st = Stopwatch.createStarted();
+    boolean hasUnknownPartitions = expressionProxy
+        .filterPartitionsByExpr(response.table_info.hms_table.getPartitionKeys(),
+            request.getExpr(), request.getDefaultPartitionName(), filteredPartNames);
+    LOG.info("{}/{} partitions were selected for table {} after expression evaluation."
+            + " Time taken: {} msec.", filteredPartNames.size(),
+        partitionNameToPartInfo.size(), tableName,
+        st.stop().elapsed(TimeUnit.MILLISECONDS));
+    List<Partition> filteredPartitions = Lists
+        .newArrayListWithCapacity(filteredPartNames.size());
+    // TODO add file-metadata to Partitions. This would requires changes to HMS API
+    // so that request can pass a flag to send back filemetadata
+    for (String partName : filteredPartNames) {
+      // Note that we are not using String.format arguments here since String.format()
+      // throws a java.util.MissingFormatArgumentException for special characters like
+      // '%3A' which could be present in the PartitionName.
+      checkCondition(partitionNameToPartInfo.containsKey(partName),
+          "Could not find partition id for partition name " + partName);
+      TPartialPartitionInfo partInfo = partitionNameToPartInfo.get(partName);
+      checkCondition(partInfo != null,
+          "Catalog did not return the partition " + partName + " for table " + tableName,
+          partName, tableName);
+      filteredPartitions.add(partitionNameToPartInfo.get(partName).getHms_partition());
+    }
+    // confirm if the number of partitions is equal to number of filtered ids
+    checkCondition(filteredPartNames.size() == filteredPartitions.size(),
+        "Unexpected number of partitions received. Expected %s got %s",
+        filteredPartNames.size(), filteredPartitions.size());
+    PartitionsByExprResult result = new PartitionsByExprResult();
+    result.setPartitions(filteredPartitions);
+    result.setHasUnknownPartitions(hasUnknownPartitions);
+    return result;
+  }
+
+  /**
+   * Throws a {@code CatalogException} with the given message string and arguments if the
+   * condition is False.
+   *
+   * @param condition throws CatalogException when the condition is False.
+   * @param msg       msg compatible with {@code String.format()} format.
+   * @param args      args for the {@code String.format()} to used as message in the
+   *                  thrown exception.
+   * @throws CatalogException if condition is False.
+   */
+  private static void checkCondition(boolean condition, String msg, Object... args)
+      throws CatalogException {
+    if (condition) return;
+    throw new CatalogException(String.format(msg, args));
+  }
+
+  /**
+   * Catalog service does not support {@link org.apache.hadoop.hive.metastore.api.Catalog}
+   * currently. This method validates the name of the given catalog with default Catalog
+   * and throws a {@link MetaException} if it doesn't match.
+   */
+  public static void checkCatalogName(String catalogName, String defaultCatalogName)
+      throws MetaException {
+    if (catalogName == null) return;
+    try {
+      checkCondition(defaultCatalogName != null, "Default catalog name is null");
+      checkCondition(defaultCatalogName.equalsIgnoreCase(catalogName),
+          "Catalog service does not support non-default catalogs. Expected %s got %s",
+          defaultCatalogName, catalogName);
+    } catch (CatalogException ex) {
+      LOG.error(ex.getMessage(), ex);
+      throw new MetaException(ex.getMessage());
+    }
+  }
+
+  /**
+   * Helper method to return a list of Partitions filtered by names. Currently partition
+   * level column statistics cannot be requested and this method throws an exception if
+   * the request has get_col_stats parameter set. If the request has {@code
+   * getFileMetadata} set the returned response will include the file-metadata as well.
+   */
+  public static GetPartitionsByNamesResult getPartitionsByNames(
+      CatalogServiceCatalog catalog, Configuration serverConf,
+      GetPartitionsByNamesRequest request)
+      throws CatalogException, NoSuchObjectException, MetaException {
+    // in case of GetPartitionsByNamesRequest there is no explicit catalogName
+    // field and hence the dbName is prepended to the database name.
+    // TODO this needs to be fixed in HMS APIs so that we have explicit catalog field.
+    String catAnddbName = request.getDb_name();
+    String tblName = request.getTbl_name();
+    checkCondition(!Strings.isNullOrEmpty(catAnddbName),
+        "Database name is empty or null");
+    checkCondition(!Strings.isNullOrEmpty(tblName), "Table name is empty or null");
+    String[] parsedCatDbName = MetaStoreUtils
+        .parseDbName(request.getDb_name(), serverConf);
+    checkCondition(parsedCatDbName.length == 2,
+        "Unexpected error during parsing the catalog and database name %s", catAnddbName);
+    checkCatalogName(parsedCatDbName[0], MetaStoreUtils.getDefaultCatalog(serverConf));
+    String dbName = parsedCatDbName[1];
+    //TODO partition granularity column statistics are not supported in catalogd currently
+    checkCondition(!request.isGet_col_stats(),
+        "Partition level column statistics are not supported in catalog");
+    GetPartialCatalogObjectRequestBuilder requestBuilder =
+        new GetPartialCatalogObjectRequestBuilder()
+            .db(dbName)
+            .tbl(tblName)
+            .wantPartitions();
+    // get the file metadata if the request has it set.
+    if (request.isGetFileMetadata()) {
+      requestBuilder.wantFiles();
+    }
+    if (request.isSetValidWriteIdList()) {
+      requestBuilder.writeId(request.getValidWriteIdList());
+    }
+    //TODO add table id in the request when client passes it. Also, looks like
+    // when HIVE-24662 is resolved, we may not need the table id in this request.
+    TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
+        requestBuilder.build(), dbName, tblName,
+        "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_NAMES);
+    checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
+        "%s.%s is not a partitioned table", dbName, tblName);
+    checkCondition(
+        !request.isGetFileMetadata() || response.table_info.network_addresses != null,
+        "Network addresses were not returned for %s.%s", dbName, tblName);
+    checkCondition(
+        !AcidUtils.isTransactionalTable(response.table_info.hms_table.getParameters())
+            || request.isSetValidWriteIdList(), "Table " + dbName + "." + tblName
+            + " is a transactional table but partitions were requested without "
+            + "providing validWriteIdList");
+    // create a mapping of the Partition name to the Partition so that we can return the
+    // filtered partitions later.
+    Set<String> requestedNames = new HashSet<>(request.getNames());
+    List<Partition> retPartitions = new ArrayList<>();
+    // filter by names
+    for (TPartialPartitionInfo partInfo : response.getTable_info().getPartitions()) {
+      if (requestedNames.contains(partInfo.getName())) {
+        // as of writing this code, we know that the partInfo object has a copy of the
+        // the partition which is stored in the catalogd instead of the reference to the
+        // actual partition object of the catalogd. Hence modifying it below is okay.
+        // Should this assumption change in the future, we must make a deepCopy of the
+        // partition object here to make sure that we don't modify the state of the
+        // partition in the catalogd.
+        Partition part = partInfo.getHms_partition();
+        if (request.isGetFileMetadata()) {
+          FileMetadata fileMetadata = new FileMetadata();
+          checkCondition(partInfo.file_descriptors != null,
+              "Catalog did not return file descriptors for partition %s of table %s.%s",
+              partInfo.getName(), dbName, tblName);
+          for (THdfsFileDesc fd : partInfo.file_descriptors) {
+            fileMetadata.addToData(fd.file_desc_data);
+          }
+          part.setFileMetadata(fileMetadata);
+        }
+        retPartitions.add(part);
+      }
+    }
+    GetPartitionsByNamesResult result = new GetPartitionsByNamesResult(retPartitions);
+    if (request.isGetFileMetadata()) {
+      result.setDictionary(getSerializedNetworkAddress(
+          response.table_info.network_addresses));
+    }
+    return result;
+  }
+
+  /**
+   * Util method to serialize a given list of {@link TNetworkAddress} into a {@link
+   * ObjectDictionary}.
+   */
+  public static ObjectDictionary getSerializedNetworkAddress(
+      List<TNetworkAddress> networkAddresses) throws CatalogException {
+    checkCondition(networkAddresses != null, "Network addresses is null");
+    // we assume that this method is only called when we want to return the file-metadata
+    // in the response in which case there should always a valid ObjectDictionary to be
+    // returned.
+    ObjectDictionary result = new ObjectDictionary(Maps.newHashMap());
+    if (networkAddresses.isEmpty()) return result;
+    List<ByteBuffer> serializedAddresses = new ArrayList<>();
+    TSerializer serializer =
+        new TSerializer(new TCompactProtocol.Factory());
+    for (TNetworkAddress networkAddress : networkAddresses) {
+      byte[] serializedNetAddress;
+      try {
+        serializedNetAddress = serializer.serialize(networkAddress);
+      } catch (TException tException) {
+        throw new CatalogException(
+            "Could not serialize network address " + networkAddress.hostname + ":"
+                + networkAddress.port, tException);
+      }
+      serializedAddresses.add(ByteBuffer.wrap(serializedNetAddress));
+    }
+    result.putToValues(IMPALA_TNETWORK_ADDRESSES, serializedAddresses);
+    return result;
+  }
+
+  /**
+   * This method computes the file-metadata directly from filesystem and sets it in the
+   * Table object of the provided GetTableResult. The file-metadata is loaded using a
+   * common static thread pool and is consistent with the given ValidTxnList and
+   * ValidWriteIdList.
+   *
+   * @throws MetaException in case there were errors while loading file-metadata.
+   */
+  public static void loadAndSetFileMetadataFromFs(@Nullable ValidTxnList validTxnList,
+      @Nullable ValidWriteIdList writeIdList, GetTableResult result)
+      throws MetaException {
+    try {
+      Stopwatch sw = Stopwatch.createStarted();
+      Table tbl = result.getTable();
+      checkCondition(tbl != null, "Table is null");
+      checkCondition(tbl.getSd() != null && tbl.getSd().getLocation() != null,
+          "Cannot get the location of table %s.%s", tbl.getDbName(), tbl.getTableName());
+      Path tblPath = new Path(tbl.getSd().getLocation());
+      // since this table doesn't exist in catalogd we compute the network addresses
+      // for the files which are being returned.
+      ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+      FileMetadataLoader fmLoader = new FileMetadataLoader(tblPath, true,
+          Collections.EMPTY_LIST, hostIndex, validTxnList, writeIdList);
+      boolean success = getFileMetadata(Arrays.asList(fmLoader));
+      checkCondition(success,
+          "Could not load file-metadata for table %s.%s. See catalogd log for details",
+          tbl.getDbName(), tbl.getTableName());
+      FileMetadata fileMetadata = new FileMetadata();
+      for (FileDescriptor fd : fmLoader.getLoadedFds()) {
+        fileMetadata.addToData(fd.toThrift().file_desc_data);
+      }
+      tbl.setFileMetadata(fileMetadata);
+      tbl.setDictionary(getSerializedNetworkAddress(hostIndex.getList()));
+      long timeTaken = sw.stop().elapsed(TimeUnit.MILLISECONDS);
+      if (timeTaken > FALLBACK_FILE_MD_TIME_WARN_THRESHOLD_MS) {
+        LOG.warn("Loading the filemetadata for table {}.{} on the fallback path. Time "
+            + "taken: {} msec", tbl.getDbName(), tbl.getTableName(), timeTaken);
+      } else {
+        LOG.debug("Loading the filemetadata for table {}.{} on the fallback path. Time "
+            + "taken: {} msec", tbl.getDbName(), tbl.getTableName(), timeTaken);
+      }
+    } catch (Exception ex) {
+      LOG.error("Unexpected error when loading filemetadata", ex);
+      throw new MetaException(
+          "Could not load filemetadata. Cause " + ex.getMessage());
+    }
+  }
+
+  /**
+   * Sets the file metadata for given partitions from the file-system. In case of
+   * transactional tables it uses the given ValidTxnList and ValidWriteIdList to compute
+   * the snapshot of the file-metadata.
+   */
+  public static void loadAndSetFileMetadataFromFs(@Nullable ValidTxnList txnList,
+      @Nullable ValidWriteIdList writeIdList,
+      GetPartitionsByNamesResult getPartsResult) throws MetaException {
+    if (getPartsResult.getPartitionsSize() == 0) return;
+    final String dbName = getPartsResult.getPartitions().get(0).getDbName();
+    final String tblName = getPartsResult.getPartitions().get(0).getTableName();
+    try {
+      Stopwatch sw = Stopwatch.createStarted();
+      Map<Partition, FileMetadataLoader> fileMdLoaders = new HashMap<>();
+      ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+      for (Partition part : getPartsResult.getPartitions()) {
+        checkCondition(part.getSd() != null && part.getSd().getLocation() != null,
+            "Could not get the location for partition %s of table %s.%s",
+            part.getValues(), part.getDbName(), part.getTableName());
+        Path partPath = new Path(part.getSd().getLocation());
+        fileMdLoaders.put(part,
+            new FileMetadataLoader(partPath, true, Collections.EMPTY_LIST, hostIndex,
+                txnList, writeIdList));
+      }
+      boolean success = getFileMetadata(fileMdLoaders.values());
+      checkCondition(success,
+          "Could not load file-metadata for %s partitions of table %s.%s. See "
+              + "catalogd log for details", getPartsResult.getPartitionsSize(), dbName,
+          tblName);
+      for (Entry<Partition, FileMetadataLoader> entry : fileMdLoaders.entrySet()) {
+        FileMetadata filemetadata = new FileMetadata();
+        for (FileDescriptor fd : entry.getValue().getLoadedFds()) {
+          filemetadata.addToData(fd.toThrift().file_desc_data);
+        }
+        entry.getKey().setFileMetadata(filemetadata);
+      }
+      getPartsResult.setDictionary(getSerializedNetworkAddress(hostIndex.getList()));
+      long timeTaken = sw.stop().elapsed(TimeUnit.MILLISECONDS);
+      if (timeTaken > FALLBACK_FILE_MD_TIME_WARN_THRESHOLD_MS) {
+        LOG.info("Loading the file metadata for {} partitions of table {}.{} on the "
+                + "fallback path. Time taken: {} msec",
+            getPartsResult.getPartitionsSize(), dbName, tblName, timeTaken);
+      } else {
+        LOG.debug("Loading the file metadata for {} partitions of table {}.{} on the "
+                + "fallback path. Time taken: {} msec",
+            getPartsResult.getPartitionsSize(), dbName, tblName, timeTaken);
+      }
+    } catch (CatalogException ex) {
+      LOG.error(
+          "Unexpected error when loading file-metadata for partitions of table {}.{}",
+          dbName, tblName, ex);
+      throw new MetaException("Could not load file metadata. Cause " + ex.getMessage());
+    }
+  }
+
+  /**
+   * Loads the file-metadata in parallel using the common thread pool {@code
+   * fallbackFdLoaderPool}
+   *
+   * @param loaders The FileMetadataLoader objects corresponding for each path.
+   * @return false if there were errors, else returns true.
+   */
+  private static boolean getFileMetadata(Collection<FileMetadataLoader> loaders) {
+    List<Pair<Path, Future<Void>>> futures = new ArrayList<>(loaders.size());
+    for (FileMetadataLoader fmdLoader : loaders) {
+      futures.add(new Pair<>(fmdLoader.getPartDir(), fallbackFdLoaderPool.submit(() -> {
+        fmdLoader.load();
+        return null;
+      })));
+    }
+    int numberOfErrorsToLog = 100;
+    int errors = 0;
+    for (Pair<Path, Future<Void>> pair : futures) {
+      try {
+        pair.second.get();
+      } catch (InterruptedException | ExecutionException e) {
+        errors++;
+        if (errors < numberOfErrorsToLog) {
+          LOG.error("Could not load file-metadata for path {}", pair.first, e);
+        }
+      }
+    }
+    if (errors > 0 && (numberOfErrorsToLog - errors) > 0) {
+      LOG.error("{} loading errors were not logged. Only logged the first {} errors",
+          numberOfErrorsToLog - errors, numberOfErrorsToLog);
+    }
+    return errors == 0;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 3703b43..67bdb26 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -54,6 +54,7 @@ import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.AuthorizationDelta;
 import org.apache.impala.authorization.AuthorizationManager;
 import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.FeFsTable.Utils;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.ExternalEventsProcessor;
@@ -63,6 +64,9 @@ import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.catalog.events.SelfEventContext;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.monitor.CatalogTableMetrics;
+import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
+import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
+import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -273,6 +277,8 @@ public class CatalogServiceCatalog extends Catalog {
   // Manages the event processing from metastore for issuing invalidates on tables
   private ExternalEventsProcessor metastoreEventProcessor_;
 
+  private final ICatalogMetastoreServer catalogMetastoreServer_;
+
   /**
    * See the gflag definition in be/.../catalog-server.cc for details on these modes.
    */
@@ -349,6 +355,8 @@ public class CatalogServiceCatalog extends Catalog {
     Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0);
     // start polling for metastore events
     metastoreEventProcessor_.start();
+    catalogMetastoreServer_ = getCatalogMetastoreServer();
+    catalogMetastoreServer_.start();
   }
 
   /**
@@ -365,6 +373,20 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Returns an instance of CatalogMetastoreServer if start_hms_server configuration is
+   * true. Otherwise, returns a NoOpCatalogMetastoreServer
+   */
+  @VisibleForTesting
+  protected ICatalogMetastoreServer getCatalogMetastoreServer() {
+    if (!BackendConfig.INSTANCE.startHmsServer()) {
+      return NoOpCatalogMetastoreServer.INSTANCE;
+    }
+    int portNumber = BackendConfig.INSTANCE.getHMSPort();
+    Preconditions.checkState(portNumber > 0, "Invalid port number for HMS service.");
+    return new CatalogMetastoreServer(this);
+  }
+
+  /**
    * Check whether the database is in blacklist
    */
   public boolean isBlacklistedDb(String dbName) {
@@ -3325,6 +3347,16 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public TGetPartialCatalogObjectResponse getPartialCatalogObject(
       TGetPartialCatalogObjectRequest req) throws CatalogException {
+    return getPartialCatalogObject(req, "needed by coordinator");
+  }
+
+  /**
+   * A wrapper around doGetPartialCatalogObject() that controls the number of concurrent
+   * invocations.
+   */
+  public TGetPartialCatalogObjectResponse getPartialCatalogObject(
+      TGetPartialCatalogObjectRequest req, String reason) throws CatalogException {
+    Preconditions.checkNotNull(reason);
     try {
       if (!partialObjectFetchAccess_.tryAcquire(1,
           PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S, TimeUnit.SECONDS)) {
@@ -3348,7 +3380,7 @@ public class CatalogServiceCatalog extends Catalog {
       try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
             "Get Partial Catalog Object - " +
             Catalog.toCatalogObjectKey(req.object_desc))) {
-        return doGetPartialCatalogObject(req);
+        return doGetPartialCatalogObject(req, reason);
       } finally {
         partialObjectFetchAccess_.release();
       }
@@ -3387,7 +3419,8 @@ public class CatalogServiceCatalog extends Catalog {
    * response's lookup_status.
    */
   private TGetPartialCatalogObjectResponse doGetPartialCatalogObject(
-      TGetPartialCatalogObjectRequest req) throws CatalogException {
+      TGetPartialCatalogObjectRequest req, String tableLoadReason)
+      throws CatalogException {
     TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc,
         "missing object_desc");
     switch (objectDesc.type) {
@@ -3423,7 +3456,7 @@ public class CatalogServiceCatalog extends Catalog {
         }
         table = getOrLoadTable(
             objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
-            "needed by coordinator", writeIdList, tableId);
+            tableLoadReason, writeIdList, tableId);
       } catch (DatabaseNotFoundException e) {
         return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
       }
@@ -3511,7 +3544,10 @@ public class CatalogServiceCatalog extends Catalog {
           .map(HdfsPartition.Builder::new)
           .collect(Collectors.toList());
       new ParallelFileMetadataLoader(
-          table, partBuilders, reqWriteIdList, validTxnList, null, logPrefix).load();
+          table.getFileSystem(), partBuilders, reqWriteIdList,
+          validTxnList, Utils.shouldRecursivelyListPartitions(table),
+          table.getHostIndex(), null, logPrefix)
+          .load();
       for (HdfsPartition.Builder builder : partBuilders) {
         // Let's retrieve the original partition instance from builder because this is
         // stored in the keys of 'partToPartialInfoMap'.
diff --git a/fe/src/main/java/org/apache/impala/catalog/GetPartialCatalogObjectRequestBuilder.java b/fe/src/main/java/org/apache/impala/catalog/GetPartialCatalogObjectRequestBuilder.java
new file mode 100644
index 0000000..c012eed
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/GetPartialCatalogObjectRequestBuilder.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableInfoSelector;
+
+/**
+ * Simple Request builder class. Assumes all the metadata at higher granularity is
+ * required if a specific level is requested. For example, if files are requested,
+ * assumes that partitions names and partitions are also requested.
+ */
+public class GetPartialCatalogObjectRequestBuilder {
+  private boolean wantFileMetadata_;
+  private boolean wantPartitionMeta_;
+  private boolean wantPartitionNames_;
+  private String tblName_, dbName_;
+  private boolean wantHmsTable_;
+  private boolean wantStatsForAllColumns_;
+  private long tableId = CatalogServiceCatalog.TABLE_ID_UNAVAILABLE;
+  private ValidWriteIdList writeIdList_;
+
+  /**
+   * Sets the database name for the request object.
+   */
+  public GetPartialCatalogObjectRequestBuilder db(String db) {
+    this.dbName_ = db;
+    return this;
+  }
+
+  /**
+   * Sets the table name for the request object.
+   */
+  public GetPartialCatalogObjectRequestBuilder tbl(String tbl) {
+    this.tblName_ = tbl;
+    return this;
+  }
+
+  /**
+   * Sets the request fields required for fetching partition names,
+   * HMS Partition object and the file-metadata of the partitions.
+   */
+  public GetPartialCatalogObjectRequestBuilder wantFiles() {
+    wantFileMetadata_ = true;
+    wantPartitionMeta_ = true;
+    wantPartitionNames_ = true;
+    return this;
+  }
+
+  /**
+   * Sets the request fields required for fetching partition names,
+   * HMS Partition objects. No file-metadata will be fetched.
+   */
+  public GetPartialCatalogObjectRequestBuilder wantPartitions() {
+    wantPartitionNames_ = true;
+    wantPartitionMeta_ = true;
+    return this;
+  }
+
+  /**
+   * Sets the request fields required for fetching partition names. No
+   * partition metadata or file-metadata will be fetched.
+   */
+  public GetPartialCatalogObjectRequestBuilder wantPartitionNames() {
+    wantPartitionNames_ = true;
+    return this;
+  }
+
+  /**
+   * Sets the request fields for fetching column statistics for all columns.
+   */
+  public GetPartialCatalogObjectRequestBuilder wantStatsForAllColums() {
+    wantStatsForAllColumns_ = true;
+    return this;
+  }
+
+  GetPartialCatalogObjectRequestBuilder tableId(long id) {
+    this.tableId = id;
+    return this;
+  }
+
+  GetPartialCatalogObjectRequestBuilder writeId(String writeIdList) {
+    Preconditions.checkNotNull(writeIdList);
+    writeIdList_ = new ValidReaderWriteIdList();
+    writeIdList_.readFromString(writeIdList);
+    return this;
+  }
+
+  GetPartialCatalogObjectRequestBuilder writeId(
+      ValidWriteIdList validWriteIdList) {
+    writeIdList_ = Preconditions.checkNotNull(validWriteIdList);
+    return this;
+  }
+
+  /**
+   * Builds the {@link TGetPartialCatalogObjectRequest} object based on the fields
+   * set in this Builder.
+   */
+  public TGetPartialCatalogObjectRequest build() {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable(dbName_, tblName_);
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.table_id = tableId;
+    if (writeIdList_ != null) {
+      req.table_info_selector.valid_write_ids = MetastoreShim
+          .convertToTValidWriteIdList(writeIdList_);
+    }
+    if (wantPartitionNames_) {
+      req.table_info_selector.want_partition_names = true;
+    }
+    if (wantPartitionMeta_) {
+      req.table_info_selector.want_partition_metadata = true;
+    }
+    if (wantFileMetadata_) {
+      req.table_info_selector.want_partition_files = true;
+    }
+    if (wantStatsForAllColumns_) {
+      req.table_info_selector.want_stats_for_all_columns = true;
+    }
+    return req;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index f0ecdee..0539387 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -742,9 +742,9 @@ public class HdfsTable extends Table implements FeFsTable {
     // we'll throw an exception here and end up bailing out of whatever catalog operation
     // we're in the middle of. This could cause a partial metadata update -- eg we may
     // have refreshed the top-level table properties without refreshing the files.
-    ParallelFileMetadataLoader loader = new ParallelFileMetadataLoader(
-        this, partBuilders, validWriteIds_, validTxnList, debugActions, logPrefix);
-    loader.load();
+    new ParallelFileMetadataLoader(getFileSystem(), partBuilders, validWriteIds_,
+        validTxnList, Utils.shouldRecursivelyListPartitions(this),
+        getHostIndex(), debugActions, logPrefix).load();
 
     // TODO(todd): would be good to log a summary of the loading process:
     // - how many block locations did we reuse/load individually/load via batch
diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 3f8fdcc..098fdc8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -20,6 +20,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -27,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import javax.annotation.Nullable;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -36,6 +38,8 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
 import org.apache.impala.util.ThreadNameAnnotator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,10 +73,11 @@ public class ParallelFileMetadataLoader {
   private final Map<Path, List<HdfsPartition.Builder>> partsByPath_;
   private final FileSystem fs_;
 
-  public ParallelFileMetadataLoader(HdfsTable table,
+  public ParallelFileMetadataLoader(FileSystem fs,
       Collection<HdfsPartition.Builder> partBuilders,
-      ValidWriteIdList writeIdList, ValidTxnList validTxnList, String debugAction,
-      String logPrefix) throws CatalogException {
+      ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive,
+      @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix)
+      throws CatalogException {
     if (writeIdList != null || validTxnList != null) {
       // make sure that both either both writeIdList and validTxnList are set or both
       // of them are not.
@@ -91,8 +96,8 @@ public class ParallelFileMetadataLoader {
     for (Map.Entry<Path, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) {
       List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
       FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
-          Utils.shouldRecursivelyListPartitions(table), oldFds, table.getHostIndex(),
-          validTxnList, writeIdList, e.getValue().get(0).getFileFormat());
+          isRecursive, oldFds, hostIndex, validTxnList, writeIdList,
+          e.getValue().get(0).getFileFormat());
       // If there is a cached partition mapped to this path, we recompute the block
       // locations even if the underlying files have not changed.
       // This is done to keep the cached block metadata up to date.
@@ -103,7 +108,7 @@ public class ParallelFileMetadataLoader {
       loaders_.put(e.getKey(), loader);
     }
     this.logPrefix_ = logPrefix;
-    this.fs_ = table.getFileSystem();
+    this.fs_ = fs;
   }
 
   /**
@@ -153,7 +158,7 @@ public class ParallelFileMetadataLoader {
       List<Pair<FileMetadataLoader, Future<Void>>> futures =
           new ArrayList<>(loaders_.size());
       for (FileMetadataLoader loader : loaders_.values()) {
-        futures.add(new Pair<FileMetadataLoader, Future<Void>>(
+        futures.add(new Pair<>(
             loader, pool.submit(() -> { loader.load(); return null; })));
       }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 910bd49..3d8fa13 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -660,10 +660,13 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
       // is done while we continue to hold the table lock.
       resp.table_info.setHms_table(getMetaStoreTable().deepCopy());
     }
-    if (selector.want_stats_for_column_names != null) {
-      List<ColumnStatisticsObj> statsList = Lists.newArrayListWithCapacity(
-          selector.want_stats_for_column_names.size());
-      for (String colName: selector.want_stats_for_column_names) {
+    if (selector.want_stats_for_column_names != null ||
+        selector.want_stats_for_all_columns) {
+      List<String> colList = selector.want_stats_for_all_columns ? getColumnNames() :
+          selector.want_stats_for_column_names;
+      List<ColumnStatisticsObj> statsList =
+          Lists.newArrayListWithCapacity(colList.size());
+      for (String colName: colList) {
         Column col = getColumn(colName);
         if (col == null) continue;
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
new file mode 100644
index 0000000..08e30ce
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import static org.apache.impala.catalog.CatalogHmsAPIHelper.IMPALA_TNETWORK_ADDRESSES;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.FileMetadata;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.ObjectDictionary;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogHmsAPIHelper;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol.Factory;
+
+public class CatalogHmsClientUtils {
+
+  /**
+   * Util method useful to deserialize the
+   * {@link FileMetadata} in the {@link GetPartitionsByNamesResult} into a list of
+   * {@link FileDescriptor} objects. The returned Map contains a mapping of partitions
+   * to its List of FileDescriptors.
+   * @param getPartitionsResult The getPartitionsByNamesResult as returned by the HMS API
+   * {@code getPartitionsByNames}. If this param does not contain fileMetadata the method
+   *                            throws an exception.
+   * @param hostIndex The HostIndex to be used to clone the FileDescriptors before
+   *                  returning. This is useful to map the FileDescriptors to an existing
+   *                  HostIndex which is available for the table.
+   * @return Map of Partition to its FileDescriptors. Note that list can be empty in
+   * case there are no files in a partition.
+   * @throws CatalogException If there are any deserialization errors.
+   */
+  public static Map<Partition, List<FileDescriptor>> extractFileDescriptors(
+      GetPartitionsByNamesResult getPartitionsResult, ListMap<TNetworkAddress> hostIndex)
+      throws CatalogException {
+    // if there are no partitions in the result, return early
+    if (getPartitionsResult.getPartitionsSize() == 0) return new HashMap<>(0);
+    Preconditions
+        .checkArgument(getPartitionsResult.isSetDictionary(), "Host info is unavailable");
+    List<TNetworkAddress> hostInfo = deserializeNetworkAddresses(
+        getPartitionsResult.getDictionary());
+    Map<Partition, List<FileDescriptor>> fds = Maps
+        .newHashMapWithExpectedSize(getPartitionsResult.getPartitionsSize());
+    for (Partition part : getPartitionsResult.getPartitions()) {
+      Preconditions.checkArgument(part.isSetFileMetadata(),
+          "Filemetadata is not set for partition with values " + part.getValues());
+      FileMetadata fileMetadata = part.getFileMetadata();
+      List<FileDescriptor> partitionFds = Lists
+          .newArrayListWithCapacity(fileMetadata.getDataSize());
+      if (fileMetadata.getData() != null) {
+        for (ByteBuffer data : fileMetadata.getData()) {
+          FileDescriptor fd = FileDescriptor.fromThrift(new THdfsFileDesc(data));
+          fd.cloneWithNewHostIndex(hostInfo, hostIndex);
+          partitionFds.add(fd);
+        }
+      }
+      fds.put(part, partitionFds);
+    }
+    return fds;
+  }
+
+  /**
+   * Util method to extract the FileDescriptors from the Table returned
+   * from HMS. The method expects that the result has FileMetadata set in the table
+   * along with the Host network addresses serialized in the table dictionary.
+   * @param tbl The table returned from HMS API getTableReq
+   * @param hostIndex The hostIndex of the table which is used to clone the returned
+   *                  FileDescriptors.
+   * @return List of FileDescriptors for the table.
+   * @throws CatalogException
+   */
+  public static List<FileDescriptor> extractFileDescriptors(Table tbl,
+      ListMap<TNetworkAddress> hostIndex) throws CatalogException {
+    String fullTblName =
+        tbl.getDbName() + "." + tbl.getTableName();
+    Preconditions.checkArgument(tbl.isSetDictionary(),
+        "Host info is not available in the table " + fullTblName);
+    List<TNetworkAddress> hostInfo = deserializeNetworkAddresses(
+        tbl.getDictionary());
+    Preconditions.checkArgument(tbl.isSetFileMetadata(),
+        "Filemetadata is not set for table " + fullTblName);
+    FileMetadata fileMetadata = tbl.getFileMetadata();
+    // it is possible that there are no files in the table.
+    if (fileMetadata.getData() == null) return Collections.emptyList();
+    List<FileDescriptor> tableFds = Lists
+        .newArrayListWithCapacity(fileMetadata.getDataSize());
+    for (ByteBuffer data : fileMetadata.getData()) {
+      FileDescriptor fd = FileDescriptor.fromThrift(new THdfsFileDesc(data));
+      fd.cloneWithNewHostIndex(hostInfo, hostIndex);
+      tableFds.add(fd);
+    }
+    return tableFds;
+  }
+
+  /**
+   * Util method to deserialize a given {@link ObjectDictionary} object into a list of
+   * {@link TNetworkAddress}. This is used to deserialize the output of
+   * {@link CatalogHmsAPIHelper#getSerializedNetworkAddress(List)}.
+   * @param dictionary
+   * @return list of deserialized TNetworkAddress
+   * @throws CatalogException in case of deserialization errors.
+   */
+  private static List<TNetworkAddress> deserializeNetworkAddresses(
+      ObjectDictionary dictionary) throws CatalogException {
+    if (dictionary == null) return null;
+    if (dictionary.getValuesSize() == 0) return Collections.EMPTY_LIST;
+    if (!dictionary.getValues()
+        .containsKey(IMPALA_TNETWORK_ADDRESSES)) {
+      throw new CatalogException("Key " + IMPALA_TNETWORK_ADDRESSES + " not found");
+    }
+    List<ByteBuffer> serializedNetAddresses = dictionary.getValues()
+        .get(IMPALA_TNETWORK_ADDRESSES);
+    List<TNetworkAddress> networkAddresses = Lists
+        .newArrayListWithCapacity(serializedNetAddresses.size());
+    int index = 0;
+    TDeserializer deserializer = new TDeserializer(new Factory());
+    for (ByteBuffer serializedData : serializedNetAddresses) {
+      TNetworkAddress networkAddress = new TNetworkAddress();
+      try {
+        deserializer.deserialize(networkAddress, serializedData.array());
+        networkAddresses.add(networkAddress);
+        index++;
+      } catch (TException tException) {
+        throw new CatalogException(
+            "Could not deserialize network address at position " + index);
+      }
+    }
+    return networkAddresses;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
new file mode 100644
index 0000000..3414f4d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.common.Metrics;
+import org.apache.impala.service.BackendConfig;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CatalogMetastoreServer implements ThriftHiveMetastore interface. This is useful to
+ * expose HMS APIs via catalog server. Currently, most of the APIs implementations are
+ * "pass-through" to the Metastore server except for the following 3 which are mostly
+ * useful for getting table and partition level metadata during query planning.
+ * 1. get_table_req
+ * 2. get_partitions_by_expr
+ * 3. get_partitions_by_names_req
+ *
+ * This class mostly deals with the thrift server instantiation and its lifecycle
+ * management. The actual implementation of the HMS APIs is done in
+ * {@link CatalogMetastoreServiceHandler} class.
+ */
+public class CatalogMetastoreServer extends ThriftHiveMetastore implements
+    ICatalogMetastoreServer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogMetastoreServer.class);
+
+  // Maximum number of bytes to read from transport for variable length fields
+  // (strings, bytes). Also, used as a maximum number of elements to read for
+  // containers (maps, lists etc) fields.
+  private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
+
+  // Minimum number of thrift server threads (concurrent connections) which serve the
+  // clients. // TODO make this configurable
+  private static final int MIN_SERVER_THREADS = 1;
+  // Maximum number of thrift server threads (concurrent connections) which serve the
+  // clients. // TODO make this configurable. A connection which is beyond this limit
+  // will be blocked until a server thread is closed.
+  private static final int MAX_SERVER_THREADS = 500;
+
+  private static final String ACTIVE_CONNECTIONS_METRIC = "metastore.active.connections";
+  private static final String RPC_DURATION_FORMAT_METRIC = "metastore.rpc.duration.%s";
+
+  // flag to indicate if the server is started or not
+  private final AtomicBoolean started_ = new AtomicBoolean(false);
+
+  // the server is started in a daemon thread so that instantiating this is not
+  // a blocking call.
+  private CompletableFuture<Void> serverHandle_;
+
+  // reference to the catalog Service catalog object
+  private final CatalogServiceCatalog catalog_;
+
+  // Metrics for this Metastore server. Also this instance is passed to the
+  // TServerEventHandler when the server is started so that RPC metrics can be registered.
+  private final Metrics metrics_ = new Metrics();
+
+  public CatalogMetastoreServer(CatalogServiceCatalog catalogServiceCatalog) {
+    Preconditions.checkNotNull(catalogServiceCatalog);
+    catalog_ = catalogServiceCatalog;
+  }
+
+  /**
+   * Simple RpcEventHandler which adds metrics for this Metastore server
+   */
+  private class RpcMetricsEventHandler implements TServerEventHandler {
+
+    @Override
+    public void preServe() {}
+
+    @Override
+    public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
+      metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).inc();
+      return null;
+    }
+
+    @Override
+    public void deleteContext(ServerContext serverContext, TProtocol tProtocol,
+        TProtocol tProtocol1) {
+      metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).dec();
+    }
+
+    @Override
+    public void processContext(ServerContext serverContext, TTransport tTransport,
+        TTransport tTransport1) {
+    }
+  }
+
+  /**
+   * Simple wrapper InvocationHandler which registers the duration metrics for each method
+   * called on the Proxy instance. The method execution is delegated to the the handler
+   * instance in the invoke method. Using such a invocation handler is much simpler than
+   * wrapping all the methods in the {@link CatalogMetastoreServiceHandler}. Additionally,
+   * this class also logs an error with the full trace in case the method invocation
+   * fails.
+   */
+  private class TimingInvocationHandler implements InvocationHandler {
+
+    private final CatalogMetastoreServiceHandler handler_;
+
+    TimingInvocationHandler(CatalogMetastoreServiceHandler handler) {
+      Preconditions.checkNotNull(handler);
+      handler_ = handler;
+    }
+
+    /**
+     * This method is called on every HMS API invocation. We invoke the method on the
+     * handler class with the given set of arguments. Additionally, this class is used to
+     * register the duration of such API calls.
+     */
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      Timer.Context context =
+          metrics_.getTimer(String.format(RPC_DURATION_FORMAT_METRIC, method.getName()))
+              .time();
+      try {
+        LOG.debug("Invoking HMS API: {}", method.getName());
+        return method.invoke(handler_, args);
+      } catch (Exception ex) {
+        Throwable unwrapped = unwrap(ex);
+        LOG.error("Received exception while executing " + method.getName() + " : ",
+            unwrapped);
+        throw unwrapped;
+      } finally {
+        context.stop();
+      }
+    }
+
+    /**
+     * The InvocationHandler throws an InvocationTargetException if the underlying method
+     * throws and exception. This method unwraps the underlying cause of such an exception
+     * and returns it if available.
+     */
+    private Throwable unwrap(Exception ex) {
+      if (ex instanceof InvocationTargetException) {
+        return ((InvocationTargetException) ex).getTargetException();
+      }
+      return ex;
+    }
+  }
+
+  @VisibleForTesting
+  protected int getPort() throws CatalogException {
+    return BackendConfig.INSTANCE.getHMSPort();
+  }
+
+  /**
+   * Starts the thrift server in a background thread and the configured port. Currently,
+   * only support NOSASL mode. TODO Add SASL and ssl support (IMPALA-10638)
+   *
+   * @throws CatalogException
+   */
+  public synchronized void start() throws CatalogException {
+    final int portNumber = getPort();
+    Preconditions.checkState(portNumber > 0);
+    Preconditions.checkState(!started_.get(), "Metastore server is already started");
+    LOG.info("Starting the Metastore server at port number {}", portNumber);
+    CatalogMetastoreServiceHandler handler =
+        new CatalogMetastoreServiceHandler(catalog_, metrics_,
+            BackendConfig.INSTANCE.fallbackToHMSOnErrors());
+    // create a proxy class for the ThriftMetastore.Iface and ICatalogMetastoreServer
+    // so that all the APIs can be invoked via a TimingInvocationHandler
+    ThriftHiveMetastore.Iface proxyCatalogHMSIFace =
+        (ThriftHiveMetastore.Iface) Proxy
+            .newProxyInstance(ThriftHiveMetastore.Iface.class.getClassLoader(),
+                new Class[]{ThriftHiveMetastore.Iface.class,
+                    ICatalogMetastoreServer.class},
+                new TimingInvocationHandler(handler));
+    //TODO Add Sasl support (IMPALA-10638)
+    final TProtocolFactory protocolFactory;
+    final TProtocolFactory inputProtoFactory;
+    //TODO add config for this (IMPALA-10639)
+    boolean useCompactProtocol = false;
+    if (useCompactProtocol) {
+      protocolFactory = new TCompactProtocol.Factory();
+      inputProtoFactory = new TCompactProtocol.Factory(MAX_MESSAGE_SIZE,
+          MAX_MESSAGE_SIZE);
+    } else {
+      protocolFactory = new TBinaryProtocol.Factory();
+      inputProtoFactory = new TBinaryProtocol.Factory(true, true, MAX_MESSAGE_SIZE,
+          MAX_MESSAGE_SIZE);
+    }
+
+    TProcessor processor;
+    try {
+      processor =
+          new ThriftHiveMetastore.Processor<>(proxyCatalogHMSIFace);
+    } catch (Exception e) {
+      throw new CatalogException("Unable to create processor for catalog metastore "
+          + "server", e);
+    }
+
+    //TODO add SSL support
+    boolean useSSL = false;
+    TServerSocket serverSocket;
+    try {
+      serverSocket =
+          new TServerSocketKeepAlive(
+              new TServerSocket(new InetSocketAddress(portNumber)));
+    } catch (TTransportException e) {
+      throw new CatalogException(
+          "Unable to create server socket at port number " + portNumber, e);
+    }
+
+    TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
+        .processor(processor)
+        .transportFactory(new TTransportFactory())
+        .protocolFactory(protocolFactory)
+        .inputProtocolFactory(inputProtoFactory)
+        .minWorkerThreads(MIN_SERVER_THREADS)
+        .maxWorkerThreads(MAX_SERVER_THREADS);
+
+    TServer tServer = new TThreadPoolServer(args);
+    TServerEventHandler rpcMetricsEventHandler = new RpcMetricsEventHandler();
+
+    tServer.setServerEventHandler(rpcMetricsEventHandler);
+    LOG.info("Started the new metaserver on port [" + portNumber
+        + "]...");
+    LOG.info("minWorkerThreads = "
+        + MIN_SERVER_THREADS);
+    LOG.info("maxWorkerThreads = "
+        + MAX_SERVER_THREADS);
+    LOG.info("Enable SSL = " + useSSL);
+    serverHandle_ = CompletableFuture.runAsync(() -> tServer.serve());
+    started_.set(true);
+  }
+
+  /**
+   * Returns the RPC and connection metrics for this metastore server. //TODO hook this
+   * method to the Catalog's debug UI
+   */
+  @Override
+  public String getMetrics() {
+    return metrics_.toString();
+  }
+
+  /**
+   * Stops this CatalogMetastoreServer on a best-effort basis. May interrupt running
+   * threads in the server.
+   * <p>
+   * // TODO currently this method is not used anywhere. We should hook this method to the
+   * shutdown process of catalogd
+   */
+  public void stop() throws CatalogException {
+    serverHandle_.cancel(true);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
new file mode 100644
index 0000000..be00d69
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import org.apache.hadoop.hive.metastore.api.GetFieldsRequest;
+import org.apache.hadoop.hive.metastore.api.GetFieldsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
+import org.apache.hadoop.hive.metastore.api.GetSchemaRequest;
+import org.apache.hadoop.hive.metastore.api.GetSchemaResponse;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.SeedTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTxnIdRequest;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogHmsAPIHelper;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.common.Metrics;
+import org.apache.impala.service.BackendConfig;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the HMS APIs that are served by CatalogD
+ * and is exposed via {@link CatalogMetastoreServer}.
+ * HMS APIs that are redirected to HMS can be found in {@link MetastoreServiceHandler}.
+ *
+ */
+public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CatalogMetastoreServiceHandler.class);
+
+  public CatalogMetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+      boolean fallBackToHMSOnErrors) {
+    super(catalog, metrics, fallBackToHMSOnErrors);
+  }
+
+  @Override
+  public GetTableResult get_table_req(GetTableRequest getTableRequest)
+      throws MetaException, NoSuchObjectException, TException {
+
+    if (!BackendConfig.INSTANCE.enableCatalogdHMSCache()) {
+      return super.get_table_req(getTableRequest);
+    }
+
+    try {
+      LOG.trace("Received get_Table_req for {}. File metadata is {}",
+          getTableRequest.getTblName(), getTableRequest.isGetFileMetadata());
+      return CatalogHmsAPIHelper.getTableReq(catalog_, defaultCatalogName_,
+          getTableRequest);
+    } catch (Exception e) {
+      // we catch the CatalogException and fall-back to HMS
+      throwIfNoFallback(e, "get_table_req");
+    }
+    return super.get_table_req(getTableRequest);
+  }
+
+  /**
+   * This is the main API which is used by Hive to get the partitions. In case of Hive it
+   * pushes the pruning logic to HMS by sending over the expression which is used to
+   * filter the partitions during query compilation. The expression is specific to Hive
+   * and loaded in the runtime based on whether we have hive-exec jar in the classpath
+   * or not. If the hive-exec jar is not present in the classpath, we fall-back to HMS
+   * since Catalog has no way to deserialize the expression sent over by the client.
+   */
+  @Override
+  public PartitionsByExprResult get_partitions_by_expr(
+      PartitionsByExprRequest partitionsByExprRequest) throws TException {
+
+    if (!BackendConfig.INSTANCE.enableCatalogdHMSCache()) {
+      return super.get_partitions_by_expr(partitionsByExprRequest);
+    }
+
+    try {
+      // expressionProxy is null or if there were errors when loading the
+      // PartitionExpressionProxy.
+      if (expressionProxy_ != null) {
+        return CatalogHmsAPIHelper.getPartitionsByExpr(
+            catalog_, defaultCatalogName_, partitionsByExprRequest, expressionProxy_);
+      } else {
+        throw new CatalogException("PartitionExpressionProxy could not be initialized");
+      }
+    } catch (Exception e) {
+      // we catch the CatalogException and fall-back to HMS
+      throwIfNoFallback(e, GET_PARTITION_BY_EXPR);
+    }
+    String tblName =
+        partitionsByExprRequest.getDbName() + "." + partitionsByExprRequest.getTblName();
+    LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_EXPR, tblName));
+    return super.get_partitions_by_expr(partitionsByExprRequest);
+  }
+
+  /**
+   * HMS API to get the partitions filtered by a provided list of names. The request
+   * contains a list of partitions names which the client is interested in. Catalog
+   * returns the partitions only for requested names. Additionally, this API also returns
+   * the file-metadata for the returned partitions if the request has
+   * {@code getFileMetadata} flag set. In case of errors, this API falls back to HMS if
+   * {@code fallBackToHMSOnErrors_} is set.
+   */
+  @Override
+  public GetPartitionsByNamesResult get_partitions_by_names_req(
+      GetPartitionsByNamesRequest getPartitionsByNamesRequest) throws TException {
+
+    if (!BackendConfig.INSTANCE.enableCatalogdHMSCache()) {
+      return super.get_partitions_by_names_req(getPartitionsByNamesRequest);
+    }
+
+    try {
+      return CatalogHmsAPIHelper
+          .getPartitionsByNames(catalog_, serverConf_, getPartitionsByNamesRequest);
+    } catch (Exception ex) {
+      throwIfNoFallback(ex, GET_PARTITION_BY_NAMES);
+    }
+    String tblName =
+        getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
+            .getTbl_name();
+    LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_NAMES, tblName));
+    return super.get_partitions_by_names_req(getPartitionsByNamesRequest);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
new file mode 100644
index 0000000..e5aa7a9
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import org.apache.impala.catalog.CatalogException;
+
+/**
+ * This is the main Interface which a CatalogMetastore service should implement. It
+ * provides lifecycle and monitoring methods which are called from
+ * CatalogServiceCatalog to instantiate a Metastore service.
+ */
+public interface ICatalogMetastoreServer {
+
+  /**
+   * Starts the metastore service
+   * @throws CatalogException
+   */
+  void start() throws CatalogException;
+
+  /**
+   * Stop the metastore service.
+   * @throws CatalogException
+   */
+  void stop() throws CatalogException;
+
+  /**
+   * Returns the metrics for this Catalog Metastore service.
+   * @return Encoded String (eg. Json format) representation of all the metrics for this
+   * metastore service.
+   */
+  String getMetrics();
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
new file mode 100644
index 0000000..9a1b6ba
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -0,0 +1,2736 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import com.facebook.fb303.fb_status;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.PartFilterExprUtil;
+import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.AddCheckConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AddDefaultConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AddForeignKeyRequest;
+import org.apache.hadoop.hive.metastore.api.AddNotNullConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.AddPrimaryKeyRequest;
+import org.apache.hadoop.hive.metastore.api.AddUniqueConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.AlterCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.AlterISchemaRequest;
+import org.apache.hadoop.hive.metastore.api.AlterPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.AlterPartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.AlterTableRequest;
+import org.apache.hadoop.hive.metastore.api.AlterTableResponse;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.CheckConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.CheckConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.CmRecycleRequest;
+import org.apache.hadoop.hive.metastore.api.CmRecycleResponse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
+import org.apache.hadoop.hive.metastore.api.CreateCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DefaultConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.DefaultConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.DropCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.DropConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.ExtendedTableInfo;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FindSchemasByColsResp;
+import org.apache.hadoop.hive.metastore.api.FindSchemasByColsRqst;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysResponse;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.GetCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.GetCatalogResponse;
+import org.apache.hadoop.hive.metastore.api.GetCatalogsResponse;
+import org.apache.hadoop.hive.metastore.api.GetDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.GetFieldsRequest;
+import org.apache.hadoop.hive.metastore.api.GetFieldsResponse;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
+import org.apache.hadoop.hive.metastore.api.GetReplicationMetricsRequest;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
+import org.apache.hadoop.hive.metastore.api.GetRuntimeStatsRequest;
+import org.apache.hadoop.hive.metastore.api.GetSchemaRequest;
+import org.apache.hadoop.hive.metastore.api.GetSchemaResponse;
+import org.apache.hadoop.hive.metastore.api.GetSerdeRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.GetTablesExtRequest;
+import org.apache.hadoop.hive.metastore.api.GetTablesRequest;
+import org.apache.hadoop.hive.metastore.api.GetTablesResult;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
+import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
+import org.apache.hadoop.hive.metastore.api.ISchema;
+import org.apache.hadoop.hive.metastore.api.ISchemaName;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MapSchemaVersionToSerdeRequest;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.NotNullConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.OptionalCompactionInfoStruct;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionValuesRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsResult;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse;
+import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.RenamePartitionRequest;
+import org.apache.hadoop.hive.metastore.api.RenamePartitionResponse;
+import org.apache.hadoop.hive.metastore.api.ReplicationMetricList;
+import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
+import org.apache.hadoop.hive.metastore.api.Role;
+import org.apache.hadoop.hive.metastore.api.RuntimeStat;
+import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.api.ScheduledQuery;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryMaintenanceRequest;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollRequest;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryProgressInfo;
+import org.apache.hadoop.hive.metastore.api.SchemaVersion;
+import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
+import org.apache.hadoop.hive.metastore.api.SeedTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTxnIdRequest;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsResponse;
+import org.apache.hadoop.hive.metastore.api.SetSchemaVersionStateRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
+import org.apache.hadoop.hive.metastore.api.TableStatsResult;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface;
+import org.apache.hadoop.hive.metastore.api.TruncateTableRequest;
+import org.apache.hadoop.hive.metastore.api.TruncateTableResponse;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.Type;
+import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.UniqueConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterPoolRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterPoolResponse;
+import org.apache.hadoop.hive.metastore.api.WMAlterResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMAlterTriggerRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterTriggerResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrDropTriggerToPoolMappingRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrDropTriggerToPoolMappingResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrUpdateMappingRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrUpdateMappingResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreatePoolRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreatePoolResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateTriggerRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateTriggerResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropMappingRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropMappingResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropPoolRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropPoolResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropTriggerRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropTriggerResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetActiveResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetActiveResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetAllResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetAllResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetTriggersForResourePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetTriggersForResourePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.catalog.CatalogHmsAPIHelper;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.common.Metrics;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.util.AcidUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the HMS APIs that are redirected to the HMS server from CatalogD.
+ * APIs that should be served from CatalogD must be overridden in {@link
+ * CatalogMetastoreServer}.
+ * <p>
+ * Implementation Notes: Care should taken to use
+ * {@link IMetaStoreClient#getThriftClient()}
+ * method when forwarding a API call to HMS service since IMetastoreClient itself modifies
+ * the arguments before sending the RPC to the HMS server. This can lead to unexpected
+ * side-effects like (processorCapabilities do not match with the actual client).
+ */
+public abstract class MetastoreServiceHandler implements Iface {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(MetastoreServiceHandler.class);
+  protected static final String NOT_IMPLEMENTED_UNSUPPORTED = "%s method not supported"
+      + " by Catalog metastore service.";
+  protected static final String METAEXCEPTION_MSG_FORMAT =
+      "Unexpected error occurred while"
+          + " executing %s. Cause: %s. See catalog logs for details.";
+  protected static final String HMS_FALLBACK_MSG_FORMAT = "Forwarding the request %s for "
+      + "table %s to the backing HiveMetastore service";
+
+  // constant used for logging error messages
+  public static final String GET_PARTITION_BY_EXPR = "get_partitions_by_expr";
+  public static final String GET_PARTITION_BY_NAMES = "get_partitions_by_names_req";
+  protected final CatalogServiceCatalog catalog_;
+  protected final boolean fallBackToHMSOnErrors_;
+  protected final Metrics metrics_;
+  // TODO handle session configuration
+  protected Configuration serverConf_;
+  protected PartitionExpressionProxy expressionProxy_;
+  protected final String defaultCatalogName_;
+
+  public MetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+      boolean fallBackToHMSOnErrors) {
+    catalog_ = Preconditions.checkNotNull(catalog);
+    metrics_ = Preconditions.checkNotNull(metrics);
+    fallBackToHMSOnErrors_ = fallBackToHMSOnErrors;
+    LOG.info("Fallback to hive metastore service on errors is {}",
+        fallBackToHMSOnErrors_);
+    // load the metastore configuration from the classpath
+    serverConf_ = Preconditions.checkNotNull(MetastoreConf.newMetastoreConf());
+    String className = MetastoreConf
+        .get(serverConf_, ConfVars.EXPRESSION_PROXY_CLASS.getVarname());
+    try {
+      Preconditions.checkNotNull(className);
+      LOG.info("Instantiating {}", className);
+      expressionProxy_ = PartFilterExprUtil.createExpressionProxy(serverConf_);
+      if (expressionProxy_ instanceof DefaultPartitionExpressionProxy) {
+        LOG.error("PartFilterExprUtil.createExpressionProxy returned"
+            + " DefaultPartitionExpressionProxy. Check if hive-exec"
+            + " jar is available in the classpath.");
+        expressionProxy_ = null;
+      }
+    } catch (Exception ex) {
+      LOG.error("Could not instantiate {}", className, ex);
+    }
+    defaultCatalogName_ =
+        MetaStoreUtils.getDefaultCatalog(serverConf_);
+  }
+
+  @Override
+  public String get_hms_api_version() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_hms_api_version();
+    }
+  }
+
+  @Override
+  public String getMetaConf(String configKey) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().getMetaConf(configKey);
+    }
+  }
+
+  @Override
+  public void setMetaConf(String configKey, String configValue)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().setMetaConf(configKey, configValue);
+    }
+  }
+
+  @Override
+  public void create_catalog(CreateCatalogRequest createCatalogRequest)
+      throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_catalog(createCatalogRequest);
+    }
+  }
+
+  @Override
+  public void alter_catalog(AlterCatalogRequest alterCatalogRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_catalog(alterCatalogRequest);
+    }
+  }
+
+  @Override
+  public GetCatalogResponse get_catalog(GetCatalogRequest getCatalogRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_catalog(getCatalogRequest);
+    }
+  }
+
+  @Override
+  public GetCatalogsResponse get_catalogs() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_catalogs();
+    }
+  }
+
+  @Override
+  public void drop_catalog(DropCatalogRequest dropCatalogRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_catalog(dropCatalogRequest);
+    }
+  }
+
+  @Override
+  public void create_database(Database database)
+      throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_database(database);
+    }
+  }
+
+  @Override
+  public Database get_database(String databaseName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_database(databaseName);
+    }
+  }
+
+  @Override
+  public Database get_database_req(GetDatabaseRequest getDatabaseRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_database_req(getDatabaseRequest);
+    }
+  }
+
+  @Override
+  public void drop_database(String databaseName, boolean deleteData,
+      boolean ignoreUnknownDb)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .drop_database(databaseName, deleteData, ignoreUnknownDb);
+    }
+  }
+
+  @Override
+  public List<String> get_databases(String pattern) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_databases(pattern);
+    }
+  }
+
+  @Override
+  public List<String> get_all_databases() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_all_databases();
+    }
+  }
+
+  @Override
+  public void alter_database(String dbname, Database database)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_database(dbname, database);
+    }
+  }
+
+  @Override
+  public Type get_type(String name) throws MetaException, NoSuchObjectException,
+      TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_type(name);
+    }
+  }
+
+  @Override
+  public boolean create_type(Type type)
+      throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().create_type(type);
+    }
+  }
+
+  @Override
+  public boolean drop_type(String type)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().drop_type(type);
+    }
+  }
+
+  @Override
+  public Map<String, Type> get_type_all(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_type_all(s);
+    }
+  }
+
+  @Override
+  public List<FieldSchema> get_fields(String dbname, String tblname)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_fields(dbname, tblname);
+    }
+  }
+
+  @Override
+  public List<FieldSchema> get_fields_with_environment_context(String dbName,
+      String tblName, EnvironmentContext environmentContext)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_fields_with_environment_context(dbName, tblName, environmentContext);
+    }
+  }
+
+  @Override
+  public List<FieldSchema> get_schema(String dbname, String tblname)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_schema(dbname, tblname);
+    }
+  }
+
+  @Override
+  public List<FieldSchema> get_schema_with_environment_context(String dbname,
+      String tblname, EnvironmentContext environmentContext)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schema_with_environment_context(dbname, tblname, environmentContext);
+    }
+  }
+
+  @Override
+  public void create_table(Table table)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_table(table);
+    }
+  }
+
+  @Override
+  public void create_table_with_environment_context(Table table,
+      EnvironmentContext environmentContext)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .create_table_with_environment_context(table, environmentContext);
+    }
+  }
+
+  @Override
+  public void create_table_with_constraints(Table table,
+      List<SQLPrimaryKey> sqlPrimaryKeys,
+      List<SQLForeignKey> sqlForeignKeys, List<SQLUniqueConstraint> sqlUniqueConstraints,
+      List<SQLNotNullConstraint> sqlNotNullConstraints,
+      List<SQLDefaultConstraint> sqlDefaultConstraints,
+      List<SQLCheckConstraint> sqlCheckConstraints)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_table_with_constraints(table,
+          sqlPrimaryKeys, sqlForeignKeys, sqlUniqueConstraints, sqlNotNullConstraints,
+          sqlDefaultConstraints, sqlCheckConstraints);
+    }
+  }
+
+  @Override
+  public void create_table_req(CreateTableRequest createTableRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_table_req(createTableRequest);
+    }
+  }
+
+  @Override
+  public void drop_constraint(DropConstraintRequest dropConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_constraint(dropConstraintRequest);
+    }
+  }
+
+  @Override
+  public void add_primary_key(AddPrimaryKeyRequest addPrimaryKeyRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_primary_key(addPrimaryKeyRequest);
+    }
+  }
+
+  @Override
+  public void add_foreign_key(AddForeignKeyRequest addForeignKeyRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_foreign_key(addForeignKeyRequest);
+    }
+  }
+
+  @Override
+  public void add_unique_constraint(AddUniqueConstraintRequest addUniqueConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_unique_constraint(addUniqueConstraintRequest);
+    }
+  }
+
+  @Override
+  public void add_not_null_constraint(
+      AddNotNullConstraintRequest addNotNullConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_not_null_constraint(addNotNullConstraintRequest);
+    }
+  }
+
+  @Override
+  public void add_default_constraint(
+      AddDefaultConstraintRequest addDefaultConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_default_constraint(addDefaultConstraintRequest);
+    }
+  }
+
+  @Override
+  public void add_check_constraint(AddCheckConstraintRequest addCheckConstraintRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_check_constraint(addCheckConstraintRequest);
+    }
+  }
+
+  @Override
+  public void drop_table(String dbname, String tblname, boolean deleteData)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_table(dbname, tblname, deleteData);
+    }
+  }
+
+  @Override
+  public void drop_table_with_environment_context(String dbname, String tblname,
+      boolean deleteData,
+      EnvironmentContext environmentContext)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .drop_table_with_environment_context(dbname, tblname, deleteData,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public void truncate_table(String dbName, String tblName, List<String> partNames)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().truncate_table(dbName, tblName, partNames);
+    }
+  }
+
+  @Override
+  public TruncateTableResponse truncate_table_req(
+      TruncateTableRequest truncateTableRequest) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .truncate_table_req(truncateTableRequest);
+    }
+  }
+
+  @Override
+  public List<String> get_tables(String dbname, String tblName)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_tables(dbname, tblName);
+    }
+  }
+
+  @Override
+  public List<String> get_tables_by_type(String dbname, String tablePattern,
+      String tableType)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_tables_by_type(dbname,
+          tablePattern, tableType);
+    }
+  }
+
+  @Override
+  public List<Table> get_all_materialized_view_objects_for_rewriting()
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_all_materialized_view_objects_for_rewriting();
+    }
+  }
+
+  @Override
+  public List<String> get_materialized_views_for_rewriting(String dbName)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_materialized_views_for_rewriting(dbName);
+    }
+  }
+
+  @Override
+  public List<TableMeta> get_table_meta(String dbnamePattern, String tblNamePattern,
+      List<String> tableTypes)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table_meta(dbnamePattern,
+          tblNamePattern, tableTypes);
+    }
+  }
+
+  @Override
+  public List<String> get_all_tables(String dbname) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_all_tables(dbname);
+    }
+  }
+
+  @Override
+  public Table get_table(String dbname, String tblname)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table(dbname, tblname);
+    }
+  }
+
+  @Override
+  public List<Table> get_table_objects_by_name(String dbname, List<String> list)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table_objects_by_name(dbname,
+          list);
+    }
+  }
+
+  @Override
+  public List<ExtendedTableInfo> get_tables_ext(GetTablesExtRequest getTablesExtRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_tables_ext(getTablesExtRequest);
+    }
+  }
+
+  /**
+   * This method gets the table from the HMS directly. Additionally, if the request has
+   * {@code getFileMetadata} set it computes the filemetadata and returns it in the
+   * response. For transactional tables, it uses the ValidWriteIdList from the request and
+   * gets the current ValidTxnList to get the requested snapshot of the file-metadata for
+   * the table.
+   */
+  @Override
+  public GetTableResult get_table_req(GetTableRequest getTableRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    String tblName = getTableRequest.getDbName() + "." + getTableRequest.getTblName();
+    LOG.debug(String.format(HMS_FALLBACK_MSG_FORMAT, "get_table_req", tblName));
+    GetTableResult result;
+    ValidTxnList txnList = null;
+    ValidWriteIdList writeIdList = null;
+    String requestWriteIdList = getTableRequest.getValidWriteIdList();
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      result = client.getHiveClient().getThriftClient()
+          .get_table_req(getTableRequest);
+      Table tbl = result.getTable();
+      // return early if file-metadata is not requested
+      if (!getTableRequest.isGetFileMetadata()) {
+        LOG.trace("File metadata is not requested. Returning table {}",
+            tbl.getTableName());
+        return result;
+      }
+      // we need to get the current ValidTxnIdList to avoid returning
+      // file-metadata for in-progress compactions. If the request does not
+      // include ValidWriteIdList or if the table is not transactional we compute
+      // the file-metadata as seen on the file-system.
+      boolean isTransactional = tbl.getParameters() != null && AcidUtils
+          .isTransactionalTable(tbl.getParameters());
+      if (isTransactional && requestWriteIdList != null) {
+        txnList = MetastoreShim.getValidTxns(client.getHiveClient());
+        writeIdList = MetastoreShim
+            .getValidWriteIdListFromString(requestWriteIdList);
+      }
+    }
+    CatalogHmsAPIHelper.loadAndSetFileMetadataFromFs(txnList, writeIdList, result);
+    return result;
+  }
+
+  @Override
+  public GetTablesResult get_table_objects_by_name_req(GetTablesRequest getTablesRequest)
+      throws MetaException, InvalidOperationException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_table_objects_by_name_req(getTablesRequest);
+    }
+  }
+
+  @Override
+  public Materialization get_materialization_invalidation_info(
+      CreationMetadata creationMetadata, String validTxnList)
+      throws MetaException, InvalidOperationException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_materialization_invalidation_info(creationMetadata, validTxnList);
+    }
+  }
+
+  @Override
+  public void update_creation_metadata(String catName, String dbName, String tblName,
+      CreationMetadata creationMetadata)
+      throws MetaException, InvalidOperationException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().update_creation_metadata(catName,
+          dbName, tblName, creationMetadata);
+    }
+  }
+
+  @Override
+  public List<String> get_table_names_by_filter(String dbname, String tblname,
+      short maxParts)
+      throws MetaException, InvalidOperationException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table_names_by_filter(dbname,
+          tblname, maxParts);
+    }
+  }
+
+  @Override
+  public void alter_table(String dbname, String tblName, Table newTable)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_table(dbname, tblName, newTable);
+    }
+  }
+
+  @Override
+  public void alter_table_with_environment_context(String dbname, String tblName,
+      Table table,
+      EnvironmentContext environmentContext)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_table_with_environment_context(dbname,
+              tblName, table, environmentContext);
+    }
+  }
+
+  @Override
+  public void alter_table_with_cascade(String dbname, String tblName, Table table,
+      boolean cascade)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_table_with_cascade(dbname, tblName,
+          table, cascade);
+    }
+  }
+
+  @Override
+  public AlterTableResponse alter_table_req(AlterTableRequest alterTableRequest)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().alter_table_req(alterTableRequest);
+    }
+  }
+
+  @Override
+  public Partition add_partition(Partition partition)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().add_partition(partition);
+    }
+  }
+
+  @Override
+  public Partition add_partition_with_environment_context(Partition partition,
+      EnvironmentContext environmentContext)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .add_partition_with_environment_context(partition, environmentContext);
+    }
+  }
+
+  @Override
+  public int add_partitions(List<Partition> partitionList)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().add_partitions(partitionList);
+    }
+  }
+
+  @Override
+  public int add_partitions_pspec(List<PartitionSpec> list)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().add_partitions_pspec(list);
+    }
+  }
+
+  @Override
+  public Partition append_partition(String dbname, String tblName, List<String> partVals)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .append_partition(dbname, tblName, partVals);
+    }
+  }
+
+  @Override
+  public AddPartitionsResult add_partitions_req(AddPartitionsRequest addPartitionsRequest)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .add_partitions_req(addPartitionsRequest);
+    }
+  }
+
+  @Override
+  public Partition append_partition_with_environment_context(String dbname,
+      String tblname,
+      List<String> partVals, EnvironmentContext environmentContext)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .append_partition_with_environment_context(dbname, tblname, partVals,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public Partition append_partition_by_name(String dbname, String tblname,
+      String partName)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .append_partition_by_name(dbname, tblname, partName);
+    }
+  }
+
+  @Override
+  public Partition append_partition_by_name_with_environment_context(String dbname,
+      String tblname, String partName, EnvironmentContext environmentContext)
+      throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .append_partition_by_name_with_environment_context(dbname, tblname, partName,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public boolean drop_partition(String dbname, String tblanme, List<String> partVals,
+      boolean deleteData)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_partition(dbname, tblanme, partVals, deleteData);
+    }
+  }
+
+  @Override
+  public boolean drop_partition_with_environment_context(String dbname, String tblname,
+      List<String> partNames, boolean deleteData, EnvironmentContext environmentContext)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_partition_with_environment_context(dbname, tblname, partNames, deleteData,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public boolean drop_partition_by_name(String dbname, String tblname, String partName,
+      boolean deleteData)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().drop_partition_by_name(dbname,
+          tblname, partName, deleteData);
+    }
+  }
+
+  @Override
+  public boolean drop_partition_by_name_with_environment_context(String dbName,
+      String tableName,
+      String partName, boolean deleteData, EnvironmentContext envContext)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_partition_by_name_with_environment_context(dbName, tableName, partName,
+              deleteData, envContext);
+    }
+  }
+
+  @Override
+  public DropPartitionsResult drop_partitions_req(
+      DropPartitionsRequest dropPartitionsRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_partitions_req(dropPartitionsRequest);
+    }
+  }
+
+  @Override
+  public Partition get_partition(String dbName, String tblName, List<String> values)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition(dbName, tblName,
+          values);
+    }
+  }
+
+  @Override
+  public Partition exchange_partition(Map<String, String> partitionSpecMap,
+      String sourcedb, String sourceTbl,
+      String destDb, String destTbl)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .exchange_partition(partitionSpecMap, sourcedb, sourceTbl, destDb
+              , destTbl);
+    }
+  }
+
+  @Override
+  public List<Partition> exchange_partitions(Map<String, String> partitionSpecs,
+      String sourceDb, String sourceTable, String destDb,
+      String destinationTableName)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().exchange_partitions(partitionSpecs,
+          sourceDb, sourceTable, destDb, destinationTableName);
+    }
+  }
+
+  @Override
+  public Partition get_partition_with_auth(String dbname, String tblName,
+      List<String> values,
+      String user, List<String> groups)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition_with_auth(dbname,
+          tblName, values, user,
+          groups);
+    }
+  }
+
+  @Override
+  public Partition get_partition_by_name(String dbName, String tblName,
+      String partitionName)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition_by_name(dbName,
+          tblName, partitionName);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions(String dbName, String tblName, short maxLimit)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions(dbName, tblName, maxLimit);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_with_auth(String dbName, String tblName,
+      short maxParts, String username,
+      List<String> groups) throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partitions_with_auth(dbName,
+          tblName, maxParts, username,
+          groups);
+    }
+  }
+
+  @Override
+  public List<PartitionSpec> get_partitions_pspec(String dbName, String tblName,
+      int maxParts)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_pspec(dbName, tblName, maxParts);
+    }
+  }
+
+  @Override
+  public GetPartitionsResponse get_partitions_with_specs(GetPartitionsRequest request)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_with_specs(request);
+    }
+  }
+
+  @Override
+  public List<String> get_partition_names(String dbName, String tblName, short maxParts)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition_names(dbName,
+          tblName, maxParts);
+    }
+  }
+
+  @Override
+  public PartitionValuesResponse get_partition_values(
+      PartitionValuesRequest partitionValuesRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partition_values(partitionValuesRequest);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_ps(String dbName, String tblName,
+      List<String> partValues,
+      short maxParts) throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_ps(dbName, tblName, partValues, maxParts);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_ps_with_auth(String dbName, String tblName,
+      List<String> partVals, short maxParts, String user, List<String> groups)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_ps_with_auth(dbName, tblName
+              , partVals, maxParts, user, groups);
+    }
+  }
+
+  @Override
+  public List<String> get_partition_names_ps(String dbName, String tblName,
+      List<String> partitionNames,
+      short maxParts) throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partition_names_ps(dbName, tblName,
+              partitionNames, maxParts);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_by_filter(String dbName, String tblName,
+      String filter, short maxParts)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_by_filter(dbName, tblName,
+              filter, maxParts);
+    }
+  }
+
+  @Override
+  public List<PartitionSpec> get_part_specs_by_filter(String dbName, String tblName,
+      String filter,
+      int maxParts) throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_part_specs_by_filter(dbName, tblName, filter
+              , maxParts);
+    }
+  }
+
+  @Override
+  public GetFieldsResponse get_fields_req(GetFieldsRequest req)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      List<FieldSchema> fields = client.getHiveClient().getThriftClient()
+          .get_fields_with_environment_context(MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getEnvContext());
+      GetFieldsResponse res = new GetFieldsResponse();
+      res.setFields(fields);
+      return res;
+    }
+  }
+
+  @Override
+  public GetSchemaResponse get_schema_req(GetSchemaRequest req)
+      throws MetaException, UnknownTableException, UnknownDBException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      // TODO Remove the usage of old API here once this API is ported to cdpd-master
+      List<FieldSchema> fields = client.getHiveClient().getThriftClient()
+          .get_schema_with_environment_context(MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getEnvContext());
+      GetSchemaResponse res = new GetSchemaResponse();
+      res.setFields(fields);
+      return res;
+    }
+  }
+
+  @Override
+  public GetPartitionResponse get_partition_req(GetPartitionRequest req)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      Partition p =
+          client.getHiveClient().getThriftClient().get_partition(
+              MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getPartVals());
+      GetPartitionResponse res = new GetPartitionResponse();
+      res.setPartition(p);
+      return res;
+    }
+  }
+
+  @Override
+  public PartitionsResponse get_partitions_req(PartitionsRequest req)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      List<Partition> partitions =
+          client.getHiveClient().getThriftClient().get_partitions(MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getMaxParts());
+      PartitionsResponse res = new PartitionsResponse();
+      res.setPartitions(partitions);
+      return res;
+    }
+  }
+
+  @Override
+  public GetPartitionNamesPsResponse get_partition_names_ps_req(
+      GetPartitionNamesPsRequest req)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      List<String> names = client.getHiveClient().getThriftClient()
+          .get_partition_names_ps(MetaStoreUtils
+                  .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+              req.getTblName(), req.getPartValues(), req.getMaxParts());
+      GetPartitionNamesPsResponse res = new GetPartitionNamesPsResponse();
+      res.setNames(names);
+      return res;
+    }
+  }
+
+ @Override
+ public GetPartitionsPsWithAuthResponse get_partitions_ps_with_auth_req(
+     GetPartitionsPsWithAuthRequest req)
+     throws MetaException, NoSuchObjectException, TException {
+   try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+     List<Partition> partitions = client.getHiveClient().getThriftClient()
+         .get_partitions_ps_with_auth(MetaStoreUtils
+                 .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+             req.getTblName(), req.getPartVals(), req.getMaxParts(),
+             req.getUserName(), req.getGroupNames());
+     GetPartitionsPsWithAuthResponse res = new GetPartitionsPsWithAuthResponse();
+     res.setPartitions(partitions);
+     return res;
+   }
+ }
+
+  @Override
+  public PartitionsByExprResult get_partitions_by_expr(
+      PartitionsByExprRequest partitionsByExprRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_by_expr(partitionsByExprRequest);
+    }
+  }
+
+  @Override
+  public int get_num_partitions_by_filter(String dbName, String tblName, String filter)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_num_partitions_by_filter(dbName,
+          tblName, filter);
+    }
+  }
+
+  @Override
+  public List<Partition> get_partitions_by_names(String dbName, String tblName,
+      List<String> partitionNames)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_by_names(dbName, tblName,
+              partitionNames);
+    }
+  }
+
+  /**
+   * Util method to evaluate if the received exception needs to be thrown to the Client
+   * based on the server configuration.
+   *
+   * @param cause   The underlying exception received from Catalog.
+   * @param apiName The HMS API name which threw the given exception.
+   * @throws TException Wrapped exception with the cause in case the given Exception is
+   *                    not a TException. Else, throws the given TException.
+   */
+  protected void throwIfNoFallback(Exception cause, String apiName)
+      throws TException {
+    LOG.debug("Received exception while executing {}", apiName, cause);
+    if (fallBackToHMSOnErrors_) return;
+    if (cause instanceof TException) throw (TException) cause;
+    // if this is not a TException we wrap it to a MetaException
+    throw new MetaException(
+        String.format(METAEXCEPTION_MSG_FORMAT, apiName, cause));
+  }
+
+  /**
+   * This method gets the partitions for the given list of names from HMS. Additionally,
+   * if the {@code getFileMetadata} flag is set in the request, it also computes the file
+   * metadata and sets it in the partitions which are returned.
+   *
+   * @throws TException
+   */
+  public GetPartitionsByNamesResult get_partitions_by_names_req(
+      GetPartitionsByNamesRequest getPartitionsByNamesRequest) throws TException {
+    String tblName =
+        getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
+            .getTbl_name();
+    LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_NAMES, tblName));
+    boolean getFileMetadata = getPartitionsByNamesRequest.isGetFileMetadata();
+    GetPartitionsByNamesResult result;
+    ValidWriteIdList validWriteIdList = null;
+    ValidTxnList validTxnList = null;
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      result = client.getHiveClient().getThriftClient()
+          .get_partitions_by_names_req(getPartitionsByNamesRequest);
+      // if file-metadata is not request; return early
+      if (!getFileMetadata) return result;
+      // we don't really know if the requested partitions are for a transactional table
+      // or not. Hence we should get the table from HMS to confirm.
+      // TODO: may be we could assume that if ValidWriteIdList is not set, the table is
+      // not transactional
+      String[] parsedCatDbName = MetaStoreUtils
+          .parseDbName(getPartitionsByNamesRequest.getDb_name(), serverConf_);
+      Table tbl = client.getHiveClient().getTable(parsedCatDbName[0], parsedCatDbName[1],
+          getPartitionsByNamesRequest.getTbl_name(),
+          getPartitionsByNamesRequest.getValidWriteIdList());
+      boolean isTransactional = tbl.getParameters() != null && AcidUtils
+          .isTransactionalTable(tbl.getParameters());
+      if (isTransactional) {
+        if (getPartitionsByNamesRequest.getValidWriteIdList() == null) {
+          throw new MetaException(
+              "ValidWriteIdList is not set when requesting partitions for table " + tbl
+                  .getDbName() + "." + tbl.getTableName());
+        }
+        validWriteIdList = MetastoreShim
+            .getValidWriteIdListFromString(
+                getPartitionsByNamesRequest.getValidWriteIdList());
+        validTxnList = client.getHiveClient().getValidTxns();
+      }
+    }
+    CatalogHmsAPIHelper
+        .loadAndSetFileMetadataFromFs(validTxnList, validWriteIdList, result);
+    return result;
+  }
+
+  @Override
+  public void alter_partition(String dbName, String tblName, Partition partition)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_partition(dbName, tblName, partition);
+    }
+  }
+
+  @Override
+  public void alter_partitions(String dbNme, String tblName, List<Partition> partitions)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_partitions(dbNme, tblName, partitions);
+    }
+  }
+
+  @Override
+  public void alter_partitions_with_environment_context(String s, String s1,
+      List<Partition> list, EnvironmentContext environmentContext)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_partitions_with_environment_context(s, s1, list, environmentContext);
+    }
+  }
+
+  @Override
+  public AlterPartitionsResponse alter_partitions_req(
+      AlterPartitionsRequest alterPartitionsRequest)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .alter_partitions_req(alterPartitionsRequest);
+    }
+  }
+
+  @Override
+  public void alter_partition_with_environment_context(String dbName, String tblName,
+      Partition partition, EnvironmentContext environmentContext)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .alter_partition_with_environment_context(dbName, tblName, partition,
+              environmentContext);
+    }
+  }
+
+  @Override
+  public void rename_partition(String dbName, String tblName, List<String> list,
+      Partition partition) throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .rename_partition(dbName, tblName, list, partition);
+    }
+  }
+
+  @Override
+  public RenamePartitionResponse rename_partition_req(
+      RenamePartitionRequest renamePartitionRequest)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .rename_partition_req(renamePartitionRequest);
+    }
+  }
+
+  @Override
+  public boolean partition_name_has_valid_characters(List<String> list, boolean b)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .partition_name_has_valid_characters(list, b);
+    }
+  }
+
+  @Override
+  public String get_config_value(String key, String defaultVal)
+      throws ConfigValSecurityException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_config_value(key, defaultVal);
+    }
+  }
+
+  @Override
+  public List<String> partition_name_to_vals(String name)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().partition_name_to_vals(name);
+    }
+  }
+
+  @Override
+  public Map<String, String> partition_name_to_spec(String name)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().partition_name_to_spec(name);
+    }
+  }
+
+  @Override
+  public void markPartitionForEvent(String s, String s1, Map<String, String> map,
+      PartitionEventType partitionEventType) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .markPartitionForEvent(s, s1, map, partitionEventType);
+    }
+  }
+
+  @Override
+  public boolean isPartitionMarkedForEvent(String s, String s1, Map<String, String> map,
+      PartitionEventType partitionEventType) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().isPartitionMarkedForEvent(s, s1,
+          map, partitionEventType);
+    }
+  }
+
+  @Override
+  public PrimaryKeysResponse get_primary_keys(PrimaryKeysRequest primaryKeysRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_primary_keys(primaryKeysRequest);
+    }
+  }
+
+  @Override
+  public ForeignKeysResponse get_foreign_keys(ForeignKeysRequest foreignKeysRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_foreign_keys(foreignKeysRequest);
+    }
+  }
+
+  @Override
+  public UniqueConstraintsResponse get_unique_constraints(
+      UniqueConstraintsRequest uniqueConstraintsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_unique_constraints(uniqueConstraintsRequest);
+    }
+  }
+
+  @Override
+  public NotNullConstraintsResponse get_not_null_constraints(
+      NotNullConstraintsRequest notNullConstraintsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_not_null_constraints(notNullConstraintsRequest);
+    }
+  }
+
+  @Override
+  public DefaultConstraintsResponse get_default_constraints(
+      DefaultConstraintsRequest defaultConstraintsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_default_constraints(defaultConstraintsRequest);
+    }
+  }
+
+  @Override
+  public CheckConstraintsResponse get_check_constraints(
+      CheckConstraintsRequest checkConstraintsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_check_constraints(checkConstraintsRequest);
+    }
+  }
+
+  @Override
+  public boolean update_table_column_statistics(ColumnStatistics columnStatistics)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .update_table_column_statistics(columnStatistics);
+    }
+  }
+
+  @Override
+  public boolean update_partition_column_statistics(ColumnStatistics columnStatistics)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .update_partition_column_statistics(columnStatistics);
+    }
+  }
+
+  @Override
+  public SetPartitionsStatsResponse update_table_column_statistics_req(
+      SetPartitionsStatsRequest setPartitionsStatsRequest)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .update_table_column_statistics_req(setPartitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public SetPartitionsStatsResponse update_partition_column_statistics_req(
+      SetPartitionsStatsRequest setPartitionsStatsRequest)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .update_partition_column_statistics_req(setPartitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public ColumnStatistics get_table_column_statistics(String s, String s1, String s2)
+      throws NoSuchObjectException, MetaException, InvalidInputException,
+      InvalidObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_table_column_statistics(s, s1
+          , s2);
+    }
+  }
+
+  @Override
+  public ColumnStatistics get_partition_column_statistics(String s, String s1, String s2,
+      String s3)
+      throws NoSuchObjectException, MetaException, InvalidInputException,
+      InvalidObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_partition_column_statistics(s
+          , s1, s2, s3);
+    }
+  }
+
+  @Override
+  public TableStatsResult get_table_statistics_req(TableStatsRequest tableStatsRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_table_statistics_req(tableStatsRequest);
+    }
+  }
+
+  @Override
+  public PartitionsStatsResult get_partitions_statistics_req(
+      PartitionsStatsRequest partitionsStatsRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_partitions_statistics_req(partitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public AggrStats get_aggr_stats_for(PartitionsStatsRequest partitionsStatsRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_aggr_stats_for(partitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public boolean set_aggr_stats_for(SetPartitionsStatsRequest setPartitionsStatsRequest)
+      throws NoSuchObjectException, InvalidObjectException, MetaException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .set_aggr_stats_for(setPartitionsStatsRequest);
+    }
+  }
+
+  @Override
+  public boolean delete_partition_column_statistics(String dbName, String tblName,
+      String partName,
+      String colName, String engine)
+      throws NoSuchObjectException, MetaException, InvalidObjectException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .delete_partition_column_statistics(dbName, tblName
+              , partName, colName, engine);
+    }
+  }
+
+  @Override
+  public boolean delete_table_column_statistics(String dbName, String tblName,
+      String columnName, String engien)
+      throws NoSuchObjectException, MetaException, InvalidObjectException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .delete_table_column_statistics(dbName,
+              tblName, columnName, engien);
+    }
+  }
+
+  @Override
+  public void create_function(Function function)
+      throws AlreadyExistsException, InvalidObjectException, MetaException,
+      NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_function(function);
+    }
+  }
+
+  @Override
+  public void drop_function(String dbName, String funcName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_function(dbName, funcName);
+    }
+  }
+
+  @Override
+  public void alter_function(String s, String s1, Function function)
+      throws InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_function(s, s1, function);
+    }
+  }
+
+  @Override
+  public List<String> get_functions(String s, String s1)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_functions(s, s1);
+    }
+  }
+
+  @Override
+  public Function get_function(String s, String s1)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_function(s, s1);
+    }
+  }
+
+  @Override
+  public GetAllFunctionsResponse get_all_functions() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_all_functions();
+    }
+  }
+
+  @Override
+  public boolean create_role(Role role) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().create_role(role);
+    }
+  }
+
+  @Override
+  public boolean drop_role(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().drop_role(s);
+    }
+  }
+
+  @Override
+  public List<String> get_role_names() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_role_names();
+    }
+  }
+
+  @Override
+  public boolean grant_role(String roleName, String userName, PrincipalType principalType,
+      String grantor,
+      PrincipalType grantorType, boolean grantOption) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .grant_role(roleName, userName, principalType,
+              grantor, grantorType, grantOption);
+    }
+  }
+
+  @Override
+  public boolean revoke_role(String s, String s1, PrincipalType principalType)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().revoke_role(s, s1, principalType);
+    }
+  }
+
+  @Override
+  public List<Role> list_roles(String s, PrincipalType principalType)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().list_roles(s, principalType);
+    }
+  }
+
+  @Override
+  public GrantRevokeRoleResponse grant_revoke_role(
+      GrantRevokeRoleRequest grantRevokeRoleRequest) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .grant_revoke_role(grantRevokeRoleRequest);
+    }
+  }
+
+  @Override
+  public GetPrincipalsInRoleResponse get_principals_in_role(
+      GetPrincipalsInRoleRequest getPrincipalsInRoleRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_principals_in_role(getPrincipalsInRoleRequest);
+    }
+  }
+
+  @Override
+  public GetRoleGrantsForPrincipalResponse get_role_grants_for_principal(
+      GetRoleGrantsForPrincipalRequest getRoleGrantsForPrincipalRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_role_grants_for_principal(getRoleGrantsForPrincipalRequest);
+    }
+  }
+
+  @Override
+  public PrincipalPrivilegeSet get_privilege_set(HiveObjectRef hiveObjectRef, String s,
+      List<String> list) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_privilege_set(hiveObjectRef,
+          s, list);
+    }
+  }
+
+  @Override
+  public List<HiveObjectPrivilege> list_privileges(String s, PrincipalType principalType,
+      HiveObjectRef hiveObjectRef) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().list_privileges(s, principalType,
+          hiveObjectRef);
+    }
+  }
+
+  @Override
+  public boolean grant_privileges(PrivilegeBag privilegeBag)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().grant_privileges(privilegeBag);
+    }
+  }
+
+  @Override
+  public boolean revoke_privileges(PrivilegeBag privilegeBag)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().revoke_privileges(privilegeBag);
+    }
+  }
+
+  @Override
+  public GrantRevokePrivilegeResponse grant_revoke_privileges(
+      GrantRevokePrivilegeRequest grantRevokePrivilegeRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .grant_revoke_privileges(grantRevokePrivilegeRequest);
+    }
+  }
+
+  @Override
+  public GrantRevokePrivilegeResponse refresh_privileges(HiveObjectRef hiveObjectRef,
+      String s, GrantRevokePrivilegeRequest grantRevokePrivilegeRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().refresh_privileges(hiveObjectRef, s,
+          grantRevokePrivilegeRequest);
+    }
+  }
+
+  @Override
+  public List<String> set_ugi(String s, List<String> list)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().set_ugi(s, list);
+    }
+  }
+
+  @Override
+  public String get_delegation_token(String s, String s1)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_delegation_token(s, s1);
+    }
+  }
+
+  @Override
+  public long renew_delegation_token(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().renew_delegation_token(s);
+    }
+  }
+
+  @Override
+  public void cancel_delegation_token(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().cancel_delegation_token(s);
+    }
+  }
+
+  @Override
+  public boolean add_token(String tokenIdentifier, String delegationToken)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .add_token(tokenIdentifier, delegationToken);
+    }
+  }
+
+  @Override
+  public boolean remove_token(String tokenIdentifier) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().remove_token(tokenIdentifier);
+    }
+  }
+
+  @Override
+  public String get_token(String s) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_token(s);
+    }
+  }
+
+  @Override
+  public List<String> get_all_token_identifiers() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_all_token_identifiers();
+    }
+  }
+
+  @Override
+  public int add_master_key(String s) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().add_master_key(s);
+    }
+  }
+
+  @Override
+  public void update_master_key(int i, String s)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().update_master_key(i, s);
+    }
+  }
+
+  @Override
+  public boolean remove_master_key(int i) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().remove_master_key(i);
+    }
+  }
+
+  @Override
+  public List<String> get_master_keys() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_master_keys();
+    }
+  }
+
+  @Override
+  public GetOpenTxnsResponse get_open_txns() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_open_txns();
+    }
+  }
+
+  @Override
+  public GetOpenTxnsInfoResponse get_open_txns_info() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_open_txns_info();
+    }
+  }
+
+  @Override
+  public OpenTxnsResponse open_txns(OpenTxnRequest openTxnRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().open_txns(openTxnRequest);
+    }
+  }
+
+  @Override
+  public void abort_txn(AbortTxnRequest abortTxnRequest)
+      throws NoSuchTxnException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().abort_txn(abortTxnRequest);
+    }
+  }
+
+  @Override
+  public void abort_txns(AbortTxnsRequest abortTxnsRequest)
+      throws NoSuchTxnException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().abort_txns(abortTxnsRequest);
+    }
+  }
+
+  @Override
+  public void commit_txn(CommitTxnRequest commitTxnRequest)
+      throws NoSuchTxnException, TxnAbortedException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().commit_txn(commitTxnRequest);
+    }
+  }
+
+  @Override
+  public void repl_tbl_writeid_state(
+      ReplTblWriteIdStateRequest replTblWriteIdStateRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .repl_tbl_writeid_state(replTblWriteIdStateRequest);
+    }
+  }
+
+  @Override
+  public GetValidWriteIdsResponse get_valid_write_ids(
+      GetValidWriteIdsRequest getValidWriteIdsRequest)
+      throws NoSuchTxnException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_valid_write_ids(getValidWriteIdsRequest);
+    }
+  }
+
+  @Override
+  public AllocateTableWriteIdsResponse allocate_table_write_ids(
+      AllocateTableWriteIdsRequest allocateTableWriteIdsRequest)
+      throws NoSuchTxnException, TxnAbortedException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .allocate_table_write_ids(allocateTableWriteIdsRequest);
+    }
+  }
+
+  @Override
+  public LockResponse lock(LockRequest lockRequest)
+      throws NoSuchTxnException, TxnAbortedException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().lock(lockRequest);
+    }
+  }
+
+  @Override
+  public LockResponse check_lock(CheckLockRequest checkLockRequest)
+      throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().check_lock(checkLockRequest);
+    }
+  }
+
+  @Override
+  public void unlock(UnlockRequest unlockRequest)
+      throws NoSuchLockException, TxnOpenException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().unlock(unlockRequest);
+    }
+  }
+
+  @Override
+  public ShowLocksResponse show_locks(ShowLocksRequest showLocksRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().show_locks(showLocksRequest);
+    }
+  }
+
+  @Override
+  public void heartbeat(HeartbeatRequest heartbeatRequest)
+      throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().heartbeat(heartbeatRequest);
+    }
+  }
+
+  @Override
+  public HeartbeatTxnRangeResponse heartbeat_txn_range(
+      HeartbeatTxnRangeRequest heartbeatTxnRangeRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .heartbeat_txn_range(heartbeatTxnRangeRequest);
+    }
+  }
+
+  @Override
+  public void compact(CompactionRequest compactionRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().compact(compactionRequest);
+    }
+  }
+
+  @Override
+  public CompactionResponse compact2(CompactionRequest compactionRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().compact2(compactionRequest);
+    }
+  }
+
+  @Override
+  public ShowCompactResponse show_compact(ShowCompactRequest showCompactRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().show_compact(showCompactRequest);
+    }
+  }
+
+  @Override
+  public void add_dynamic_partitions(AddDynamicPartitions addDynamicPartitions)
+      throws NoSuchTxnException, TxnAbortedException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_dynamic_partitions(addDynamicPartitions);
+    }
+  }
+
+  @Override
+  public OptionalCompactionInfoStruct find_next_compact(String s)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().find_next_compact(s);
+    }
+  }
+
+  @Override
+  public void update_compactor_state(CompactionInfoStruct compactionInfoStruct, long l)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .update_compactor_state(compactionInfoStruct, l);
+    }
+  }
+
+  @Override
+  public List<String> find_columns_with_stats(CompactionInfoStruct compactionInfoStruct)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .find_columns_with_stats(compactionInfoStruct);
+    }
+  }
+
+  @Override
+  public void mark_cleaned(CompactionInfoStruct compactionInfoStruct)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().mark_cleaned(compactionInfoStruct);
+    }
+  }
+
+  @Override
+  public void mark_compacted(CompactionInfoStruct compactionInfoStruct)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().markCompacted(compactionInfoStruct);
+    }
+  }
+
+  @Override
+  public void mark_failed(CompactionInfoStruct compactionInfoStruct)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().mark_failed(compactionInfoStruct);
+    }
+  }
+
+  @Override
+  public MaxAllocatedTableWriteIdResponse get_max_allocated_table_write_id(
+      MaxAllocatedTableWriteIdRequest rqst)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_max_allocated_table_write_id(rqst);
+    }
+  }
+
+  @Override
+  public void seed_write_id(SeedTableWriteIdsRequest rqst)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().seed_write_id(rqst);
+    }
+  }
+
+  @Override
+  public void seed_txn_id(SeedTxnIdRequest rqst)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().seed_txn_id(rqst);
+    }
+  }
+
+  @Override
+  public void set_hadoop_jobid(String s, long l) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().set_hadoop_jobid(s, l);
+    }
+  }
+
+  @Override
+  public NotificationEventResponse get_next_notification(
+      NotificationEventRequest notificationEventRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_next_notification(notificationEventRequest);
+    }
+  }
+
+  @Override
+  public CurrentNotificationEventId get_current_notificationEventId() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_current_notificationEventId();
+    }
+  }
+
+  @Override
+  public NotificationEventsCountResponse get_notification_events_count(
+      NotificationEventsCountRequest notificationEventsCountRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_notification_events_count(notificationEventsCountRequest);
+    }
+  }
+
+  @Override
+  public FireEventResponse fire_listener_event(FireEventRequest fireEventRequest)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .fire_listener_event(fireEventRequest);
+    }
+  }
+
+  @Override
+  public void flushCache() throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().flushCache();
+    }
+  }
+
+  @Override
+  public WriteNotificationLogResponse add_write_notification_log(
+      WriteNotificationLogRequest writeNotificationLogRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .add_write_notification_log(writeNotificationLogRequest);
+    }
+  }
+
+  @Override
+  public CmRecycleResponse cm_recycle(CmRecycleRequest cmRecycleRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().cm_recycle(cmRecycleRequest);
+    }
+  }
+
+  @Override
+  public GetFileMetadataByExprResult get_file_metadata_by_expr(
+      GetFileMetadataByExprRequest getFileMetadataByExprRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_file_metadata_by_expr(getFileMetadataByExprRequest);
+    }
+  }
+
+  @Override
+  public GetFileMetadataResult get_file_metadata(
+      GetFileMetadataRequest getFileMetadataRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_file_metadata(getFileMetadataRequest);
+    }
+  }
+
+  @Override
+  public PutFileMetadataResult put_file_metadata(
+      PutFileMetadataRequest putFileMetadataRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .put_file_metadata(putFileMetadataRequest);
+    }
+  }
+
+  @Override
+  public ClearFileMetadataResult clear_file_metadata(
+      ClearFileMetadataRequest clearFileMetadataRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .clear_file_metadata(clearFileMetadataRequest);
+    }
+  }
+
+  @Override
+  public CacheFileMetadataResult cache_file_metadata(
+      CacheFileMetadataRequest cacheFileMetadataRequest) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .cache_file_metadata(cacheFileMetadataRequest);
+    }
+  }
+
+  @Override
+  public String get_metastore_db_uuid() throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_metastore_db_uuid();
+    }
+  }
+
+  @Override
+  public WMCreateResourcePlanResponse create_resource_plan(
+      WMCreateResourcePlanRequest wmCreateResourcePlanRequest)
+      throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_resource_plan(wmCreateResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMGetResourcePlanResponse get_resource_plan(
+      WMGetResourcePlanRequest wmGetResourcePlanRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_resource_plan(wmGetResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMGetActiveResourcePlanResponse get_active_resource_plan(
+      WMGetActiveResourcePlanRequest wmGetActiveResourcePlanRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_active_resource_plan(wmGetActiveResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMGetAllResourcePlanResponse get_all_resource_plans(
+      WMGetAllResourcePlanRequest wmGetAllResourcePlanRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_all_resource_plans(wmGetAllResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMAlterResourcePlanResponse alter_resource_plan(
+      WMAlterResourcePlanRequest wmAlterResourcePlanRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .alter_resource_plan(wmAlterResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMValidateResourcePlanResponse validate_resource_plan(
+      WMValidateResourcePlanRequest wmValidateResourcePlanRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .validate_resource_plan(wmValidateResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMDropResourcePlanResponse drop_resource_plan(
+      WMDropResourcePlanRequest wmDropResourcePlanRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_resource_plan(wmDropResourcePlanRequest);
+    }
+  }
+
+  @Override
+  public WMCreateTriggerResponse create_wm_trigger(
+      WMCreateTriggerRequest wmCreateTriggerRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_wm_trigger(wmCreateTriggerRequest);
+    }
+  }
+
+  @Override
+  public WMAlterTriggerResponse alter_wm_trigger(
+      WMAlterTriggerRequest wmAlterTriggerRequest)
+      throws NoSuchObjectException, InvalidObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .alter_wm_trigger(wmAlterTriggerRequest);
+    }
+  }
+
+  @Override
+  public WMDropTriggerResponse drop_wm_trigger(WMDropTriggerRequest wmDropTriggerRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_wm_trigger(wmDropTriggerRequest);
+    }
+  }
+
+  @Override
+  public WMGetTriggersForResourePlanResponse get_triggers_for_resourceplan(
+      WMGetTriggersForResourePlanRequest wmGetTriggersForResourePlanRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_triggers_for_resourceplan(wmGetTriggersForResourePlanRequest);
+    }
+  }
+
+  @Override
+  public WMCreatePoolResponse create_wm_pool(WMCreatePoolRequest wmCreatePoolRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_wm_pool(wmCreatePoolRequest);
+    }
+  }
+
+  @Override
+  public WMAlterPoolResponse alter_wm_pool(WMAlterPoolRequest wmAlterPoolRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().alter_wm_pool(wmAlterPoolRequest);
+    }
+  }
+
+  @Override
+  public WMDropPoolResponse drop_wm_pool(WMDropPoolRequest wmDropPoolRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().drop_wm_pool(wmDropPoolRequest);
+    }
+  }
+
+  @Override
+  public WMCreateOrUpdateMappingResponse create_or_update_wm_mapping(
+      WMCreateOrUpdateMappingRequest wmCreateOrUpdateMappingRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_or_update_wm_mapping(wmCreateOrUpdateMappingRequest);
+    }
+  }
+
+  @Override
+  public WMDropMappingResponse drop_wm_mapping(WMDropMappingRequest wmDropMappingRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .drop_wm_mapping(wmDropMappingRequest);
+    }
+  }
+
+  @Override
+  public WMCreateOrDropTriggerToPoolMappingResponse
+    create_or_drop_wm_trigger_to_pool_mapping(
+      WMCreateOrDropTriggerToPoolMappingRequest wmCreateOrDropTriggerToPoolMappingRequest)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+      MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .create_or_drop_wm_trigger_to_pool_mapping(
+              wmCreateOrDropTriggerToPoolMappingRequest);
+    }
+  }
+
+  @Override
+  public void create_ischema(ISchema iSchema)
+      throws AlreadyExistsException, NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().create_ischema(iSchema);
+    }
+  }
+
+  @Override
+  public void alter_ischema(AlterISchemaRequest alterISchemaRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().alter_ischema(alterISchemaRequest);
+    }
+  }
+
+  @Override
+  public ISchema get_ischema(ISchemaName iSchemaName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_ischema(iSchemaName);
+    }
+  }
+
+  @Override
+  public void drop_ischema(ISchemaName iSchemaName)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().drop_ischema(iSchemaName);
+    }
+  }
+
+  @Override
+  public void add_schema_version(SchemaVersion schemaVersion)
+      throws AlreadyExistsException, NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_schema_version(schemaVersion);
+    }
+  }
+
+  @Override
+  public SchemaVersion get_schema_version(SchemaVersionDescriptor schemaVersionDescriptor)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schema_version(schemaVersionDescriptor);
+    }
+  }
+
+  @Override
+  public SchemaVersion get_schema_latest_version(ISchemaName iSchemaName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schema_latest_version(iSchemaName);
+    }
+  }
+
+  @Override
+  public List<SchemaVersion> get_schema_all_versions(ISchemaName iSchemaName)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schema_all_versions(iSchemaName);
+    }
+  }
+
+  @Override
+  public void drop_schema_version(SchemaVersionDescriptor schemaVersionDescriptor)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .drop_schema_version(schemaVersionDescriptor);
+    }
+  }
+
+  @Override
+  public FindSchemasByColsResp get_schemas_by_cols(
+      FindSchemasByColsRqst findSchemasByColsRqst) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_schemas_by_cols(findSchemasByColsRqst);
+    }
+  }
+
+  @Override
+  public void map_schema_version_to_serde(
+      MapSchemaVersionToSerdeRequest mapSchemaVersionToSerdeRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .map_schema_version_to_serde(mapSchemaVersionToSerdeRequest);
+    }
+  }
+
+  @Override
+  public void set_schema_version_state(
+      SetSchemaVersionStateRequest setSchemaVersionStateRequest)
+      throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .set_schema_version_state(setSchemaVersionStateRequest);
+    }
+  }
+
+  @Override
+  public void add_serde(SerDeInfo serDeInfo)
+      throws AlreadyExistsException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_serde(serDeInfo);
+    }
+  }
+
+  @Override
+  public SerDeInfo get_serde(GetSerdeRequest getSerdeRequest)
+      throws NoSuchObjectException, MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_serde(getSerdeRequest);
+    }
+  }
+
+  @Override
+  public LockResponse get_lock_materialization_rebuild(String s, String s1, long l)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_lock_materialization_rebuild(s, s1, l);
+    }
+  }
+
+  @Override
+  public boolean heartbeat_lock_materialization_rebuild(String s, String s1, long l)
+      throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .heartbeat_lock_materialization_rebuild(s, s1, l);
+    }
+  }
+
+  @Override
+  public void add_runtime_stats(RuntimeStat runtimeStat)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient().add_runtime_stats(runtimeStat);
+    }
+  }
+
+  @Override
+  public List<RuntimeStat> get_runtime_stats(
+      GetRuntimeStatsRequest getRuntimeStatsRequest) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_runtime_stats(getRuntimeStatsRequest);
+    }
+  }
+
+  @Override
+  public ScheduledQueryPollResponse scheduled_query_poll(
+      ScheduledQueryPollRequest scheduledQueryPollRequest)
+      throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .scheduled_query_poll(scheduledQueryPollRequest);
+    }
+  }
+
+  @Override
+  public void scheduled_query_maintenance(
+      ScheduledQueryMaintenanceRequest scheduledQueryMaintenanceRequest)
+      throws MetaException, NoSuchObjectException, AlreadyExistsException,
+      InvalidInputException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .scheduled_query_maintenance(scheduledQueryMaintenanceRequest);
+    }
+  }
+
+  @Override
+  public void scheduled_query_progress(
+      ScheduledQueryProgressInfo scheduledQueryProgressInfo)
+      throws MetaException, InvalidOperationException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .scheduled_query_progress(scheduledQueryProgressInfo);
+    }
+  }
+
+  @Override
+  public ScheduledQuery get_scheduled_query(ScheduledQueryKey scheduledQueryKey)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_scheduled_query(scheduledQueryKey);
+    }
+  }
+
+  @Override
+  public void add_replication_metrics(ReplicationMetricList replicationMetricList)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      client.getHiveClient().getThriftClient()
+          .add_replication_metrics(replicationMetricList);
+    }
+  }
+
+  @Override
+  public ReplicationMetricList get_replication_metrics(
+      GetReplicationMetricsRequest getReplicationMetricsRequest)
+      throws MetaException, NoSuchObjectException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient()
+          .get_replication_metrics(getReplicationMetricsRequest);
+    }
+  }
+
+  @Override
+  public long get_latest_txnid_in_conflict(long txnId) throws MetaException, TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getThriftClient().get_latest_txnid_in_conflict(txnId);
+    }
+  }
+
+  @Override
+  public String getName() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getName"));
+  }
+
+  @Override
+  public String getVersion() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getVersion"));
+  }
+
+  @Override
+  public fb_status getStatus() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getStatus"));
+  }
+
+  @Override
+  public String getStatusDetails() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getStatusDetails"));
+  }
+
+  @Override
+  public Map<String, Long> getCounters() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getCounters"));
+  }
+
+  @Override
+  public long getCounter(String s) throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getCounter"));
+  }
+
+  @Override
+  public void setOption(String s, String s1) throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "setOption"));
+
+  }
+
+  @Override
+  public String getOption(String s) throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getOption"));
+  }
+
+  @Override
+  public Map<String, String> getOptions() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getOptions"));
+  }
+
+  @Override
+  public String getCpuProfile(int i) throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "getCpuProfile"));
+  }
+
+  @Override
+  public long aliveSince() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "aliveSince"));
+  }
+
+  @Override
+  public void reinitialize() throws TException {
+    throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+        "reinitialize"));
+
+  }
+
+  @Override
+  public void shutdown() throws TException {
+    // nothing to do. Use this call to clean-up any session specific clean-up.
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
new file mode 100644
index 0000000..399ed09
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import org.apache.impala.catalog.CatalogException;
+
+/**
+ * NoOpCatalogMetastoreServer which is used when metastore service is not configured.
+ */
+public class NoOpCatalogMetastoreServer implements ICatalogMetastoreServer {
+  public static final NoOpCatalogMetastoreServer INSTANCE =
+      new NoOpCatalogMetastoreServer();
+  private static final String EMPTY = "";
+
+  @Override
+  public void start() throws CatalogException {
+    // no-op
+  }
+
+  @Override
+  public void stop() throws CatalogException {
+    // no-op
+  }
+
+  @Override
+  public String getMetrics() {
+    return EMPTY;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 4f68669..7ae4dc7 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.impala.analysis.SqlScanner;
 import org.apache.impala.thrift.TBackendGflags;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
@@ -288,4 +289,25 @@ public class BackendConfig {
   public int getMaxWaitTimeForSyncDdlSecs() {
     return backendCfg_.max_wait_time_for_sync_ddl_s;
   }
+
+  public boolean startHmsServer() {
+    return backendCfg_.start_hms_server;
+  }
+
+  public int getHMSPort() {
+    return backendCfg_.hms_port;
+  }
+
+  public boolean fallbackToHMSOnErrors() {
+    return backendCfg_.fallback_to_hms_on_errors;
+  }
+
+  @VisibleForTesting
+  public void setEnableCatalogdHMSCache(boolean flag) {
+    backendCfg_.enable_catalogd_hms_cache = flag;
+  }
+
+  public boolean enableCatalogdHMSCache() {
+    return backendCfg_.enable_catalogd_hms_cache;
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsFileMetadataTest.java b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsFileMetadataTest.java
new file mode 100644
index 0000000..158b359
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsFileMetadataTest.java
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.HdfsPartition.FileBlock;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.fb.FbFileBlock;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.CatalogServiceTestCatalog.CatalogServiceTestHMSCatalog;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CatalogHmsFileMetadataTest {
+  private static CatalogServiceCatalog catalog_;
+  private static HiveMetaStoreClient catalogHmsClient_;
+  private static final Configuration CONF = MetastoreConf.newMetastoreConf();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    catalog_ = CatalogServiceTestCatalog.createTestCatalogMetastoreServer();
+    MetastoreConf.setVar(CONF, ConfVars.THRIFT_URIS,
+        "thrift://localhost:" + ((CatalogServiceTestHMSCatalog) catalog_).getPort());
+    // metastore clients which connect to catalogd's HMS endpoint need this
+    // configuration set since the forwarded HMS call use catalogd's HMS client
+    // not the end-user's UGI.
+    CONF.set("hive.metastore.execute.setugi", "false");
+    catalogHmsClient_ = new HiveMetaStoreClient(CONF);
+  }
+
+  @AfterClass
+  public static void cleanUp() throws Exception {
+    catalog_.close();
+  }
+
+  /**
+   * The test fetches partitions of a table over HMS API and then compares if the
+   * deserialized filemetadata from the response matches with what we have in catalogd.
+   */
+  @Test
+  public void testFileMetadataForPartitions() throws Exception {
+    // get partitions from catalog directly
+    HdfsTable tbl = (HdfsTable) catalog_
+        .getOrLoadTable("functional", "alltypes", "test", null);
+    HdfsPartition hdfsPartition1 = tbl
+        .getPartitionsForNames(Arrays.asList("year=2009/month=1")).get(0);
+    HdfsPartition hdfsPartition2 = tbl
+        .getPartitionsForNames(Arrays.asList("year=2009/month=2")).get(0);
+
+    // test empty partitions result case.
+    GetPartitionsByNamesRequest request = new GetPartitionsByNamesRequest();
+    String dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+    request.setDb_name(dbName);
+    request.setTbl_name("alltypes");
+    // no names are set so the result is expected to be empty
+    request.setNames(new ArrayList<>());
+    request.setGetFileMetadata(true);
+    GetPartitionsByNamesResult result = catalogHmsClient_.getPartitionsByNames(request);
+    assertTrue(result.getPartitions().isEmpty());
+    Map<Partition, List<FileDescriptor>> fds = CatalogHmsClientUtils
+        .extractFileDescriptors(result, tbl.getHostIndex());
+    assertTrue(fds.isEmpty());
+
+    // get the partitions over HMS API.
+    request = new GetPartitionsByNamesRequest();
+    dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+    request.setDb_name(dbName);
+    request.setTbl_name("alltypes");
+    request.setNames(Arrays.asList("year=2009/month=1", "year=2009/month=2"));
+    request.setGetFileMetadata(true);
+    result = catalogHmsClient_.getPartitionsByNames(request);
+    for (Partition part : result.getPartitions()) {
+      assertNotNull(part.getFileMetadata());
+    }
+    assertNotNull(result.getDictionary());
+    fds = CatalogHmsClientUtils
+        .extractFileDescriptors(result, tbl.getHostIndex());
+    assertEquals(2, fds.size());
+    for (List<FileDescriptor> partFds : fds.values()) {
+      assertFalse(partFds.isEmpty());
+      assertEquals(1, partFds.size());
+    }
+    // make sure that the FileDescriptors from catalog and over HMS API are the same
+    // for the same hostIndex
+    assertFdsAreSame(hdfsPartition1.getFileDescriptors(),
+        fds.get(result.getPartitions().get(0)));
+    assertFdsAreSame(hdfsPartition2.getFileDescriptors(),
+        fds.get(result.getPartitions().get(1)));
+  }
+
+  public static void assertFdsAreSame(List<FileDescriptor> fdsFromCatalog,
+      List<FileDescriptor> fdsFromHMS) {
+    assertEquals(fdsFromCatalog.size(), fdsFromHMS.size());
+    for (int i=0; i<fdsFromCatalog.size(); i++) {
+      FileDescriptor fdFromCatalog = fdsFromCatalog.get(i);
+      FileDescriptor fdFromHMS = fdsFromHMS.get(i);
+      assertEquals(fdFromCatalog.getRelativePath(), fdFromHMS.getRelativePath());
+      assertEquals(fdFromCatalog.getFileCompression(), fdFromHMS.getFileCompression());
+      assertEquals(fdFromCatalog.getFileLength(), fdFromHMS.getFileLength());
+      assertEquals(fdFromCatalog.getIsEc(), fdFromHMS.getIsEc());
+      assertEquals(fdFromCatalog.getModificationTime(), fdFromHMS.getModificationTime());
+      assertEquals(fdFromCatalog.getNumFileBlocks(), fdFromHMS.getNumFileBlocks());
+      for (int j=0; j<fdFromCatalog.getNumFileBlocks(); j++) {
+        FbFileBlock blockFromCat = fdFromCatalog.getFbFileBlock(j);
+        FbFileBlock blockFromHMS = fdFromCatalog.getFbFileBlock(j);
+        // quick and dirty way to compare the relevant fields within the file blocks.
+        assertEquals(FileBlock.debugString(blockFromCat),
+            FileBlock.debugString(blockFromHMS));
+      }
+    }
+  }
+
+  /**
+   * Test requests a table over HMS API with file-metadata and then compares if the
+   * file-metadata returned is same as what we have in catalogd.
+   */
+  @Test
+  public void testFileMetadataForTable() throws Exception {
+    Table tbl = catalogHmsClient_
+        .getTable(null, "functional", "zipcode_incomes", null, false, null, true);
+    assertNotNull(tbl.getFileMetadata());
+    HdfsTable catTbl = (HdfsTable) catalog_
+        .getOrLoadTable("functional", "zipcode_incomes", "test", null);
+    HdfsPartition part = (HdfsPartition) Iterables.getOnlyElement(catTbl.getPartitions());
+    List<FileDescriptor> hmsTblFds = CatalogHmsClientUtils
+        .extractFileDescriptors(tbl, catTbl.getHostIndex());
+    assertFdsAreSame(part.getFileDescriptors(), hmsTblFds);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/catalog/metastore/EnableCatalogdHmsCacheFlagTest.java b/fe/src/test/java/org/apache/impala/catalog/metastore/EnableCatalogdHmsCacheFlagTest.java
new file mode 100644
index 0000000..9409ddb
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/metastore/EnableCatalogdHmsCacheFlagTest.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.CatalogServiceTestCatalog.CatalogServiceTestHMSCatalog;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class EnableCatalogdHmsCacheFlagTest {
+  private static CatalogServiceCatalog catalog_;
+  private static HiveMetaStoreClient catalogHmsClient_;
+  private static final Configuration CONF = MetastoreConf.newMetastoreConf();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    catalog_ = CatalogServiceTestCatalog.createTestCatalogMetastoreServer();
+    MetastoreConf.setVar(CONF, ConfVars.THRIFT_URIS,
+        "thrift://localhost:" + ((CatalogServiceTestHMSCatalog) catalog_).getPort());
+    // metastore clients which connect to catalogd's HMS endpoint need this
+    // configuration set since the forwarded HMS call use catalogd's HMS client
+    // not the end-user's UGI.
+    CONF.set("hive.metastore.execute.setugi", "false");
+    catalogHmsClient_ = new HiveMetaStoreClient(CONF);
+  }
+
+  @AfterClass
+  public static void cleanUp() throws Exception {
+    catalog_.close();
+  }
+
+  /**
+   * The test fetches partitions of a table over HMS API and then compares if the
+   * deserialized filemetadata from the response matches with what we have in catalogd.
+   */
+  @Test
+  public void testEnableCatalogdCachingFlag() throws Exception {
+
+    BackendConfig.INSTANCE.setEnableCatalogdHMSCache(true);
+
+    // get partitions from catalog directly
+    HdfsTable tbl = (HdfsTable) catalog_
+            .getOrLoadTable("functional", "alltypes", "test", null);
+    HdfsPartition hdfsPartition1 = tbl
+            .getPartitionsForNames(Arrays.asList("year=2009/month=1")).get(0);
+    HdfsPartition hdfsPartition2 = tbl
+            .getPartitionsForNames(Arrays.asList("year=2009/month=2")).get(0);
+
+    BackendConfig.INSTANCE.setEnableCatalogdHMSCache(false);
+
+    // test empty partitions result case.
+    GetPartitionsByNamesRequest request = new GetPartitionsByNamesRequest();
+    String dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+    request.setDb_name(dbName);
+    request.setTbl_name("alltypes");
+    // no names are set so the result is expected to be empty
+    request.setNames(new ArrayList<>());
+    request.setGetFileMetadata(true);
+    GetPartitionsByNamesResult result = catalogHmsClient_.getPartitionsByNames(request);
+    assertTrue(result.getPartitions().isEmpty());
+    Map<Partition, List<FileDescriptor>> fds = CatalogHmsClientUtils
+            .extractFileDescriptors(result, tbl.getHostIndex());
+    assertTrue(fds.isEmpty());
+
+    // get the partitions over HMS API.
+    request = new GetPartitionsByNamesRequest();
+    dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+    request.setDb_name(dbName);
+    request.setTbl_name("alltypes");
+    request.setNames(Arrays.asList("year=2009/month=1", "year=2009/month=2"));
+    request.setGetFileMetadata(true);
+    result = catalogHmsClient_.getPartitionsByNames(request);
+    for (Partition part : result.getPartitions()) {
+      assertNotNull(part.getFileMetadata());
+    }
+    assertNotNull(result.getDictionary());
+    fds = CatalogHmsClientUtils
+            .extractFileDescriptors(result, tbl.getHostIndex());
+    assertEquals(2, fds.size());
+    for (List<FileDescriptor> partFds : fds.values()) {
+      assertFalse(partFds.isEmpty());
+      assertEquals(1, partFds.size());
+    }
+
+    // make sure that the FileDescriptors from catalog and over HMS API are the same
+    // for the same hostIndex
+    CatalogHmsFileMetadataTest.assertFdsAreSame(hdfsPartition1.getFileDescriptors(),
+            fds.get(result.getPartitions().get(0)));
+    CatalogHmsFileMetadataTest.assertFdsAreSame(hdfsPartition2.getFileDescriptors(),
+            fds.get(result.getPartitions().get(1)));
+  }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 637a114..a942ee2 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -17,12 +17,15 @@
 
 package org.apache.impala.testutil;
 
+import java.net.ServerSocket;
 import org.apache.impala.authorization.AuthorizationFactory;
 import org.apache.impala.authorization.NoopAuthorizationFactory;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.authorization.NoopAuthorizationFactory.NoopAuthorizationManager;
+import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.MetaStoreClientPool;
+import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.service.FeSupport;
@@ -54,19 +57,29 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     return createWithAuth(new NoopAuthorizationFactory());
   }
 
+  public static CatalogServiceCatalog createWithAuth(AuthorizationFactory authzFactory) {
+    return createWithAuth(authzFactory, false);
+  }
+
   /**
    * Creates a catalog server that reads authorization policy metadata from the
    * authorization config.
    */
-  public static CatalogServiceCatalog createWithAuth(AuthorizationFactory authzFactory) {
+  public static CatalogServiceCatalog createWithAuth(AuthorizationFactory authzFactory,
+      boolean startCatalogHms) {
     FeSupport.loadLibrary();
     CatalogServiceCatalog cs;
     try {
       if (MetastoreShim.getMajorVersion() > 2) {
         MetastoreShim.setHiveClientCapabilities();
       }
-      cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
-          new MetaStoreClientPool(0, 0));
+      if (startCatalogHms) {
+        cs = new CatalogServiceTestHMSCatalog(false, 16, new TUniqueId(),
+            new MetaStoreClientPool(0, 0));
+      } else {
+        cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
+            new MetaStoreClientPool(0, 0));
+      }
       cs.setAuthzManager(authzFactory.newAuthorizationManager(cs));
       cs.reset();
     } catch (ImpalaException e) {
@@ -95,6 +108,76 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     return cs;
   }
 
+  private static class CatalogTestMetastoreServer extends CatalogMetastoreServer {
+    private final int port;
+    public CatalogTestMetastoreServer(
+        CatalogServiceCatalog catalogServiceCatalog) throws ImpalaException {
+      super(catalogServiceCatalog);
+      try {
+        port = getRandomPort();
+      } catch (Exception e) {
+        throw new CatalogException(e.getMessage());
+      }
+    }
+
+    @Override
+    public int getPort() {
+      return port;
+    }
+
+    private static int getRandomPort() throws Exception {
+      for (int i=0; i<5; i++) {
+        ServerSocket serverSocket = null;
+        try {
+          serverSocket = new ServerSocket(0);
+          return serverSocket.getLocalPort();
+        } finally {
+          if (serverSocket != null) serverSocket.close();
+        }
+      }
+      throw new Exception("Could not find a free port");
+    }
+  }
+
+  public static class CatalogServiceTestHMSCatalog extends CatalogServiceTestCatalog {
+    private CatalogTestMetastoreServer metastoreServer;
+    public CatalogServiceTestHMSCatalog(boolean loadInBackground, int numLoadingThreads,
+        TUniqueId catalogServiceId,
+        MetaStoreClientPool metaStoreClientPool) throws ImpalaException {
+      super(loadInBackground, numLoadingThreads, catalogServiceId, metaStoreClientPool);
+    }
+
+    @Override
+    protected CatalogMetastoreServer getCatalogMetastoreServer() {
+      synchronized (this) {
+        if (metastoreServer != null) return metastoreServer;
+        try {
+          metastoreServer = new CatalogTestMetastoreServer(this);
+        } catch (ImpalaException e) {
+          return null;
+        }
+        return metastoreServer;
+      }
+    }
+
+    public int getPort() { return metastoreServer.getPort(); }
+
+    @Override
+    public void close() {
+      super.close();
+      try {
+        metastoreServer.stop();
+      } catch (CatalogException e) {
+        // ignored
+      }
+    }
+  }
+
+  public static CatalogServiceCatalog createTestCatalogMetastoreServer()
+      throws ImpalaException {
+    return createWithAuth(new NoopAuthorizationFactory(), true);
+  }
+
   @Override
   public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
 }
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 7978f91..6145b1d 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -30,6 +30,7 @@ import requests
 import socket
 import subprocess
 import time
+import string
 from functools import wraps
 from getpass import getuser
 from random import choice
@@ -151,6 +152,24 @@ class ImpalaTestSuite(BaseTestSuite):
     # to work with the HS2 client can add HS2 in addition to or instead of beeswax.
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 'beeswax'))
 
+  @staticmethod
+  def create_hive_client(port):
+    """
+    Creates a HMS client to a external running metastore service at the provided port
+    """
+    trans_type = 'buffered'
+    if pytest.config.option.use_kerberos:
+      trans_type = 'kerberos'
+    hive_transport = create_transport(
+      host=pytest.config.option.metastore_server.split(':')[0],
+      port=port,
+      service=pytest.config.option.hive_service_name,
+      transport_type=trans_type)
+    protocol = TBinaryProtocol.TBinaryProtocol(hive_transport)
+    hive_client = ThriftHiveMetastore.Client(protocol)
+    hive_transport.open()
+    return hive_client, hive_transport
+
   @classmethod
   def setup_class(cls):
     """Setup section that runs before each test suite"""
@@ -1185,3 +1204,11 @@ class ImpalaTestSuite(BaseTestSuite):
         LOG.info("Expected log lines could not be found, sleeping before retrying: %s",
             str(e))
         time.sleep(1)
+
+  @staticmethod
+  def get_random_name(prefix='', length=5):
+    """
+    Gets a random name used to create unique database or table
+    """
+    assert length > 0
+    return prefix + ''.join(choice(string.ascii_lowercase) for i in range(length))
diff --git a/tests/custom_cluster/test_metastore_service.py b/tests/custom_cluster/test_metastore_service.py
new file mode 100644
index 0000000..c60fedf
--- /dev/null
+++ b/tests/custom_cluster/test_metastore_service.py
@@ -0,0 +1,707 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from hive_metastore.ttypes import Database
+from hive_metastore.ttypes import FieldSchema
+from hive_metastore.ttypes import GetTableRequest
+from hive_metastore.ttypes import GetPartitionsByNamesRequest
+from hive_metastore.ttypes import Table
+from hive_metastore.ttypes import StorageDescriptor
+from hive_metastore.ttypes import SerDeInfo
+
+from tests.util.event_processor_utils import EventProcessorUtils
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class TestMetastoreService(CustomClusterTestSuite):
+    """
+    Tests for the Catalog Metastore service. Each test in this class should
+    start a hms_server using the catalogd flag --start_hms_server=true
+    """
+
+    part_tbl = ImpalaTestSuite.get_random_name("test_metastore_part_tbl")
+    unpart_tbl = ImpalaTestSuite.get_random_name("test_metastore_unpart_tbl")
+    acid_part_tbl = ImpalaTestSuite.get_random_name("test_metastore_acid_part_tbl")
+    unpart_acid_tbl = ImpalaTestSuite.get_random_name("test_metastore_unpart_acid_tbl")
+    default_unknowntbl = ImpalaTestSuite.get_random_name("test_metastore_default_tbl")
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899"
+    )
+    def test_passthrough_apis(self):
+        """
+        This test exercises some of the Catalog HMS APIs which are directly
+        passed through to the backing HMS service. This is by no means an
+        exhaustive set but merely used to as a sanity check to make sure that a
+        hive_client can connect to the Catalog's metastore service and is
+        able to execute calls to the backing HMS service.
+        """
+        catalog_hms_client = None
+        db_name = ImpalaTestSuite.get_random_name("test_passthrough_apis_db")
+        try:
+            catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+            assert catalog_hms_client is not None
+            # get_databases
+            databases = catalog_hms_client.get_all_databases()
+            assert databases is not None
+            assert len(databases) > 0
+            assert "functional" in databases
+            # get_database
+            database = catalog_hms_client.get_database("functional")
+            assert database is not None
+            assert "functional" == database.name
+            # get_tables
+            tables = catalog_hms_client.get_tables("functional", "*")
+            assert tables is not None
+            assert len(tables) > 0
+            assert "alltypes" in tables
+            # get table
+            table = catalog_hms_client.get_table("functional", "alltypes")
+            assert table is not None
+            assert "alltypes" == table.tableName
+            assert table.sd is not None
+            # get partitions
+            partitions = catalog_hms_client.get_partitions("functional", "alltypes", -1)
+            assert partitions is not None
+            assert len(partitions) > 0
+            # get partition names
+            part_names = catalog_hms_client.get_partition_names("functional", "alltypes",
+                                                                -1)
+            assert part_names is not None
+            assert len(part_names) > 0
+            assert "year=2009/month=1" in part_names
+            # notification APIs
+            event_id = EventProcessorUtils.get_current_notification_id(catalog_hms_client)
+            assert event_id is not None
+            assert event_id > 0
+            # DDLs
+            catalog_hms_client.create_database(self.__get_test_database(db_name))
+            database = catalog_hms_client.get_database(db_name)
+            assert database is not None
+            assert db_name == database.name
+            tbl_name = ImpalaTestSuite.get_random_name(
+                "test_passthrough_apis_tbl")
+            cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+            part_cols = [["part", "string", "part col"]]
+            catalog_hms_client.create_table(self.__get_test_tbl(db_name, tbl_name,
+                cols, part_cols))
+            table = catalog_hms_client.get_table(db_name, tbl_name)
+            assert table is not None
+            assert tbl_name == table.tableName
+            self.__compare_cols(cols, table.sd.cols)
+            self.__compare_cols(part_cols, table.partitionKeys)
+        finally:
+            if catalog_hms_client is not None:
+                catalog_hms_client.drop_database(db_name, True, True)
+                catalog_hms_client.shutdown()
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899"
+    )
+    def test_get_table_req_with_fallback(self):
+      """
+      Test the get_table_req APIs with fallback to HMS enabled. These calls
+      succeed even if catalog throws exceptions since we fallback to HMS.
+      """
+      catalog_hms_client = None
+      db_name = ImpalaTestSuite.get_random_name(
+          "test_get_table_req_with_fallback_db")
+      try:
+        catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+        assert catalog_hms_client is not None
+
+        # Test simple get_table_req without stats.
+        get_table_request = GetTableRequest()
+        get_table_request.dbName = "functional"
+        get_table_request.tblName = "alltypes"
+        get_table_request.getFileMetadata = True
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == "functional"
+        assert get_table_response.table.tableName == "alltypes"
+        # Request did not ask for stats, verify colStats are not populated.
+        assert get_table_response.table.colStats is None
+        # assert that fileMetadata is in the response
+        self.__assert_filemd(get_table_response.table.fileMetadata,
+                             get_table_response.table.dictionary)
+
+        # Test get_table_request with stats and engine set to Impala.
+        get_table_request.getColumnStats = True
+        get_table_request.engine = "impala"
+        get_table_request.getFileMetadata = False
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == "functional"
+        assert get_table_response.table.tableName == "alltypes"
+        assert get_table_response.table.colStats is not None
+        # Verify the column stats objects are populated for
+        # non-clustering columns.
+        assert len(get_table_response.table.colStats.statsObj) > 0
+        # file-metadata is not requested and hence should be not be set
+        # assert that fileMetadata is in the response
+        self.__assert_no_filemd(get_table_response.table.fileMetadata,
+                             get_table_response.table.dictionary)
+
+        # Create table in Hive and test this is properly falling back to HMS.
+        # Create table via Hive.
+        catalog_hms_client.create_database(self.__get_test_database(db_name))
+        database = catalog_hms_client.get_database(db_name)
+        assert database is not None
+        assert db_name == database.name
+        tbl_name = ImpalaTestSuite.get_random_name(
+            "test_get_table_req_with_fallback_tbl")
+        cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+        catalog_hms_client.create_table(self.__get_test_tbl(db_name, tbl_name, cols))
+
+        get_table_request.dbName = db_name
+        get_table_request.tblName = tbl_name
+        get_table_request.getColumnStats = True
+        get_table_request.getFileMetadata = True
+
+        # Engine is not specified, this should throw an exception even if
+        # fallback_to_hms_on_errors is true.
+        expected_exception = "isGetColumnStats() is true in the request but " \
+            "engine is not specified."
+        try:
+          catalog_hms_client.get_table_req(get_table_request)
+        except Exception as e:
+          if expected_exception is not None:
+            assert expected_exception in str(e)
+
+        # Set engine and request impala, this should fallback to HMS since
+        # the table is not in catalog cache. File metadata should be set even if the
+        # request is served via HMS fallback.
+        get_table_request.engine = "Impala"
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == db_name
+        assert get_table_response.table.tableName == tbl_name
+        assert get_table_response.table.fileMetadata is not None
+        # The table does not have any data so we expect the fileMetadata.data to be None
+        assert get_table_response.table.fileMetadata.data is None
+        # even if there are no files, the object dictionary should be not null and empty
+        assert get_table_response.table.dictionary is not None
+        assert len(get_table_response.table.dictionary.values) == 0
+      finally:
+        if catalog_hms_client is not None:
+            catalog_hms_client.shutdown()
+        if self.__get_database_no_throw(db_name) is not None:
+          self.hive_client.drop_database(db_name, True, True)
+
+    def __get_database_no_throw(self, db_name):
+      try:
+        return self.hive_client.get_database(db_name)
+      except Exception:
+        return None
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899 "
+                      "--fallback_to_hms_on_errors=false"
+    )
+    def test_get_table_req_without_fallback(self):
+      """
+      Test the get_table_req APIs with fallback to HMS enabled. These calls
+      throw exceptions since we do not fallback to HMS if the db/table is not
+      in Catalog cache.
+      """
+      catalog_hms_client = None
+      db_name = ImpalaTestSuite.get_random_name(
+          "test_get_table_req_without_fallback_db")
+      new_db_name = None
+      try:
+        catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+        assert catalog_hms_client is not None
+
+        # Create table via Hive.
+        self.hive_client.create_database(self.__get_test_database(db_name))
+        database = self.hive_client.get_database(db_name)
+        assert database is not None
+        assert db_name == database.name
+        tbl_name = ImpalaTestSuite.get_random_name("test_get_table_req_tbl")
+        cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+        self.hive_client.create_table(self.__get_test_tbl(db_name, tbl_name, cols))
+
+        # Test get_table_request with stats and engine set to Impala.
+        # Test simple get_table_req without stats.
+        get_table_request = GetTableRequest()
+        get_table_request.dbName = db_name
+        get_table_request.tblName = tbl_name
+        get_table_request.getColumnStats = True
+        # Engine is not set, this should throw an exception without falling
+        # back to HMS.
+        expected_exception_str = "Column stats are requested " \
+            "but engine is not set in the request."
+        self.__call_get_table_req_expect_exception(catalog_hms_client,
+            get_table_request, expected_exception_str)
+        # Verify DB not found exception is thrown. Engine does not matter,
+        # we only return Impala table level column stats but this field is
+        # required to be consistent with Hive's implementation.
+        get_table_request.engine = "Impala"
+        expected_exception_str = "Database " + db_name + " not found"
+        self.__call_get_table_req_expect_exception(catalog_hms_client,
+            get_table_request, expected_exception_str)
+
+        # Create database in Impala
+        new_db_name = ImpalaTestSuite.get_random_name(
+            "test_get_table_req_without_fallback_db")
+        query = "create database " + new_db_name
+        self.execute_query_expect_success(self.client, query)
+        new_tbl_name = ImpalaTestSuite.get_random_name(
+            "test_get_table_req_without_fallback_tbl")
+        new_cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+        # DDLs currently only pass-through to HMS so this call will not create a table
+        # in catalogd.
+        catalog_hms_client.create_table(self.__get_test_tbl(new_db_name, new_tbl_name,
+            new_cols))
+        # Verify table not found exception is thrown
+        get_table_request.dbName = new_db_name
+        get_table_request.tblName = new_tbl_name
+        expected_exception_str = "Table " + new_db_name + "." + new_tbl_name \
+                                 + " not found"
+        self.__call_get_table_req_expect_exception(catalog_hms_client,
+            get_table_request, expected_exception_str)
+
+        # Create a table in Impala and verify that the get_table_req gets the
+        # table and stats for non-clustering columns immediately.
+        impala_tbl_name = ImpalaTestSuite.get_random_name(
+            "test_get_table_req_without_fallback_impala_tbl")
+        query = "create table " + new_db_name + "." + impala_tbl_name + \
+            "(id int) partitioned by (year int)"
+        self.execute_query_expect_success(self.client, query)
+        get_table_request.dbName = new_db_name
+        get_table_request.tblName = impala_tbl_name
+        get_table_request.getColumnStats = True
+        # Engine does not matter, we only return Impala table level column
+        # stats but this field is required to be consistent with Hive's
+        # implementation.
+        get_table_request.engine = "impala"
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == new_db_name
+        assert get_table_response.table.tableName == impala_tbl_name
+        assert get_table_response.table.colStats is not None
+        assert get_table_response.table.colStats.statsObj is not None
+        # Verify there is a ColumnStatisticsObj only for non-clustering columns
+        # (id in this case).
+        assert len(get_table_response.table.colStats.statsObj) == 1
+        assert get_table_response.table.colStats.statsObj[0].colName == "id"
+
+        # Verify a non-default catalog in the request throws an exception.
+        get_table_request.catName = "testCatalog"
+        expected_exception_str = "Catalog service does not support " \
+            "non-default catalogs"
+        self.__call_get_table_req_expect_exception(catalog_hms_client,
+            get_table_request, expected_exception_str)
+        # fetch file-metadata of a non-partitioned table
+        get_table_request = GetTableRequest()
+        get_table_request.dbName = "functional"
+        get_table_request.tblName = "tinytable"
+        get_table_request.getFileMetadata = True
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == "functional"
+        assert get_table_response.table.tableName == "tinytable"
+        self.__assert_filemd(get_table_response.table.fileMetadata,
+                             get_table_response.table.dictionary)
+
+        # fetch file-metadata along with stats
+        get_table_request = GetTableRequest()
+        get_table_request.dbName = "functional"
+        get_table_request.tblName = "tinytable"
+        get_table_request.getFileMetadata = True
+        get_table_request.getColumnStats = True
+        get_table_request.engine = "impala"
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == "functional"
+        assert get_table_response.table.tableName == "tinytable"
+        self.__assert_filemd(get_table_response.table.fileMetadata,
+                             get_table_response.table.dictionary)
+        assert get_table_response.table.colStats is not None
+        assert get_table_response.table.colStats.statsObj is not None
+      finally:
+        if catalog_hms_client is not None:
+          catalog_hms_client.shutdown()
+        if self.__get_database_no_throw(db_name) is not None:
+          self.hive_client.drop_database(db_name, True, True)
+        if self.__get_database_no_throw(new_db_name) is not None:
+          self.hive_client.drop_database(new_db_name, True, True)
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899 "
+                      "--fallback_to_hms_on_errors=false"
+    )
+    def test_get_partitions_by_names(self):
+      catalog_hms_client = None
+      try:
+        catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+        assert catalog_hms_client is not None
+        valid_part_names = ["year=2009/month=1", "year=2009/month=2", "year=2009/month=3",
+           "year=2009/month=4"]
+        self.__run_partitions_by_names_tests(catalog_hms_client, "functional_parquet",
+          "alltypestiny", valid_part_names)
+      finally:
+        if catalog_hms_client is not None:
+          catalog_hms_client.shutdown()
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(
+        impalad_args="--use_local_catalog=true",
+        catalogd_args="--catalog_topic_mode=minimal "
+                      "--start_hms_server=true "
+                      "--hms_port=5899 "
+                      "--fallback_to_hms_on_errors=true"
+    )
+    def test_fallback_get_partitions_by_names(self, unique_database):
+      """
+      Test makes sure that the fall-back path is working in case of errors in catalogd's
+      implementation of get_partitions_by_name_req.
+      """
+      catalog_client = None
+      try:
+        catalog_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+        assert catalog_client is not None
+        assert self.hive_client is not None
+        self.__create_test_tbls_from_hive(unique_database)
+        valid_part_names = ["year=2009/month=1", "year=2009/month=2", "year=2009/month=3",
+                            "year=2009/month=4"]
+        self.__run_partitions_by_names_tests(catalog_client, unique_database,
+          TestMetastoreService.part_tbl, valid_part_names, True)
+      finally:
+        if catalog_client is not None:
+          catalog_client.shutdown()
+
+    def __create_test_tbls_from_hive(self, db_name):
+      """Util method to create test tables from hive in the given database. It creates
+      4 tables (partitioned and unpartitioned) for non-acid and acid cases and returns
+      the valid partition names for the partitioned tables."""
+      # create a partitioned table
+      # Creating a table from hive and inserting into it takes very long. Hence we create
+      # a external table pointing to an existing table location
+      tbl_location = self.hive_client.get_table("functional", "alltypessmall").sd.location
+      self.run_stmt_in_hive(
+        "create external table {0}.{1} like functional.alltypessmall location '{2}'"
+          .format(db_name, TestMetastoreService.part_tbl, tbl_location))
+      self.run_stmt_in_hive(
+        "msck repair table {0}.{1}".format(db_name, TestMetastoreService.part_tbl))
+      # TODO create a acid partitioned table
+      tbl_location = self.hive_client.get_table("functional", "tinytable").sd.location
+      self.run_stmt_in_hive(
+        "create external table {0}.{1} like functional.tinytable location '{2}'"
+          .format(db_name, TestMetastoreService.unpart_tbl, tbl_location))
+      self.run_stmt_in_hive(
+        "create table default.{0} (c1 int)"
+          .format(TestMetastoreService.default_unknowntbl))
+
+    @classmethod
+    def __run_get_table_tests(cls, catalog_hms_client, db_name, tbl_name,
+                              fallback_expected=False):
+      """
+      Issues get_table_req for various cases and validates the response.
+      """
+      # Test simple get_table_req without stats.
+      get_table_request = GetTableRequest()
+      get_table_request.dbName = db_name
+      get_table_request.tblName = tbl_name
+      get_table_response = catalog_hms_client.get_table_req(get_table_request)
+      assert get_table_response.table.dbName == db_name
+      assert get_table_response.table.tableName == tbl_name
+      # Request did not ask for stats, verify colStats are not populated.
+      assert get_table_response.table.colStats is None
+
+      # Test get_table_request with stats and engine set to Impala.
+      get_table_request.getColumnStats = True
+      get_table_request.engine = "impala"
+      get_table_response = catalog_hms_client.get_table_req(get_table_request)
+      assert get_table_response.table.dbName == db_name
+      assert get_table_response.table.tableName == tbl_name
+      assert get_table_response.table.colStats is not None
+      # Verify the column stats objects are populated for
+      # non-clustering columns.
+      assert len(get_table_response.table.colStats.statsObj) > 0
+
+      # We only return Impala table level column stats but engine field is
+      # required to be consistent with Hive's implementation.
+      get_table_request.engine = "hive"
+      get_table_response = catalog_hms_client.get_table_req(get_table_request)
+      assert get_table_response.table.dbName == db_name
+      assert get_table_response.table.tableName == tbl_name
+      assert get_table_response.table.colStats is not None
+      # Verify the column stats objects are populated for
+      # non-clustering columns.
+      assert len(get_table_response.table.colStats.statsObj) > 0
+
+      # request for tables which are unknown to catalogd should fallback to HMS
+      # if fallback_expected if True
+      get_table_request.dbName = TestMetastoreService.test_db_name
+      get_table_request.tblName = TestMetastoreService.unpart_tbl
+      get_table_request.getColumnStats = True
+
+      # Engine is not specified, this should throw an exception even if
+      # fallback_to_hms_on_errors is true.
+      expected_exception = "isGetColumnStats() is true in the request but " \
+                           "engine is not specified."
+      try:
+        catalog_hms_client.get_table_req(get_table_request)
+      except Exception as e:
+        if expected_exception is not None:
+          assert expected_exception in str(e)
+
+      # Set engine and request impala, this should fallback to HMS since
+      # the table is not in catalog cache.
+      get_table_request.engine = "Impala"
+      if fallback_expected:
+        get_table_response = catalog_hms_client.get_table_req(get_table_request)
+        assert get_table_response.table.dbName == db_name
+        assert get_table_response.table.tableName == tbl_name
+      else:
+        expected_exception = None
+        try:
+          catalog_hms_client.get_table_req(get_table_request)
+        except Exception as e:
+          expected_exception = e
+        assert expected_exception is not None
+
+    def __run_partitions_by_names_tests(self, catalog_hms_client, db_name, tbl_name,
+                                        valid_part_names, expect_fallback=False):
+      """This method runs all the test cases for fetching partitions. The method
+      expects that the partition keys are year and month for the given table and
+      that there are atleast 3 partitions in the table."""
+      # Test get_partition_by_name_req
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      # must have more than 3 partitions to test the filtering below
+      assert len(valid_part_names) > 3
+      part_names = valid_part_names[:3]
+      part_name_format = "year={0}/month={1}"
+      get_parts_req.names = part_names
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      assert response.partitions is not None
+      assert len(response.partitions) == 3
+      self.__validate_partitions(part_names, response, part_name_format)
+      # request partitions with file-metadata
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      get_parts_req.names = part_names
+      get_parts_req.getFileMetadata = True
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      assert response.partitions is not None
+      assert len(response.partitions) == 3
+      self.__validate_partitions(part_names, response, part_name_format,
+                                 True)
+      # request contains unavailable partitions
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      part_names = [x for x in valid_part_names]
+      part_names[0] = "year=2009/month=13"
+      get_parts_req.names = part_names
+      get_parts_req.getFileMetadata = True
+      assert get_parts_req.names is not None
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      assert response.partitions is not None
+      assert len(response.partitions) == 3
+      # remove the bad partname for comparison below
+      part_names.pop(0)
+      self.__validate_partitions(part_names, response, part_name_format, True)
+      # request contains empty partition names
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      get_parts_req.names = []
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      assert response.partitions is not None
+      assert len(response.partitions) == 0
+      # Negative test cases
+      # invalid partition keys
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = tbl_name
+      get_parts_req.names = ["invalid-key=1"]
+      response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+      # in this case we replicate what HMS does which is to return 0 partitions
+      assert len(response.partitions) == 0
+      # empty table name
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = ""
+      get_parts_req.names = []
+      if expect_fallback:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                   "NoSuchObjectException")
+      else:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                 "Table name is empty or null")
+      # empty db name
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = ""
+      get_parts_req.tbl_name = tbl_name
+      get_parts_req.names = []
+      if expect_fallback:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                   "NoSuchObjectException")
+      else:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                 "Database name is empty or null")
+      # table does not exist
+      get_parts_req = GetPartitionsByNamesRequest()
+      get_parts_req.db_name = db_name
+      get_parts_req.tbl_name = "table-does-not-exist"
+      get_parts_req.names = []
+      if expect_fallback:
+        # TODO HMS actually throws an InvalidObjectException but the HMS API signature
+        # doesn't declare it in the signature.
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+                                                   "Internal error")
+      else:
+        self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+          "Table {0}.table-does-not-exist not found".format(
+            db_name))
+
+    @classmethod
+    def __validate_partitions(cls, part_names, get_parts_by_names_result, name_format,
+                              expect_files=False):
+      """
+      Validates that the given list of partitions contains all the partitions from the
+      given list of partition names.
+      """
+      test_part_names = list(part_names)
+      if expect_files:
+        assert get_parts_by_names_result.dictionary is not None
+        assert len(get_parts_by_names_result.dictionary.values) > 0
+      else:
+        assert get_parts_by_names_result.dictionary is None
+      partitions = get_parts_by_names_result.partitions
+      for part in partitions:
+        assert part is not None
+        # we create the part name here since partition object only has keys
+        assert len(part.values) == 2
+        name = name_format.format(part.values[0], part.values[1])
+        assert name in test_part_names
+        if expect_files:
+          assert part.fileMetadata is not None
+          assert part.fileMetadata.data is not None
+        else:
+          assert part.fileMetadata is None
+        test_part_names.remove(name)
+      assert len(test_part_names) == 0
+
+    def __assert_filemd(self, filemetadata, obj_dict):
+      """
+      Util method which asserts that the given file-metadata is valid.
+      """
+      assert filemetadata is not None
+      assert filemetadata.data is not None
+      assert obj_dict is not None
+      assert len(obj_dict.values) > 0
+
+    def __assert_no_filemd(self, filemetadata, obj_dict):
+      """
+      Util method which asserts that the given file-metadata not valid. Used to verify
+      that the HMS response objects do not contain the file-metadata.
+      """
+      assert filemetadata is None
+      assert obj_dict is None
+
+    @classmethod
+    def __call_get_table_req_expect_exception(cls, client,
+        get_table_request, expected_exception_str=None):
+      exception_recieved = None
+      try:
+        client.get_table_req(get_table_request)
+      except Exception as e:
+        exception_recieved = e
+        if expected_exception_str is not None:
+          assert expected_exception_str in str(e)
+      assert exception_recieved is not None
+
+    @classmethod
+    def __get_parts_by_names_expect_exception(self, catalog_hms_client,
+        request, expected_exception_str=None):
+      """
+      Calls get_partitions_by_names_req on the HMS client and expects
+      it to call with the exception msg as provided in expected_exception_str
+      """
+      exception = None
+      try:
+        catalog_hms_client.get_partitions_by_names_req(request)
+      except Exception as e:
+        exception = e
+        if expected_exception_str is not None:
+          assert expected_exception_str in str(e)
+      assert exception is not None
+
+    def __compare_cols(self, cols, fieldSchemas):
+        """
+        Compares the given list of fieldSchemas with the expected cols
+        """
+        assert len(cols) == len(fieldSchemas)
+        for i in range(len(cols)):
+            assert cols[i][0] == fieldSchemas[i].name
+            assert cols[i][1] == fieldSchemas[i].type
+            assert cols[i][2] == fieldSchemas[i].comment
+
+    def __get_test_database(self, db_name,
+        description="test_db_for_metastore_service"):
+        db = Database()
+        db.name = db_name
+        db.desription = description
+        return db
+
+    def __get_test_tbl(self, db_name, tbl_name, cols, part_cols=None):
+        tbl = Table()
+        tbl.dbName = db_name
+        tbl.tableName = tbl_name
+        sd = StorageDescriptor()
+        tbl.sd = sd
+        sd.cols = self.__create_field_schemas(cols)
+        sd.inputFormat = \
+          "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
+        sd.outputFormat = \
+          "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
+        if part_cols is not None:
+            tbl.partitionKeys = self.__create_field_schemas(part_cols)
+        serde = SerDeInfo()
+        serde.name = ""
+        serde.serializationLib = \
+          "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+        tbl.sd.serdeInfo = serde
+        return tbl
+
+    def __create_field_schemas(self, cols):
+        fieldSchemas = []
+        for col in cols:
+            f = FieldSchema()
+            f.name = col[0]
+            f.type = col[1]
+            f.comment = col[2]
+            fieldSchemas.append(f)
+        return fieldSchemas