You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2019/06/20 06:17:28 UTC

[hive] branch master updated: HIVE-21787: Metastore table cache LRU eviction (Sam An, reviewed by Daniel Dai)

This is an automated email from the ASF dual-hosted git repository.

daijy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b36b89c  HIVE-21787: Metastore table cache LRU eviction (Sam An, reviewed by Daniel Dai)
b36b89c is described below

commit b36b89ce8eb542038990af0bd2cd8de57176acfa
Author: Daniel Dai <da...@cloudera.com>
AuthorDate: Wed Jun 19 23:17:17 2019 -0700

    HIVE-21787: Metastore table cache LRU eviction (Sam An, reviewed by Daniel Dai)
---
 .../cache/TestCachedStoreUpdateUsingEvents.java    |   10 +-
 .../hadoop/hive/metastore/cache/CacheUtils.java    |    2 +-
 .../hadoop/hive/metastore/cache/CachedStore.java   | 1389 ++++++++------------
 .../hadoop/hive/metastore/cache/SharedCache.java   |  723 ++++++----
 .../hive/metastore/cache/TestCachedStore.java      |  316 ++++-
 5 files changed, 1264 insertions(+), 1176 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
index cdfc60c..285f30b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
@@ -53,6 +53,10 @@ public class TestCachedStoreUpdateUsingEvents {
 
     rawStore = new ObjectStore();
     rawStore.setConf(hmsHandler.getConf());
+
+    CachedStore cachedStore = new CachedStore();
+    CachedStore.clearSharedCache();
+    cachedStore.setConfForTest(conf);
     sharedCache = CachedStore.getSharedCache();
 
     // Stop the CachedStore cache update service. We'll start it explicitly to control the test
@@ -190,7 +194,7 @@ public class TestCachedStoreUpdateUsingEvents {
     hmsHandler.drop_database(dbName, true, true);
     hmsHandler.drop_database(dbName2, true, true);
     sharedCache.getDatabaseCache().clear();
-    sharedCache.getTableCache().clear();
+    sharedCache.clearTableCache();
     sharedCache.getSdCache().clear();
   }
 
@@ -267,7 +271,7 @@ public class TestCachedStoreUpdateUsingEvents {
     Assert.assertNull(tblRead);
 
     sharedCache.getDatabaseCache().clear();
-    sharedCache.getTableCache().clear();
+    sharedCache.clearTableCache();
     sharedCache.getSdCache().clear();
   }
 
@@ -379,7 +383,7 @@ public class TestCachedStoreUpdateUsingEvents {
     // Clean up
     rawStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName);
     sharedCache.getDatabaseCache().clear();
-    sharedCache.getTableCache().clear();
+    sharedCache.clearTableCache();
     sharedCache.getSdCache().clear();
   }
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
index d50fa13..bb673f4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
@@ -65,7 +65,7 @@ public class CacheUtils {
     return buildKey(catName, dbName, tableName, colName);
   }
 
-  private static String buildKey(String... elements) {
+  public static String buildKey(String... elements) {
     return org.apache.commons.lang.StringUtils.join(elements, delimit);
   }
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 07f325d..511e6c1 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.metastore.cache;
 
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,7 +51,6 @@ import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.HiveAlterHandler;
-import org.apache.hadoop.hive.metastore.HiveMetaException;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType;
 import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
@@ -120,14 +118,15 @@ public class CachedStore implements RawStore, Configurable {
   private Configuration conf;
   private static boolean areTxnStatsSupported;
   private PartitionExpressionProxy expressionProxy = null;
+  private static String lock = "L";
+  private static boolean sharedCacheInited = false;
   private static SharedCache sharedCache = new SharedCache();
-  private static  boolean canUseEvents = false;
+  private static boolean canUseEvents = false;
   private static long lastEventId;
 
-  static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
 
-  @Override
-  public void setConf(Configuration conf) {
+  @Override public void setConf(Configuration conf) {
     setConfInternal(conf);
     initBlackListWhiteList(conf);
     initSharedCache(conf);
@@ -140,12 +139,16 @@ public class CachedStore implements RawStore, Configurable {
    * @param conf
    */
   void setConfForTest(Configuration conf) {
+    setConfForTestExceptSharedCache(conf);
+    initSharedCache(conf);
+  }
+
+  void setConfForTestExceptSharedCache(Configuration conf) {
     setConfInternal(conf);
     initBlackListWhiteList(conf);
-    initSharedCache(conf);
   }
 
-  synchronized private static void triggerUpdateUsingEvent(RawStore rawStore) {
+  private static synchronized void triggerUpdateUsingEvent(RawStore rawStore) {
     if (!isCachePrewarmed.get()) {
       LOG.error("cache update should be done only after prewarm");
       throw new RuntimeException("cache update should be done only after prewarm");
@@ -159,12 +162,12 @@ public class CachedStore implements RawStore, Configurable {
       throw new RuntimeException(e.getMessage());
     } finally {
       long endTime = System.nanoTime();
-      LOG.info("Time taken in updateUsingNotificationEvents for num events : " +  (lastEventId - preEventId) + " = " +
-              (endTime - startTime) / 1000000 + "ms");
+      LOG.info("Time taken in updateUsingNotificationEvents for num events : " + (lastEventId - preEventId) + " = "
+          + (endTime - startTime) / 1000000 + "ms");
     }
   }
 
-  synchronized private static void triggerPreWarm(RawStore rawStore) {
+  private static synchronized void triggerPreWarm(RawStore rawStore) {
     lastEventId = rawStore.getCurrentNotificationEventId().getEventId();
     prewarm(rawStore);
   }
@@ -177,8 +180,7 @@ public class CachedStore implements RawStore, Configurable {
     }
     LOG.info("canUseEvents is set to " + canUseEvents + " in cached Store");
 
-    String rawStoreClassName =
-        MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
+    String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
     if (rawStore == null) {
       try {
         rawStore = (JavaUtils.getClass(rawStoreClassName, RawStore.class)).newInstance();
@@ -198,38 +200,37 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   private void initSharedCache(Configuration conf) {
-    long maxSharedCacheSizeInBytes =
-        MetastoreConf.getSizeVar(conf, ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY);
-    sharedCache.initialize(maxSharedCacheSizeInBytes);
-    if (maxSharedCacheSizeInBytes > 0) {
-      LOG.info("Maximum memory that the cache will use: {} KB",
-          maxSharedCacheSizeInBytes / (1024));
+    synchronized (lock) {
+      if (!sharedCacheInited) {
+        sharedCacheInited = true;
+        sharedCache.initialize(conf);
+      }
     }
   }
 
-  @VisibleForTesting
-  public static SharedCache getSharedCache() {
-   return sharedCache;
+  @VisibleForTesting public static SharedCache getSharedCache() {
+    return sharedCache;
   }
 
-  static private ColumnStatistics updateStatsForAlterPart(RawStore rawStore, Table before, String catalogName,
-                                          String dbName, String tableName, Partition part) throws Exception {
+  private static ColumnStatistics updateStatsForAlterPart(RawStore rawStore, Table before, String catalogName,
+      String dbName, String tableName, Partition part) throws Exception {
     ColumnStatistics colStats;
     List<String> deletedCols = new ArrayList<>();
-    colStats = HiveAlterHandler.updateOrGetPartitionColumnStats(rawStore, catalogName, dbName, tableName,
-                    part.getValues(), part.getSd().getCols(), before, part, null, deletedCols);
+    colStats = HiveAlterHandler
+        .updateOrGetPartitionColumnStats(rawStore, catalogName, dbName, tableName, part.getValues(),
+            part.getSd().getCols(), before, part, null, deletedCols);
     for (String column : deletedCols) {
       sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, part.getValues(), column);
     }
     if (colStats != null) {
-      sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, part.getWriteId(),
-              part.getValues(), part.getParameters(), colStats.getStatsObj());
+      sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, part.getWriteId(), part.getValues(),
+          part.getParameters(), colStats.getStatsObj());
     }
     return colStats;
   }
 
-  static private void updateStatsForAlterTable(RawStore rawStore, Table tblBefore, Table tblAfter, String catalogName,
-                                          String dbName, String tableName) throws Exception {
+  private static void updateStatsForAlterTable(RawStore rawStore, Table tblBefore, Table tblAfter, String catalogName,
+      String dbName, String tableName) throws Exception {
     ColumnStatistics colStats = null;
     List<String> deletedCols = new ArrayList<>();
     if (tblBefore.isSetPartitionKeys()) {
@@ -239,19 +240,19 @@ public class CachedStore implements RawStore, Configurable {
       }
     }
 
-    List<ColumnStatisticsObj> statisticsObjs = HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, tblBefore,
-            tblAfter,null, null, rawStore.getConf(), deletedCols);
+    List<ColumnStatisticsObj> statisticsObjs = HiveAlterHandler
+        .alterTableUpdateTableColumnStats(rawStore, tblBefore, tblAfter, null, null, rawStore.getConf(), deletedCols);
     if (colStats != null) {
-      sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, tblAfter.getWriteId(),
-              statisticsObjs, tblAfter.getParameters());
+      sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, tblAfter.getWriteId(), statisticsObjs,
+          tblAfter.getParameters());
     }
     for (String column : deletedCols) {
       sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, column);
     }
   }
 
-  @VisibleForTesting
-  public static long updateUsingNotificationEvents(RawStore rawStore, long lastEventId) throws Exception {
+  @VisibleForTesting public static long updateUsingNotificationEvents(RawStore rawStore, long lastEventId)
+      throws Exception {
     LOG.debug("updating cache using notification events starting from event id " + lastEventId);
     NotificationEventRequest rqst = new NotificationEventRequest(lastEventId);
 
@@ -305,96 +306,95 @@ public class CachedStore implements RawStore, Configurable {
         continue;
       }
       switch (event.getEventType()) {
-        case MessageBuilder.ADD_PARTITION_EVENT:
-          AddPartitionMessage addPartMessage = deserializer.getAddPartitionMessage(message);
-          sharedCache.addPartitionsToCache(catalogName,
-                  dbName, tableName, addPartMessage.getPartitionObjs());
-          break;
-        case MessageBuilder.ALTER_PARTITION_EVENT:
-          AlterPartitionMessage alterPartitionMessage = deserializer.getAlterPartitionMessage(message);
-          sharedCache.alterPartitionInCache(catalogName, dbName, tableName,
-                  alterPartitionMessage.getPtnObjBefore().getValues(), alterPartitionMessage.getPtnObjAfter());
-          //TODO : Use the stat object stored in the alter table message to update the stats in cache.
-          updateStatsForAlterPart(rawStore, alterPartitionMessage.getTableObj(),
-                  catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter());
-          break;
-        case MessageBuilder.DROP_PARTITION_EVENT:
-          DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message);
-          for (Map<String, String> partMap : dropPartitionMessage.getPartitions()) {
-            sharedCache.removePartitionFromCache(catalogName, dbName, tableName, new ArrayList<>(partMap.values()));
-          }
-          break;
-        case MessageBuilder.CREATE_TABLE_EVENT:
-          CreateTableMessage createTableMessage = deserializer.getCreateTableMessage(message);
-          sharedCache.addTableToCache(catalogName, dbName,
-                  tableName, createTableMessage.getTableObj());
-          break;
-        case MessageBuilder.ALTER_TABLE_EVENT:
-          AlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(message);
-          sharedCache.alterTableInCache(catalogName, dbName, tableName, alterTableMessage.getTableObjAfter());
-          //TODO : Use the stat object stored in the alter table message to update the stats in cache.
-          updateStatsForAlterTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
-                  catalogName, dbName, tableName);
-          break;
-        case MessageBuilder.DROP_TABLE_EVENT:
-          DropTableMessage dropTableMessage = deserializer.getDropTableMessage(message);
-          int batchSize = MetastoreConf.getIntVar(rawStore.getConf(), ConfVars.BATCH_RETRIEVE_OBJECTS_MAX);
-          String tableDnsPath = null;
-          Path tablePath = new Path(dropTableMessage.getTableObj().getSd().getLocation());
-          if (tablePath != null) {
-            tableDnsPath = new Warehouse(rawStore.getConf()).getDnsPath(tablePath).toString();
-          }
+      case MessageBuilder.ADD_PARTITION_EVENT:
+        AddPartitionMessage addPartMessage = deserializer.getAddPartitionMessage(message);
+        sharedCache.addPartitionsToCache(catalogName, dbName, tableName, addPartMessage.getPartitionObjs());
+        break;
+      case MessageBuilder.ALTER_PARTITION_EVENT:
+        AlterPartitionMessage alterPartitionMessage = deserializer.getAlterPartitionMessage(message);
+        sharedCache
+            .alterPartitionInCache(catalogName, dbName, tableName, alterPartitionMessage.getPtnObjBefore().getValues(),
+                alterPartitionMessage.getPtnObjAfter());
+        //TODO : Use the stat object stored in the alter table message to update the stats in cache.
+        updateStatsForAlterPart(rawStore, alterPartitionMessage.getTableObj(), catalogName, dbName, tableName,
+            alterPartitionMessage.getPtnObjAfter());
+        break;
+      case MessageBuilder.DROP_PARTITION_EVENT:
+        DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message);
+        for (Map<String, String> partMap : dropPartitionMessage.getPartitions()) {
+          sharedCache.removePartitionFromCache(catalogName, dbName, tableName, new ArrayList<>(partMap.values()));
+        }
+        break;
+      case MessageBuilder.CREATE_TABLE_EVENT:
+        CreateTableMessage createTableMessage = deserializer.getCreateTableMessage(message);
+        sharedCache.addTableToCache(catalogName, dbName, tableName, createTableMessage.getTableObj());
+        break;
+      case MessageBuilder.ALTER_TABLE_EVENT:
+        AlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(message);
+        sharedCache.alterTableInCache(catalogName, dbName, tableName, alterTableMessage.getTableObjAfter());
+        //TODO : Use the stat object stored in the alter table message to update the stats in cache.
+        updateStatsForAlterTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
+            catalogName, dbName, tableName);
+        break;
+      case MessageBuilder.DROP_TABLE_EVENT:
+        DropTableMessage dropTableMessage = deserializer.getDropTableMessage(message);
+        int batchSize = MetastoreConf.getIntVar(rawStore.getConf(), ConfVars.BATCH_RETRIEVE_OBJECTS_MAX);
+        String tableDnsPath = null;
+        Path tablePath = new Path(dropTableMessage.getTableObj().getSd().getLocation());
+        if (tablePath != null) {
+          tableDnsPath = new Warehouse(rawStore.getConf()).getDnsPath(tablePath).toString();
+        }
 
-          while (true) {
-            Map<String, String> partitionLocations = rawStore.getPartitionLocations(catalogName, dbName, tableName,
-                    tableDnsPath, batchSize);
-            if (partitionLocations == null || partitionLocations.isEmpty()) {
-              break;
-            }
-            sharedCache.removePartitionFromCache(catalogName, dbName, tableName,
-                    new ArrayList<>(partitionLocations.values()));
+        while (true) {
+          Map<String, String> partitionLocations =
+              rawStore.getPartitionLocations(catalogName, dbName, tableName, tableDnsPath, batchSize);
+          if (partitionLocations == null || partitionLocations.isEmpty()) {
+            break;
           }
-          sharedCache.removeTableFromCache(catalogName, dbName, tableName);
-          break;
-        case MessageBuilder.CREATE_DATABASE_EVENT:
-          CreateDatabaseMessage createDatabaseMessage = deserializer.getCreateDatabaseMessage(message);
-          sharedCache.addDatabaseToCache(createDatabaseMessage.getDatabaseObject());
-          break;
-        case MessageBuilder.ALTER_DATABASE_EVENT:
-          AlterDatabaseMessage alterDatabaseMessage = deserializer.getAlterDatabaseMessage(message);
-          sharedCache.alterDatabaseInCache(catalogName, dbName, alterDatabaseMessage.getDbObjAfter());
-          break;
-        case MessageBuilder.DROP_DATABASE_EVENT:
-          sharedCache.removeDatabaseFromCache(catalogName, dbName);
-          break;
-        case MessageBuilder.CREATE_CATALOG_EVENT:
-        case MessageBuilder.DROP_CATALOG_EVENT:
-        case MessageBuilder.ALTER_CATALOG_EVENT:
-          // TODO : Need to add cache invalidation for catalog events
-          LOG.error("catalog Events are not supported for cache invalidation : " + event.getEventType());
-          break;
-        case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT:
-          UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message);
-          sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, msg.getWriteId(),
-                  msg.getColumnStatistics().getStatsObj(), msg.getParameters());
-          break;
-        case MessageBuilder.DELETE_TBL_COL_STAT_EVENT:
-          DeleteTableColumnStatMessage msgDel = deserializer.getDeleteTableColumnStatMessage(message);
-          sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, msgDel.getColName());
-          break;
-        case MessageBuilder.UPDATE_PART_COL_STAT_EVENT:
-          UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message);
-          sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getWriteId(),
-                  msgPartUpdate.getPartVals(), msgPartUpdate.getParameters(),
-                  msgPartUpdate.getColumnStatistics().getStatsObj());
-          break;
-        case MessageBuilder.DELETE_PART_COL_STAT_EVENT:
-          DeletePartitionColumnStatMessage msgPart = deserializer.getDeletePartitionColumnStatMessage(message);
-          sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName,
-                  msgPart.getPartValues(), msgPart.getColName());
-          break;
-        default:
-          LOG.error("Event is not supported for cache invalidation : " + event.getEventType());
+          sharedCache
+              .removePartitionFromCache(catalogName, dbName, tableName, new ArrayList<>(partitionLocations.values()));
+        }
+        sharedCache.removeTableFromCache(catalogName, dbName, tableName);
+        break;
+      case MessageBuilder.CREATE_DATABASE_EVENT:
+        CreateDatabaseMessage createDatabaseMessage = deserializer.getCreateDatabaseMessage(message);
+        sharedCache.addDatabaseToCache(createDatabaseMessage.getDatabaseObject());
+        break;
+      case MessageBuilder.ALTER_DATABASE_EVENT:
+        AlterDatabaseMessage alterDatabaseMessage = deserializer.getAlterDatabaseMessage(message);
+        sharedCache.alterDatabaseInCache(catalogName, dbName, alterDatabaseMessage.getDbObjAfter());
+        break;
+      case MessageBuilder.DROP_DATABASE_EVENT:
+        sharedCache.removeDatabaseFromCache(catalogName, dbName);
+        break;
+      case MessageBuilder.CREATE_CATALOG_EVENT:
+      case MessageBuilder.DROP_CATALOG_EVENT:
+      case MessageBuilder.ALTER_CATALOG_EVENT:
+        // TODO : Need to add cache invalidation for catalog events
+        LOG.error("catalog Events are not supported for cache invalidation : " + event.getEventType());
+        break;
+      case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT:
+        UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message);
+        sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, msg.getWriteId(),
+            msg.getColumnStatistics().getStatsObj(), msg.getParameters());
+        break;
+      case MessageBuilder.DELETE_TBL_COL_STAT_EVENT:
+        DeleteTableColumnStatMessage msgDel = deserializer.getDeleteTableColumnStatMessage(message);
+        sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, msgDel.getColName());
+        break;
+      case MessageBuilder.UPDATE_PART_COL_STAT_EVENT:
+        UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message);
+        sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getWriteId(),
+            msgPartUpdate.getPartVals(), msgPartUpdate.getParameters(),
+            msgPartUpdate.getColumnStatistics().getStatsObj());
+        break;
+      case MessageBuilder.DELETE_PART_COL_STAT_EVENT:
+        DeletePartitionColumnStatMessage msgPart = deserializer.getDeletePartitionColumnStatMessage(message);
+        sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, msgPart.getPartValues(),
+            msgPart.getColName());
+        break;
+      default:
+        LOG.error("Event is not supported for cache invalidation : " + event.getEventType());
       }
     }
     return lastEventId;
@@ -535,8 +535,9 @@ public class CachedStore implements RawStore, Configurable {
                 Deadline.stopTimer();
               }
               // If the table could not cached due to memory limit, stop prewarm
-              boolean isSuccess = sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
-                  aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
+              boolean isSuccess = sharedCache
+                  .populateTableInCache(table, tableColStats, partitions, partitionColStats, aggrStatsAllPartitions,
+                      aggrStatsAllButDefaultPartition);
               if (isSuccess) {
                 LOG.trace("Cached Database: {}'s Table: {}.", dbName, tblName);
               } else {
@@ -565,8 +566,16 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
+  /**
+   * This method is only used for testing. Test method will init a new cache and use the new handle to query the cache
+   * to get content in the cache. In production, no code would/should call this method, because SharedCache should be
+   * a singleton.
+   */
   @VisibleForTesting
   static void clearSharedCache() {
+    synchronized (lock) {
+      sharedCacheInited = false;
+    }
     sharedCache = new SharedCache();
   }
 
@@ -605,23 +614,20 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
-  @VisibleForTesting
-  static void setCachePrewarmedState(boolean state) {
+  @VisibleForTesting static void setCachePrewarmedState(boolean state) {
     isCachePrewarmed.set(state);
   }
 
   private static void initBlackListWhiteList(Configuration conf) {
-    whitelistPatterns = createPatterns(MetastoreConf.getAsString(conf,
-        MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST));
-    blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf,
-        MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST));
+    whitelistPatterns = createPatterns(
+        MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST));
+    blacklistPatterns = createPatterns(
+        MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST));
   }
 
   private static Collection<String> catalogsToCache(RawStore rs) throws MetaException {
-    Collection<String> confValue =
-        MetastoreConf.getStringCollection(rs.getConf(), ConfVars.CATALOGS_TO_CACHE);
-    if (confValue == null || confValue.isEmpty() ||
-        (confValue.size() == 1 && confValue.contains(""))) {
+    Collection<String> confValue = MetastoreConf.getStringCollection(rs.getConf(), ConfVars.CATALOGS_TO_CACHE);
+    if (confValue == null || confValue.isEmpty() || (confValue.size() == 1 && confValue.contains(""))) {
       return rs.getCatalogs();
     } else {
       return confValue;
@@ -636,19 +642,17 @@ public class CachedStore implements RawStore, Configurable {
    * @param conf
    * @param runOnlyOnce
    * @param shouldRunPrewarm
-   */
-  static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce,
+   */ static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce,
       boolean shouldRunPrewarm) {
     if (cacheUpdateMaster == null) {
       initBlackListWhiteList(conf);
       if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) {
-        cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf,
-            ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS);
+        cacheRefreshPeriodMS =
+            MetastoreConf.getTimeVar(conf, ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS);
       }
       LOG.info("CachedStore: starting cache update service (run every {} ms)", cacheRefreshPeriodMS);
       cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-        @Override
-        public Thread newThread(Runnable r) {
+        @Override public Thread newThread(Runnable r) {
           Thread t = Executors.defaultThreadFactory().newThread(r);
           t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId());
           t.setDaemon(true);
@@ -656,24 +660,23 @@ public class CachedStore implements RawStore, Configurable {
         }
       });
       if (!runOnlyOnce) {
-        cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
-            cacheRefreshPeriodMS, TimeUnit.MILLISECONDS);
+        cacheUpdateMaster
+            .scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, cacheRefreshPeriodMS,
+                TimeUnit.MILLISECONDS);
       }
