You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/07/30 07:34:15 UTC

[GitHub] [hive] jcamachor commented on a change in pull request #1317: HIVE-23949: Introduce caching layer in HS2 to accelerate query compilation

jcamachor commented on a change in pull request #1317:
URL: https://github.com/apache/hive/pull/1317#discussion_r462584582



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -3665,33 +3678,39 @@ public boolean dropPartition(String dbName, String tableName, List<String> parti
    * @return list of partition objects
    */
   public List<Partition> getPartitions(Table tbl) throws HiveException {
-    if (tbl.isPartitioned()) {
-      List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
-      try {
-        GetPartitionsPsWithAuthRequest req = new GetPartitionsPsWithAuthRequest();
-        req.setTblName(tbl.getTableName());
-        req.setDbName(tbl.getDbName());
-        req.setUserName(getUserName());
-        req.setMaxParts((short) -1);
-        req.setGroupNames(getGroupNames());
-        if (AcidUtils.isTransactionalTable(tbl)) {
-          ValidWriteIdList validWriteIdList = getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
-          req.setValidWriteIdList(validWriteIdList != null ? validWriteIdList.toString() : null);
-        }
-        GetPartitionsPsWithAuthResponse res = getMSC().listPartitionsWithAuthInfoRequest(req);
-        tParts = res.getPartitions();
+    long t1 = System.nanoTime();
+    try {
+      if (tbl.isPartitioned()) {
+        List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
+        try {
+          GetPartitionsPsWithAuthRequest req = new GetPartitionsPsWithAuthRequest();
+          req.setTblName(tbl.getTableName());
+          req.setDbName(tbl.getDbName());
+          req.setUserName(getUserName());
+          req.setMaxParts((short) -1);
+          req.setGroupNames(getGroupNames());
+          if (AcidUtils.isTransactionalTable(tbl)) {
+            ValidWriteIdList validWriteIdList = getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
+            req.setValidWriteIdList(validWriteIdList != null ? validWriteIdList.toString() : null);
+          }
+          GetPartitionsPsWithAuthResponse res = getMSC().listPartitionsWithAuthInfoRequest(req);
+          tParts = res.getPartitions();
 
-      } catch (Exception e) {
-        LOG.error(StringUtils.stringifyException(e));
-        throw new HiveException(e);
-      }
-      List<Partition> parts = new ArrayList<>(tParts.size());
-      for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
-        parts.add(new Partition(tbl, tpart));
+        } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+          throw new HiveException(e);
+        }
+        List<Partition> parts = new ArrayList<>(tParts.size());
+        for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
+          parts.add(new Partition(tbl, tpart));
+        }
+        return parts;
+      } else {
+        return Collections.singletonList(new Partition(tbl));
       }
-      return parts;
-    } else {
-      return Collections.singletonList(new Partition(tbl));
+    } finally {

Review comment:
       I did not catch this comment in the review. I was taking a look at the code and I think there are two problems with current approach (sorry about the back and forth).
   1) Currently we are logging information twice (PerfLogger already has its own logging mechanism when logging is set to debug level). We should remove the logging lines outside of it.
   2) I think we should have left the PerfLogEnd in the `finally` block. I saw in some methods that we catch an exception and ignore it, which could lead to leaks since we do not call PerfLogEnd. Even if currently this may not be a problem, it would be difficult to verify that nothing changes in the future there. I think the safest path is just to make that call in the `finally` block to make sure it is always called.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -3665,33 +3678,39 @@ public boolean dropPartition(String dbName, String tableName, List<String> parti
    * @return list of partition objects
    */
   public List<Partition> getPartitions(Table tbl) throws HiveException {
-    if (tbl.isPartitioned()) {
-      List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
-      try {
-        GetPartitionsPsWithAuthRequest req = new GetPartitionsPsWithAuthRequest();
-        req.setTblName(tbl.getTableName());
-        req.setDbName(tbl.getDbName());
-        req.setUserName(getUserName());
-        req.setMaxParts((short) -1);
-        req.setGroupNames(getGroupNames());
-        if (AcidUtils.isTransactionalTable(tbl)) {
-          ValidWriteIdList validWriteIdList = getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
-          req.setValidWriteIdList(validWriteIdList != null ? validWriteIdList.toString() : null);
-        }
-        GetPartitionsPsWithAuthResponse res = getMSC().listPartitionsWithAuthInfoRequest(req);
-        tParts = res.getPartitions();
+    long t1 = System.nanoTime();
+    try {
+      if (tbl.isPartitioned()) {
+        List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
+        try {
+          GetPartitionsPsWithAuthRequest req = new GetPartitionsPsWithAuthRequest();
+          req.setTblName(tbl.getTableName());
+          req.setDbName(tbl.getDbName());
+          req.setUserName(getUserName());
+          req.setMaxParts((short) -1);
+          req.setGroupNames(getGroupNames());
+          if (AcidUtils.isTransactionalTable(tbl)) {
+            ValidWriteIdList validWriteIdList = getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
+            req.setValidWriteIdList(validWriteIdList != null ? validWriteIdList.toString() : null);
+          }
+          GetPartitionsPsWithAuthResponse res = getMSC().listPartitionsWithAuthInfoRequest(req);
+          tParts = res.getPartitions();
 
-      } catch (Exception e) {
-        LOG.error(StringUtils.stringifyException(e));
-        throw new HiveException(e);
-      }
-      List<Partition> parts = new ArrayList<>(tParts.size());
-      for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
-        parts.add(new Partition(tbl, tpart));
+        } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+          throw new HiveException(e);
+        }
+        List<Partition> parts = new ArrayList<>(tParts.size());
+        for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
+          parts.add(new Partition(tbl, tpart));
+        }
+        return parts;
+      } else {
+        return Collections.singletonList(new Partition(tbl));
       }
-      return parts;
-    } else {
-      return Collections.singletonList(new Partition(tbl));
+    } finally {

Review comment:
       I did not catch this comment in the review. I was taking a look at the code and I think there are two problems with current approach (sorry about the back and forth).
   1) Currently we are logging information twice (PerfLogger already has its own logging mechanism when logging is set to debug level). We should remove the logging lines outside of it.
   2) I think we should have left the PerfLogEnd in the `finally` block. I saw in some methods that we catch an exception and ignore it, which could lead to leaks since we do not call PerfLogEnd. Even if currently this may not be a problem, it would be difficult to verify that nothing changes in the future there. I think the safest path is just to make that call in the `finally` block to make sure it is always called.
   
   Thoughts?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -3665,33 +3678,39 @@ public boolean dropPartition(String dbName, String tableName, List<String> parti
    * @return list of partition objects
    */
   public List<Partition> getPartitions(Table tbl) throws HiveException {
-    if (tbl.isPartitioned()) {
-      List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
-      try {
-        GetPartitionsPsWithAuthRequest req = new GetPartitionsPsWithAuthRequest();
-        req.setTblName(tbl.getTableName());
-        req.setDbName(tbl.getDbName());
-        req.setUserName(getUserName());
-        req.setMaxParts((short) -1);
-        req.setGroupNames(getGroupNames());
-        if (AcidUtils.isTransactionalTable(tbl)) {
-          ValidWriteIdList validWriteIdList = getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
-          req.setValidWriteIdList(validWriteIdList != null ? validWriteIdList.toString() : null);
-        }
-        GetPartitionsPsWithAuthResponse res = getMSC().listPartitionsWithAuthInfoRequest(req);
-        tParts = res.getPartitions();
+    long t1 = System.nanoTime();
+    try {
+      if (tbl.isPartitioned()) {
+        List<org.apache.hadoop.hive.metastore.api.Partition> tParts;
+        try {
+          GetPartitionsPsWithAuthRequest req = new GetPartitionsPsWithAuthRequest();
+          req.setTblName(tbl.getTableName());
+          req.setDbName(tbl.getDbName());
+          req.setUserName(getUserName());
+          req.setMaxParts((short) -1);
+          req.setGroupNames(getGroupNames());
+          if (AcidUtils.isTransactionalTable(tbl)) {
+            ValidWriteIdList validWriteIdList = getValidWriteIdList(tbl.getDbName(), tbl.getTableName());
+            req.setValidWriteIdList(validWriteIdList != null ? validWriteIdList.toString() : null);
+          }
+          GetPartitionsPsWithAuthResponse res = getMSC().listPartitionsWithAuthInfoRequest(req);
+          tParts = res.getPartitions();
 
-      } catch (Exception e) {
-        LOG.error(StringUtils.stringifyException(e));
-        throw new HiveException(e);
-      }
-      List<Partition> parts = new ArrayList<>(tParts.size());
-      for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
-        parts.add(new Partition(tbl, tpart));
+        } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+          throw new HiveException(e);
+        }
+        List<Partition> parts = new ArrayList<>(tParts.size());
+        for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) {
+          parts.add(new Partition(tbl, tpart));
+        }
+        return parts;
+      } else {
+        return Collections.singletonList(new Partition(tbl));
       }
-      return parts;
-    } else {
-      return Collections.singletonList(new Partition(tbl));
+    } finally {

Review comment:
       I just checked the code and for 2., there are quite a few occurrences. I think we may at least move it to `finally` for the methods where we already catching exceptions. For others, we can create a follow-up to explore that, since that may be a more widespread issue that affects other code paths where PerfLogger is used too.

##########
File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+  private static Cache<CacheKey, Object> mscLocalCache = null;
+  //TODO: initialize in the init method

Review comment:
       nit. This line can be removed?

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -487,6 +487,13 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
    * in the underlying Hadoop configuration.
    */
   public static enum ConfVars {
+    MSC_CACHE_ENABLED("hive.metastore.client.cache.enabled", true,
+            "This property enables a Caffeiene Cache for Metastore client"),
+    MSC_CACHE_MAX_SIZE("hive.metastore.client.cache.maxSize", "1Gb", new SizeValidator(),
+            "Set the maximum size (number of bytes) of the metastore client cache (DEFAULT: 1GB). " +
+                    "Only in effect when the cache is enabled"),
+    MSC_CACHE_RECORD_STATS("hive.metastore.client.cache.recordStats", true,

Review comment:
       The default for this property should be `false` (you can set it to `true` in tests `hive-site` files though).

##########
File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+  private static Cache<CacheKey, Object> mscLocalCache = null;
+  //TODO: initialize in the init method
+  private static boolean IS_CACHE_ENABLED;
+  private static long MAX_SIZE;
+  private static boolean RECORD_STATS;
+  private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+  private static String cacheObjName = null;
+
+  public static synchronized void init() {
+    if (mscLocalCache != null) return; // init cache only once
+    Configuration metaConf = MetastoreConf.newMetastoreConf();
+    LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+    MAX_SIZE = MetastoreConf.getSizeVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+    IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+    RECORD_STATS = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+    initSizeEstimator();
+    initCache();
+    LOG.debug("Local cache initialized in HiveMetaStoreClient: " + mscLocalCache);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf) throws MetaException {
+    this(conf, null, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader) throws MetaException {
+    this(conf, hookLoader, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+    super(conf, hookLoader, allowEmbedded);
+  }
+
+  private static void initSizeEstimator() {
+    sizeEstimator = new HashMap<>();
+    IncrementalObjectSizeEstimator.createEstimators(CacheKey.class, sizeEstimator);
+    Arrays.stream(KeyType.values()).forEach(e -> {
+      IncrementalObjectSizeEstimator.createEstimators(e.keyClass, sizeEstimator);
+      IncrementalObjectSizeEstimator.createEstimators(e.valueClass, sizeEstimator);}
+    );
+  }
+
+  /**
+   * KeyType is used to differentiate the request types. More types can be added in future.
+   */
+  public enum KeyType {
+    PARTITIONS_BY_EXPR(PartitionsByExprRequest.class, PartitionsByExprResult.class),
+    PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class, PartitionsSpecByExprResult.class);
+
+    private final Class<?> keyClass;
+    private final Class<?> valueClass;
+
+    KeyType(Class<?> keyClass, Class<?> valueClass) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+    }
+  }
+
+  /**
+   * CacheKey objects are used as key for the cache.
+   */
+  public static class CacheKey{
+    KeyType IDENTIFIER;
+    Object obj;
+
+    public CacheKey(KeyType IDENTIFIER, Object obj) {
+      this.IDENTIFIER = IDENTIFIER;
+      this.obj = obj;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CacheKey cacheKey = (CacheKey) o;
+      return IDENTIFIER == cacheKey.IDENTIFIER &&
+              Objects.equals(obj, cacheKey.obj);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(IDENTIFIER, obj);
+    }
+  }
+
+  private static int getWeight(CacheKey key, Object val) {
+    ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+    ObjectEstimator valSizeEstimator = sizeEstimator.get(key.IDENTIFIER.valueClass);
+    int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+    int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize, valSize, keySize + valSize);
+    }
+    return keySize + valSize;
+  }
+
+  private Object load(CacheKey key) {
+    try {
+      return getResultObject(key);
+    } catch (TException e) {
+      throw new UncheckedCacheException(e);
+    }
+  }
+
+/**
+ * Initializes the cache
+ */
+  private static void initCache() {
+    int initSize = 100;
+    Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+            .initialCapacity(initSize)
+            .maximumWeight(MAX_SIZE)
+            .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+            .removalListener((key, val, cause) -> {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val, cause);
+              }});
+    if (RECORD_STATS) {
+      cacheBuilder.recordStats();
+    }
+    mscLocalCache = cacheBuilder.build();
+    cacheObjName = mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+  }
+
+  /**
+   * This method is used to load the cache by calling relevant APIs, depending on the type of the request.
+   *
+   * @param cacheKey key of the cache, containing an identifier and a request object
+   * @return Result object / null
+   * @throws TException
+   */
+  private Object getResultObject(CacheKey cacheKey) throws TException {
+    Object result = null;
+
+    switch (cacheKey.IDENTIFIER) {
+      case PARTITIONS_BY_EXPR:
+        result = super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      case PARTITIONS_SPEC_BY_EXPR:
+        result = super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      default:
+        break;
+    }
+
+    return result;
+  }
+
+  @Override
+  protected PartitionsByExprResult getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+    PartitionsByExprResult r;
+
+    // table should be transactional to get responses from the cache
+    if (isCacheEnabledAndInitialized() && isRequestCachable(req, KeyType.PARTITIONS_BY_EXPR)) {
+      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+      try {
+        r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load); // get either the result or an Exception
+
+        if (LOG.isDebugEnabled() && RECORD_STATS) {
+          LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        }
+
+      } catch (UncheckedCacheException e) {
+        if (e.getCause() instanceof MetaException) {
+          throw (MetaException) e.getCause();
+        } else {
+          throw new TException(e.getCause());
+        }
+      }
+    } else {
+         r = client.get_partitions_by_expr(req);

Review comment:
       nit. whitespace

##########
File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+  private static Cache<CacheKey, Object> mscLocalCache = null;
+  //TODO: initialize in the init method
+  private static boolean IS_CACHE_ENABLED;
+  private static long MAX_SIZE;
+  private static boolean RECORD_STATS;
+  private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+  private static String cacheObjName = null;
+
+  public static synchronized void init() {
+    if (mscLocalCache != null) return; // init cache only once
+    Configuration metaConf = MetastoreConf.newMetastoreConf();
+    LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+    MAX_SIZE = MetastoreConf.getSizeVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+    IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+    RECORD_STATS = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+    initSizeEstimator();
+    initCache();
+    LOG.debug("Local cache initialized in HiveMetaStoreClient: " + mscLocalCache);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf) throws MetaException {
+    this(conf, null, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader) throws MetaException {
+    this(conf, hookLoader, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+    super(conf, hookLoader, allowEmbedded);
+  }
+
+  private static void initSizeEstimator() {
+    sizeEstimator = new HashMap<>();
+    IncrementalObjectSizeEstimator.createEstimators(CacheKey.class, sizeEstimator);
+    Arrays.stream(KeyType.values()).forEach(e -> {
+      IncrementalObjectSizeEstimator.createEstimators(e.keyClass, sizeEstimator);
+      IncrementalObjectSizeEstimator.createEstimators(e.valueClass, sizeEstimator);}
+    );
+  }
+
+  /**
+   * KeyType is used to differentiate the request types. More types can be added in future.
+   */
+  public enum KeyType {
+    PARTITIONS_BY_EXPR(PartitionsByExprRequest.class, PartitionsByExprResult.class),
+    PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class, PartitionsSpecByExprResult.class);
+
+    private final Class<?> keyClass;
+    private final Class<?> valueClass;
+
+    KeyType(Class<?> keyClass, Class<?> valueClass) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+    }
+  }
+
+  /**
+   * CacheKey objects are used as key for the cache.
+   */
+  public static class CacheKey{
+    KeyType IDENTIFIER;
+    Object obj;
+
+    public CacheKey(KeyType IDENTIFIER, Object obj) {
+      this.IDENTIFIER = IDENTIFIER;
+      this.obj = obj;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CacheKey cacheKey = (CacheKey) o;
+      return IDENTIFIER == cacheKey.IDENTIFIER &&
+              Objects.equals(obj, cacheKey.obj);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(IDENTIFIER, obj);
+    }
+  }
+
+  private static int getWeight(CacheKey key, Object val) {
+    ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+    ObjectEstimator valSizeEstimator = sizeEstimator.get(key.IDENTIFIER.valueClass);
+    int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+    int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize, valSize, keySize + valSize);
+    }
+    return keySize + valSize;
+  }
+
+  private Object load(CacheKey key) {
+    try {
+      return getResultObject(key);
+    } catch (TException e) {
+      throw new UncheckedCacheException(e);
+    }
+  }
+
+/**
+ * Initializes the cache
+ */
+  private static void initCache() {
+    int initSize = 100;
+    Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+            .initialCapacity(initSize)
+            .maximumWeight(MAX_SIZE)
+            .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+            .removalListener((key, val, cause) -> {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val, cause);
+              }});
+    if (RECORD_STATS) {
+      cacheBuilder.recordStats();
+    }
+    mscLocalCache = cacheBuilder.build();
+    cacheObjName = mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+  }
+
+  /**
+   * This method is used to load the cache by calling relevant APIs, depending on the type of the request.
+   *
+   * @param cacheKey key of the cache, containing an identifier and a request object
+   * @return Result object / null
+   * @throws TException
+   */
+  private Object getResultObject(CacheKey cacheKey) throws TException {
+    Object result = null;
+
+    switch (cacheKey.IDENTIFIER) {
+      case PARTITIONS_BY_EXPR:
+        result = super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      case PARTITIONS_SPEC_BY_EXPR:
+        result = super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      default:
+        break;
+    }
+
+    return result;
+  }
+
+  @Override
+  protected PartitionsByExprResult getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+    PartitionsByExprResult r;
+
+    // table should be transactional to get responses from the cache
+    if (isCacheEnabledAndInitialized() && isRequestCachable(req, KeyType.PARTITIONS_BY_EXPR)) {
+      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+      try {
+        r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load); // get either the result or an Exception
+
+        if (LOG.isDebugEnabled() && RECORD_STATS) {
+          LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        }
+
+      } catch (UncheckedCacheException e) {
+        if (e.getCause() instanceof MetaException) {
+          throw (MetaException) e.getCause();
+        } else {

Review comment:
       nit. Maybe we could check whether it is already a `TException` so we do not wrap it around a second one.

##########
File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+  private static Cache<CacheKey, Object> mscLocalCache = null;
+  //TODO: initialize in the init method
+  private static boolean IS_CACHE_ENABLED;
+  private static long MAX_SIZE;
+  private static boolean RECORD_STATS;
+  private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+  private static String cacheObjName = null;
+
+  public static synchronized void init() {
+    if (mscLocalCache != null) return; // init cache only once
+    Configuration metaConf = MetastoreConf.newMetastoreConf();
+    LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+    MAX_SIZE = MetastoreConf.getSizeVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+    IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+    RECORD_STATS = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+    initSizeEstimator();
+    initCache();
+    LOG.debug("Local cache initialized in HiveMetaStoreClient: " + mscLocalCache);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf) throws MetaException {
+    this(conf, null, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader) throws MetaException {
+    this(conf, hookLoader, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+    super(conf, hookLoader, allowEmbedded);
+  }
+
+  private static void initSizeEstimator() {
+    sizeEstimator = new HashMap<>();
+    IncrementalObjectSizeEstimator.createEstimators(CacheKey.class, sizeEstimator);
+    Arrays.stream(KeyType.values()).forEach(e -> {
+      IncrementalObjectSizeEstimator.createEstimators(e.keyClass, sizeEstimator);
+      IncrementalObjectSizeEstimator.createEstimators(e.valueClass, sizeEstimator);}
+    );
+  }
+
+  /**
+   * KeyType is used to differentiate the request types. More types can be added in future.
+   */
+  public enum KeyType {
+    PARTITIONS_BY_EXPR(PartitionsByExprRequest.class, PartitionsByExprResult.class),
+    PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class, PartitionsSpecByExprResult.class);
+
+    private final Class<?> keyClass;
+    private final Class<?> valueClass;
+
+    KeyType(Class<?> keyClass, Class<?> valueClass) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+    }
+  }
+
+  /**
+   * CacheKey objects are used as key for the cache.
+   */
+  public static class CacheKey{
+    KeyType IDENTIFIER;
+    Object obj;
+
+    public CacheKey(KeyType IDENTIFIER, Object obj) {
+      this.IDENTIFIER = IDENTIFIER;
+      this.obj = obj;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CacheKey cacheKey = (CacheKey) o;
+      return IDENTIFIER == cacheKey.IDENTIFIER &&
+              Objects.equals(obj, cacheKey.obj);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(IDENTIFIER, obj);
+    }
+  }
+
+  private static int getWeight(CacheKey key, Object val) {
+    ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+    ObjectEstimator valSizeEstimator = sizeEstimator.get(key.IDENTIFIER.valueClass);
+    int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+    int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize, valSize, keySize + valSize);
+    }
+    return keySize + valSize;
+  }
+
+  private Object load(CacheKey key) {
+    try {
+      return getResultObject(key);
+    } catch (TException e) {
+      throw new UncheckedCacheException(e);
+    }
+  }
+
+/**
+ * Initializes the cache
+ */
+  private static void initCache() {
+    int initSize = 100;
+    Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+            .initialCapacity(initSize)
+            .maximumWeight(MAX_SIZE)
+            .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+            .removalListener((key, val, cause) -> {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val, cause);
+              }});
+    if (RECORD_STATS) {
+      cacheBuilder.recordStats();
+    }
+    mscLocalCache = cacheBuilder.build();
+    cacheObjName = mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+  }
+
+  /**
+   * This method is used to load the cache by calling relevant APIs, depending on the type of the request.
+   *
+   * @param cacheKey key of the cache, containing an identifier and a request object
+   * @return Result object / null
+   * @throws TException
+   */
+  private Object getResultObject(CacheKey cacheKey) throws TException {
+    Object result = null;
+
+    switch (cacheKey.IDENTIFIER) {
+      case PARTITIONS_BY_EXPR:
+        result = super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      case PARTITIONS_SPEC_BY_EXPR:
+        result = super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      default:
+        break;
+    }
+
+    return result;
+  }
+
+  @Override
+  protected PartitionsByExprResult getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+    PartitionsByExprResult r;
+
+    // table should be transactional to get responses from the cache
+    if (isCacheEnabledAndInitialized() && isRequestCachable(req, KeyType.PARTITIONS_BY_EXPR)) {
+      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+      try {
+        r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load); // get either the result or an Exception
+
+        if (LOG.isDebugEnabled() && RECORD_STATS) {
+          LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        }
+
+      } catch (UncheckedCacheException e) {
+        if (e.getCause() instanceof MetaException) {
+          throw (MetaException) e.getCause();
+        } else {
+          throw new TException(e.getCause());
+        }
+      }
+    } else {
+         r = client.get_partitions_by_expr(req);
+    }
+
+    return r;
+  }
+
+  @Override
+  protected PartitionsSpecByExprResult getPartitionsSpecByExprResult(PartitionsByExprRequest req) throws TException {
+    PartitionsSpecByExprResult r;
+
+    // table should be transactional to get responses from the cache
+    if (isCacheEnabledAndInitialized() && isRequestCachable(req, KeyType.PARTITIONS_SPEC_BY_EXPR)) {
+      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_SPEC_BY_EXPR, req);
+      try {
+        r = (PartitionsSpecByExprResult) mscLocalCache.get(cacheKey, this::load); // get either the result or an Exception
+
+        if (LOG.isDebugEnabled() && RECORD_STATS) {
+          LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        }
+
+      } catch (UncheckedCacheException e) {
+        if (e.getCause() instanceof MetaException) {
+          throw (MetaException) e.getCause();
+        } else {

Review comment:
       nit. Same comment as above (TException).

##########
File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+  private static Cache<CacheKey, Object> mscLocalCache = null;
+  //TODO: initialize in the init method
+  private static boolean IS_CACHE_ENABLED;
+  private static long MAX_SIZE;
+  private static boolean RECORD_STATS;
+  private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+  private static String cacheObjName = null;
+
+  public static synchronized void init() {
+    if (mscLocalCache != null) return; // init cache only once
+    Configuration metaConf = MetastoreConf.newMetastoreConf();
+    LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+    MAX_SIZE = MetastoreConf.getSizeVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+    IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+    RECORD_STATS = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+    initSizeEstimator();
+    initCache();
+    LOG.debug("Local cache initialized in HiveMetaStoreClient: " + mscLocalCache);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf) throws MetaException {
+    this(conf, null, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader) throws MetaException {
+    this(conf, hookLoader, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+    super(conf, hookLoader, allowEmbedded);
+  }
+
+  private static void initSizeEstimator() {
+    sizeEstimator = new HashMap<>();
+    IncrementalObjectSizeEstimator.createEstimators(CacheKey.class, sizeEstimator);
+    Arrays.stream(KeyType.values()).forEach(e -> {
+      IncrementalObjectSizeEstimator.createEstimators(e.keyClass, sizeEstimator);
+      IncrementalObjectSizeEstimator.createEstimators(e.valueClass, sizeEstimator);}
+    );
+  }
+
+  /**
+   * KeyType is used to differentiate the request types. More types can be added in future.
+   */
+  public enum KeyType {
+    PARTITIONS_BY_EXPR(PartitionsByExprRequest.class, PartitionsByExprResult.class),
+    PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class, PartitionsSpecByExprResult.class);
+
+    private final Class<?> keyClass;
+    private final Class<?> valueClass;
+
+    KeyType(Class<?> keyClass, Class<?> valueClass) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+    }
+  }
+
+  /**
+   * CacheKey objects are used as key for the cache.
+   */
+  public static class CacheKey{
+    KeyType IDENTIFIER;
+    Object obj;
+
+    public CacheKey(KeyType IDENTIFIER, Object obj) {
+      this.IDENTIFIER = IDENTIFIER;
+      this.obj = obj;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CacheKey cacheKey = (CacheKey) o;
+      return IDENTIFIER == cacheKey.IDENTIFIER &&
+              Objects.equals(obj, cacheKey.obj);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(IDENTIFIER, obj);
+    }
+  }
+
+  private static int getWeight(CacheKey key, Object val) {
+    ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+    ObjectEstimator valSizeEstimator = sizeEstimator.get(key.IDENTIFIER.valueClass);
+    int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+    int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize, valSize, keySize + valSize);
+    }
+    return keySize + valSize;
+  }
+
+  private Object load(CacheKey key) {
+    try {
+      return getResultObject(key);
+    } catch (TException e) {
+      throw new UncheckedCacheException(e);
+    }
+  }
+
+/**
+ * Initializes the cache
+ */
+  private static void initCache() {
+    int initSize = 100;
+    Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+            .initialCapacity(initSize)
+            .maximumWeight(MAX_SIZE)
+            .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+            .removalListener((key, val, cause) -> {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val, cause);
+              }});
+    if (RECORD_STATS) {
+      cacheBuilder.recordStats();
+    }
+    mscLocalCache = cacheBuilder.build();
+    cacheObjName = mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+  }
+
+  /**
+   * This method is used to load the cache by calling relevant APIs, depending on the type of the request.
+   *
+   * @param cacheKey key of the cache, containing an identifier and a request object
+   * @return Result object / null
+   * @throws TException
+   */
+  private Object getResultObject(CacheKey cacheKey) throws TException {
+    Object result = null;
+
+    switch (cacheKey.IDENTIFIER) {
+      case PARTITIONS_BY_EXPR:
+        result = super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      case PARTITIONS_SPEC_BY_EXPR:
+        result = super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      default:
+        break;
+    }
+
+    return result;
+  }
+
+  @Override
+  protected PartitionsByExprResult getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+    PartitionsByExprResult r;
+
+    // table should be transactional to get responses from the cache
+    if (isCacheEnabledAndInitialized() && isRequestCachable(req, KeyType.PARTITIONS_BY_EXPR)) {
+      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+      try {
+        r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load); // get either the result or an Exception
+
+        if (LOG.isDebugEnabled() && RECORD_STATS) {
+          LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        }
+
+      } catch (UncheckedCacheException e) {
+        if (e.getCause() instanceof MetaException) {
+          throw (MetaException) e.getCause();
+        } else {
+          throw new TException(e.getCause());
+        }
+      }
+    } else {
+         r = client.get_partitions_by_expr(req);
+    }
+
+    return r;
+  }
+
+  @Override
+  protected PartitionsSpecByExprResult getPartitionsSpecByExprResult(PartitionsByExprRequest req) throws TException {
+    PartitionsSpecByExprResult r;
+
+    // table should be transactional to get responses from the cache
+    if (isCacheEnabledAndInitialized() && isRequestCachable(req, KeyType.PARTITIONS_SPEC_BY_EXPR)) {
+      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_SPEC_BY_EXPR, req);
+      try {
+        r = (PartitionsSpecByExprResult) mscLocalCache.get(cacheKey, this::load); // get either the result or an Exception
+
+        if (LOG.isDebugEnabled() && RECORD_STATS) {
+          LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        }
+
+      } catch (UncheckedCacheException e) {
+        if (e.getCause() instanceof MetaException) {
+          throw (MetaException) e.getCause();
+        } else {
+          throw new TException(e.getCause());
+        }
+      }
+    } else {
+        r = client.get_partitions_spec_by_expr(req);

Review comment:
       nit. whitespace

##########
File path: standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientWithLocalCache.java
##########
@@ -0,0 +1,264 @@
+package org.apache.hadoop.hive.metastore;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsSpecByExprResult;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
+import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.thrift.TException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+
+public class HiveMetaStoreClientWithLocalCache extends HiveMetaStoreClient {
+
+  private static Cache<CacheKey, Object> mscLocalCache = null;
+  //TODO: initialize in the init method
+  private static boolean IS_CACHE_ENABLED;
+  private static long MAX_SIZE;
+  private static boolean RECORD_STATS;
+  private static HashMap<Class<?>, ObjectEstimator> sizeEstimator = null;
+  private static String cacheObjName = null;
+
+  public static synchronized void init() {
+    if (mscLocalCache != null) return; // init cache only once
+    Configuration metaConf = MetastoreConf.newMetastoreConf();
+    LOG.debug("Initializing local cache in HiveMetaStoreClient...");
+    MAX_SIZE = MetastoreConf.getSizeVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_MAX_SIZE);
+    IS_CACHE_ENABLED = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_ENABLED);
+    RECORD_STATS = MetastoreConf.getBoolVar(metaConf, MetastoreConf.ConfVars.MSC_CACHE_RECORD_STATS);
+    initSizeEstimator();
+    initCache();
+    LOG.debug("Local cache initialized in HiveMetaStoreClient: " + mscLocalCache);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf) throws MetaException {
+    this(conf, null, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader) throws MetaException {
+    this(conf, hookLoader, true);
+  }
+
+  public HiveMetaStoreClientWithLocalCache(Configuration conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException {
+    super(conf, hookLoader, allowEmbedded);
+  }
+
+  private static void initSizeEstimator() {
+    sizeEstimator = new HashMap<>();
+    IncrementalObjectSizeEstimator.createEstimators(CacheKey.class, sizeEstimator);
+    Arrays.stream(KeyType.values()).forEach(e -> {
+      IncrementalObjectSizeEstimator.createEstimators(e.keyClass, sizeEstimator);
+      IncrementalObjectSizeEstimator.createEstimators(e.valueClass, sizeEstimator);}
+    );
+  }
+
+  /**
+   * KeyType is used to differentiate the request types. More types can be added in future.
+   */
+  public enum KeyType {
+    PARTITIONS_BY_EXPR(PartitionsByExprRequest.class, PartitionsByExprResult.class),
+    PARTITIONS_SPEC_BY_EXPR(PartitionsByExprRequest.class, PartitionsSpecByExprResult.class);
+
+    private final Class<?> keyClass;
+    private final Class<?> valueClass;
+
+    KeyType(Class<?> keyClass, Class<?> valueClass) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+    }
+  }
+
+  /**
+   * CacheKey objects are used as key for the cache.
+   */
+  public static class CacheKey{
+    KeyType IDENTIFIER;
+    Object obj;
+
+    public CacheKey(KeyType IDENTIFIER, Object obj) {
+      this.IDENTIFIER = IDENTIFIER;
+      this.obj = obj;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CacheKey cacheKey = (CacheKey) o;
+      return IDENTIFIER == cacheKey.IDENTIFIER &&
+              Objects.equals(obj, cacheKey.obj);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(IDENTIFIER, obj);
+    }
+  }
+
+  private static int getWeight(CacheKey key, Object val) {
+    ObjectEstimator keySizeEstimator = sizeEstimator.get(key.getClass());
+    ObjectEstimator valSizeEstimator = sizeEstimator.get(key.IDENTIFIER.valueClass);
+    int keySize = keySizeEstimator.estimate(key, sizeEstimator);
+    int valSize = valSizeEstimator.estimate(val, sizeEstimator);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cache entry weight - key: {}, value: {}, total: {}", keySize, valSize, keySize + valSize);
+    }
+    return keySize + valSize;
+  }
+
+  private Object load(CacheKey key) {
+    try {
+      return getResultObject(key);
+    } catch (TException e) {
+      throw new UncheckedCacheException(e);
+    }
+  }
+
+/**
+ * Initializes the cache
+ */
+  private static void initCache() {
+    int initSize = 100;
+    Caffeine<CacheKey, Object> cacheBuilder = Caffeine.newBuilder()
+            .initialCapacity(initSize)
+            .maximumWeight(MAX_SIZE)
+            .weigher(HiveMetaStoreClientWithLocalCache::getWeight)
+            .removalListener((key, val, cause) -> {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Caffeine - ({}, {}) was removed ({})", key, val, cause);
+              }});
+    if (RECORD_STATS) {
+      cacheBuilder.recordStats();
+    }
+    mscLocalCache = cacheBuilder.build();
+    cacheObjName = mscLocalCache.toString().substring(mscLocalCache.toString().indexOf("Cache@"));
+  }
+
+  /**
+   * This method is used to load the cache by calling relevant APIs, depending on the type of the request.
+   *
+   * @param cacheKey key of the cache, containing an identifier and a request object
+   * @return Result object / null
+   * @throws TException
+   */
+  private Object getResultObject(CacheKey cacheKey) throws TException {
+    Object result = null;
+
+    switch (cacheKey.IDENTIFIER) {
+      case PARTITIONS_BY_EXPR:
+        result = super.getPartitionsByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      case PARTITIONS_SPEC_BY_EXPR:
+        result = super.getPartitionsSpecByExprResult((PartitionsByExprRequest)cacheKey.obj);
+        break;
+      default:
+        break;
+    }
+
+    return result;
+  }
+
+  @Override
+  protected PartitionsByExprResult getPartitionsByExprResult(PartitionsByExprRequest req) throws TException {
+    PartitionsByExprResult r;
+
+    // table should be transactional to get responses from the cache
+    if (isCacheEnabledAndInitialized() && isRequestCachable(req, KeyType.PARTITIONS_BY_EXPR)) {
+      CacheKey cacheKey = new CacheKey(KeyType.PARTITIONS_BY_EXPR, req);
+      try {
+        r = (PartitionsByExprResult) mscLocalCache.get(cacheKey, this::load); // get either the result or an Exception
+
+        if (LOG.isDebugEnabled() && RECORD_STATS) {
+          LOG.debug(cacheObjName + ": " + mscLocalCache.stats().toString());
+        }
+
+      } catch (UncheckedCacheException e) {
+        if (e.getCause() instanceof MetaException) {
+          throw (MetaException) e.getCause();
+        } else {
+          throw new TException(e.getCause());
+        }
+      }
+    } else {
+         r = client.get_partitions_by_expr(req);
+    }
+
+    return r;
+  }
+
+  @Override
+  protected PartitionsSpecByExprResult getPartitionsSpecByExprResult(PartitionsByExprRequest req) throws TException {
+    PartitionsSpecByExprResult r;
+
+    // table should be transactional to get responses from the cache
+    if (isCacheEnabledAndInitialized() && isRequestCachable(req, KeyType.PARTITIONS_SPEC_BY_EXPR)) {

Review comment:
       nit. maybe we could create the CacheKey before the `if` clause and pass it to `isRequestCachable`. Feel free to ignore if you think it does not make sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org