-    } 
+    }
     if (runOnlyOnce) {
       // Some tests control the execution of the background update thread
       cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, TimeUnit.MILLISECONDS);
     }
   }
 
-  @VisibleForTesting
-  static synchronized boolean stopCacheUpdateService(long timeout) {
+  @VisibleForTesting static synchronized boolean stopCacheUpdateService(long timeout) {
     boolean tasksStoppedBeforeShutdown = false;
     if (cacheUpdateMaster != null) {
       LOG.info("CachedStore: shutting down cache update service");
       try {
-        tasksStoppedBeforeShutdown =
-            cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+        tasksStoppedBeforeShutdown = cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to "
             + "complete before shutting down. Will make a hard stop now.");
@@ -684,8 +687,7 @@ public class CachedStore implements RawStore, Configurable {
     return tasksStoppedBeforeShutdown;
   }
 
-  @VisibleForTesting
-  static void setCacheRefreshPeriod(long time) {
+  @VisibleForTesting static void setCacheRefreshPeriod(long time) {
     cacheRefreshPeriodMS = time;
   }
 
@@ -693,7 +695,6 @@ public class CachedStore implements RawStore, Configurable {
     private boolean shouldRunPrewarm = true;
     private final RawStore rawStore;
 
-
     CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) {
       this.shouldRunPrewarm = shouldRunPrewarm;
       String rawStoreClassName =
@@ -708,8 +709,7 @@ public class CachedStore implements RawStore, Configurable {
       }
     }
 
-    @Override
-    public void run() {
+    @Override public void run() {
       if (!shouldRunPrewarm) {
         if (canUseEvents) {
           try {
@@ -814,8 +814,9 @@ public class CachedStore implements RawStore, Configurable {
             if (!shouldCacheTable(catName, dbName, tblName)) {
               continue;
             }
-            Table table = rawStore.getTable(StringUtils.normalizeIdentifier(catName),
-                StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName));
+            Table table = rawStore
+                .getTable(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName),
+                    StringUtils.normalizeIdentifier(tblName));
             tables.add(table);
           }
           success = sharedCache.refreshTablesInCache(catName, dbName, tables);
@@ -864,8 +865,9 @@ public class CachedStore implements RawStore, Configurable {
         Deadline.startTimer("getPartitions");
         List<Partition> partitions = rawStore.getPartitions(catName, dbName, tblName, -1);
         Deadline.stopTimer();
-        sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(catName),
-            StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), partitions);
+        sharedCache
+            .refreshPartitionsInCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName),
+                StringUtils.normalizeIdentifier(tblName), partitions);
         LOG.debug("CachedStore: updated cached partition objects for catalog: {}, database: {}, table: {}", catName,
             dbName, tblName);
       } catch (MetaException | NoSuchObjectException e) {
@@ -886,7 +888,7 @@ public class CachedStore implements RawStore, Configurable {
           // Get partition column stats for this table
           Deadline.startTimer("getPartitionColumnStatistics");
           List<ColumnStatistics> partitionColStats =
-                  rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames);
+              rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames);
           Deadline.stopTimer();
           sharedCache.refreshPartitionColStatsInCache(catName, dbName, tblName, partitionColStats);
           Deadline.startTimer("getPartitionsByNames");
@@ -914,7 +916,8 @@ public class CachedStore implements RawStore, Configurable {
     // but default partition
     private static void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName,
         String tblName) {
-      LOG.debug("CachedStore: updating cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}",
+      LOG.debug(
+          "CachedStore: updating cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}",
           catName, dbName, tblName);
       try {
         Table table = rawStore.getTable(catName, dbName, tblName);
@@ -943,10 +946,10 @@ public class CachedStore implements RawStore, Configurable {
               rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
           Deadline.stopTimer();
           sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
-                  StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
+              StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
               aggrStatsAllButDefaultPartition, null);
-          LOG.debug("CachedStore: updated cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}",
-              catName, dbName, tblName);
+          LOG.debug("CachedStore: updated cached aggregate partition col stats objects for catalog:"
+              + " {}, database: {}, table: {}", catName, dbName, tblName);
         }
       } catch (MetaException | NoSuchObjectException e) {
         LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, e);
@@ -954,23 +957,19 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
-  @Override
-  public Configuration getConf() {
+  @Override public Configuration getConf() {
     return rawStore.getConf();
   }
 
-  @Override
-  public void shutdown() {
+  @Override public void shutdown() {
     rawStore.shutdown();
   }
 
-  @Override
-  public boolean openTransaction() {
+  @Override public boolean openTransaction() {
     return rawStore.openTransaction();
   }
 
-  @Override
-  public boolean commitTransaction() {
+  @Override public boolean commitTransaction() {
     if (!rawStore.commitTransaction()) {
       return false;
     }
@@ -994,18 +993,15 @@ public class CachedStore implements RawStore, Configurable {
     return true;
   }
 
-  @Override
-  public boolean isActiveTransaction() {
+  @Override public boolean isActiveTransaction() {
     return rawStore.isActiveTransaction();
   }
 
-  @Override
-  public void rollbackTransaction() {
+  @Override public void rollbackTransaction() {
     rawStore.rollbackTransaction();
   }
 
-  @Override
-  public void createCatalog(Catalog cat) throws MetaException {
+  @Override public void createCatalog(Catalog cat) throws MetaException {
     rawStore.createCatalog(cat);
     // in case of event based cache update, cache will not be updated for catalog.
     if (!canUseEvents) {
@@ -1013,9 +1009,7 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
-  @Override
-  public void alterCatalog(String catName, Catalog cat) throws MetaException,
-      InvalidOperationException {
+  @Override public void alterCatalog(String catName, Catalog cat) throws MetaException, InvalidOperationException {
     rawStore.alterCatalog(catName, cat);
     // in case of event based cache update, cache will not be updated for catalog.
     if (!canUseEvents) {
@@ -1023,8 +1017,7 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
-  @Override
-  public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException {
+  @Override public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException {
     // in case of event based cache update, cache will not be updated for catalog.
     if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) {
       return rawStore.getCatalog(catalogName);
@@ -1036,8 +1029,7 @@ public class CachedStore implements RawStore, Configurable {
     return cat;
   }
 
-  @Override
-  public List<String> getCatalogs() throws MetaException {
+  @Override public List<String> getCatalogs() throws MetaException {
     // in case of event based cache update, cache will not be updated for catalog.
     if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) {
       return rawStore.getCatalogs();
@@ -1045,8 +1037,7 @@ public class CachedStore implements RawStore, Configurable {
     return sharedCache.listCachedCatalogs();
   }
 
-  @Override
-  public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException {
+  @Override public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException {
     rawStore.dropCatalog(catalogName);
 
     // in case of event based cache update, cache will not be updated for catalog.
@@ -1056,8 +1047,7 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
-  @Override
-  public void createDatabase(Database db) throws InvalidObjectException, MetaException {
+  @Override public void createDatabase(Database db) throws InvalidObjectException, MetaException {
     rawStore.createDatabase(db);
     // in case of event based cache update, cache will be updated during commit.
     if (!canUseEvents) {
@@ -1065,8 +1055,7 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
-  @Override
-  public Database getDatabase(String catName, String dbName) throws NoSuchObjectException {
+  @Override public Database getDatabase(String catName, String dbName) throws NoSuchObjectException {
     // in case of  event based cache update, cache will be updated during commit. So within active transaction, read
     // directly from rawStore to avoid reading stale data as the data updated during same transaction will not be
     // updated in the cache.
@@ -1074,65 +1063,58 @@ public class CachedStore implements RawStore, Configurable {
       return rawStore.getDatabase(catName, dbName);
     }
     dbName = dbName.toLowerCase();
-    Database db = sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(catName),
-            StringUtils.normalizeIdentifier(dbName));
+    Database db = sharedCache
+        .getDatabaseFromCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName));
     if (db == null) {
       throw new NoSuchObjectException();
     }
     return db;
   }
 
-  @Override
-  public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException {
+  @Override public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException {
     boolean succ = rawStore.dropDatabase(catName, dbName);
     if (succ && !canUseEvents) {
       // in case of event based cache update, cache will be updated during commit.
-      sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName),
-          StringUtils.normalizeIdentifier(dbName));
+      sharedCache
+          .removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName));
     }
     return succ;
   }
 
-  @Override
-  public boolean alterDatabase(String catName, String dbName, Database db)
+  @Override public boolean alterDatabase(String catName, String dbName, Database db)
       throws NoSuchObjectException, MetaException {
     boolean succ = rawStore.alterDatabase(catName, dbName, db);
     if (succ && !canUseEvents) {
       // in case of event based cache update, cache will be updated during commit.
-      sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(catName),
-          StringUtils.normalizeIdentifier(dbName), db);
+      sharedCache
+          .alterDatabaseInCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName), db);
     }
     return succ;
   }
 
-  @Override
-  public List<String> getDatabases(String catName, String pattern) throws MetaException {
+  @Override public List<String> getDatabases(String catName, String pattern) throws MetaException {
     if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getDatabases(catName, pattern);
     }
     return sharedCache.listCachedDatabases(catName, pattern);
   }
 
-  @Override
-  public List<String> getAllDatabases(String catName) throws MetaException {
+  @Override public List<String> getAllDatabases(String catName) throws MetaException {
     if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
       return rawStore.getAllDatabases(catName);
     }
     return sharedCache.listCachedDatabases(catName);
   }
 
-  @Override
-  public boolean createType(Type type) {
+  @Override public boolean createType(Type type) {
     return rawStore.createType(type);
   }
 
-  @Override
-  public Type getType(String typeName) {
+  @Override public Type getType(String typeName) {
     return rawStore.getType(typeName);
   }
 
-  @Override
-  public boolean dropType(String typeName) {
+  @Override public boolean dropType(String typeName) {
     return rawStore.dropType(typeName);
   }
 
@@ -1154,8 +1136,7 @@ public class CachedStore implements RawStore, Configurable {
     tbl.setTableType(tableType);
   }
 
-  @Override
-  public void createTable(Table tbl) throws InvalidObjectException, MetaException {
+  @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException {
     rawStore.createTable(tbl);
     // in case of event based cache update, cache will be updated during commit.
     if (canUseEvents) {
@@ -1171,8 +1152,7 @@ public class CachedStore implements RawStore, Configurable {
     sharedCache.addTableToCache(catName, dbName, tblName, tbl);
   }
 
-  @Override
-  public boolean dropTable(String catName, String dbName, String tblName)
+  @Override public boolean dropTable(String catName, String dbName, String tblName)
       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.dropTable(catName, dbName, tblName);
     // in case of event based cache update, cache will be updated during commit.
@@ -1188,13 +1168,12 @@ public class CachedStore implements RawStore, Configurable {
     return succ;
   }
 
-  @Override
-  public Table getTable(String catName, String dbName, String tblName) throws MetaException {
+  @Override public Table getTable(String catName, String dbName, String tblName) throws MetaException {
     return getTable(catName, dbName, tblName, null);
   }
 
-  @Override
-  public Table getTable(String catName, String dbName, String tblName, String validWriteIds) throws MetaException {
+  @Override public Table getTable(String catName, String dbName, String tblName, String validWriteIds)
+      throws MetaException {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1209,7 +1188,11 @@ public class CachedStore implements RawStore, Configurable {
       // let's move this table to the top of tblNamesBeingPrewarmed stack,
       // so that it gets loaded to the cache faster and is available for subsequent requests
       tblsPendingPrewarm.prioritizeTableForPrewarm(tblName);
-      return rawStore.getTable(catName, dbName, tblName, validWriteIds);
+      Table t = rawStore.getTable(catName, dbName, tblName, validWriteIds);
+      if (t != null) {
+        sharedCache.addTableToCache(catName, dbName, tblName, t);
+      }
+      return t;
     }
     if (validWriteIds != null) {
       tbl.setParameters(
@@ -1237,8 +1220,7 @@ public class CachedStore implements RawStore, Configurable {
     return tbl;
   }
 
-  @Override
-  public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
+  @Override public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartition(part);
     // in case of event based cache update, cache will be updated during commit.
     if (succ && !canUseEvents) {
@@ -1253,8 +1235,7 @@ public class CachedStore implements RawStore, Configurable {
     return succ;
   }
 
-  @Override
-  public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
+  @Override public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
       throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartitions(catName, dbName, tblName, parts);
     // in case of event based cache update, cache will be updated during commit.
@@ -1270,9 +1251,8 @@ public class CachedStore implements RawStore, Configurable {
     return succ;
   }
 
-  @Override
-  public boolean addPartitions(String catName, String dbName, String tblName, PartitionSpecProxy partitionSpec,
-      boolean ifNotExists) throws InvalidObjectException, MetaException {
+  @Override public boolean addPartitions(String catName, String dbName, String tblName,
+      PartitionSpecProxy partitionSpec, boolean ifNotExists) throws InvalidObjectException, MetaException {
     boolean succ = rawStore.addPartitions(catName, dbName, tblName, partitionSpec, ifNotExists);
     // in case of event based cache update, cache will be updated during commit.
     if (succ && !canUseEvents) {
@@ -1291,65 +1271,56 @@ public class CachedStore implements RawStore, Configurable {
     return succ;
   }
 
-  @Override
-  public Partition getPartition(String catName, String dbName, String tblName, List<String> part_vals)
+  @Override public Partition getPartition(String catName, String dbName, String tblName, List<String> partVals)
       throws MetaException, NoSuchObjectException {
-    return getPartition(catName, dbName, tblName, part_vals, null);
+    return getPartition(catName, dbName, tblName, partVals, null);
   }
 
-  @Override
-  public Partition getPartition(String catName, String dbName, String tblName,
-                                List<String> part_vals, String validWriteIds)
-      throws MetaException, NoSuchObjectException {
+  @Override public Partition getPartition(String catName, String dbName, String tblName, List<String> partVals,
+      String validWriteIds) throws MetaException, NoSuchObjectException {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
     if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
-      return rawStore.getPartition(
-          catName, dbName, tblName, part_vals, validWriteIds);
+      return rawStore.getPartition(catName, dbName, tblName, partVals, validWriteIds);
     }
-    Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, part_vals);
+    Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, partVals);
     if (part == null) {
       // The table containing the partition is not yet loaded in cache
-      return rawStore.getPartition(
-          catName, dbName, tblName, part_vals, validWriteIds);
+      return rawStore.getPartition(catName, dbName, tblName, partVals, validWriteIds);
     }
     if (validWriteIds != null) {
       Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
       if (table == null) {
         // The table containing the partition is not yet loaded in cache
-        return rawStore.getPartition(
-            catName, dbName, tblName, part_vals, validWriteIds);
+        return rawStore.getPartition(catName, dbName, tblName, partVals, validWriteIds);
       }
-      part.setParameters(adjustStatsParamsForGet(table.getParameters(),
-          part.getParameters(), part.getWriteId(), validWriteIds));
+      part.setParameters(
+          adjustStatsParamsForGet(table.getParameters(), part.getParameters(), part.getWriteId(), validWriteIds));
     }
 
     return part;
   }
 
-  @Override
-  public boolean doesPartitionExist(String catName, String dbName, String tblName,
-      List<FieldSchema> partKeys, List<String> part_vals)
-      throws MetaException, NoSuchObjectException {
+  @Override public boolean doesPartitionExist(String catName, String dbName, String tblName, List<FieldSchema> partKeys,
+      List<String> partVals) throws MetaException, NoSuchObjectException {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
     if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
-      return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals);
+      return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, partVals);
     }
     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
     if (tbl == null) {
       // The table containing the partition is not yet loaded in cache
-      return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals);
+      return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, partVals);
     }
-    return sharedCache.existPartitionFromCache(catName, dbName, tblName, part_vals);
+    return sharedCache.existPartitionFromCache(catName, dbName, tblName, partVals);
   }
 
-  @Override
-  public boolean dropPartition(String catName, String dbName, String tblName, List<String> part_vals)
+  @Override public boolean dropPartition(String catName, String dbName, String tblName, List<String> partVals)
       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
-    boolean succ = rawStore.dropPartition(catName, dbName, tblName, part_vals);
+    boolean succ = rawStore.dropPartition(catName, dbName, tblName, partVals);
     // in case of event based cache update, cache will be updated during commit.
     if (succ && !canUseEvents) {
       catName = normalizeIdentifier(catName);
@@ -1358,13 +1329,12 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(catName, dbName, tblName)) {
         return succ;
       }
-      sharedCache.removePartitionFromCache(catName, dbName, tblName, part_vals);
+      sharedCache.removePartitionFromCache(catName, dbName, tblName, partVals);
     }
     return succ;
   }
 
-  @Override
-  public void dropPartitions(String catName, String dbName, String tblName, List<String> partNames)
+  @Override public void dropPartitions(String catName, String dbName, String tblName, List<String> partNames)
       throws MetaException, NoSuchObjectException {
     rawStore.dropPartitions(catName, dbName, tblName, partNames);
     // in case of event based cache update, cache will be updated during commit.
@@ -1384,8 +1354,7 @@ public class CachedStore implements RawStore, Configurable {
     sharedCache.removePartitionsFromCache(catName, dbName, tblName, partVals);
   }
 
-  @Override
-  public List<Partition> getPartitions(String catName, String dbName, String tblName, int max)
+  @Override public List<Partition> getPartitions(String catName, String dbName, String tblName, int max)
       throws MetaException, NoSuchObjectException {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
@@ -1402,26 +1371,23 @@ public class CachedStore implements RawStore, Configurable {
     return parts;
   }
 
-  @Override
-  public Map<String, String> getPartitionLocations(String catName, String dbName, String tblName,
+  @Override public Map<String, String> getPartitionLocations(String catName, String dbName, String tblName,
       String baseLocationToNotShow, int max) {
     return rawStore.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max);
   }
 
-  @Override
-  public Table alterTable(String catName, String dbName, String tblName, Table newTable,
-      String validWriteIds) throws InvalidObjectException, MetaException {
+  @Override public Table alterTable(String catName, String dbName, String tblName, Table newTable, String validWriteIds)
+      throws InvalidObjectException, MetaException {
     newTable = rawStore.alterTable(catName, dbName, tblName, newTable, validWriteIds);
     // in case of event based cache update, cache will be updated during commit.
     if (canUseEvents) {
-      return  newTable;
+      return newTable;
     }
     catName = normalizeIdentifier(catName);
     dbName = normalizeIdentifier(dbName);
     tblName = normalizeIdentifier(tblName);
     String newTblName = normalizeIdentifier(newTable.getTableName());
-    if (!shouldCacheTable(catName, dbName, tblName) &&
-        !shouldCacheTable(catName, dbName, newTblName)) {
+    if (!shouldCacheTable(catName, dbName, tblName) && !shouldCacheTable(catName, dbName, newTblName)) {
       return newTable;
     }
     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1442,60 +1408,36 @@ public class CachedStore implements RawStore, Configurable {
     return newTable;
   }
 
-  @Override
-  public void updateCreationMetadata(String catName, String dbname, String tablename, CreationMetadata cm)
+  @Override public void updateCreationMetadata(String catName, String dbname, String tablename, CreationMetadata cm)
       throws MetaException {
     rawStore.updateCreationMetadata(catName, dbname, tablename, cm);
   }
 
-  @Override
-  public List<String> getTables(String catName, String dbName, String pattern) throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() ||
-            (canUseEvents && rawStore.isActiveTransaction())) {
-      return rawStore.getTables(catName, dbName, pattern);
-    }
-    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
-        StringUtils.normalizeIdentifier(dbName), pattern, -1);
+  @Override public List<String> getTables(String catName, String dbName, String pattern) throws MetaException {
+    return rawStore.getTables(catName, dbName, pattern);
   }
 
-  @Override
-  public List<String> getTables(String catName, String dbName, String pattern, TableType tableType, int limit)
+  @Override public List<String> getTables(String catName, String dbName, String pattern, TableType tableType, int limit)
       throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()|| !isCachedAllMetadata.get()
-            || (canUseEvents && rawStore.isActiveTransaction())) {
-      return rawStore.getTables(catName, dbName, pattern, tableType, limit);
-    }
-    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
-        StringUtils.normalizeIdentifier(dbName), pattern, tableType, limit);
+    return rawStore.getTables(catName, dbName, pattern, tableType, limit);
   }
 
-  @Override
-  public List<Table> getAllMaterializedViewObjectsForRewriting(String catName) throws MetaException {
+  @Override public List<Table> getAllMaterializedViewObjectsForRewriting(String catName) throws MetaException {
     // TODO fucntionCache
     return rawStore.getAllMaterializedViewObjectsForRewriting(catName);
   }
 
-  @Override
-  public List<String> getMaterializedViewsForRewriting(String catName, String dbName)
+  @Override public List<String> getMaterializedViewsForRewriting(String catName, String dbName)
       throws MetaException, NoSuchObjectException {
     return rawStore.getMaterializedViewsForRewriting(catName, dbName);
   }
 
-  @Override
-  public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames, List<String> tableTypes)
-      throws MetaException {
-    // TODO Check if all required tables are allowed, if so, get it from cache
-    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() ||
-            (canUseEvents && rawStore.isActiveTransaction())) {
-      return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes);
-    }
-    return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName),
-        StringUtils.normalizeIdentifier(dbNames),
-        StringUtils.normalizeIdentifier(tableNames), tableTypes);
+  @Override public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames,
+      List<String> tableTypes) throws MetaException {
+    return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes);
   }
 
-  @Override
-  public List<Table> getTableObjectsByName(String catName, String dbName, List<String> tblNames)
+  @Override public List<Table> getTableObjectsByName(String catName, String dbName, List<String> tblNames)
       throws MetaException, UnknownDBException {
     if (canUseEvents && rawStore.isActiveTransaction()) {
       return rawStore.getTableObjectsByName(catName, dbName, tblNames);
@@ -1523,6 +1465,7 @@ public class CachedStore implements RawStore, Configurable {
       Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
       if (tbl == null) {
         tbl = rawStore.getTable(catName, dbName, tblName);
+        sharedCache.addTableToCache(catName, dbName, tblName, tbl);
       }
       if (tbl != null) {
         tables.add(tbl);
@@ -1532,58 +1475,48 @@ public class CachedStore implements RawStore, Configurable {
     return tables;
   }
 
-  @Override
-  public List<String> getAllTables(String catName, String dbName) throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() ||
-            (canUseEvents && rawStore.isActiveTransaction())) {
-      return rawStore.getAllTables(catName, dbName);
-    }
-    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
-        StringUtils.normalizeIdentifier(dbName));
+  @Override public List<String> getAllTables(String catName, String dbName) throws MetaException {
+    return rawStore.getAllTables(catName, dbName);
   }
 
   @Override
   // TODO: implement using SharedCache
-  public List<String> listTableNamesByFilter(String catName, String dbName, String filter, short max_tables)
+  public List<String> listTableNamesByFilter(String catName, String dbName, String filter, short maxTables)
       throws MetaException, UnknownDBException {
-    return rawStore.listTableNamesByFilter(catName, dbName, filter, max_tables);
+    return rawStore.listTableNamesByFilter(catName, dbName, filter, maxTables);
   }
 
-  @Override
-  public List<String> listPartitionNames(String catName, String dbName, String tblName,
-      short max_parts) throws MetaException {
+  @Override public List<String> listPartitionNames(String catName, String dbName, String tblName, short maxParts)
+      throws MetaException {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
     if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
-      return rawStore.listPartitionNames(catName, dbName, tblName, max_parts);
+      return rawStore.listPartitionNames(catName, dbName, tblName, maxParts);
     }
     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
     if (tbl == null) {
       // The table is not yet loaded in cache
-      return rawStore.listPartitionNames(catName, dbName, tblName, max_parts);
+      return rawStore.listPartitionNames(catName, dbName, tblName, maxParts);
     }
     List<String> partitionNames = new ArrayList<>();
     int count = 0;
-    for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, max_parts)) {
-      if (max_parts == -1 || count < max_parts) {
+    for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) {
+      if (maxParts == -1 || count < maxParts) {
         partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()));
       }
     }
     return partitionNames;
   }
 
-  @Override
-  public PartitionValuesResponse listPartitionValues(String catName, String db_name, String tbl_name,
-      List<FieldSchema> cols, boolean applyDistinct, String filter, boolean ascending,
-      List<FieldSchema> order, long maxParts) throws MetaException {
+  @Override public PartitionValuesResponse listPartitionValues(String catName, String dbName, String tblName,
+      List<FieldSchema> cols, boolean applyDistinct, String filter, boolean ascending, List<FieldSchema> order,
+      long maxParts) throws MetaException {
     throw new UnsupportedOperationException();
   }
 
-  @Override
-  public Partition alterPartition(String catName, String dbName, String tblName,
-      List<String> partVals, Partition newPart, String validWriteIds)
-          throws InvalidObjectException, MetaException {
+  @Override public Partition alterPartition(String catName, String dbName, String tblName, List<String> partVals,
+      Partition newPart, String validWriteIds) throws InvalidObjectException, MetaException {
     newPart = rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, validWriteIds);
     // in case of event based cache update, cache will be updated during commit.
     if (canUseEvents) {
@@ -1599,13 +1532,10 @@ public class CachedStore implements RawStore, Configurable {
     return newPart;
   }
 
-  @Override
-  public List<Partition> alterPartitions(String catName, String dbName, String tblName,
-                              List<List<String>> partValsList, List<Partition> newParts,
-                              long writeId, String validWriteIds)
+  @Override public List<Partition> alterPartitions(String catName, String dbName, String tblName,
+      List<List<String>> partValsList, List<Partition> newParts, long writeId, String validWriteIds)
       throws InvalidObjectException, MetaException {
-    newParts = rawStore.alterPartitions(
-        catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds);
+    newParts = rawStore.alterPartitions(catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds);
     // in case of event based cache update, cache will be updated during commit.
     if (canUseEvents) {
       return newParts;
@@ -1620,43 +1550,37 @@ public class CachedStore implements RawStore, Configurable {
     return newParts;
   }
 
-  private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr,
-      String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache)
-      throws MetaException, NoSuchObjectException {
-    List<Partition> parts =
-        sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getCatName()),
-            StringUtils.normalizeIdentifier(table.getDbName()),
-            StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
+  private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, String defaultPartName, short maxParts,
+      List<String> result, SharedCache sharedCache) throws MetaException, NoSuchObjectException {
+    List<Partition> parts = sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getCatName()),
+        StringUtils.normalizeIdentifier(table.getDbName()), StringUtils.normalizeIdentifier(table.getTableName()),
+        maxParts);
     for (Partition part : parts) {
       result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
     }
     if (defaultPartName == null || defaultPartName.isEmpty()) {
       defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
     }
-    return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName,
-        result);
+    return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName, result);
   }
 
   @Override
   // TODO: implement using SharedCache
-  public List<Partition> getPartitionsByFilter(String catName, String dbName, String tblName,
-      String filter, short maxParts)
-      throws MetaException, NoSuchObjectException {
+  public List<Partition> getPartitionsByFilter(String catName, String dbName, String tblName, String filter,
+      short maxParts) throws MetaException, NoSuchObjectException {
     return rawStore.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts);
   }
 
   @Override
   /**
    * getPartitionSpecsByFilterAndProjection interface is currently non-cacheable.
-   */
-  public List<Partition> getPartitionSpecsByFilterAndProjection(Table table,
+   */ public List<Partition> getPartitionSpecsByFilterAndProjection(Table table,
       GetPartitionsProjectionSpec projectionSpec, GetPartitionsFilterSpec filterSpec)
       throws MetaException, NoSuchObjectException {
     return rawStore.getPartitionSpecsByFilterAndProjection(table, projectionSpec, filterSpec);
   }
 
-  @Override
-  public boolean getPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr,
+  @Override public boolean getPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr,
       String defaultPartitionName, short maxParts, List<Partition> result) throws TException {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
@@ -1680,14 +1604,12 @@ public class CachedStore implements RawStore, Configurable {
     return hasUnknownPartitions;
   }
 
-  @Override
-  public int getNumPartitionsByFilter(String catName, String dbName, String tblName, String filter)
+  @Override public int getNumPartitionsByFilter(String catName, String dbName, String tblName, String filter)
       throws MetaException, NoSuchObjectException {
     return rawStore.getNumPartitionsByFilter(catName, dbName, tblName, filter);
   }
 
-  @Override
-  public int getNumPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr)
+  @Override public int getNumPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr)
       throws MetaException, NoSuchObjectException {
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
@@ -1702,13 +1624,11 @@ public class CachedStore implements RawStore, Configurable {
       // The table is not yet loaded in cache
       return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr);
     }
-    getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames,
-        sharedCache);
+    getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames, sharedCache);
     return partNames.size();
   }
 
-  @VisibleForTesting
-  public static List<String> partNameToVals(String name) {
+  @VisibleForTesting public static List<String> partNameToVals(String name) {
     if (name == null) {
       return null;
     }
@@ -1720,8 +1640,7 @@ public class CachedStore implements RawStore, Configurable {
     return vals;
   }
 
-  @Override
-  public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName,
+  @Override public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName,
       List<String> partNames) throws MetaException, NoSuchObjectException {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
@@ -1737,179 +1656,144 @@ public class CachedStore implements RawStore, Configurable {
     List<Partition> partitions = new ArrayList<>();
     for (String partName : partNames) {
       Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, partNameToVals(partName));
-      if (part!=null) {
+      if (part != null) {
         partitions.add(part);
       }
     }
     return partitions;
   }
 
-  @Override
-  public Table markPartitionForEvent(String catName, String dbName, String tblName,
+  @Override public Table markPartitionForEvent(String catName, String dbName, String tblName,
       Map<String, String> partVals, PartitionEventType evtType)
-      throws MetaException, UnknownTableException, InvalidPartitionException,
-      UnknownPartitionException {
+      throws MetaException, UnknownTableException, InvalidPartitionException, UnknownPartitionException {
     return rawStore.markPartitionForEvent(catName, dbName, tblName, partVals, evtType);
   }
 
-  @Override
-  public boolean isPartitionMarkedForEvent(String catName, String dbName, String tblName,
+  @Override public boolean isPartitionMarkedForEvent(String catName, String dbName, String tblName,
       Map<String, String> partName, PartitionEventType evtType)
-      throws MetaException, UnknownTableException, InvalidPartitionException,
-      UnknownPartitionException {
+      throws MetaException, UnknownTableException, InvalidPartitionException, UnknownPartitionException {
     return rawStore.isPartitionMarkedForEvent(catName, dbName, tblName, partName, evtType);
   }
 
-  @Override
-  public boolean addRole(String rowName, String ownerName)
+  @Override public boolean addRole(String rowName, String ownerName)
       throws InvalidObjectException, MetaException, NoSuchObjectException {
     return rawStore.addRole(rowName, ownerName);
   }
 
-  @Override
-  public boolean removeRole(String roleName)
-      throws MetaException, NoSuchObjectException {
+  @Override public boolean removeRole(String roleName) throws MetaException, NoSuchObjectException {
     return rawStore.removeRole(roleName);
   }
 
-  @Override
-  public boolean grantRole(Role role, String userName,
-      PrincipalType principalType, String grantor, PrincipalType grantorType,
-      boolean grantOption)
+  @Override public boolean grantRole(Role role, String userName, PrincipalType principalType, String grantor,
+      PrincipalType grantorType, boolean grantOption)
       throws MetaException, NoSuchObjectException, InvalidObjectException {
     return rawStore.grantRole(role, userName, principalType, grantor, grantorType, grantOption);
   }
 
-  @Override
-  public boolean revokeRole(Role role, String userName,
-      PrincipalType principalType, boolean grantOption)
+  @Override public boolean revokeRole(Role role, String userName, PrincipalType principalType, boolean grantOption)
       throws MetaException, NoSuchObjectException {
     return rawStore.revokeRole(role, userName, principalType, grantOption);
   }
 
-  @Override
-  public PrincipalPrivilegeSet getUserPrivilegeSet(String userName,
-      List<String> groupNames) throws InvalidObjectException, MetaException {
+  @Override public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, List<String> groupNames)
+      throws InvalidObjectException, MetaException {
     return rawStore.getUserPrivilegeSet(userName, groupNames);
   }
 
-  @Override
-  public PrincipalPrivilegeSet getDBPrivilegeSet(String catName, String dbName, String userName,
+  @Override public PrincipalPrivilegeSet getDBPrivilegeSet(String catName, String dbName, String userName,
       List<String> groupNames) throws InvalidObjectException, MetaException {
     return rawStore.getDBPrivilegeSet(catName, dbName, userName, groupNames);
   }
 
-  @Override
-  public PrincipalPrivilegeSet getTablePrivilegeSet(String catName, String dbName,
-      String tableName, String userName, List<String> groupNames)
-      throws InvalidObjectException, MetaException {
+  @Override public PrincipalPrivilegeSet getTablePrivilegeSet(String catName, String dbName, String tableName,
+      String userName, List<String> groupNames) throws InvalidObjectException, MetaException {
     return rawStore.getTablePrivilegeSet(catName, dbName, tableName, userName, groupNames);
   }
 
-  @Override
-  public PrincipalPrivilegeSet getPartitionPrivilegeSet(String catName, String dbName,
-      String tableName, String partition, String userName,
-      List<String> groupNames) throws InvalidObjectException, MetaException {
+  @Override public PrincipalPrivilegeSet getPartitionPrivilegeSet(String catName, String dbName, String tableName,
+      String partition, String userName, List<String> groupNames) throws InvalidObjectException, MetaException {
     return rawStore.getPartitionPrivilegeSet(catName, dbName, tableName, partition, userName, groupNames);
   }
 
-  @Override
-  public PrincipalPrivilegeSet getColumnPrivilegeSet(String catName, String dbName,
-      String tableName, String partitionName, String columnName,
-      String userName, List<String> groupNames)
+  @Override public PrincipalPrivilegeSet getColumnPrivilegeSet(String catName, String dbName, String tableName,
+      String partitionName, String columnName, String userName, List<String> groupNames)
       throws InvalidObjectException, MetaException {
     return rawStore.getColumnPrivilegeSet(catName, dbName, tableName, partitionName, columnName, userName, groupNames);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalGlobalGrants(
-      String principalName, PrincipalType principalType) {
+  @Override public List<HiveObjectPrivilege> listPrincipalGlobalGrants(String principalName,
+      PrincipalType principalType) {
     return rawStore.listPrincipalGlobalGrants(principalName, principalType);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalDBGrants(String principalName,
-      PrincipalType principalType, String catName, String dbName) {
+  @Override public List<HiveObjectPrivilege> listPrincipalDBGrants(String principalName, PrincipalType principalType,
+      String catName, String dbName) {
     return rawStore.listPrincipalDBGrants(principalName, principalType, catName, dbName);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listAllTableGrants(String principalName,
-      PrincipalType principalType, String catName, String dbName, String tableName) {
+  @Override public List<HiveObjectPrivilege> listAllTableGrants(String principalName, PrincipalType principalType,
+      String catName, String dbName, String tableName) {
     return rawStore.listAllTableGrants(principalName, principalType, catName, dbName, tableName);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalPartitionGrants(
-      String principalName, PrincipalType principalType, String catName, String dbName,
-      String tableName, List<String> partValues, String partName) {
-    return rawStore.listPrincipalPartitionGrants(principalName, principalType, catName, dbName, tableName, partValues, partName);
+  @Override public List<HiveObjectPrivilege> listPrincipalPartitionGrants(String principalName,
+      PrincipalType principalType, String catName, String dbName, String tableName, List<String> partValues,
+      String partName) {
+    return rawStore
+        .listPrincipalPartitionGrants(principalName, principalType, catName, dbName, tableName, partValues, partName);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalTableColumnGrants(
-      String principalName, PrincipalType principalType, String catName, String dbName,
-      String tableName, String columnName) {
-    return rawStore.listPrincipalTableColumnGrants(principalName, principalType, catName, dbName, tableName, columnName);
+  @Override public List<HiveObjectPrivilege> listPrincipalTableColumnGrants(String principalName,
+      PrincipalType principalType, String catName, String dbName, String tableName, String columnName) {
+    return rawStore
+        .listPrincipalTableColumnGrants(principalName, principalType, catName, dbName, tableName, columnName);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrants(
-      String principalName, PrincipalType principalType, String catName, String dbName,
-      String tableName, List<String> partValues, String partName,
-      String columnName) {
-    return rawStore.listPrincipalPartitionColumnGrants(principalName, principalType, catName, dbName, tableName, partValues, partName, columnName);
+  @Override public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrants(String principalName,
+      PrincipalType principalType, String catName, String dbName, String tableName, List<String> partValues,
+      String partName, String columnName) {
+    return rawStore
+        .listPrincipalPartitionColumnGrants(principalName, principalType, catName, dbName, tableName, partValues,
+            partName, columnName);
   }
 
-  @Override
-  public boolean grantPrivileges(PrivilegeBag privileges)
+  @Override public boolean grantPrivileges(PrivilegeBag privileges)
       throws InvalidObjectException, MetaException, NoSuchObjectException {
     return rawStore.grantPrivileges(privileges);
   }
 
-  @Override
-  public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption)
+  @Override public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption)
       throws InvalidObjectException, MetaException, NoSuchObjectException {
     return rawStore.revokePrivileges(privileges, grantOption);
   }
 
-  @Override
-  public boolean refreshPrivileges(HiveObjectRef objToRefresh, String authorizer, PrivilegeBag grantPrivileges)
-      throws InvalidObjectException, MetaException, NoSuchObjectException {
+  @Override public boolean refreshPrivileges(HiveObjectRef objToRefresh, String authorizer,
+      PrivilegeBag grantPrivileges) throws InvalidObjectException, MetaException, NoSuchObjectException {
     return rawStore.refreshPrivileges(objToRefresh, authorizer, grantPrivileges);
   }
 
-  @Override
-  public Role getRole(String roleName) throws NoSuchObjectException {
+  @Override public Role getRole(String roleName) throws NoSuchObjectException {
     return rawStore.getRole(roleName);
   }
 
-  @Override
-  public List<String> listRoleNames() {
+  @Override public List<String> listRoleNames() {
     return rawStore.listRoleNames();
   }
 
-  @Override
-  public List<Role> listRoles(String principalName,
-      PrincipalType principalType) {
+  @Override public List<Role> listRoles(String principalName, PrincipalType principalType) {
     return rawStore.listRoles(principalName, principalType);
   }
 
-  @Override
-  public List<RolePrincipalGrant> listRolesWithGrants(String principalName,
-      PrincipalType principalType) {
+  @Override public List<RolePrincipalGrant> listRolesWithGrants(String principalName, PrincipalType principalType) {
     return rawStore.listRolesWithGrants(principalName, principalType);
   }
 
-  @Override
-  public List<RolePrincipalGrant> listRoleMembers(String roleName) {
+  @Override public List<RolePrincipalGrant> listRoleMembers(String roleName) {
     return rawStore.listRoleMembers(roleName);
   }
 
-  @Override
-  public Partition getPartitionWithAuth(String catName, String dbName, String tblName,
-      List<String> partVals, String userName, List<String> groupNames)
-      throws MetaException, NoSuchObjectException, InvalidObjectException {
+  @Override public Partition getPartitionWithAuth(String catName, String dbName, String tblName, List<String> partVals,
+      String userName, List<String> groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1924,8 +1808,7 @@ public class CachedStore implements RawStore, Configurable {
     Partition p = sharedCache.getPartitionFromCache(catName, dbName, tblName, partVals);
     if (p != null) {
       String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals);
-      PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName,
-          userName, groupNames);
+      PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName, userName, groupNames);
       p.setPrivileges(privs);
     } else {
       throw new NoSuchObjectException("partition values=" + partVals.toString());
@@ -1933,10 +1816,8 @@ public class CachedStore implements RawStore, Configurable {
     return p;
   }
 
-  @Override
-  public List<Partition> getPartitionsWithAuth(String catName, String dbName, String tblName,
-      short maxParts, String userName, List<String> groupNames)
-      throws MetaException, NoSuchObjectException, InvalidObjectException {
+  @Override public List<Partition> getPartitionsWithAuth(String catName, String dbName, String tblName, short maxParts,
+      String userName, List<String> groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1953,8 +1834,8 @@ public class CachedStore implements RawStore, Configurable {
     for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) {
       if (maxParts == -1 || count < maxParts) {
         String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
-        PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName,
-            userName, groupNames);
+        PrincipalPrivilegeSet privs =
+            getPartitionPrivilegeSet(catName, dbName, tblName, partName, userName, groupNames);
         part.setPrivileges(privs);
         partitions.add(part);
         count++;
@@ -1963,9 +1844,8 @@ public class CachedStore implements RawStore, Configurable {
     return partitions;
   }
 
-  @Override
-  public List<String> listPartitionNamesPs(String catName, String dbName, String tblName, List<String> partSpecs,
-      short maxParts) throws MetaException, NoSuchObjectException {
+  @Override public List<String> listPartitionNamesPs(String catName, String dbName, String tblName,
+      List<String> partSpecs, short maxParts) throws MetaException, NoSuchObjectException {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1991,9 +1871,8 @@ public class CachedStore implements RawStore, Configurable {
     return partitionNames;
   }
 
-  @Override
-  public List<Partition> listPartitionsPsWithAuth(String catName, String dbName, String tblName, List<String> partSpecs,
-      short maxParts, String userName, List<String> groupNames)
+  @Override public List<Partition> listPartitionsPsWithAuth(String catName, String dbName, String tblName,
+      List<String> partSpecs, short maxParts, String userName, List<String> groupNames)
       throws MetaException, InvalidObjectException, NoSuchObjectException {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
@@ -2045,11 +1924,13 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   // Note: ideally this should be above both CachedStore and ObjectStore.
-  private Map<String, String> adjustStatsParamsForGet(Map<String, String> tableParams,
-      Map<String, String> params, long statsWriteId, String validWriteIds) throws MetaException {
-    if (!TxnUtils.isTransactionalTable(tableParams)) return params; // Not a txn table.
-    if (areTxnStatsSupported && ((validWriteIds == null)
-        || ObjectStore.isCurrentStatsValidForTheQuery(params, statsWriteId, validWriteIds, false))) {
+  private Map<String, String> adjustStatsParamsForGet(Map<String, String> tableParams, Map<String, String> params,
+      long statsWriteId, String validWriteIds) throws MetaException {
+    if (!TxnUtils.isTransactionalTable(tableParams)) {
+      return params; // Not a txn table.
+    }
+    if (areTxnStatsSupported && ((validWriteIds == null) || ObjectStore
+        .isCurrentStatsValidForTheQuery(params, statsWriteId, validWriteIds, false))) {
       // Valid stats are supported for txn tables, and either no verification was requested by the
       // caller, or the verification has succeeded.
       return params;
@@ -2060,16 +1941,15 @@ public class CachedStore implements RawStore, Configurable {
     return params;
   }
 
-
   // Note: ideally this should be above both CachedStore and ObjectStore.
-  public static ColumnStatistics adjustColStatForGet(Map<String, String> tableParams,
-                                               ColumnStatistics colStat, long statsWriteId,
-      String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
+  public static ColumnStatistics adjustColStatForGet(Map<String, String> tableParams, ColumnStatistics colStat,
+      long statsWriteId, String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
     colStat.setIsStatsCompliant(true);
-    if (!TxnUtils.isTransactionalTable(tableParams)) return colStat; // Not a txn table.
-    if (areTxnStatsSupported && ((validWriteIds == null)
-        || ObjectStore.isCurrentStatsValidForTheQuery(
-            tableParams, statsWriteId, validWriteIds, false))) {
+    if (!TxnUtils.isTransactionalTable(tableParams)) {
+      return colStat; // Not a txn table.
+    }
+    if (areTxnStatsSupported && ((validWriteIds == null) || ObjectStore
+        .isCurrentStatsValidForTheQuery(tableParams, statsWriteId, validWriteIds, false))) {
       // Valid stats are supported for txn tables, and either no verification was requested by the
       // caller, or the verification has succeeded.
       return colStat;
@@ -2080,11 +1960,9 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   private static void updateTableColumnsStatsInternal(Configuration conf, ColumnStatistics colStats,
-                                                      Map<String, String> newParams, String validWriteIds,
-                                                      long writeId) throws MetaException {
-    String catName = colStats.getStatsDesc().isSetCatName() ?
-            normalizeIdentifier(colStats.getStatsDesc().getCatName()) :
-            getDefaultCatalog(conf);
+      Map<String, String> newParams, String validWriteIds, long writeId) throws MetaException {
+    String catName = colStats.getStatsDesc().isSetCatName() ? normalizeIdentifier(
+        colStats.getStatsDesc().getCatName()) : getDefaultCatalog(conf);
     String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
     String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
     if (!shouldCacheTable(catName, dbName, tblName)) {
@@ -2101,17 +1979,17 @@ public class CachedStore implements RawStore, Configurable {
       if (!areTxnStatsSupported) {
         StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
       } else {
-        String errorMsg = ObjectStore.verifyStatsChangeCtx(TableName.getDbTable(dbName, tblName),
-                table.getParameters(), newParams, writeId, validWriteIds, true);
+        String errorMsg = ObjectStore
+            .verifyStatsChangeCtx(TableName.getDbTable(dbName, tblName), table.getParameters(), newParams, writeId,
+                validWriteIds, true);
         if (errorMsg != null) {
           throw new MetaException(errorMsg);
         }
-        if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, table.getWriteId(),
-                validWriteIds, true)) {
+        if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, table.getWriteId(), validWriteIds, true)) {
           // Make sure we set the flag to invalid regardless of the current value.
           StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
-          LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table "
-                  + table.getDbName() + "." + table.getTableName());
+          LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + table.getDbName() + "." + table
+              .getTableName());
         }
       }
     }
@@ -2122,12 +2000,9 @@ public class CachedStore implements RawStore, Configurable {
     sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj());
   }
 
-  @Override
-  public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats,
-      String validWriteIds, long writeId)
-      throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
-    Map<String, String> newParams = rawStore.updateTableColumnStatistics(
-        colStats, validWriteIds, writeId);
+  @Override public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats, String validWriteIds,
+      long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+    Map<String, String> newParams = rawStore.updateTableColumnStatistics(colStats, validWriteIds, writeId);
     // in case of event based cache update, cache will be updated during commit.
     if (newParams != null && !canUseEvents) {
       updateTableColumnsStatsInternal(conf, colStats, newParams, null, writeId);
@@ -2135,43 +2010,35 @@ public class CachedStore implements RawStore, Configurable {
     return newParams;
   }
 
-  @Override
-  public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tblName,
+  @Override public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tblName,
       List<String> colNames) throws MetaException, NoSuchObjectException {
     return getTableColumnStatistics(catName, dbName, tblName, colNames, null);
   }
 
-  @Override
-  public ColumnStatistics getTableColumnStatistics(
-      String catName, String dbName, String tblName, List<String> colNames,
-      String validWriteIds)
-      throws MetaException, NoSuchObjectException {
+  @Override public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tblName,
+      List<String> colNames, String validWriteIds) throws MetaException, NoSuchObjectException {
     catName = StringUtils.normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
     if (!shouldCacheTable(catName, dbName, tblName)) {
-      return rawStore.getTableColumnStatistics(
-          catName, dbName, tblName, colNames, validWriteIds);
+      return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames, validWriteIds);
     }
     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
     if (table == null) {
       // The table is not yet loaded in cache
-      return rawStore.getTableColumnStatistics(
-          catName, dbName, tblName, colNames, validWriteIds);
+      return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames, validWriteIds);
     }
     ColumnStatistics columnStatistics =
         sharedCache.getTableColStatsFromCache(catName, dbName, tblName, colNames, validWriteIds, areTxnStatsSupported);
     if (columnStatistics == null) {
-      LOG.info("Stat of Table {}.{} for column {} is not present in cache." +
-              "Getting from raw store", dbName, tblName, colNames);
+      LOG.info("Stat of Table {}.{} for column {} is not present in cache." + "Getting from raw store", dbName, tblName,
+          colNames);
       return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames, validWriteIds);
     }
     return columnStatistics;
   }
 
-  @Override
-  public boolean deleteTableColumnStatistics(String catName, String dbName, String tblName,
-                                             String colName)
+  @Override public boolean deleteTableColumnStatistics(String catName, String dbName, String tblName, String colName)
       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.deleteTableColumnStatistics(catName, dbName, tblName, colName);
     // in case of event based cache update, cache is updated during commit txn
@@ -2187,16 +2054,15 @@ public class CachedStore implements RawStore, Configurable {
     return succ;
   }
 
-  @Override
-  public Map<String, String> updatePartitionColumnStatistics(ColumnStatistics colStats,
-      List<String> partVals, String validWriteIds, long writeId)
+  @Override public Map<String, String> updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals,
+      String validWriteIds, long writeId)
       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
-    Map<String, String> newParams = rawStore.updatePartitionColumnStatistics(
-        colStats, partVals, validWriteIds, writeId);
+    Map<String, String> newParams =
+        rawStore.updatePartitionColumnStatistics(colStats, partVals, validWriteIds, writeId);
     // in case of event based cache update, cache is updated during commit txn
     if (newParams != null && !canUseEvents) {
-      String catName = colStats.getStatsDesc().isSetCatName() ?
-          normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME;
+      String catName = colStats.getStatsDesc().isSetCatName() ? normalizeIdentifier(
+          colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME;
       String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
       String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
       if (!shouldCacheTable(catName, dbName, tblName)) {
@@ -2210,36 +2076,31 @@ public class CachedStore implements RawStore, Configurable {
     return newParams;
   }
 
-  @Override
-  public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, String tblName,
+  @Override public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, String tblName,
       List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException {
     return getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames, null);
   }
 
-  @Override
-  public List<ColumnStatistics> getPartitionColumnStatistics(
-      String catName, String dbName, String tblName, List<String> partNames,
-      List<String> colNames, String writeIdList)
-      throws MetaException, NoSuchObjectException {
+  @Override public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, String tblName,
+      List<String> partNames, List<String> colNames, String writeIdList) throws MetaException, NoSuchObjectException {
 
     // If writeIdList is not null, that means stats are requested within a txn context. So set stats compliant to false,
     // if areTxnStatsSupported is false or the write id which has updated the stats in not compatible with writeIdList.
     // This is done within table lock as the number of partitions may be more than one and we need a consistent view
     // for all the partitions.
-    List<ColumnStatistics> columnStatistics = sharedCache.getPartitionColStatsListFromCache(catName, dbName, tblName,
-            partNames, colNames, writeIdList, areTxnStatsSupported);
+    List<ColumnStatistics> columnStatistics = sharedCache
+        .getPartitionColStatsListFromCache(catName, dbName, tblName, partNames, colNames, writeIdList,
+            areTxnStatsSupported);
     if (columnStatistics == null) {
       return rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames, writeIdList);
     }
     return columnStatistics;
   }
 
-  @Override
-  public boolean deletePartitionColumnStatistics(String catName, String dbName, String tblName, String partName,
-      List<String> partVals, String colName)
+  @Override public boolean deletePartitionColumnStatistics(String catName, String dbName, String tblName,
+      String partName, List<String> partVals, String colName)
       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
-    boolean succ =
-        rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName);
+    boolean succ = rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName);
     // in case of event based cache update, cache is updated during commit txn.
     if (succ && !canUseEvents) {
       catName = normalizeIdentifier(catName);
@@ -2253,17 +2114,13 @@ public class CachedStore implements RawStore, Configurable {
     return succ;
   }
 
-  @Override
-  public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, List<String> partNames,
+  @Override public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, List<String> partNames,
       List<String> colNames) throws MetaException, NoSuchObjectException {
     return get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, null);
   }
 
-  @Override
-  public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName,
-                                      List<String> partNames, List<String> colNames,
-                                      String writeIdList)
-      throws MetaException, NoSuchObjectException {
+  @Override public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, List<String> partNames,
+      List<String> colNames, String writeIdList) throws MetaException, NoSuchObjectException {
     List<ColumnStatisticsObj> colStats;
     catName = normalizeIdentifier(catName);
     dbName = StringUtils.normalizeIdentifier(dbName);
@@ -2272,14 +2129,12 @@ public class CachedStore implements RawStore, Configurable {
     //       (incl. due to lack of sync w.r.t. the below rawStore call).
     // In case the cache is updated using events, aggregate is calculated locally and thus can be read from cache.
     if (!shouldCacheTable(catName, dbName, tblName) || (writeIdList != null && !canUseEvents)) {
-      return rawStore.get_aggr_stats_for(
-          catName, dbName, tblName, partNames, colNames, writeIdList);
+      return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, writeIdList);
     }
     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
     if (table == null) {
       // The table is not yet loaded in cache
-      return rawStore.get_aggr_stats_for(
-          catName, dbName, tblName, partNames, colNames, writeIdList);
+      return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, writeIdList);
     }
 
     List<String> allPartNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1);
@@ -2301,21 +2156,21 @@ public class CachedStore implements RawStore, Configurable {
       }
     }
 
-    LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}",
-        tblName, partNames, colNames);
-    MergedColumnStatsForPartitions mergedColStats = mergeColStatsForPartitions(catName, dbName, tblName,
-              partNames, colNames, sharedCache, type, writeIdList);
+    LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}", tblName, partNames,
+        colNames);
+    MergedColumnStatsForPartitions mergedColStats =
+        mergeColStatsForPartitions(catName, dbName, tblName, partNames, colNames, sharedCache, type, writeIdList);
     if (mergedColStats == null) {
-      LOG.info("Aggregate stats of partition " + TableName.getQualified(catName, dbName, tblName) + "." +
-              partNames + " for columns " + colNames + " is not present in cache. Getting it from raw store");
+      LOG.info("Aggregate stats of partition " + TableName.getQualified(catName, dbName, tblName) + "." + partNames
+          + " for columns " + colNames + " is not present in cache. Getting it from raw store");
       return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, writeIdList);
     }
     return new AggrStats(mergedColStats.getColStats(), mergedColStats.getPartsFound());
   }
 
-  private MergedColumnStatsForPartitions mergeColStatsForPartitions(
-      String catName, String dbName, String tblName, List<String> partNames, List<String> colNames,
-      SharedCache sharedCache, StatsType type, String writeIdList) throws MetaException {
+  private MergedColumnStatsForPartitions mergeColStatsForPartitions(String catName, String dbName, String tblName,
+      List<String> partNames, List<String> colNames, SharedCache sharedCache, StatsType type, String writeIdList)
+      throws MetaException {
     final boolean useDensityFunctionForNDVEstimation =
         MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_NDV_DENSITY_FUNCTION);
     final double ndvTuner = MetastoreConf.getDoubleVar(getConf(), ConfVars.STATS_NDV_TUNER);
@@ -2335,8 +2190,8 @@ public class CachedStore implements RawStore, Configurable {
         //    the behavior same as object store.
         // 3. Partition is missing or its stat is updated by live(not yet committed) or aborted txn. In this case,
         //    colStatsWriteId is null. Thus null is returned to keep the behavior same as object store.
-        SharedCache.ColumStatsWithWriteId colStatsWriteId = sharedCache.getPartitionColStatsFromCache(catName, dbName,
-                tblName, partValue, colName, writeIdList);
+        SharedCache.ColumStatsWithWriteId colStatsWriteId =
+            sharedCache.getPartitionColStatsFromCache(catName, dbName, tblName, partValue, colName, writeIdList);
         if (colStatsWriteId == null) {
           return null;
         }
@@ -2349,15 +2204,14 @@ public class CachedStore implements RawStore, Configurable {
               new ColStatsObjWithSourceInfo(colStatsForPart, catName, dbName, tblName, partName);
           colStatsWithPartInfoList.add(colStatsWithPartInfo);
           if (colStatsAggregator == null) {
-            colStatsAggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(
-                colStatsForPart.getStatsData().getSetField(), useDensityFunctionForNDVEstimation,
-                ndvTuner);
+            colStatsAggregator = ColumnStatsAggregatorFactory
+                .getColumnStatsAggregator(colStatsForPart.getStatsData().getSetField(),
+                    useDensityFunctionForNDVEstimation, ndvTuner);
           }
           partsFoundForColumn++;
         } else {
-          LOG.debug(
-              "Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}",
-              dbName, tblName, partName, colName);
+          LOG.debug("Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}", dbName, tblName,
+              partName, colName);
         }
       }
       if (colStatsWithPartInfoList.size() > 0) {
@@ -2369,27 +2223,26 @@ public class CachedStore implements RawStore, Configurable {
         partsFound = partsFoundForColumn;
       }
       if (colStatsMap.size() < 1) {
-        LOG.debug("No stats data found for: dbName={} tblName= {} partNames= {} colNames= ", dbName,
-            tblName, partNames, colNames);
+        LOG.debug("No stats data found for: dbName={} tblName= {} partNames= {} colNames= ", dbName, tblName, partNames,
+            colNames);
         return new MergedColumnStatsForPartitions(new ArrayList<ColumnStatisticsObj>(), 0);
       }
     }
     // Note that enableBitVector does not apply here because ColumnStatisticsObj
     // itself will tell whether bitvector is null or not and aggr logic can automatically apply.
-    List<ColumnStatisticsObj> colAggrStats = MetaStoreServerUtils.aggrPartitionStats(colStatsMap,
-            partNames, partsFound == partNames.size(), useDensityFunctionForNDVEstimation, ndvTuner);
+    List<ColumnStatisticsObj> colAggrStats = MetaStoreServerUtils
+        .aggrPartitionStats(colStatsMap, partNames, partsFound == partNames.size(), useDensityFunctionForNDVEstimation,
+            ndvTuner);
 
     if (canUseEvents) {
       if (type == StatsType.ALL) {
         sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
-                StringUtils.normalizeIdentifier(dbName),
-                StringUtils.normalizeIdentifier(tblName), new AggrStats(colAggrStats, partsFound),
-                null, partNameToWriteId);
+            StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName),
+            new AggrStats(colAggrStats, partsFound), null, partNameToWriteId);
       } else if (type == StatsType.ALLBUTDEFAULT) {
         sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
-                StringUtils.normalizeIdentifier(dbName),
-                StringUtils.normalizeIdentifier(tblName), null,
-                new AggrStats(colAggrStats, partsFound), partNameToWriteId);
+            StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), null,
+            new AggrStats(colAggrStats, partsFound), partNameToWriteId);
       }
     }
     return new MergedColumnStatsForPartitions(colAggrStats, partsFound);
@@ -2413,440 +2266,355 @@ public class CachedStore implements RawStore, Configurable {
     }
   }
 
-  @Override
-  public long cleanupEvents() {
+  @Override public long cleanupEvents() {
     return rawStore.cleanupEvents();
   }
 
-  @Override
-  public boolean addToken(String tokenIdentifier, String delegationToken) {
+  @Override public boolean addToken(String tokenIdentifier, String delegationToken) {
     return rawStore.addToken(tokenIdentifier, delegationToken);
   }
 
-  @Override
-  public boolean removeToken(String tokenIdentifier) {
+  @Override public boolean removeToken(String tokenIdentifier) {
     return rawStore.removeToken(tokenIdentifier);
   }
 
-  @Override
-  public String getToken(String tokenIdentifier) {
+  @Override public String getToken(String tokenIdentifier) {
     return rawStore.getToken(tokenIdentifier);
   }
 
-  @Override
-  public List<String> getAllTokenIdentifiers() {
+  @Override public List<String> getAllTokenIdentifiers() {
     return rawStore.getAllTokenIdentifiers();
   }
 
-  @Override
-  public int addMasterKey(String key) throws MetaException {
+  @Override public int addMasterKey(String key) throws MetaException {
     return rawStore.addMasterKey(key);
   }
 
-  @Override
-  public void updateMasterKey(Integer seqNo, String key)
-      throws NoSuchObjectException, MetaException {
+  @Override public void updateMasterKey(Integer seqNo, String key) throws NoSuchObjectException, MetaException {
     rawStore.updateMasterKey(seqNo, key);
   }
 
-  @Override
-  public boolean removeMasterKey(Integer keySeq) {
+  @Override public boolean removeMasterKey(Integer keySeq) {
     return rawStore.removeMasterKey(keySeq);
   }
 
-  @Override
-  public String[] getMasterKeys() {
+  @Override public String[] getMasterKeys() {
     return rawStore.getMasterKeys();
   }
 
-  @Override
-  public void verifySchema() throws MetaException {
+  @Override public void verifySchema() throws MetaException {
     rawStore.verifySchema();
   }
 
-  @Override
-  public String getMetaStoreSchemaVersion() throws MetaException {
+  @Override public String getMetaStoreSchemaVersion() throws MetaException {
     return rawStore.getMetaStoreSchemaVersion();
   }
 
-  @Override
-  public void setMetaStoreSchemaVersion(String version, String comment)
-      throws MetaException {
+  @Override public void setMetaStoreSchemaVersion(String version, String comment) throws MetaException {
     rawStore.setMetaStoreSchemaVersion(version, comment);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalDBGrantsAll(
-      String principalName, PrincipalType principalType) {
+  @Override public List<HiveObjectPrivilege> listPrincipalDBGrantsAll(String principalName,
+      PrincipalType principalType) {
     return rawStore.listPrincipalDBGrantsAll(principalName, principalType);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalTableGrantsAll(
-      String principalName, PrincipalType principalType) {
+  @Override public List<HiveObjectPrivilege> listPrincipalTableGrantsAll(String principalName,
+      PrincipalType principalType) {
     return rawStore.listPrincipalTableGrantsAll(principalName, principalType);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalPartitionGrantsAll(
-      String principalName, PrincipalType principalType) {
+  @Override public List<HiveObjectPrivilege> listPrincipalPartitionGrantsAll(String principalName,
+      PrincipalType principalType) {
     return rawStore.listPrincipalPartitionGrantsAll(principalName, principalType);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalTableColumnGrantsAll(
-      String principalName, PrincipalType principalType) {
+  @Override public List<HiveObjectPrivilege> listPrincipalTableColumnGrantsAll(String principalName,
+      PrincipalType principalType) {
     return rawStore.listPrincipalTableColumnGrantsAll(principalName, principalType);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrantsAll(
-      String principalName, PrincipalType principalType) {
+  @Override public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrantsAll(String principalName,
+      PrincipalType principalType) {
     return rawStore.listPrincipalPartitionColumnGrantsAll(principalName, principalType);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listGlobalGrantsAll() {
+  @Override public List<HiveObjectPrivilege> listGlobalGrantsAll() {
     return rawStore.listGlobalGrantsAll();
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listDBGrantsAll(String catName, String dbName) {
+  @Override public List<HiveObjectPrivilege> listDBGrantsAll(String catName, String dbName) {
     return rawStore.listDBGrantsAll(catName, dbName);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPartitionColumnGrantsAll(String catName, String dbName,
+  @Override public List<HiveObjectPrivilege> listPartitionColumnGrantsAll(String catName, String dbName,
       String tableName, String partitionName, String columnName) {
     return rawStore.listPartitionColumnGrantsAll(catName, dbName, tableName, partitionName, columnName);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listTableGrantsAll(String catName, String dbName,
-      String tableName) {
+  @Override public List<HiveObjectPrivilege> listTableGrantsAll(String catName, String dbName, String tableName) {
     return rawStore.listTableGrantsAll(catName, dbName, tableName);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listPartitionGrantsAll(String catName, String dbName,
-      String tableName, String partitionName) {
+  @Override public List<HiveObjectPrivilege> listPartitionGrantsAll(String catName, String dbName, String tableName,
+      String partitionName) {
     return rawStore.listPartitionGrantsAll(catName, dbName, tableName, partitionName);
   }
 
-  @Override
-  public List<HiveObjectPrivilege> listTableColumnGrantsAll(String catName, String dbName,
-      String tableName, String columnName) {
+  @Override public List<HiveObjectPrivilege> listTableColumnGrantsAll(String catName, String dbName, String tableName,
+      String columnName) {
     return rawStore.listTableColumnGrantsAll(catName, dbName, tableName, columnName);
   }
 
-  @Override
-  public void createFunction(Function func)
-      throws InvalidObjectException, MetaException {
+  @Override public void createFunction(Function func) throws InvalidObjectException, MetaException {
     // TODO fucntionCache
     rawStore.createFunction(func);
   }
 
-  @Override
-  public void alterFunction(String catName, String dbName, String funcName,
-      Function newFunction) throws InvalidObjectException, MetaException {
+  @Override public void alterFunction(String catName, String dbName, String funcName, Function newFunction)
+      throws InvalidObjectException, MetaException {
     // TODO fucntionCache
     rawStore.alterFunction(catName, dbName, funcName, newFunction);
   }
 
-  @Override
-  public void dropFunction(String catName, String dbName, String funcName) throws MetaException,
-      NoSuchObjectException, InvalidObjectException, InvalidInputException {
+  @Override public void dropFunction(String catName, String dbName, String funcName)
+      throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
     // TODO fucntionCache
     rawStore.dropFunction(catName, dbName, funcName);
   }
 
-  @Override
-  public Function getFunction(String catName, String dbName, String funcName)
-      throws MetaException {
+  @Override public Function getFunction(String catName, String dbName, String funcName) throws MetaException {
     // TODO fucntionCache
     return rawStore.getFunction(catName, dbName, funcName);
   }
 
-  @Override
-  public List<Function> getAllFunctions(String catName) throws MetaException {
+  @Override public List<Function> getAllFunctions(String catName) throws MetaException {
     // TODO fucntionCache
     return rawStore.getAllFunctions(catName);
   }
 
-  @Override
-  public List<String> getFunctions(String catName, String dbName, String pattern)
-      throws MetaException {
+  @Override public List<String> getFunctions(String catName, String dbName, String pattern) throws MetaException {
     // TODO fucntionCache
     return rawStore.getFunctions(catName, dbName, pattern);
   }
 
-  @Override
-  public NotificationEventResponse getNextNotification(
-      NotificationEventRequest rqst) {
+  @Override public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) {
     return rawStore.getNextNotification(rqst);
   }
 
-  @Override
-  public void addNotificationEvent(NotificationEvent event) throws MetaException {
+  @Override public void addNotificationEvent(NotificationEvent event) throws MetaException {
     rawStore.addNotificationEvent(event);
   }
 
-  @Override
-  public void cleanNotificationEvents(int olderThan) {
+  @Override public void cleanNotificationEvents(int olderThan) {
     rawStore.cleanNotificationEvents(olderThan);
   }
 
-  @Override
-  public CurrentNotificationEventId getCurrentNotificationEventId() {
+  @Override public CurrentNotificationEventId getCurrentNotificationEventId() {
     return rawStore.getCurrentNotificationEventId();
   }
 
-  @Override
-  public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) {
+  @Override public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) {
     return rawStore.getNotificationEventsCount(rqst);
   }
 
-  @Override
-  public void flushCache() {
+  @Override public void flushCache() {
     rawStore.flushCache();
   }
 
-  @Override
-  public ByteBuffer[] getFileMetadata(List<Long> fileIds) throws MetaException {
+  @Override public ByteBuffer[] getFileMetadata(List<Long> fileIds) throws MetaException {
     return rawStore.getFileMetadata(fileIds);
   }
 
-  @Override
-  public void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata,
-      FileMetadataExprType type) throws MetaException {
+  @Override public void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata, FileMetadataExprType type)
+      throws MetaException {
     rawStore.putFileMetadata(fileIds, metadata, type);
   }
 
-  @Override
-  public boolean isFileMetadataSupported() {
+  @Override public boolean isFileMetadataSupported() {
     return rawStore.isFileMetadataSupported();
   }
 
-  @Override
-  public void getFileMetadataByExpr(List<Long> fileIds,
-      FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas,
-      ByteBuffer[] exprResults, boolean[] eliminated) throws MetaException {
+  @Override public void getFileMetadataByExpr(List<Long> fileIds, FileMetadataExprType type, byte[] expr,
+      ByteBuffer[] metadatas, ByteBuffer[] exprResults, boolean[] eliminated) throws MetaException {
     rawStore.getFileMetadataByExpr(fileIds, type, expr, metadatas, exprResults, eliminated);
   }
 
-  @Override
-  public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) {
+  @Override public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) {
     return rawStore.getFileMetadataHandler(type);
   }
 
-  @Override
-  public int getTableCount() throws MetaException {
+  @Override public int getTableCount() throws MetaException {
     return rawStore.getTableCount();
   }
 
-  @Override
-  public int getPartitionCount() throws MetaException {
+  @Override public int getPartitionCount() throws MetaException {
     return rawStore.getPartitionCount();
   }
 
-  @Override
-  public int getDatabaseCount() throws MetaException {
+  @Override public int getDatabaseCount() throws MetaException {
     return rawStore.getDatabaseCount();
   }
 
-  @Override
-  public List<SQLPrimaryKey> getPrimaryKeys(String catName, String db_name, String tbl_name)
+  @Override public List<SQLPrimaryKey> getPrimaryKeys(String catName, String dbName, String tblName)
       throws MetaException {
     // TODO constraintCache
-    return rawStore.getPrimaryKeys(catName, db_name, tbl_name);
+    return rawStore.getPrimaryKeys(catName, dbName, tblName);
   }
 
-  @Override
-  public List<SQLForeignKey> getForeignKeys(String catName, String parent_db_name,
-      String parent_tbl_name, String foreign_db_name, String foreign_tbl_name)
-      throws MetaException {
+  @Override public List<SQLForeignKey> getForeignKeys(String catName, String parentDbName, String parentTblName,
+      String foreignDbName, String foreignTblName) throws MetaException {
     // TODO constraintCache
-    return rawStore.getForeignKeys(catName, parent_db_name, parent_tbl_name, foreign_db_name, foreign_tbl_name);
+    return rawStore.getForeignKeys(catName, parentDbName, parentTblName, foreignDbName, foreignTblName);
   }
 
-  @Override
-  public List<SQLUniqueConstraint> getUniqueConstraints(String catName, String db_name, String tbl_name)
+  @Override public List<SQLUniqueConstraint> getUniqueConstraints(String catName, String dbName, String tblName)
       throws MetaException {
     // TODO constraintCache
-    return rawStore.getUniqueConstraints(catName, db_name, tbl_name);
+    return rawStore.getUniqueConstraints(catName, dbName, tblName);
   }
 
-  @Override
-  public List<SQLNotNullConstraint> getNotNullConstraints(String catName, String db_name, String tbl_name)
+  @Override public List<SQLNotNullConstraint> getNotNullConstraints(String catName, String dbName, String tblName)
       throws MetaException {
     // TODO constraintCache
-    return rawStore.getNotNullConstraints(catName, db_name, tbl_name);
+    return rawStore.getNotNullConstraints(catName, dbName, tblName);
   }
 
-  @Override
-  public List<SQLDefaultConstraint> getDefaultConstraints(String catName, String db_name, String tbl_name)
+  @Override public List<SQLDefaultConstraint> getDefaultConstraints(String catName, String dbName, String tblName)
       throws MetaException {
     // TODO constraintCache
-    return rawStore.getDefaultConstraints(catName, db_name, tbl_name);
+    return rawStore.getDefaultConstraints(catName, dbName, tblName);
   }
 
-  @Override
-  public List<SQLCheckConstraint> getCheckConstraints(String catName, String db_name, String tbl_name)
+  @Override public List<SQLCheckConstraint> getCheckConstraints(String catName, String dbName, String tblName)
       throws MetaException {
     // TODO constraintCache
-    return rawStore.getCheckConstraints(catName, db_name, tbl_name);
+    return rawStore.getCheckConstraints(catName, dbName, tblName);
   }
 
-  @Override
-  public List<String> createTableWithConstraints(Table tbl, List<SQLPrimaryKey> primaryKeys,
+  @Override public List<String> createTableWithConstraints(Table tbl, List<SQLPrimaryKey> primaryKeys,
       List<SQLForeignKey> foreignKeys, List<SQLUniqueConstraint> uniqueConstraints,
-      List<SQLNotNullConstraint> notNullConstraints,
-      List<SQLDefaultConstraint> defaultConstraints,
+      List<SQLNotNullConstraint> notNullConstraints, List<SQLDefaultConstraint> defaultConstraints,
       List<SQLCheckConstraint> checkConstraints) throws InvalidObjectException, MetaException {
     // TODO constraintCache
-    List<String> constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys,
-        foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+    List<String> constraintNames = rawStore
+        .createTableWithConstraints(tbl, primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints,
+            defaultConstraints, checkConstraints);
     // in case of event based cache update, cache is updated during commit.
     if (canUseEvents) {
       return constraintNames;
     }
     String dbName = normalizeIdentifier(tbl.getDbName());
     String tblName = normalizeIdentifier(tbl.getTableName());
-    String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) :
-        DEFAULT_CATALOG_NAME;
+    String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) : DEFAULT_CATALOG_NAME;
     if (!shouldCacheTable(catName, dbName, tblName)) {
       return constraintNames;
     }
     sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getCatName()),
-        StringUtils.normalizeIdentifier(tbl.getDbName()),
-        StringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+        StringUtils.normalizeIdentifier(tbl.getDbName()), StringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
     return constraintNames;
   }
 
-  @Override
-  public void dropConstraint(String catName, String dbName, String tableName,
-      String constraintName, boolean missingOk) throws NoSuchObjectException {
+  @Override public void dropConstraint(String catName, String dbName, String tableName, String constraintName,
+      boolean missingOk) throws NoSuchObjectException {
     // TODO constraintCache
     rawStore.dropConstraint(catName, dbName, tableName, constraintName, missingOk);
   }
 
-  @Override
-  public List<String> addPrimaryKeys(List<SQLPrimaryKey> pks)
-      throws InvalidObjectException, MetaException {
+  @Override public List<String> addPrimaryKeys(List<SQLPrimaryKey> pks) throws InvalidObjectException, MetaException {
     // TODO constraintCache
     return rawStore.addPrimaryKeys(pks);
   }
 
-  @Override
-  public List<String> addForeignKeys(List<SQLForeignKey> fks)
-      throws InvalidObjectException, MetaException {
+  @Override public List<String> addForeignKeys(List<SQLForeignKey> fks) throws InvalidObjectException, MetaException {
     // TODO constraintCache
     return rawStore.addForeignKeys(fks);
   }
 
-  @Override
-  public List<String> addUniqueConstraints(List<SQLUniqueConstraint> uks)
+  @Override public List<String> addUniqueConstraints(List<SQLUniqueConstraint> uks)
       throws InvalidObjectException, MetaException {
     // TODO constraintCache
     return rawStore.addUniqueConstraints(uks);
   }
 
-  @Override
-  public List<String> addNotNullConstraints(List<SQLNotNullConstraint> nns)
+  @Override public List<String> addNotNullConstraints(List<SQLNotNullConstraint> nns)
       throws InvalidObjectException, MetaException {
     // TODO constraintCache
     return rawStore.addNotNullConstraints(nns);
   }
 
-  @Override
-  public List<String> addDefaultConstraints(List<SQLDefaultConstraint> nns)
+  @Override public List<String> addDefaultConstraints(List<SQLDefaultConstraint> nns)
       throws InvalidObjectException, MetaException {
     // TODO constraintCache
     return rawStore.addDefaultConstraints(nns);
   }
 
-  @Override
-  public List<String> addCheckConstraints(List<SQLCheckConstraint> nns)
+  @Override public List<String> addCheckConstraints(List<SQLCheckConstraint> nns)
       throws InvalidObjectException, MetaException {
     // TODO constraintCache
     return rawStore.addCheckConstraints(nns);
   }
 
   // TODO - not clear if we should cache these or not.  For now, don't bother
-  @Override
-  public void createISchema(ISchema schema)
+  @Override public void createISchema(ISchema schema)
       throws AlreadyExistsException, NoSuchObjectException, MetaException {
     rawStore.createISchema(schema);
   }
 
-  @Override
-  public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String catName, String dbName)
+  @Override public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String catName, String dbName)
       throws MetaException, NoSuchObjectException {
     return rawStore.getPartitionColStatsForDatabase(catName, dbName);
   }
 
-  @Override
-  public void alterISchema(ISchemaName schemaName, ISchema newSchema)
+  @Override public void alterISchema(ISchemaName schemaName, ISchema newSchema)
       throws NoSuchObjectException, MetaException {
     rawStore.alterISchema(schemaName, newSchema);
   }
 
-  @Override
-  public ISchema getISchema(ISchemaName schemaName) throws MetaException {
+  @Override public ISchema getISchema(ISchemaName schemaName) throws MetaException {
     return rawStore.getISchema(schemaName);
   }
 
-  @Override
-  public void dropISchema(ISchemaName schemaName) throws NoSuchObjectException, MetaException {
+  @Override public void dropISchema(ISchemaName schemaName) throws NoSuchObjectException, MetaException {
     rawStore.dropISchema(schemaName);
   }
 
-  @Override
-  public void addSchemaVersion(SchemaVersion schemaVersion) throws
-      AlreadyExistsException, InvalidObjectException, NoSuchObjectException, MetaException {
+  @Override public void addSchemaVersion(SchemaVersion schemaVersion)
+      throws AlreadyExistsException, InvalidObjectException, NoSuchObjectException, MetaException {
     rawStore.addSchemaVersion(schemaVersion);
   }
 
-  @Override
-  public void alterSchemaVersion(SchemaVersionDescriptor version, SchemaVersion newVersion) throws
-      NoSuchObjectException, MetaException {
+  @Override public void alterSchemaVersion(SchemaVersionDescriptor version, SchemaVersion newVersion)
+      throws NoSuchObjectException, MetaException {
     rawStore.alterSchemaVersion(version, newVersion);
   }
 
-  @Override
-  public SchemaVersion getSchemaVersion(SchemaVersionDescriptor version) throws MetaException {
+  @Override public SchemaVersion getSchemaVersion(SchemaVersionDescriptor version) throws MetaException {
     return rawStore.getSchemaVersion(version);
   }
 
-  @Override
-  public SchemaVersion getLatestSchemaVersion(ISchemaName schemaName) throws MetaException {
+  @Override public SchemaVersion getLatestSchemaVersion(ISchemaName schemaName) throws MetaException {
     return rawStore.getLatestSchemaVersion(schemaName);
   }
 
-  @Override
-  public List<SchemaVersion> getAllSchemaVersion(ISchemaName schemaName) throws MetaException {
+  @Override public List<SchemaVersion> getAllSchemaVersion(ISchemaName schemaName) throws MetaException {
     return rawStore.getAllSchemaVersion(schemaName);
   }
 
-  @Override
-  public List<SchemaVersion> getSchemaVersionsByColumns(String colName, String colNamespace,
-                                                        String type) throws MetaException {
+  @Override public List<SchemaVersion> getSchemaVersionsByColumns(String colName, String colNamespace, String type)
+      throws MetaException {
     return rawStore.getSchemaVersionsByColumns(colName, colNamespace, type);
   }
 
-  @Override
-  public void dropSchemaVersion(SchemaVersionDescriptor version) throws NoSuchObjectException,
-      MetaException {
+  @Override public void dropSchemaVersion(SchemaVersionDescriptor version) throws NoSuchObjectException, MetaException {
     rawStore.dropSchemaVersion(version);
   }
 
-  @Override
-  public SerDeInfo getSerDeInfo(String serDeName) throws NoSuchObjectException, MetaException {
+  @Override public SerDeInfo getSerDeInfo(String serDeName) throws NoSuchObjectException, MetaException {
     return rawStore.getSerDeInfo(serDeName);
   }
 
-  @Override
-  public void addSerde(SerDeInfo serde) throws AlreadyExistsException, MetaException {
+  @Override public void addSerde(SerDeInfo serde) throws AlreadyExistsException, MetaException {
     rawStore.addSerde(serde);
   }
 
@@ -2854,124 +2622,99 @@ public class CachedStore implements RawStore, Configurable {
     return rawStore;
   }
 
-  @VisibleForTesting
-  public void setRawStore(RawStore rawStore) {
+  @VisibleForTesting public void setRawStore(RawStore rawStore) {
     this.rawStore = rawStore;
   }
 
-  @Override
-  public String getMetastoreDbUuid() throws MetaException {
+  @Override public String getMetastoreDbUuid() throws MetaException {
     return rawStore.getMetastoreDbUuid();
   }
 
-  @Override
-  public void createResourcePlan(WMResourcePlan resourcePlan, String copyFrom, int defaultPoolSize)
+  @Override public void createResourcePlan(WMResourcePlan resourcePlan, String copyFrom, int defaultPoolSize)
       throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException {
     rawStore.createResourcePlan(resourcePlan, copyFrom, defaultPoolSize);
   }
 
-  @Override
-  public WMFullResourcePlan getResourcePlan(String name, String ns)
+  @Override public WMFullResourcePlan getResourcePlan(String name, String ns)
       throws NoSuchObjectException, MetaException {
     return rawStore.getResourcePlan(name, ns);
   }
 
-  @Override
-  public List<WMResourcePlan> getAllResourcePlans(String ns) throws MetaException {
+  @Override public List<WMResourcePlan> getAllResourcePlans(String ns) throws MetaException {
     return rawStore.getAllResourcePlans(ns);
   }
 
-  @Override
-  public WMFullResourcePlan alterResourcePlan(String name, String ns, WMNullableResourcePlan resourcePlan,
-    boolean canActivateDisabled, boolean canDeactivate, boolean isReplace)
-      throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException,
-          MetaException {
-    return rawStore.alterResourcePlan(
-      name, ns, resourcePlan, canActivateDisabled, canDeactivate, isReplace);
+  @Override public WMFullResourcePlan alterResourcePlan(String name, String ns, WMNullableResourcePlan resourcePlan,
+      boolean canActivateDisabled, boolean canDeactivate, boolean isReplace)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
+    return rawStore.alterResourcePlan(name, ns, resourcePlan, canActivateDisabled, canDeactivate, isReplace);
   }
 
-  @Override
-  public WMFullResourcePlan getActiveResourcePlan(String ns) throws MetaException {
+  @Override public WMFullResourcePlan getActiveResourcePlan(String ns) throws MetaException {
     return rawStore.getActiveResourcePlan(ns);
   }
 
-  @Override
-  public WMValidateResourcePlanResponse validateResourcePlan(String name, String ns)
+  @Override public WMValidateResourcePlanResponse validateResourcePlan(String name, String ns)
       throws NoSuchObjectException, InvalidObjectException, MetaException {
     return rawStore.validateResourcePlan(name, ns);
   }
 
-  @Override
-  public void dropResourcePlan(String name, String ns) throws NoSuchObjectException, MetaException {
+  @Override public void dropResourcePlan(String name, String ns) throws NoSuchObjectException, MetaException {
     rawStore.dropResourcePlan(name, ns);
   }
 
-  @Override
-  public void createWMTrigger(WMTrigger trigger)
-      throws AlreadyExistsException, MetaException, NoSuchObjectException,
-          InvalidOperationException {
+  @Override public void createWMTrigger(WMTrigger trigger)
+      throws AlreadyExistsException, MetaException, NoSuchObjectException, InvalidOperationException {
     rawStore.createWMTrigger(trigger);
   }
 
-  @Override
-  public void alterWMTrigger(WMTrigger trigger)
+  @Override public void alterWMTrigger(WMTrigger trigger)
       throws NoSuchObjectException, InvalidOperationException, MetaException {
     rawStore.alterWMTrigger(trigger);
   }
 
-  @Override
-  public void dropWMTrigger(String resourcePlanName, String triggerName, String ns)
+  @Override public void dropWMTrigger(String resourcePlanName, String triggerName, String ns)
       throws NoSuchObjectException, InvalidOperationException, MetaException {
     rawStore.dropWMTrigger(resourcePlanName, triggerName, ns);
   }
 
-  @Override
-  public List<WMTrigger> getTriggersForResourcePlan(String resourcePlanName, String ns)
+  @Override public List<WMTrigger> getTriggersForResourcePlan(String resourcePlanName, String ns)
       throws NoSuchObjectException, MetaException {
     return rawStore.getTriggersForResourcePlan(resourcePlanName, ns);
   }
 
-  @Override
-  public void createPool(WMPool pool) throws AlreadyExistsException, NoSuchObjectException,
-      InvalidOperationException, MetaException {
+  @Override public void createPool(WMPool pool)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
     rawStore.createPool(pool);
   }
 
-  @Override
-  public void alterPool(WMNullablePool pool, String poolPath) throws AlreadyExistsException,
-      NoSuchObjectException, InvalidOperationException, MetaException {
+  @Override public void alterPool(WMNullablePool pool, String poolPath)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
     rawStore.alterPool(pool, poolPath);
   }
 
-  @Override
-  public void dropWMPool(String resourcePlanName, String poolPath, String ns)
+  @Override public void dropWMPool(String resourcePlanName, String poolPath, String ns)
       throws NoSuchObjectException, InvalidOperationException, MetaException {
     rawStore.dropWMPool(resourcePlanName, poolPath, ns);
   }
 
-  @Override
-  public void createOrUpdateWMMapping(WMMapping mapping, boolean update)
-      throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException,
-      MetaException {
+  @Override public void createOrUpdateWMMapping(WMMapping mapping, boolean update)
+      throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
     rawStore.createOrUpdateWMMapping(mapping, update);
   }
 
-  @Override
-  public void dropWMMapping(WMMapping mapping)
+  @Override public void dropWMMapping(WMMapping mapping)
       throws NoSuchObjectException, InvalidOperationException, MetaException {
     rawStore.dropWMMapping(mapping);
   }
 
-  @Override
-  public void createWMTriggerToPoolMapping(String resourcePlanName, String triggerName,
-      String poolPath, String ns) throws AlreadyExistsException, NoSuchObjectException,
-      InvalidOperationException, MetaException {
+  @Override public void createWMTriggerToPoolMapping(String resourcePlanName, String triggerName, String poolPath,
+      String ns) throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
     rawStore.createWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath, ns);
   }
 
-  @Override
-  public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerName,
-      String poolPath, String ns) throws NoSuchObjectException, InvalidOperationException, MetaException {
+  @Override public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerName, String poolPath,
+      String ns) throws NoSuchObjectException, InvalidOperationException, MetaException {
     rawStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath, ns);
   }
 
@@ -2979,14 +2722,12 @@ public class CachedStore implements RawStore, Configurable {
     return sharedCache.getUpdateCount();
   }
 
-  @Override
-  public void cleanWriteNotificationEvents(int olderThan) {
+  @Override public void cleanWriteNotificationEvents(int olderThan) {
     rawStore.cleanWriteNotificationEvents(olderThan);
   }
 
-
-  @Override
-  public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+  @Override public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName)
+      throws MetaException {
     return rawStore.getAllWriteEventInfo(txnId, dbName, tableName);
   }
 
@@ -2996,8 +2737,8 @@ public class CachedStore implements RawStore, Configurable {
       LOG.debug("Trying to match: {} against blacklist pattern: {}", str, pattern);
       Matcher matcher = pattern.matcher(str);
       if (matcher.matches()) {
-        LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(),
-            matcher.start(), matcher.end());
+        LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(), matcher.start(),
+            matcher.end());
         return false;
       }
     }
@@ -3010,8 +2751,8 @@ public class CachedStore implements RawStore, Configurable {
       LOG.debug("Trying to match: {} against whitelist pattern: {}", str, pattern);
       Matcher matcher = pattern.matcher(str);
       if (matcher.matches()) {
-        LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(),
-            matcher.start(), matcher.end());
+        LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(), matcher.start(),
+            matcher.end());
         return true;
       }
     }
@@ -3052,45 +2793,39 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   static boolean isBlacklistWhitelistEmpty(Configuration conf) {
-    return MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST)
-        .equals(".*")
-        && MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST).isEmpty();
+    return
+        MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST).equals(".*")
+            && MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)
+            .isEmpty();
   }
 
-  @VisibleForTesting
-  void resetCatalogCache() {
+  @VisibleForTesting void resetCatalogCache() {
     sharedCache.resetCatalogCache();
     setCachePrewarmedState(false);
   }
 
-  @Override
-  public void addRuntimeStat(RuntimeStat stat) throws MetaException {
+  @Override public void addRuntimeStat(RuntimeStat stat) throws MetaException {
     rawStore.addRuntimeStat(stat);
   }
 
-  @Override
-  public List<RuntimeStat> getRuntimeStats(int maxEntries, int maxCreateTime) throws MetaException {
+  @Override public List<RuntimeStat> getRuntimeStats(int maxEntries, int maxCreateTime) throws MetaException {
     return rawStore.getRuntimeStats(maxEntries, maxCreateTime);
   }
 
-  @Override
-  public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
+  @Override public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
     return rawStore.deleteRuntimeStats(maxRetainSecs);
   }
 
-  @Override
-  public List<TableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
+  @Override public List<TableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
     return rawStore.getTableNamesWithStats();
   }
 
-  @Override
-  public List<TableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
+  @Override public List<TableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
     return rawStore.getAllTableNamesForStats();
   }
 
-  @Override
-  public Map<String, List<String>> getPartitionColsWithStats(String catName,
-      String dbName, String tableName) throws MetaException, NoSuchObjectException {
+  @Override public Map<String, List<String>> getPartitionColsWithStats(String catName, String dbName, String tableName)
+      throws MetaException, NoSuchObjectException {
     return rawStore.getPartitionColsWithStats(catName, dbName, tableName);
   }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index 2c7354a..45b1b0d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore.cache;
 
+import java.lang.reflect.Field;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
@@ -27,19 +28,31 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.TreeMap;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.HiveMetaException;
 import org.apache.hadoop.hive.metastore.ObjectStore;
 import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.Catalog;
@@ -52,21 +65,23 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
 import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import static org.apache.hadoop.hive.metastore.cache.CachedStore.partNameToVals;
 import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
 public class SharedCache {
   private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true);
+  private static final long MAX_DEFAULT_CACHE_SIZE = 1024 * 1024;
   private boolean isCatalogCachePrewarmed = false;
   private Map<String, Catalog> catalogCache = new TreeMap<>();
   private HashSet<String> catalogsDeletedDuringPrewarm = new HashSet<>();
@@ -79,17 +94,22 @@ public class SharedCache {
   private AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
 
   // For caching TableWrapper objects. Key is aggregate of database name and table name
-  private Map<String, TableWrapper> tableCache = new TreeMap<>();
+  private Cache<String, TableWrapper> tableCache = null;
+  private int concurrencyLevel = -1;
+  private int refreshInterval = 10000;
+
   private boolean isTableCachePrewarmed = false;
   private HashSet<String> tablesDeletedDuringPrewarm = new HashSet<>();
   private AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
   private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<>();
   private static MessageDigest md;
-  static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
   private AtomicLong cacheUpdateCount = new AtomicLong(0);
-  private static long maxCacheSizeInBytes = -1;
-  private static long currentCacheSizeInBytes = 0;
-  private static HashMap<Class<?>, ObjectEstimator> sizeEstimators = null;
+  private long maxCacheSizeInBytes = -1;
+  private HashMap<Class<?>, ObjectEstimator> sizeEstimators = null;
+  private Set<String> tableToUpdateSize = new ConcurrentHashSet<>();
+  private ScheduledExecutorService executor = null;
+  private Map<String, Integer> tableSizeMap = null;
 
   enum StatsType {
     ALL(0), ALLBUTDEFAULT(1), PARTIAL(2);
@@ -105,6 +125,10 @@ public class SharedCache {
     }
   }
 
+  private enum MemberName {
+    TABLE_COL_STATS_CACHE, PARTITION_CACHE, PARTITION_COL_STATS_CACHE, AGGR_COL_STATS_CACHE
+  }
+
   static {
     try {
       md = MessageDigest.getInstance("MD5");
@@ -113,16 +137,98 @@ public class SharedCache {
     }
   }
 
+  static class TableWrapperSizeUpdater implements Runnable {
+    private Set<String> setToUpdate;
+    private Cache<String, TableWrapper> cache;
+
+    TableWrapperSizeUpdater(Set<String> set, Cache<String, TableWrapper> cache1) {
+      setToUpdate = set;
+      cache = cache1;
+    }
+
+    @Override
+    public void run() {
+      for (String s : setToUpdate) {
+        refreshTableWrapperInCache(s);
+      }
+      setToUpdate.clear();
+    }
+
+    void refreshTableWrapperInCache(String tblKey) {
+      TableWrapper tw = cache.getIfPresent(tblKey);
+      if (tw != null) {
+        //cache will re-weigh the TableWrapper and record new weight.
+        cache.put(tblKey, tw);
+      }
+    }
+  }
+
+  //concurrency level of table cache. Set to -1 to let Guava use default.
+  public void setConcurrencyLevel(int cl){
+    this.concurrencyLevel = cl;
+  }
+  //number of miliseconds between size updates.
+  public void setRefreshInterval(int interval){
+    this.refreshInterval = interval;
+  }
+  //set the table size map to fake table size. This is for testing only.
+  public void setTableSizeMap(Map<String, Integer> map){
+    this.tableSizeMap = map;
+  }
+
+  public void initialize(Configuration conf) {
+    maxCacheSizeInBytes = MetastoreConf.getSizeVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY);
 
-  public void initialize(long maxSharedCacheSizeInBytes) {
-    maxCacheSizeInBytes = maxSharedCacheSizeInBytes;
     // Create estimators
     if ((maxCacheSizeInBytes > 0) && (sizeEstimators == null)) {
       sizeEstimators = IncrementalObjectSizeEstimator.createEstimators(SharedCache.class);
     }
+
+    if (tableCache == null) {
+      CacheBuilder<String, TableWrapper> b = CacheBuilder.newBuilder()
+          .maximumWeight(maxCacheSizeInBytes > 0 ? maxCacheSizeInBytes : MAX_DEFAULT_CACHE_SIZE)
+          .weigher(new Weigher<String, TableWrapper>() {
+            @Override
+            public int weigh(String key, TableWrapper value) {
+              return value.getSize();
+            }
+          }).removalListener(new RemovalListener<String, TableWrapper>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, TableWrapper> notification) {
+              LOG.debug("Eviction happened for table " + notification.getKey());
+              LOG.debug("current table cache contains " + tableCache.size() + "entries");
+              TableWrapper tblWrapper = notification.getValue();
+              RemovalCause cause = notification.getCause();
+              if (cause.equals(RemovalCause.COLLECTED) || cause.equals(RemovalCause.EXPIRED)) {
+                byte[] sdHash = tblWrapper.getSdHash();
+                if (sdHash != null) {
+                  decrSd(sdHash);
+                }
+              }
+            }
+          });
+
+      if (concurrencyLevel > 0) {
+        b.concurrencyLevel(concurrencyLevel);
+      }
+      tableCache = b.recordStats().build();
+    }
+
+    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = Executors.defaultThreadFactory().newThread(r);
+        t.setName("SharedCache table size updater: Thread-" + t.getId());
+        t.setDaemon(true);
+        return t;
+      }
+    });
+    executor.scheduleAtFixedRate(new TableWrapperSizeUpdater(tableToUpdateSize, tableCache), 0,
+        refreshInterval, TimeUnit.MILLISECONDS);
+
   }
 
-  private static ObjectEstimator getMemorySizeEstimator(Class<?> clazz) {
+  private ObjectEstimator getMemorySizeEstimator(Class<?> clazz) {
     ObjectEstimator estimator = sizeEstimators.get(clazz);
     if (estimator == null) {
       IncrementalObjectSizeEstimator.createEstimators(clazz, sizeEstimators);
@@ -131,21 +237,43 @@ public class SharedCache {
     return estimator;
   }
 
-  static class TableWrapper {
-    Table t;
-    String location;
-    Map<String, String> parameters;
-    byte[] sdHash;
-    ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true);
+  public int getObjectSize(Class<?> clazz, Object obj) {
+    if (sizeEstimators == null) {
+      return 0;
+    }
+
+    try {
+      ObjectEstimator oe = getMemorySizeEstimator(clazz);
+      return oe.estimate(obj, sizeEstimators);
+    } catch (Exception e) {
+      LOG.error("Error while getting object size.", e);
+    }
+    return 0;
+  }
+
+  enum SizeMode {
+    Delta, Snapshot
+  }
+
+  class TableWrapper {
+    private Table t;
+    private String location;
+    private Map<String, String> parameters;
+    private byte[] sdHash;
+    private int otherSize;
+    private int tableColStatsCacheSize;
+    private int partitionCacheSize;
+    private int partitionColStatsCacheSize;
+    private int aggrColStatsCacheSize;
+
+    private ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true);
     // For caching column stats for an unpartitioned table
     // Key is column name and the value is the col stat object
-    private Map<String, ColumnStatisticsObj> tableColStatsCache =
-        new ConcurrentHashMap<String, ColumnStatisticsObj>();
+    private Map<String, ColumnStatisticsObj> tableColStatsCache = new ConcurrentHashMap<String, ColumnStatisticsObj>();
     private AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
     // For caching partition objects
     // Ket is partition values and the value is a wrapper around the partition object
-    private Map<String, PartitionWrapper> partitionCache =
-        new ConcurrentHashMap<String, PartitionWrapper>();
+    private Map<String, PartitionWrapper> partitionCache = new ConcurrentHashMap<String, PartitionWrapper>();
     private AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
     // For caching column stats for a partitioned table
     // Key is aggregate of partition values, column name and the value is the col stat object
@@ -164,6 +292,52 @@ public class SharedCache {
       this.sdHash = sdHash;
       this.location = location;
       this.parameters = parameters;
+      this.tableColStatsCacheSize = 0;
+      this.partitionCacheSize = 0;
+      this.partitionColStatsCacheSize = 0;
+      this.aggrColStatsCacheSize = 0;
+      this.otherSize = getTableWrapperSizeWithoutMaps();
+    }
+
+    private int getTableWrapperSizeWithoutMaps() {
+      Class<?> clazz = TableWrapper.class;
+      Field[] fields = clazz.getDeclaredFields();
+      int size = 0;
+      for (Field field : fields) {
+        if (field.getType().equals(ConcurrentHashMap.class)) {
+          continue;
+        }
+        if (field.getType().equals(SharedCache.class)) {
+          continue;
+        }
+        try {
+          field.setAccessible(true);
+          Object val = field.get(this);
+          ObjectEstimator oe = getMemorySizeEstimator(field.getType());
+          if (oe != null) {
+            size += oe.estimate(val, sizeEstimators);
+          }
+        } catch (Exception ex) {
+          LOG.error("Not able to estimate size.", ex);
+        }
+      }
+
+      return size;
+    }
+
+    public int getSize() {
+      //facilitate testing only. In production we won't use tableSizeMap at all.
+      if (tableSizeMap != null) {
+        String tblKey = CacheUtils.buildTableKey(this.t.getCatName(), this.t.getDbName(), this.t.getTableName());
+        if (tableSizeMap.containsKey(tblKey)) {
+          return tableSizeMap.get(tblKey);
+        }
+      }
+      if (sizeEstimators == null) {
+        return 0;
+      }
+      return otherSize + tableColStatsCacheSize + partitionCacheSize + partitionColStatsCacheSize
+          + aggrColStatsCacheSize;
     }
 
     public Table getTable() {
@@ -202,15 +376,69 @@ public class SharedCache {
       return catName.equals(t.getCatName()) && dbName.equals(t.getDbName());
     }
 
+    private void updateMemberSize(MemberName mn, Integer size, SizeMode mode) {
+      if (sizeEstimators == null) {
+        return;
+      }
+
+      switch (mn) {
+      case TABLE_COL_STATS_CACHE:
+        if (mode == SizeMode.Delta) {
+          tableColStatsCacheSize += size;
+        } else {
+          tableColStatsCacheSize = size;
+        }
+        break;
+      case PARTITION_CACHE:
+        if (mode == SizeMode.Delta) {
+          partitionCacheSize += size;
+        } else {
+          partitionCacheSize = size;
+        }
+        break;
+      case PARTITION_COL_STATS_CACHE:
+        if (mode == SizeMode.Delta) {
+          partitionColStatsCacheSize += size;
+        } else {
+          partitionColStatsCacheSize = size;
+        }
+        break;
+      case AGGR_COL_STATS_CACHE:
+        if (mode == SizeMode.Delta) {
+          aggrColStatsCacheSize += size;
+        } else {
+          aggrColStatsCacheSize = size;
+        }
+        break;
+      default:
+        break;
+      }
+
+      String tblKey = getTblKey();
+      tableToUpdateSize.add(tblKey);
+    }
+
+    String getTblKey() {
+      Table tbl = this.t;
+      String catName = tbl.getCatName();
+      String dbName = tbl.getDbName();
+      String tblName = tbl.getTableName();
+      return CacheUtils.buildTableKey(catName, dbName, tblName);
+    }
+
     void cachePartition(Partition part, SharedCache sharedCache) {
       try {
         tableLock.writeLock().lock();
         PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
         partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+        int size = getObjectSize(PartitionWrapper.class, wrapper);
+        updateMemberSize(MemberName.PARTITION_CACHE, size, SizeMode.Delta);
         isPartitionCacheDirty.set(true);
+
         // Invalidate cached aggregate stats
         if (!aggrColStatsCache.isEmpty()) {
           aggrColStatsCache.clear();
+          updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         }
       } finally {
         tableLock.writeLock().unlock();
@@ -220,31 +448,21 @@ public class SharedCache {
     boolean cachePartitions(Iterable<Partition> parts, SharedCache sharedCache, boolean fromPrewarm) {
       try {
         tableLock.writeLock().lock();
+        int size = 0;
         for (Partition part : parts) {
-          PartitionWrapper ptnWrapper = makePartitionWrapper(part, sharedCache);
-          if (maxCacheSizeInBytes > 0) {
-            ObjectEstimator ptnWrapperSizeEstimator =
-                getMemorySizeEstimator(PartitionWrapper.class);
-            long estimatedMemUsage = ptnWrapperSizeEstimator.estimate(ptnWrapper, sizeEstimators);
-            LOG.trace("Memory needed to cache Partition: {} is {} bytes", part, estimatedMemUsage);
-            if (isCacheMemoryFull(estimatedMemUsage)) {
-              LOG.debug(
-                  "Cannot cache Partition: {}. Memory needed is {} bytes, whereas the memory remaining is: {} bytes.",
-                  part, estimatedMemUsage, (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes));
-              return false;
-            } else {
-              currentCacheSizeInBytes += estimatedMemUsage;
-            }
-            LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes);
-          }
-          partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), ptnWrapper);
+          PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+          partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+          size += getObjectSize(PartitionWrapper.class, wrapper);
+
           if (!fromPrewarm) {
             isPartitionCacheDirty.set(true);
           }
         }
+        updateMemberSize(MemberName.PARTITION_CACHE, size, SizeMode.Delta);
         // Invalidate cached aggregate stats
         if (!aggrColStatsCache.isEmpty()) {
           aggrColStatsCache.clear();
+          updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         }
         return true;
       } finally {
@@ -300,30 +518,36 @@ public class SharedCache {
       Partition part = null;
       try {
         tableLock.writeLock().lock();
-        PartitionWrapper wrapper =
-            partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
+        PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
         if (wrapper == null) {
           return null;
         }
         isPartitionCacheDirty.set(true);
+
+        int size = getObjectSize(PartitionWrapper.class, wrapper);
+        updateMemberSize(MemberName.PARTITION_CACHE, -1 * size, SizeMode.Delta);
+
         part = CacheUtils.assemble(wrapper, sharedCache);
         if (wrapper.getSdHash() != null) {
           sharedCache.decrSd(wrapper.getSdHash());
         }
         // Remove col stats
         String partialKey = CacheUtils.buildPartitionCacheKey(partVal);
-        Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-            partitionColStatsCache.entrySet().iterator();
+        Iterator<Entry<String, ColumnStatisticsObj>> iterator = partitionColStatsCache.entrySet().iterator();
         while (iterator.hasNext()) {
           Entry<String, ColumnStatisticsObj> entry = iterator.next();
           String key = entry.getKey();
           if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+            int statsSize = getObjectSize(ColumnStatisticsObj.class, entry.getValue());
+            updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, -1 * statsSize, SizeMode.Delta);
             iterator.remove();
           }
         }
+
         // Invalidate cached aggregate stats
         if (!aggrColStatsCache.isEmpty()) {
           aggrColStatsCache.clear();
+          updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         }
       } finally {
         tableLock.writeLock().unlock();
@@ -353,7 +577,7 @@ public class SharedCache {
     }
 
     public void alterPartitionAndStats(List<String> partVals, SharedCache sharedCache, long writeId,
-                                       Map<String,String> parameters, List<ColumnStatisticsObj> colStatsObjs) {
+        Map<String, String> parameters, List<ColumnStatisticsObj> colStatsObjs) {
       try {
         tableLock.writeLock().lock();
         PartitionWrapper partitionWrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
@@ -372,8 +596,7 @@ public class SharedCache {
       }
     }
 
-    public void alterPartitions(List<List<String>> partValsList, List<Partition> newParts,
-        SharedCache sharedCache) {
+    public void alterPartitions(List<List<String>> partValsList, List<Partition> newParts, SharedCache sharedCache) {
       try {
         tableLock.writeLock().lock();
         for (int i = 0; i < partValsList.size(); i++) {
@@ -390,6 +613,7 @@ public class SharedCache {
       Map<String, PartitionWrapper> newPartitionCache = new HashMap<String, PartitionWrapper>();
       try {
         tableLock.writeLock().lock();
+        int size = 0;
         for (Partition part : partitions) {
           if (isPartitionCacheDirty.compareAndSet(true, false)) {
             LOG.debug("Skipping partition cache update for table: " + getTable().getTableName()
@@ -405,8 +629,10 @@ public class SharedCache {
           }
           wrapper = makePartitionWrapper(part, sharedCache);
           newPartitionCache.put(key, wrapper);
+          size += getObjectSize(PartitionWrapper.class, wrapper);
         }
         partitionCache = newPartitionCache;
+        updateMemberSize(MemberName.PARTITION_CACHE, size, SizeMode.Snapshot);
       } finally {
         tableLock.writeLock().unlock();
       }
@@ -415,38 +641,23 @@ public class SharedCache {
     public boolean updateTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
       try {
         tableLock.writeLock().lock();
+        int statsSize = 0;
         for (ColumnStatisticsObj colStatObj : colStatsForTable) {
           // Get old stats object if present
           String key = colStatObj.getColName();
           ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
           if (oldStatsObj != null) {
             // Update existing stat object's field
+            statsSize -= getObjectSize(ColumnStatisticsObj.class, oldStatsObj);
             StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
           } else {
             // No stats exist for this key; add a new object to the cache
             // TODO: get rid of deepCopy after making sure callers don't use references
-            if (maxCacheSizeInBytes > 0) {
-              ObjectEstimator tblColStatsSizeEstimator =
-                  getMemorySizeEstimator(ColumnStatisticsObj.class);
-              long estimatedMemUsage =
-                  tblColStatsSizeEstimator.estimate(colStatObj, sizeEstimators);
-              LOG.trace("Memory needed to cache Table Column Statistics Object: {} is {} bytes",
-                  colStatObj, estimatedMemUsage);
-              if (isCacheMemoryFull(estimatedMemUsage)) {
-                LOG.debug(
-                    "Cannot cache Table Column Statistics Object: {}. Memory needed is {} bytes, "
-                        + "whereas the memory remaining is: {} bytes.",
-                    colStatObj, estimatedMemUsage,
-                    (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes));
-                return false;
-              } else {
-                currentCacheSizeInBytes += estimatedMemUsage;
-              }
-              LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes);
-            }
             tableColStatsCache.put(key, colStatObj.deepCopy());
           }
+          statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj);
         }
+        updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, statsSize, SizeMode.Delta);
         isTableColStatsCacheDirty.set(true);
         return true;
       } finally {
@@ -455,29 +666,30 @@ public class SharedCache {
     }
 
     public void refreshTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
-      Map<String, ColumnStatisticsObj> newTableColStatsCache =
-          new HashMap<String, ColumnStatisticsObj>();
+      Map<String, ColumnStatisticsObj> newTableColStatsCache = new HashMap<String, ColumnStatisticsObj>();
       try {
         tableLock.writeLock().lock();
+        int statsSize = 0;
         for (ColumnStatisticsObj colStatObj : colStatsForTable) {
           if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
-            LOG.debug("Skipping table col stats cache update for table: "
-                + getTable().getTableName() + "; the table col stats list we have is dirty.");
+            LOG.debug("Skipping table col stats cache update for table: " + getTable().getTableName()
+                + "; the table col stats list we have is dirty.");
             return;
           }
           String key = colStatObj.getColName();
           // TODO: get rid of deepCopy after making sure callers don't use references
           newTableColStatsCache.put(key, colStatObj.deepCopy());
+          statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj);
         }
         tableColStatsCache = newTableColStatsCache;
+        updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, statsSize, SizeMode.Snapshot);
       } finally {
         tableLock.writeLock().unlock();
       }
     }
 
     public ColumnStatistics getCachedTableColStats(ColumnStatisticsDesc csd, List<String> colNames,
-                                                            String validWriteIds, boolean areTxnStatsSupported)
-            throws MetaException {
+        String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
       List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
       try {
         tableLock.readLock().lock();
@@ -488,7 +700,7 @@ public class SharedCache {
           }
         }
         return CachedStore.adjustColStatForGet(getTable().getParameters(), new ColumnStatistics(csd, colStatObjs),
-                getTable().getWriteId(), validWriteIds, areTxnStatsSupported);
+            getTable().getWriteId(), validWriteIds, areTxnStatsSupported);
       } finally {
         tableLock.readLock().unlock();
       }
@@ -499,8 +711,10 @@ public class SharedCache {
         tableLock.writeLock().lock();
         if (colName == null) {
           tableColStatsCache.clear();
+          updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         } else {
           tableColStatsCache.remove(colName);
+          updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         }
         isTableColStatsCacheDirty.set(true);
       } finally {
@@ -512,6 +726,7 @@ public class SharedCache {
       try {
         tableLock.writeLock().lock();
         tableColStatsCache.clear();
+        updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         isTableColStatsCacheDirty.set(true);
       } finally {
         tableLock.writeLock().unlock();
@@ -522,7 +737,7 @@ public class SharedCache {
       try {
         tableLock.readLock().lock();
         ColumnStatisticsObj statisticsObj =
-                partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+            partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
         if (statisticsObj == null || writeIdList == null) {
           return new ColumStatsWithWriteId(-1, statisticsObj);
         }
@@ -546,23 +761,22 @@ public class SharedCache {
       }
     }
 
-    public List<ColumnStatistics> getPartColStatsList(List<String> partNames, List<String> colNames,
-                                              String writeIdList, boolean txnStatSupported) throws MetaException {
+    public List<ColumnStatistics> getPartColStatsList(List<String> partNames, List<String> colNames, String writeIdList,
+        boolean txnStatSupported) throws MetaException {
       List<ColumnStatistics> colStatObjs = new ArrayList<>();
       try {
         tableLock.readLock().lock();
         Table tbl = getTable();
         for (String partName : partNames) {
-          ColumnStatisticsDesc csd = new ColumnStatisticsDesc(false,
-                  tbl.getDbName(), tbl.getTableName());
+          ColumnStatisticsDesc csd = new ColumnStatisticsDesc(false, tbl.getDbName(), tbl.getTableName());
           csd.setCatName(tbl.getCatName());
           csd.setPartName(partName);
           csd.setLastAnalyzed(0); //TODO : Need to get last analysed. This is not being used by anybody now.
           List<ColumnStatisticsObj> statObject = new ArrayList<>();
-          List<String> partVal =  Warehouse.getPartValuesFromPartName(partName);
+          List<String> partVal = Warehouse.getPartValuesFromPartName(partName);
           for (String colName : colNames) {
             ColumnStatisticsObj statisticsObj =
-                    partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+                partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
             if (statisticsObj != null) {
               statObject.add(statisticsObj);
             } else {
@@ -576,17 +790,17 @@ public class SharedCache {
             if (!txnStatSupported) {
               columnStatistics.setIsStatsCompliant(false);
             } else {
-              PartitionWrapper wrapper =
-                      partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
+              PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
               if (wrapper == null) {
                 columnStatistics.setIsStatsCompliant(false);
               } else {
                 Partition partition = wrapper.getPartition();
-                if (!ObjectStore.isCurrentStatsValidForTheQuery(partition.getParameters(),
-                        partition.getWriteId(), writeIdList, false)) {
+                if (!ObjectStore
+                    .isCurrentStatsValidForTheQuery(partition.getParameters(), partition.getWriteId(), writeIdList,
+                        false)) {
                   LOG.debug("The current cached store transactional partition column statistics for {}.{}.{} "
-                                  + "(write ID {}) are not valid for current query ({})", tbl.getDbName(),
-                          tbl.getTableName(), partName, partition.getWriteId(), writeIdList);
+                          + "(write ID {}) are not valid for current query ({})", tbl.getDbName(), tbl.getTableName(),
+                      partName, partition.getWriteId(), writeIdList);
                   columnStatistics.setIsStatsCompliant(false);
                 }
               }
@@ -600,46 +814,31 @@ public class SharedCache {
       return colStatObjs;
     }
 
-    public boolean updatePartitionColStats(List<String> partVal,
-        List<ColumnStatisticsObj> colStatsObjs) {
+    public boolean updatePartitionColStats(List<String> partVal, List<ColumnStatisticsObj> colStatsObjs) {
       try {
         tableLock.writeLock().lock();
+        int statsSize = 0;
         for (ColumnStatisticsObj colStatObj : colStatsObjs) {
           // Get old stats object if present
           String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
           ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
           if (oldStatsObj != null) {
             // Update existing stat object's field
+            statsSize -= getObjectSize(ColumnStatisticsObj.class, oldStatsObj);
             StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
           } else {
             // No stats exist for this key; add a new object to the cache
             // TODO: get rid of deepCopy after making sure callers don't use references
-            if (maxCacheSizeInBytes > 0) {
-              ObjectEstimator ptnColStatsSizeEstimator =
-                  getMemorySizeEstimator(ColumnStatisticsObj.class);
-              long estimatedMemUsage =
-                  ptnColStatsSizeEstimator.estimate(colStatObj, sizeEstimators);
-              LOG.trace("Memory needed to cache Partition Column Statistics Object: {} is {} bytes",
-                  colStatObj, estimatedMemUsage);
-              if (isCacheMemoryFull(estimatedMemUsage)) {
-                LOG.debug(
-                    "Cannot cache Partition Column Statistics Object: {}. Memory needed is {} bytes, "
-                    + "whereas the memory remaining is: {} bytes.",
-                    colStatObj, estimatedMemUsage,
-                    (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes));
-                return false;
-              } else {
-                currentCacheSizeInBytes += estimatedMemUsage;
-              }
-              LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes);
-            }
             partitionColStatsCache.put(key, colStatObj.deepCopy());
           }
+          statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj);
         }
+        updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, statsSize, SizeMode.Delta);
         isPartitionColStatsCacheDirty.set(true);
         // Invalidate cached aggregate stats
         if (!aggrColStatsCache.isEmpty()) {
           aggrColStatsCache.clear();
+          updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         }
       } finally {
         tableLock.writeLock().unlock();
@@ -650,11 +849,17 @@ public class SharedCache {
     public void removePartitionColStats(List<String> partVals, String colName) {
       try {
         tableLock.writeLock().lock();
-        partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName));
+        ColumnStatisticsObj statsObj =
+            partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName));
+        if (statsObj != null) {
+          int statsSize = getObjectSize(ColumnStatisticsObj.class, statsObj);
+          updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, -1 * statsSize, SizeMode.Delta);
+        }
         isPartitionColStatsCacheDirty.set(true);
         // Invalidate cached aggregate stats
         if (!aggrColStatsCache.isEmpty()) {
           aggrColStatsCache.clear();
+          updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         }
       } finally {
         tableLock.writeLock().unlock();
@@ -665,10 +870,12 @@ public class SharedCache {
       try {
         tableLock.writeLock().lock();
         partitionColStatsCache.clear();
+        updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         isPartitionColStatsCacheDirty.set(true);
         // Invalidate cached aggregate stats
         if (!aggrColStatsCache.isEmpty()) {
           aggrColStatsCache.clear();
+          updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
         }
       } finally {
         tableLock.writeLock().unlock();
@@ -676,15 +883,15 @@ public class SharedCache {
     }
 
     public void refreshPartitionColStats(List<ColumnStatistics> partitionColStats) {
-      Map<String, ColumnStatisticsObj> newPartitionColStatsCache =
-          new HashMap<String, ColumnStatisticsObj>();
+      Map<String, ColumnStatisticsObj> newPartitionColStatsCache = new HashMap<String, ColumnStatisticsObj>();
       try {
         tableLock.writeLock().lock();
         String tableName = StringUtils.normalizeIdentifier(getTable().getTableName());
+        int statsSize = 0;
         for (ColumnStatistics cs : partitionColStats) {
           if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
-            LOG.debug("Skipping partition column stats cache update for table: "
-                + getTable().getTableName() + "; the partition column stats list we have is dirty");
+            LOG.debug("Skipping partition column stats cache update for table: " + getTable().getTableName()
+                + "; the partition column stats list we have is dirty");
             return;
           }
           List<String> partVal;
@@ -693,26 +900,26 @@ public class SharedCache {
             List<ColumnStatisticsObj> colStatsObjs = cs.getStatsObj();
             for (ColumnStatisticsObj colStatObj : colStatsObjs) {
               if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
-                LOG.debug("Skipping partition column stats cache update for table: "
-                    + getTable().getTableName() + "; the partition column list we have is dirty");
+                LOG.debug("Skipping partition column stats cache update for table: " + getTable().getTableName()
+                    + "; the partition column list we have is dirty");
                 return;
               }
-              String key =
-                  CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+              String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
               newPartitionColStatsCache.put(key, colStatObj.deepCopy());
+              statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj);
             }
           } catch (MetaException e) {
             LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
           }
         }
         partitionColStatsCache = newPartitionColStatsCache;
+        updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, statsSize, SizeMode.Snapshot);
       } finally {
         tableLock.writeLock().unlock();
       }
     }
 
-    public List<ColumnStatisticsObj> getAggrPartitionColStats(List<String> colNames,
-        StatsType statsType) {
+    public List<ColumnStatisticsObj> getAggrPartitionColStats(List<String> colNames, StatsType statsType) {
       List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
       try {
         tableLock.readLock().lock();
@@ -739,12 +946,14 @@ public class SharedCache {
         AggrStats aggrStatsAllButDefaultPartition) {
       try {
         tableLock.writeLock().lock();
+        int statsSize = 0;
         if (aggrStatsAllPartitions != null) {
           for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
             if (statObj != null) {
               List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
               aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
               aggrColStatsCache.put(statObj.getColName(), aggrStats);
+              statsSize += getObjectSize(ColumnStatisticsObj.class, statObj);
             }
           }
         }
@@ -756,9 +965,11 @@ public class SharedCache {
                 aggrStats = new ArrayList<ColumnStatisticsObj>();
               }
               aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+              statsSize += getObjectSize(ColumnStatisticsObj.class, statObj);
             }
           }
         }
+        updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, statsSize, SizeMode.Snapshot);
         isAggrPartitionColStatsCacheDirty.set(true);
       } finally {
         tableLock.writeLock().unlock();
@@ -767,10 +978,10 @@ public class SharedCache {
 
     public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
         AggrStats aggrStatsAllButDefaultPartition, SharedCache sharedCache, Map<List<String>, Long> partNameToWriteId) {
-      Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
-          new HashMap<String, List<ColumnStatisticsObj>>();
+      Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache = new HashMap<String, List<ColumnStatisticsObj>>();
       try {
         tableLock.writeLock().lock();
+        int statsSize = 0;
         if (partNameToWriteId != null) {
           for (Entry<List<String>, Long> partValuesWriteIdSet : partNameToWriteId.entrySet()) {
             List<String> partValues = partValuesWriteIdSet.getKey();
@@ -784,8 +995,8 @@ public class SharedCache {
             // skip updating the aggregate stats in the cache.
             long writeId = partition.getWriteId();
             if (writeId != partValuesWriteIdSet.getValue()) {
-              LOG.info("Could not refresh the aggregate stat as partition " + partValues + " has write id " +
-                      partValuesWriteIdSet.getValue() + " instead of " + writeId);
+              LOG.info("Could not refresh the aggregate stat as partition " + partValues + " has write id "
+                  + partValuesWriteIdSet.getValue() + " instead of " + writeId);
               return;
             }
           }
@@ -793,22 +1004,23 @@ public class SharedCache {
         if (aggrStatsAllPartitions != null) {
           for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
             if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
-              LOG.debug("Skipping aggregate stats cache update for table: "
-                  + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+              LOG.debug("Skipping aggregate stats cache update for table: " + getTable().getTableName()
+                  + "; the aggregate stats list we have is dirty");
               return;
             }
             if (statObj != null) {
               List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
               aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
               newAggrColStatsCache.put(statObj.getColName(), aggrStats);
+              statsSize += getObjectSize(ColumnStatisticsObj.class, statObj);
             }
           }
         }
         if (aggrStatsAllButDefaultPartition != null) {
           for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
             if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
-              LOG.debug("Skipping aggregate stats cache update for table: "
-                  + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+              LOG.debug("Skipping aggregate stats cache update for table: " + getTable().getTableName()
+                  + "; the aggregate stats list we have is dirty");
               return;
             }
             if (statObj != null) {
@@ -817,10 +1029,12 @@ public class SharedCache {
                 aggrStats = new ArrayList<ColumnStatisticsObj>();
               }
               aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+              statsSize += getObjectSize(ColumnStatisticsObj.class, statObj);
             }
           }
         }
         aggrColStatsCache = newAggrColStatsCache;
+        updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, statsSize, SizeMode.Snapshot);
       } finally {
         tableLock.writeLock().unlock();
       }
@@ -871,10 +1085,10 @@ public class SharedCache {
   }
 
   static class PartitionWrapper {
-    Partition p;
-    String location;
-    Map<String, String> parameters;
-    byte[] sdHash;
+    private Partition p;
+    private String location;
+    private Map<String, String> parameters;
+    private byte[] sdHash;
 
     PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
       this.p = p;
@@ -901,8 +1115,8 @@ public class SharedCache {
   }
 
   static class StorageDescriptorWrapper {
-    StorageDescriptor sd;
-    int refCount = 0;
+    private StorageDescriptor sd;
+    private int refCount = 0;
 
     StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
       this.sd = sd;
@@ -921,6 +1135,7 @@ public class SharedCache {
   public static class ColumStatsWithWriteId {
     private long writeId;
     private ColumnStatisticsObj columnStatisticsObj;
+
     public ColumStatsWithWriteId(long writeId, ColumnStatisticsObj columnStatisticsObj) {
       this.writeId = writeId;
       this.columnStatisticsObj = columnStatisticsObj;
@@ -1049,8 +1264,7 @@ public class SharedCache {
         // 1. Don't add databases that were deleted while we were preparing list for prewarm
         // 2. Skip overwriting exisiting db object
         // (which is present because it was added after prewarm started)
-        String key = CacheUtils.buildDbKey(dbCopy.getCatalogName().toLowerCase(),
-            dbCopy.getName().toLowerCase());
+        String key = CacheUtils.buildDbKey(dbCopy.getCatalogName().toLowerCase(), dbCopy.getName().toLowerCase());
         if (databasesDeletedDuringPrewarm.contains(key)) {
           continue;
         }
@@ -1104,8 +1318,9 @@ public class SharedCache {
       cacheLock.readLock().lock();
       for (String pair : databaseCache.keySet()) {
         String[] n = CacheUtils.splitDbName(pair);
-        if (catName.equals(n[0]))
+        if (catName.equals(n[0])) {
           results.add(n[1]);
+        }
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1173,9 +1388,9 @@ public class SharedCache {
     }
   }
 
-  public boolean populateTableInCache(Table table, ColumnStatistics tableColStats,
-      List<Partition> partitions, List<ColumnStatistics> partitionColStats,
-      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+  public boolean populateTableInCache(Table table, ColumnStatistics tableColStats, List<Partition> partitions,
+      List<ColumnStatistics> partitionColStats, AggrStats aggrStatsAllPartitions,
+      AggrStats aggrStatsAllButDefaultPartition) {
     String catName = StringUtils.normalizeIdentifier(table.getCatName());
     String dbName = StringUtils.normalizeIdentifier(table.getDbName());
     String tableName = StringUtils.normalizeIdentifier(table.getTableName());
@@ -1185,23 +1400,6 @@ public class SharedCache {
       return false;
     }
     TableWrapper tblWrapper = createTableWrapper(catName, dbName, tableName, table);
-    if (maxCacheSizeInBytes > 0) {
-      ObjectEstimator tblWrapperSizeEstimator = getMemorySizeEstimator(TableWrapper.class);
-      long estimatedMemUsage = tblWrapperSizeEstimator.estimate(tblWrapper, sizeEstimators);
-      LOG.debug("Memory needed to cache Database: {}'s Table: {}, is {} bytes", dbName, tableName,
-          estimatedMemUsage);
-      if (isCacheMemoryFull(estimatedMemUsage)) {
-        LOG.debug(
-            "Cannot cache Database: {}'s Table: {}. Memory needed is {} bytes, "
-                + "whereas the memory we have remaining is: {} bytes.",
-            dbName, tableName, estimatedMemUsage,
-            (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes));
-        return false;
-      } else {
-        currentCacheSizeInBytes += estimatedMemUsage;
-      }
-      LOG.debug("Current cache size: {} bytes", currentCacheSizeInBytes);
-    }
     if (!table.isSetPartitionKeys() && (tableColStats != null)) {
       if (table.getPartitionKeys().isEmpty() && (tableColStats != null)) {
         return false;
@@ -1227,8 +1425,7 @@ public class SharedCache {
           }
         }
       }
-      tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
-          aggrStatsAllButDefaultPartition);
+      tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
     }
     tblWrapper.isPartitionCacheDirty.set(false);
     tblWrapper.isTableColStatsCacheDirty.set(false);
@@ -1238,17 +1435,13 @@ public class SharedCache {
       cacheLock.writeLock().lock();
       // 2. Skip overwriting exisiting table object
       // (which is present because it was added after prewarm started)
-      tableCache.putIfAbsent(CacheUtils.buildTableKey(catName, dbName, tableName), tblWrapper);
+      tableCache.put(CacheUtils.buildTableKey(catName, dbName, tableName), tblWrapper);
       return true;
     } finally {
       cacheLock.writeLock().unlock();
     }
   }
 
-  private static boolean isCacheMemoryFull(long estimatedMemUsage) {
-    return (0.8*maxCacheSizeInBytes) < (currentCacheSizeInBytes + estimatedMemUsage);
-  }
-
   public void completeTableCachePrewarm() {
     try {
       cacheLock.writeLock().lock();
@@ -1263,8 +1456,7 @@ public class SharedCache {
     Table t = null;
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper =
-          tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName));
       if (tblWrapper != null) {
         t = CacheUtils.assemble(tblWrapper, this);
       }
@@ -1286,8 +1478,7 @@ public class SharedCache {
     }
   }
 
-  private TableWrapper createTableWrapper(String catName, String dbName, String tblName,
-      Table tbl) {
+  private TableWrapper createTableWrapper(String catName, String dbName, String tblName, Table tbl) {
     TableWrapper wrapper;
     Table tblCopy = tbl.deepCopy();
     tblCopy.setCatName(normalizeIdentifier(catName));
@@ -1318,19 +1509,19 @@ public class SharedCache {
       if (!isTableCachePrewarmed) {
         tablesDeletedDuringPrewarm.add(CacheUtils.buildTableKey(catName, dbName, tblName));
       }
-      TableWrapper tblWrapper =
-          tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName));
+      String tblKey = CacheUtils.buildTableKey(catName, dbName, tblName);
+      TableWrapper tblWrapper = tableCache.getIfPresent(tblKey);
       if (tblWrapper == null) {
         //in case of retry, ignore second try.
         return;
       }
-      if (tblWrapper != null) {
-        byte[] sdHash = tblWrapper.getSdHash();
-        if (sdHash != null) {
-          decrSd(sdHash);
-        }
-        isTableCacheDirty.set(true);
+
+      byte[] sdHash = tblWrapper.getSdHash();
+      if (sdHash != null) {
+        decrSd(sdHash);
       }
+      tableCache.invalidate(tblKey);
+      isTableCacheDirty.set(true);
     } finally {
       cacheLock.writeLock().unlock();
     }
@@ -1339,8 +1530,7 @@ public class SharedCache {
   public void alterTableInCache(String catName, String dbName, String tblName, Table newTable) {
     try {
       cacheLock.writeLock().lock();
-      TableWrapper tblWrapper =
-          tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.updateTableObj(newTable, this);
         String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
@@ -1354,11 +1544,10 @@ public class SharedCache {
   }
 
   public void alterTableAndStatsInCache(String catName, String dbName, String tblName, long writeId,
-                                        List<ColumnStatisticsObj> colStatsObjs, Map<String,String> newParams) {
+      List<ColumnStatisticsObj> colStatsObjs, Map<String, String> newParams) {
     try {
       cacheLock.writeLock().lock();
-      TableWrapper tblWrapper =
-              tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper == null) {
         LOG.info("Table " + tblName + " is missing from cache. Cannot update table stats in cache");
         return;
@@ -1369,8 +1558,8 @@ public class SharedCache {
       //tblWrapper.updateTableObj(newTable, this);
       String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
       String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName());
-      tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName), tblWrapper);
       tblWrapper.updateTableColStats(colStatsObjs);
+      tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName), tblWrapper);
       isTableCacheDirty.set(true);
     } finally {
       cacheLock.writeLock().unlock();
@@ -1381,7 +1570,7 @@ public class SharedCache {
     List<Table> tables = new ArrayList<>();
     try {
       cacheLock.readLock().lock();
-      for (TableWrapper wrapper : tableCache.values()) {
+      for (TableWrapper wrapper : tableCache.asMap().values()) {
         if (wrapper.sameDatabase(catName, dbName)) {
           tables.add(CacheUtils.assemble(wrapper, this));
         }
@@ -1396,7 +1585,7 @@ public class SharedCache {
     List<String> tableNames = new ArrayList<>();
     try {
       cacheLock.readLock().lock();
-      for (TableWrapper wrapper : tableCache.values()) {
+      for (TableWrapper wrapper : tableCache.asMap().values()) {
         if (wrapper.sameDatabase(catName, dbName)) {
           tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
         }
@@ -1407,16 +1596,14 @@ public class SharedCache {
     return tableNames;
   }
 
-  public List<String> listCachedTableNames(String catName, String dbName, String pattern,
-      int maxTables) {
+  public List<String> listCachedTableNames(String catName, String dbName, String pattern, int maxTables) {
     List<String> tableNames = new ArrayList<>();
     try {
       cacheLock.readLock().lock();
       int count = 0;
-      for (TableWrapper wrapper : tableCache.values()) {
-        if (wrapper.sameDatabase(catName, dbName)
-            && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
-            && (maxTables == -1 || count < maxTables)) {
+      for (TableWrapper wrapper : tableCache.asMap().values()) {
+        if (wrapper.sameDatabase(catName, dbName) && CacheUtils.matches(wrapper.getTable().getTableName(), pattern) && (
+            maxTables == -1 || count < maxTables)) {
           tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
           count++;
         }
@@ -1427,17 +1614,15 @@ public class SharedCache {
     return tableNames;
   }
 
-  public List<String> listCachedTableNames(String catName, String dbName, String pattern,
-      TableType tableType, int limit) {
+  public List<String> listCachedTableNames(String catName, String dbName, String pattern, TableType tableType,
+      int limit) {
     List<String> tableNames = new ArrayList<>();
     try {
       cacheLock.readLock().lock();
       int count = 0;
-      for (TableWrapper wrapper : tableCache.values()) {
-        if (wrapper.sameDatabase(catName, dbName)
-            && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
-            && wrapper.getTable().getTableType().equals(tableType.toString())
-            && (limit == -1 || count < limit)) {
+      for (TableWrapper wrapper : tableCache.asMap().values()) {
+        if (wrapper.sameDatabase(catName, dbName) && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+            && wrapper.getTable().getTableType().equals(tableType.toString()) && (limit == -1 || count < limit)) {
           tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
           count++;
         }
@@ -1456,7 +1641,7 @@ public class SharedCache {
     Map<String, TableWrapper> newCacheForDB = new TreeMap<>();
     for (Table tbl : tables) {
       String tblName = StringUtils.normalizeIdentifier(tbl.getTableName());
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.updateTableObj(tbl, this);
       } else {
@@ -1466,7 +1651,7 @@ public class SharedCache {
     }
     try {
       cacheLock.writeLock().lock();
-      Iterator<Entry<String, TableWrapper>> entryIterator = tableCache.entrySet().iterator();
+      Iterator<Entry<String, TableWrapper>> entryIterator = tableCache.asMap().entrySet().iterator();
       while (entryIterator.hasNext()) {
         String key = entryIterator.next().getKey();
         if (key.startsWith(CacheUtils.buildDbKeyWithDelimiterSuffix(catName, dbName))) {
@@ -1480,11 +1665,11 @@ public class SharedCache {
     }
   }
 
-  public ColumnStatistics getTableColStatsFromCache(String catName, String dbName,
-      String tblName, List<String> colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
+  public ColumnStatistics getTableColStatsFromCache(String catName, String dbName, String tblName,
+      List<String> colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper == null) {
         LOG.info("Table " + tblName + " is missing from cache.");
         return null;
@@ -1496,11 +1681,10 @@ public class SharedCache {
     }
   }
 
-  public void removeTableColStatsFromCache(String catName, String dbName, String tblName,
-      String colName) {
+  public void removeTableColStatsFromCache(String catName, String dbName, String tblName, String colName) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.removeTableColStats(colName);
       } else {
@@ -1514,7 +1698,7 @@ public class SharedCache {
   public void removeAllTableColStatsFromCache(String catName, String dbName, String tblName) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.removeAllTableColStats();
       } else {
@@ -1529,8 +1713,7 @@ public class SharedCache {
       List<ColumnStatisticsObj> colStatsForTable) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper =
-          tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName));
       if (tblWrapper != null) {
         tblWrapper.updateTableColStats(colStatsForTable);
       } else {
@@ -1545,8 +1728,7 @@ public class SharedCache {
       List<ColumnStatisticsObj> colStatsForTable) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper =
-          tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName));
       if (tblWrapper != null) {
         tblWrapper.refreshTableColStats(colStatsForTable);
       } else {
@@ -1560,14 +1742,13 @@ public class SharedCache {
   public int getCachedTableCount() {
     try {
       cacheLock.readLock().lock();
-      return tableCache.size();
+      return tableCache.asMap().size();
     } finally {
       cacheLock.readLock().unlock();
     }
   }
 
-  public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames,
-      List<String> tableTypes) {
+  public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames, List<String> tableTypes) {
     List<TableMeta> tableMetas = new ArrayList<>();
     try {
       cacheLock.readLock().lock();
@@ -1576,8 +1757,7 @@ public class SharedCache {
           for (Table table : listCachedTables(catName, dbName)) {
             if (CacheUtils.matches(table.getTableName(), tableNames)) {
               if (tableTypes == null || tableTypes.contains(table.getTableType())) {
-                TableMeta metaData =
-                    new TableMeta(dbName, table.getTableName(), table.getTableType());
+                TableMeta metaData = new TableMeta(dbName, table.getTableName(), table.getTableType());
                 metaData.setCatName(catName);
                 metaData.setComments(table.getParameters().get("comment"));
                 tableMetas.add(metaData);
@@ -1595,7 +1775,8 @@ public class SharedCache {
   public void addPartitionToCache(String catName, String dbName, String tblName, Partition part) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      String tblKey = CacheUtils.buildTableKey(catName, dbName, tblName);
+      TableWrapper tblWrapper = tableCache.getIfPresent(tblKey);
       if (tblWrapper != null) {
         tblWrapper.cachePartition(part, this);
       }
@@ -1604,11 +1785,10 @@ public class SharedCache {
     }
   }
 
-  public void addPartitionsToCache(String catName, String dbName, String tblName,
-      Iterable<Partition> parts) {
+  public void addPartitionsToCache(String catName, String dbName, String tblName, Iterable<Partition> parts) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.cachePartitions(parts, this, false);
       }
@@ -1617,12 +1797,11 @@ public class SharedCache {
     }
   }
 
-  public Partition getPartitionFromCache(String catName, String dbName, String tblName,
-      List<String> partVals) {
+  public Partition getPartitionFromCache(String catName, String dbName, String tblName, List<String> partVals) {
     Partition part = null;
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         part = tblWrapper.getPartition(partVals, this);
       }
@@ -1632,12 +1811,11 @@ public class SharedCache {
     return part;
   }
 
-  public boolean existPartitionFromCache(String catName, String dbName, String tblName,
-      List<String> partVals) {
+  public boolean existPartitionFromCache(String catName, String dbName, String tblName, List<String> partVals) {
     boolean existsPart = false;
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         existsPart = tblWrapper.containsPartition(partVals);
       }
@@ -1647,14 +1825,15 @@ public class SharedCache {
     return existsPart;
   }
 
-  public Partition removePartitionFromCache(String catName, String dbName, String tblName,
-      List<String> partVals) {
+  public Partition removePartitionFromCache(String catName, String dbName, String tblName, List<String> partVals) {
     Partition part = null;
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         part = tblWrapper.removePartition(partVals, this);
+      } else {
+        LOG.warn("This is abnormal");
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1662,11 +1841,10 @@ public class SharedCache {
     return part;
   }
 
-  public void removePartitionsFromCache(String catName, String dbName, String tblName,
-      List<List<String>> partVals) {
+  public void removePartitionsFromCache(String catName, String dbName, String tblName, List<List<String>> partVals) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.removePartitions(partVals, this);
       }
@@ -1675,12 +1853,11 @@ public class SharedCache {
     }
   }
 
-  public List<Partition> listCachedPartitions(String catName, String dbName, String tblName,
-      int max) {
+  public List<Partition> listCachedPartitions(String catName, String dbName, String tblName, int max) {
     List<Partition> parts = new ArrayList<Partition>();
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         parts = tblWrapper.listPartitions(max, this);
       }
@@ -1690,11 +1867,11 @@ public class SharedCache {
     return parts;
   }
 
-  public void alterPartitionInCache(String catName, String dbName, String tblName,
-      List<String> partVals, Partition newPart) {
+  public void alterPartitionInCache(String catName, String dbName, String tblName, List<String> partVals,
+      Partition newPart) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.alterPartition(partVals, newPart, this);
       }
@@ -1704,11 +1881,10 @@ public class SharedCache {
   }
 
   public void alterPartitionAndStatsInCache(String catName, String dbName, String tblName, long writeId,
-                                            List<String> partVals, Map<String,String> parameters,
-                                            List<ColumnStatisticsObj> colStatsObjs) {
+      List<String> partVals, Map<String, String> parameters, List<ColumnStatisticsObj> colStatsObjs) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.alterPartitionAndStats(partVals, this, writeId, parameters, colStatsObjs);
       }
@@ -1717,11 +1893,11 @@ public class SharedCache {
     }
   }
 
-  public void alterPartitionsInCache(String catName, String dbName, String tblName,
-      List<List<String>> partValsList, List<Partition> newParts) {
+  public void alterPartitionsInCache(String catName, String dbName, String tblName, List<List<String>> partValsList,
+      List<Partition> newParts) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.alterPartitions(partValsList, newParts, this);
       }
@@ -1730,11 +1906,10 @@ public class SharedCache {
     }
   }
 
-  public void refreshPartitionsInCache(String catName, String dbName, String tblName,
-      List<Partition> partitions) {
+  public void refreshPartitionsInCache(String catName, String dbName, String tblName, List<Partition> partitions) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.refreshPartitions(partitions, this);
       }
@@ -1743,11 +1918,11 @@ public class SharedCache {
     }
   }
 
-  public void removePartitionColStatsFromCache(String catName, String dbName, String tblName,
-      List<String> partVals, String colName) {
+  public void removePartitionColStatsFromCache(String catName, String dbName, String tblName, List<String> partVals,
+      String colName) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.removePartitionColStats(partVals, colName);
       }
@@ -1759,7 +1934,7 @@ public class SharedCache {
   public void removeAllPartitionColStatsFromCache(String catName, String dbName, String tblName) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.removeAllPartitionColStats();
       }
@@ -1768,12 +1943,11 @@ public class SharedCache {
     }
   }
 
-  public void updatePartitionColStatsInCache(String catName, String dbName, String tableName,
-      List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
+  public void updatePartitionColStatsInCache(String catName, String dbName, String tableName, List<String> partVals,
+      List<ColumnStatisticsObj> colStatsObjs) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper =
-          tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName));
       if (tblWrapper != null) {
         tblWrapper.updatePartitionColStats(partVals, colStatsObjs);
       }
@@ -1782,12 +1956,12 @@ public class SharedCache {
     }
   }
 
-  public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, String dbName,
-      String tblName, List<String> partVal, String colName, String writeIdList) {
+  public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, String dbName, String tblName,
+      List<String> partVal, String colName, String writeIdList) {
     ColumStatsWithWriteId colStatObj = null;
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         colStatObj = tblWrapper.getPartitionColStats(partVal, colName, writeIdList);
       }
@@ -1798,12 +1972,11 @@ public class SharedCache {
   }
 
   public List<ColumnStatistics> getPartitionColStatsListFromCache(String catName, String dbName, String tblName,
-                                                                  List<String> partNames, List<String> colNames,
-                                                                  String writeIdList, boolean txnStatSupported) {
+      List<String> partNames, List<String> colNames, String writeIdList, boolean txnStatSupported) {
     List<ColumnStatistics> colStatObjs = null;
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         colStatObjs = tblWrapper.getPartColStatsList(partNames, colNames, writeIdList, txnStatSupported);
       }
@@ -1819,7 +1992,7 @@ public class SharedCache {
       List<ColumnStatistics> partitionColStats) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.refreshPartitionColStats(partitionColStats);
       }
@@ -1828,11 +2001,11 @@ public class SharedCache {
     }
   }
 
-  public List<ColumnStatisticsObj> getAggrStatsFromCache(String catName, String dbName,
-      String tblName, List<String> colNames, StatsType statsType) {
+  public List<ColumnStatisticsObj> getAggrStatsFromCache(String catName, String dbName, String tblName,
+      List<String> colNames, StatsType statsType) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         return tblWrapper.getAggrPartitionColStats(colNames, statsType);
       }
@@ -1842,14 +2015,13 @@ public class SharedCache {
     return null;
   }
 
-  public void addAggregateStatsToCache(String catName, String dbName, String tblName,
-      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+  public void addAggregateStatsToCache(String catName, String dbName, String tblName, AggrStats aggrStatsAllPartitions,
+      AggrStats aggrStatsAllButDefaultPartition) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
-        tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
-            aggrStatsAllButDefaultPartition);
+        tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1858,13 +2030,13 @@ public class SharedCache {
 
   public void refreshAggregateStatsInCache(String catName, String dbName, String tblName,
       AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition,
-                                           Map<List<String>, Long> partNameToWriteId) {
+      Map<List<String>, Long> partNameToWriteId) {
     try {
       cacheLock.readLock().lock();
-      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
-        tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions,
-            aggrStatsAllButDefaultPartition, this, partNameToWriteId);
+        tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions, aggrStatsAllButDefaultPartition, this,
+            partNameToWriteId);
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1903,8 +2075,8 @@ public class SharedCache {
   }
 
   @VisibleForTesting
-  Map<String, TableWrapper> getTableCache() {
-    return tableCache;
+  void clearTableCache() {
+    tableCache.invalidateAll();
   }
 
   @VisibleForTesting
@@ -1928,6 +2100,11 @@ public class SharedCache {
     isTableCacheDirty.set(false);
   }
 
+  public void printCacheStats() {
+    CacheStats cs = tableCache.stats();
+    LOG.info(cs.toString());
+  }
+
   public long getUpdateCount() {
     return cacheUpdateCount.get();
   }
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
index e30d4a8..420369d 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
 import org.apache.hadoop.hive.metastore.Deadline;
@@ -61,7 +62,6 @@ import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataI
 import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
 import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -69,12 +69,12 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import jline.internal.Log;
-
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
 
-@Category(MetastoreCheckinTest.class)
-public class TestCachedStore {
+/**
+ * Unit tests for CachedStore
+ */
+@Category(MetastoreCheckinTest.class) public class TestCachedStore {
   // cs_db1
   Database db1;
   // cs_db2
@@ -94,8 +94,7 @@ public class TestCachedStore {
   List<Partition> db2Ptbl1Ptns;
   List<String> db2Ptbl1PtnNames;
 
-  @Before
-  public void setUp() throws Exception {
+  @Before public void setUp() throws Exception {
     Deadline.registerIfNot(10000000);
     Deadline.startTimer("");
     Configuration conf = MetastoreConf.newMetastoreConf();
@@ -130,8 +129,7 @@ public class TestCachedStore {
     objectStore.shutdown();
   }
 
-  @After
-  public void teardown() throws Exception {
+  @After public void teardown() throws Exception {
     Deadline.startTimer("");
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
@@ -155,8 +153,7 @@ public class TestCachedStore {
    * Methods that test CachedStore
    *********************************************************************************************/
 
-  @Test
-  public void testPrewarm() throws Exception {
+  @Test public void testPrewarm() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -199,8 +196,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testPrewarmBlackList() throws Exception {
+  @Test public void testPrewarmBlackList() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -223,8 +219,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testPrewarmWhiteList() throws Exception {
+  @Test public void testPrewarmWhiteList() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -270,8 +265,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testCacheUpdate() throws Exception {
+  @Test public void testCacheUpdate() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -353,8 +347,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testCreateAndGetDatabase() throws Exception {
+  @Test public void testCreateAndGetDatabase() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -379,12 +372,12 @@ public class TestCachedStore {
     Assert.assertEquals(3, allDatabases.size());
     // Add another db via CachedStore
     String dbName1 = "testCreateAndGetDatabase1";
-    Database db1 = createDatabaseObject(dbName1, dbOwner);
-    cachedStore.createDatabase(db1);
-    db1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
+    Database localDb1 = createDatabaseObject(dbName1, dbOwner);
+    cachedStore.createDatabase(localDb1);
+    localDb1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
     // Read db via ObjectStore
     dbRead = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
-    Assert.assertEquals(db1, dbRead);
+    Assert.assertEquals(localDb1, dbRead);
     allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME);
     Assert.assertEquals(4, allDatabases.size());
     // Clean up
@@ -393,8 +386,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testDropDatabase() throws Exception {
+  @Test public void testDropDatabase() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -424,12 +416,12 @@ public class TestCachedStore {
     Assert.assertEquals(2, allDatabases.size());
     // Create another db via CachedStore and drop via ObjectStore
     String dbName1 = "testDropDatabase1";
-    Database db1 = createDatabaseObject(dbName1, dbOwner);
-    cachedStore.createDatabase(db1);
-    db1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
+    Database localDb1 = createDatabaseObject(dbName1, dbOwner);
+    cachedStore.createDatabase(localDb1);
+    localDb1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
     // Read db via ObjectStore
     dbRead = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
-    Assert.assertEquals(db1, dbRead);
+    Assert.assertEquals(localDb1, dbRead);
     allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME);
     Assert.assertEquals(3, allDatabases.size());
     objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName1);
@@ -440,8 +432,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testAlterDatabase() throws Exception {
+  @Test public void testAlterDatabase() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -480,8 +471,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testCreateAndGetTable() throws Exception {
+  @Test public void testCreateAndGetTable() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -527,7 +517,6 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
   // Note: the 44Kb approximation has been determined based on trial/error. 
   // If this starts failing on different env, might need another look.
   public void testGetAllTablesPrewarmMemoryLimit() throws Exception {
@@ -553,8 +542,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testGetAllTablesBlacklist() throws Exception {
+  @Test public void testGetAllTablesBlacklist() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -579,8 +567,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testGetAllTablesWhitelist() throws Exception {
+  @Test public void testGetAllTablesWhitelist() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -605,8 +592,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testGetTableByPattern() throws Exception {
+  @Test public void testGetTableByPattern() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -631,8 +617,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testAlterTable() throws Exception {
+  @Test public void testAlterTable() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -653,8 +638,8 @@ public class TestCachedStore {
     String newOwner = "newOwner";
     Table db1Utbl1ReadAlt = new Table(db1Utbl1Read);
     db1Utbl1ReadAlt.setOwner(newOwner);
-    cachedStore.alterTable(DEFAULT_CATALOG_NAME, db1Utbl1Read.getDbName(), db1Utbl1Read.getTableName(), db1Utbl1ReadAlt,
-        "0");
+    cachedStore
+        .alterTable(DEFAULT_CATALOG_NAME, db1Utbl1Read.getDbName(), db1Utbl1Read.getTableName(), db1Utbl1ReadAlt, "0");
     db1Utbl1Read =
         cachedStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1ReadAlt.getDbName(), db1Utbl1ReadAlt.getTableName());
     Table db1Utbl1ReadOS =
@@ -664,8 +649,8 @@ public class TestCachedStore {
     Table db2Utbl1Read = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName());
     Table db2Utbl1ReadAlt = new Table(db2Utbl1Read);
     db2Utbl1ReadAlt.setOwner(newOwner);
-    objectStore.alterTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName(), db2Utbl1ReadAlt,
-        "0");
+    objectStore
+        .alterTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName(), db2Utbl1ReadAlt, "0");
     updateCache(cachedStore);
     db2Utbl1Read =
         objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1ReadAlt.getDbName(), db2Utbl1ReadAlt.getTableName());
@@ -675,8 +660,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testDropTable() throws Exception {
+  @Test public void testDropTable() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -716,12 +700,11 @@ public class TestCachedStore {
 
   /**********************************************************************************************
    * Methods that test SharedCache
-   * @throws MetaException 
-   * @throws NoSuchObjectException 
+   * @throws MetaException
+   * @throws NoSuchObjectException
    *********************************************************************************************/
 
-  @Test
-  public void testSharedStoreDb() throws NoSuchObjectException, MetaException {
+  @Test public void testSharedStoreDb() throws NoSuchObjectException, MetaException {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -731,13 +714,13 @@ public class TestCachedStore {
     cachedStore.setConfForTest(conf);
     SharedCache sharedCache = CachedStore.getSharedCache();
 
-    Database db1 = createDatabaseObject("db1", "user1");
-    Database db2 = createDatabaseObject("db2", "user1");
-    Database db3 = createDatabaseObject("db3", "user1");
+    Database localDb1 = createDatabaseObject("db1", "user1");
+    Database localDb2 = createDatabaseObject("db2", "user1");
+    Database localDb3 = createDatabaseObject("db3", "user1");
     Database newDb1 = createDatabaseObject("newdb1", "user1");
-    sharedCache.addDatabaseToCache(db1);
-    sharedCache.addDatabaseToCache(db2);
-    sharedCache.addDatabaseToCache(db3);
+    sharedCache.addDatabaseToCache(localDb1);
+    sharedCache.addDatabaseToCache(localDb2);
+    sharedCache.addDatabaseToCache(localDb3);
     Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
     sharedCache.alterDatabaseInCache(DEFAULT_CATALOG_NAME, "db1", newDb1);
     Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
@@ -750,8 +733,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testSharedStoreTable() {
+  @Test public void testSharedStoreTable() {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -836,8 +818,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testSharedStorePartition() {
+  @Test public void testSharedStorePartition() {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -1002,14 +983,14 @@ public class TestCachedStore {
     Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100);
     aggrStats = cachedStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, aggrPartVals, colNames);
     Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100);
-    
+
     objectStore.deletePartitionColumnStatistics(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(),
         Warehouse.makePartName(tbl.getPartitionKeys(), partVals1), partVals1, colName);
     objectStore.deletePartitionColumnStatistics(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(),
         Warehouse.makePartName(tbl.getPartitionKeys(), partVals2), partVals2, colName);
     objectStore.dropPartition(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(), partVals1);
     objectStore.dropPartition(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(), partVals2);
-    objectStore.dropTable(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName()) ;
+    objectStore.dropTable(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName());
     objectStore.dropDatabase(DEFAULT_CATALOG_NAME, db.getName());
     cachedStore.shutdown();
   }
@@ -1185,8 +1166,7 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
-  @Test
-  public void testMultiThreadedSharedCacheOps() throws Exception {
+  @Test public void testMultiThreadedSharedCacheOps() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -1199,8 +1179,7 @@ public class TestCachedStore {
     List<String> dbNames = new ArrayList<String>(Arrays.asList("db1", "db2", "db3", "db4", "db5"));
     List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
     ExecutorService executor = Executors.newFixedThreadPool(50, new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
+      @Override public Thread newThread(Runnable r) {
         Thread t = Executors.defaultThreadFactory().newThread(r);
         t.setDaemon(true);
         return t;
@@ -1332,6 +1311,199 @@ public class TestCachedStore {
     cachedStore.shutdown();
   }
 
+  @Test public void testPartitionSize() {
+    Configuration conf = MetastoreConf.newMetastoreConf();
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "5Kb");
+    MetaStoreTestUtils.setConfForStandloneMode(conf);
+    CachedStore cachedStore = new CachedStore();
+    CachedStore.clearSharedCache();
+    cachedStore.setConfForTestExceptSharedCache(conf);
+
+    String dbName = "db1";
+    String tbl1Name = "tbl1";
+    String tbl2Name = "tbl2";
+    String owner = "user1";
+    Database db = createDatabaseObject(dbName, owner);
+
+    FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+    FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(col1);
+    cols.add(col2);
+    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+    Table tbl1 = createTestTbl(dbName, tbl1Name, owner, cols, ptnCols);
+    Table tbl2 = createTestTbl(dbName, tbl2Name, owner, cols, ptnCols);
+
+    Map<String, Integer> tableSizeMap = new HashMap<>();
+    String tbl1Key = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, dbName, tbl1Name);
+    String tbl2Key = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, dbName, tbl2Name);
+    tableSizeMap.put(tbl1Key, 1000);
+    tableSizeMap.put(tbl2Key, 4500);
+
+    Partition part1 = new Partition();
+    StorageDescriptor sd1 = new StorageDescriptor();
+    List<FieldSchema> cols1 = new ArrayList<>();
+    cols1.add(new FieldSchema("col1", "int", ""));
+    Map<String, String> params1 = new HashMap<>();
+    params1.put("key", "value");
+    sd1.setCols(cols1);
+    sd1.setParameters(params1);
+    sd1.setLocation("loc1");
+    part1.setSd(sd1);
+    part1.setValues(Arrays.asList("201701"));
+
+    Partition part2 = new Partition();
+    StorageDescriptor sd2 = new StorageDescriptor();
+    List<FieldSchema> cols2 = new ArrayList<>();
+    cols2.add(new FieldSchema("col1", "int", ""));
+    Map<String, String> params2 = new HashMap<>();
+    params2.put("key", "value");
+    sd2.setCols(cols2);
+    sd2.setParameters(params2);
+    sd2.setLocation("loc2");
+    part2.setSd(sd2);
+    part2.setValues(Arrays.asList("201702"));
+
+    Partition part3 = new Partition();
+    StorageDescriptor sd3 = new StorageDescriptor();
+    List<FieldSchema> cols3 = new ArrayList<>();
+    cols3.add(new FieldSchema("col3", "int", ""));
+    Map<String, String> params3 = new HashMap<>();
+    params3.put("key2", "value2");
+    sd3.setCols(cols3);
+    sd3.setParameters(params3);
+    sd3.setLocation("loc3");
+    part3.setSd(sd3);
+    part3.setValues(Arrays.asList("201703"));
+
+    Partition newPart1 = new Partition();
+    newPart1.setDbName(dbName);
+    newPart1.setTableName(tbl1Name);
+    StorageDescriptor newSd1 = new StorageDescriptor();
+    List<FieldSchema> newCols1 = new ArrayList<>();
+    newCols1.add(new FieldSchema("newcol1", "int", ""));
+    Map<String, String> newParams1 = new HashMap<>();
+    newParams1.put("key", "value");
+    newSd1.setCols(newCols1);
+    newSd1.setParameters(params1);
+    newSd1.setLocation("loc1new");
+    newPart1.setSd(newSd1);
+    newPart1.setValues(Arrays.asList("201701"));
+
+    SharedCache sharedCache = cachedStore.getSharedCache();
+    sharedCache.setConcurrencyLevel(1);
+    sharedCache.setTableSizeMap(tableSizeMap);
+    sharedCache.initialize(conf);
+
+    sharedCache.addDatabaseToCache(db);
+    sharedCache.addTableToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, tbl1);
+    sharedCache.addTableToCache(DEFAULT_CATALOG_NAME, dbName, tbl2Name, tbl2);
+
+    sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, part1);
+    sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, part2);
+    sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, part3);
+
+    Partition p = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, Arrays.asList("201701"));
+    Assert.assertNull(p);
+
+    sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl2Name, newPart1);
+    p = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tbl2Name, Arrays.asList("201701"));
+    Assert.assertNotNull(p);
+    cachedStore.shutdown();
+  }
+
+  @Test public void testShowTables() throws Exception {
+    Configuration conf = MetastoreConf.newMetastoreConf();
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "5kb");
+    MetaStoreTestUtils.setConfForStandloneMode(conf);
+    CachedStore cachedStore = new CachedStore();
+    CachedStore.clearSharedCache();
+
+    cachedStore.setConfForTestExceptSharedCache(conf);
+    ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore();
+    //set up table size map
+    Map<String, Integer> tableSizeMap = new HashMap<>();
+    String db1Utbl1TblKey =
+        CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName());
+    String db1Ptbl1TblKey =
+        CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName());
+    String db2Utbl1TblKey =
+        CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName());
+    String db2Ptbl1TblKey =
+        CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName());
+    tableSizeMap.put(db1Utbl1TblKey, 4000);
+    tableSizeMap.put(db1Ptbl1TblKey, 4000);
+    tableSizeMap.put(db2Utbl1TblKey, 4000);
+    tableSizeMap.put(db2Ptbl1TblKey, 4000);
+
+    SharedCache sc = cachedStore.getSharedCache();
+    sc.setConcurrencyLevel(1);
+    sc.setTableSizeMap(tableSizeMap);
+    sc.initialize(conf);
+
+    // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
+    CachedStore.prewarm(objectStore);
+
+    List<String> db1Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName());
+    Assert.assertEquals(2, db1Tables.size());
+    List<String> db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName());
+    Assert.assertEquals(2, db2Tables.size());
+
+    cachedStore.shutdown();
+  }
+
+  @Test public void testTableEviction() throws Exception {
+    Configuration conf = MetastoreConf.newMetastoreConf();
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "5kb");
+    MetaStoreTestUtils.setConfForStandloneMode(conf);
+    CachedStore cachedStore = new CachedStore();
+    CachedStore.clearSharedCache();
+
+    cachedStore.setConfForTestExceptSharedCache(conf);
+    ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore();
+    //set up table size map
+    Map<String, Integer> tableSizeMap = new HashMap<>();
+    String db1Utbl1TblKey =
+        CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName());
+    String db1Ptbl1TblKey =
+        CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName());
+    String db2Utbl1TblKey =
+        CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName());
+    String db2Ptbl1TblKey =
+        CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName());
+    tableSizeMap.put(db1Utbl1TblKey, 4000);
+    tableSizeMap.put(db1Ptbl1TblKey, 4000);
+    tableSizeMap.put(db2Utbl1TblKey, 4000);
+    tableSizeMap.put(db2Ptbl1TblKey, 4000);
+    Table tblDb1Utbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName());
+    Table tblDb1Ptbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName());
+    Table tblDb2Utbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName());
+    Table tblDb2Ptbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName());
+
+    SharedCache sc = cachedStore.getSharedCache();
+    sc.setConcurrencyLevel(1);
+    sc.setTableSizeMap(tableSizeMap);
+    sc.initialize(conf);
+
+    sc.addDatabaseToCache(db1);
+    sc.addDatabaseToCache(db2);
+    sc.addTableToCache(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName(), tblDb1Utbl1);
+    sc.addTableToCache(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName(), tblDb1Ptbl1);
+    sc.addTableToCache(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName(), tblDb2Utbl1);
+    sc.addTableToCache(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName(), tblDb2Ptbl1);
+
+    List<String> db1Tables = sc.listCachedTableNames(DEFAULT_CATALOG_NAME, db1.getName());
+    Assert.assertEquals(0, db1Tables.size());
+    List<String> db2Tables = sc.listCachedTableNames(DEFAULT_CATALOG_NAME, db2.getName());
+    Assert.assertEquals(1, db2Tables.size());
+
+    cachedStore.shutdown();
+  }
+
   private Table createTestTbl(String dbName, String tblName, String tblOwner, List<FieldSchema> cols,
       List<FieldSchema> ptnCols) {
     String serdeLocation = "file:/tmp";
@@ -1535,8 +1707,8 @@ public class TestCachedStore {
   }
 
   class TableAndColStats {
-    Table table;
-    ColumnStatistics colStats;
+    private Table table;
+    private ColumnStatistics colStats;
 
     TableAndColStats(Table table, ColumnStatistics colStats) {
       this.table = table;
@@ -1592,8 +1764,8 @@ public class TestCachedStore {
   }
 
   class PartitionObjectsAndNames {
-    List<Partition> ptns;
-    List<String> ptnNames;
+    private List<Partition> ptns;
+    private List<String> ptnNames;
 
     PartitionObjectsAndNames(List<Partition> ptns, List<String> ptnNames) {
       this.ptns = ptns;