You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/03/19 17:54:32 UTC

[3/4] hive git commit: HIVE-18264: CachedStore: Store cached partitions/col stats within the table cache and make prewarm non-blocking (Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index d28b196..d37b201 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hive.metastore.cache;
 import org.apache.hadoop.hive.metastore.api.CreationMetadata;
 import org.apache.hadoop.hive.metastore.api.ISchemaName;
 import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -35,7 +35,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -95,8 +94,6 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.api.SchemaVersion;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.hadoop.hive.metastore.api.Type;
@@ -124,130 +121,50 @@ import com.google.common.annotations.VisibleForTesting;
 // TODO constraintCache
 // TODO need sd nested copy?
 // TODO String intern
-// TODO restructure HBaseStore
 // TODO monitor event queue
 // TODO initial load slow?
 // TODO size estimation
-// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation
 
 public class CachedStore implements RawStore, Configurable {
   private static ScheduledExecutorService cacheUpdateMaster = null;
-  private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
-  private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
-  private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
-  private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
-  private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock(
-      true);
-  private static ReentrantReadWriteLock partitionAggrColStatsCacheLock =
-      new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isPartitionAggrColStatsCacheDirty = new AtomicBoolean(false);
-  private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
   private static List<Pattern> whitelistPatterns = null;
   private static List<Pattern> blacklistPatterns = null;
+  // Default value set to 100 milliseconds for test purpose
+  private static long DEFAULT_CACHE_REFRESH_PERIOD = 100;
+  // Time after which metastore cache is updated from metastore DB by the background update thread
+  private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD;
+  private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false);
   private RawStore rawStore = null;
   private Configuration conf;
   private PartitionExpressionProxy expressionProxy = null;
-  // Default value set to 100 milliseconds for test purpose
-  private static long cacheRefreshPeriod = 100;
-
-  /** A wrapper over SharedCache. Allows one to get SharedCache safely; should be merged
-   *  into SharedCache itself (see the TODO on the class). */
-  private static final SharedCacheWrapper sharedCacheWrapper = new SharedCacheWrapper();
+  private static final SharedCache sharedCache = new SharedCache();
 
   static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
 
-  static class TableWrapper {
-    Table t;
-    String location;
-    Map<String, String> parameters;
-    byte[] sdHash;
-    TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) {
-      this.t = t;
-      this.sdHash = sdHash;
-      this.location = location;
-      this.parameters = parameters;
-    }
-    public Table getTable() {
-      return t;
-    }
-    public byte[] getSdHash() {
-      return sdHash;
-    }
-    public String getLocation() {
-      return location;
-    }
-    public Map<String, String> getParameters() {
-      return parameters;
-    }
-  }
-
-  static class PartitionWrapper {
-    Partition p;
-    String location;
-    Map<String, String> parameters;
-    byte[] sdHash;
-    PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
-      this.p = p;
-      this.sdHash = sdHash;
-      this.location = location;
-      this.parameters = parameters;
-    }
-    public Partition getPartition() {
-      return p;
-    }
-    public byte[] getSdHash() {
-      return sdHash;
-    }
-    public String getLocation() {
-      return location;
-    }
-    public Map<String, String> getParameters() {
-      return parameters;
-    }
-  }
+  public CachedStore() {
 
-  static class StorageDescriptorWrapper {
-    StorageDescriptor sd;
-    int refCount = 0;
-    StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
-      this.sd = sd;
-      this.refCount = refCount;
-    }
-    public StorageDescriptor getSd() {
-      return sd;
-    }
-    public int getRefCount() {
-      return refCount;
-    }
   }
 
-  public CachedStore() {
+  @Override
+  public void setConf(Configuration conf) {
+    setConfInternal(conf);
+    initBlackListWhiteList(conf);
+    startCacheUpdateService(conf, false, true);
   }
 
-  public static void initSharedCacheAsync(Configuration conf) {
-    String clazzName = null;
-    boolean isEnabled = false;
-    try {
-      clazzName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL);
-      isEnabled = JavaUtils.getClass(clazzName, RawStore.class).isAssignableFrom(CachedStore.class);
-    } catch (MetaException e) {
-      LOG.error("Cannot instantiate metastore class", e);
-    }
-    if (!isEnabled) {
-      LOG.debug("CachedStore is not enabled; using " + clazzName);
-      return;
-    }
-    sharedCacheWrapper.startInit(conf);
+  /**
+   * Similar to setConf but used from within the tests
+   * This does start the background thread for prewarm and update
+   * @param conf
+   */
+  void setConfForTest(Configuration conf) {
+    setConfInternal(conf);
+    initBlackListWhiteList(conf);
   }
 
-  @Override
-  public void setConf(Configuration conf) {
-    String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL,
-        ObjectStore.class.getName());
+  private void setConfInternal(Configuration conf) {
+    String rawStoreClassName =
+        MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
     if (rawStore == null) {
       try {
         rawStore = (JavaUtils.getClass(rawStoreClassName, RawStore.class)).newInstance();
@@ -260,94 +177,145 @@ public class CachedStore implements RawStore, Configurable {
     this.conf = conf;
     if (expressionProxy != null && conf != oldConf) {
       LOG.warn("Unexpected setConf when we were already configured");
-    }
-    if (expressionProxy == null || conf != oldConf) {
+    } else {
       expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
     }
-    initBlackListWhiteList(conf);
   }
 
   @VisibleForTesting
-  static void prewarm(RawStore rawStore) throws Exception {
-    // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
-    Deadline.registerIfNot(1000000);
-    List<String> dbNames = rawStore.getAllDatabases();
-    LOG.info("Number of databases to prewarm: " + dbNames.size());
-    SharedCache sharedCache = sharedCacheWrapper.getUnsafe();
-    for (int i = 0; i < dbNames.size(); i++) {
-      String dbName = StringUtils.normalizeIdentifier(dbNames.get(i));
-      // Cache partition column stats
-      Deadline.startTimer("getColStatsForDatabase");
-      List<ColStatsObjWithSourceInfo> colStatsForDB =
-          rawStore.getPartitionColStatsForDatabase(dbName);
-      Deadline.stopTimer();
-      if (colStatsForDB != null) {
-        sharedCache.addPartitionColStatsToCache(colStatsForDB);
+  /**
+   * This initializes the caches in SharedCache by getting the objects from Metastore DB via
+   * ObjectStore and populating the respective caches
+   *
+   * @param rawStore
+   * @throws Exception
+   */
+  static void prewarm(RawStore rawStore) {
+    if (isCachePrewarmed.get()) {
+      return;
+    }
+    long startTime = System.nanoTime();
+    LOG.info("Prewarming CachedStore");
+    while (!isCachePrewarmed.get()) {
+      // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+      Deadline.registerIfNot(1000000);
+      List<String> dbNames;
+      try {
+        dbNames = rawStore.getAllDatabases();
+      } catch (MetaException e) {
+        // Try again
+        continue;
       }
-      LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size());
-      Database db = rawStore.getDatabase(dbName);
-      sharedCache.addDatabaseToCache(dbName, db);
-      List<String> tblNames = rawStore.getAllTables(dbName);
-      LOG.debug("Tables in database: {} : {}", dbName, tblNames);
-      for (int j = 0; j < tblNames.size(); j++) {
-        String tblName = StringUtils.normalizeIdentifier(tblNames.get(j));
-        if (!shouldCacheTable(dbName, tblName)) {
-          LOG.info("Not caching database: {}'s table: {}", dbName, tblName);
+      LOG.info("Number of databases to prewarm: {}", dbNames.size());
+      List<Database> databases = new ArrayList<>(dbNames.size());
+      for (String dbName : dbNames) {
+        try {
+          databases.add(rawStore.getDatabase(dbName));
+        } catch (NoSuchObjectException e) {
+          // Continue with next database
           continue;
         }
-        LOG.info("Caching database: {}'s table: {}. Cached {} / {}  tables so far.", dbName,
-            tblName, j, tblNames.size());
-        Table table = null;
-        table = rawStore.getTable(dbName, tblName);
-        // It is possible the table is deleted during fetching tables of the database,
-        // in that case, continue with the next table
-        if (table == null) {
+      }
+      sharedCache.populateDatabasesInCache(databases);
+      LOG.debug(
+          "Databases cache is now prewarmed. Now adding tables, partitions and statistics to the cache");
+      int numberOfDatabasesCachedSoFar = 0;
+      for (String dbName : dbNames) {
+        dbName = StringUtils.normalizeIdentifier(dbName);
+        List<String> tblNames;
+        try {
+          tblNames = rawStore.getAllTables(dbName);
+        } catch (MetaException e) {
+          // Continue with next database
           continue;
         }
-        sharedCache.addTableToCache(dbName, tblName, table);
-        if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) {
-          Deadline.startTimer("getPartitions");
-          List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
-          Deadline.stopTimer();
-          for (Partition partition : partitions) {
-            sharedCache.addPartitionToCache(dbName, tblName, partition);
+        int numberOfTablesCachedSoFar = 0;
+        for (String tblName : tblNames) {
+          tblName = StringUtils.normalizeIdentifier(tblName);
+          if (!shouldCacheTable(dbName, tblName)) {
+            continue;
           }
-        }
-        // Cache table column stats
-        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
-        Deadline.startTimer("getTableColumnStatistics");
-        ColumnStatistics tableColStats =
-            rawStore.getTableColumnStatistics(dbName, tblName, colNames);
-        Deadline.stopTimer();
-        if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) {
-          sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj());
-        }
-        // Cache aggregate stats for all partitions of a table and for all but default partition
-        List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
-        if ((partNames != null) && (partNames.size() > 0)) {
-          AggrStats aggrStatsAllPartitions =
-              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
-          // Remove default partition from partition names and get aggregate
-          // stats again
-          List<FieldSchema> partKeys = table.getPartitionKeys();
-          String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
-          List<String> partCols = new ArrayList<String>();
-          List<String> partVals = new ArrayList<String>();
-          for (FieldSchema fs : partKeys) {
-            partCols.add(fs.getName());
-            partVals.add(defaultPartitionValue);
+          Table table;
+          try {
+            table = rawStore.getTable(dbName, tblName);
+          } catch (MetaException e) {
+            // It is possible the table is deleted during fetching tables of the database,
+            // in that case, continue with the next table
+            continue;
           }
-          String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
-          partNames.remove(defaultPartitionName);
-          AggrStats aggrStatsAllButDefaultPartition =
-              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
-          sharedCache.addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions,
-              aggrStatsAllButDefaultPartition);
+          List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+          try {
+            ColumnStatistics tableColStats = null;
+            List<Partition> partitions = null;
+            List<ColumnStatistics> partitionColStats = null;
+            AggrStats aggrStatsAllPartitions = null;
+            AggrStats aggrStatsAllButDefaultPartition = null;
+            if (table.isSetPartitionKeys()) {
+              Deadline.startTimer("getPartitions");
+              partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+              Deadline.stopTimer();
+              List<String> partNames = new ArrayList<>(partitions.size());
+              for (Partition p : partitions) {
+                partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues()));
+              }
+              if (!partNames.isEmpty()) {
+                // Get partition column stats for this table
+                Deadline.startTimer("getPartitionColumnStatistics");
+                partitionColStats =
+                    rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
+                Deadline.stopTimer();
+                // Get aggregate stats for all partitions of a table and for all but default
+                // partition
+                Deadline.startTimer("getAggrPartitionColumnStatistics");
+                aggrStatsAllPartitions =
+                    rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+                Deadline.stopTimer();
+                // Remove default partition from partition names and get aggregate
+                // stats again
+                List<FieldSchema> partKeys = table.getPartitionKeys();
+                String defaultPartitionValue =
+                    MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+                List<String> partCols = new ArrayList<>();
+                List<String> partVals = new ArrayList<>();
+                for (FieldSchema fs : partKeys) {
+                  partCols.add(fs.getName());
+                  partVals.add(defaultPartitionValue);
+                }
+                String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+                partNames.remove(defaultPartitionName);
+                Deadline.startTimer("getAggrPartitionColumnStatistics");
+                aggrStatsAllButDefaultPartition =
+                    rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+                Deadline.stopTimer();
+              }
+            } else {
+              Deadline.startTimer("getTableColumnStatistics");
+              tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+              Deadline.stopTimer();
+            }
+            sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
+                aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
+          } catch (MetaException | NoSuchObjectException e) {
+            // Continue with next table
+            continue;
+          }
+          LOG.debug("Processed database: {}'s table: {}. Cached {} / {}  tables so far.", dbName,
+              tblName, ++numberOfTablesCachedSoFar, tblNames.size());
         }
+        LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName,
+            ++numberOfDatabasesCachedSoFar, dbNames.size());
       }
+      isCachePrewarmed.set(true);
     }
-    // Notify all blocked threads that prewarm is complete now
-    sharedCacheWrapper.notifyAllBlocked();
+    LOG.info("CachedStore initialized");
+    long endTime = System.nanoTime();
+    LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms");
+    sharedCache.completeTableCachePrewarm();
+  }
+
+  @VisibleForTesting
+  static void setCachePrewarmedState(boolean state) {
+    isCachePrewarmed.set(state);
   }
 
   private static void initBlackListWhiteList(Configuration conf) {
@@ -356,20 +324,27 @@ public class CachedStore implements RawStore, Configurable {
           MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST));
       blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf,
           MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST));
-      // The last specified blacklist pattern gets precedence
-      Collections.reverse(blacklistPatterns);
     }
   }
 
   @VisibleForTesting
-  synchronized static void startCacheUpdateService(Configuration conf) {
+  /**
+   * This starts a background thread, which initially populates the SharedCache and later
+   * periodically gets updates from the metastore db
+   *
+   * @param conf
+   * @param runOnlyOnce
+   * @param shouldRunPrewarm
+   */
+  static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce,
+      boolean shouldRunPrewarm) {
     if (cacheUpdateMaster == null) {
       initBlackListWhiteList(conf);
       if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) {
-        cacheRefreshPeriod = MetastoreConf.getTimeVar(conf,
+        cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf,
             ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS);
       }
-      LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriod);
+      LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriodMS);
       cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
         @Override
         public Thread newThread(Runnable r) {
@@ -379,13 +354,20 @@ public class CachedStore implements RawStore, Configurable {
           return t;
         }
       });
-      cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf), 0, cacheRefreshPeriod,
+      if (!runOnlyOnce) {
+        cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
+            cacheRefreshPeriodMS, TimeUnit.MILLISECONDS);
+      }
+    }
+    if (runOnlyOnce) {
+      // Some tests control the execution of the background update thread
+      cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
           TimeUnit.MILLISECONDS);
     }
   }
 
   @VisibleForTesting
-  synchronized static boolean stopCacheUpdateService(long timeout) {
+  static synchronized boolean stopCacheUpdateService(long timeout) {
     boolean tasksStoppedBeforeShutdown = false;
     if (cacheUpdateMaster != null) {
       LOG.info("CachedStore: shutting down cache update service");
@@ -404,167 +386,83 @@ public class CachedStore implements RawStore, Configurable {
 
   @VisibleForTesting
   static void setCacheRefreshPeriod(long time) {
-    cacheRefreshPeriod = time;
+    cacheRefreshPeriodMS = time;
   }
 
   static class CacheUpdateMasterWork implements Runnable {
-    private boolean isFirstRun = true;
+    private boolean shouldRunPrewarm = true;
     private final RawStore rawStore;
 
-    public CacheUpdateMasterWork(Configuration conf) {
-      String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL,
-          ObjectStore.class.getName());
+    CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) {
+      this.shouldRunPrewarm = shouldRunPrewarm;
+      String rawStoreClassName =
+          MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
       try {
         rawStore = JavaUtils.getClass(rawStoreClassName, RawStore.class).newInstance();
         rawStore.setConf(conf);
       } catch (InstantiationException | IllegalAccessException | MetaException e) {
         // MetaException here really means ClassNotFound (see the utility method).
         // So, if any of these happen, that means we can never succeed.
-        sharedCacheWrapper.updateInitState(e, true);
         throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
       }
     }
 
     @Override
     public void run() {
-      if (isFirstRun) {
-        while (isFirstRun) {
-          try {
-            long startTime = System.nanoTime();
-            LOG.info("Prewarming CachedStore");
-            prewarm(rawStore);
-            LOG.info("CachedStore initialized");
-            long endTime = System.nanoTime();
-            LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms");
-          } catch (Exception e) {
-            LOG.error("Prewarm failure", e);
-            sharedCacheWrapper.updateInitState(e, false);
-            return;
-          }
-          sharedCacheWrapper.updateInitState(null, false);
-          isFirstRun = false;
-        }
-      } else {
+      if (!shouldRunPrewarm) {
         // TODO: prewarm and update can probably be merged.
         update();
+      } else {
+        try {
+          prewarm(rawStore);
+        } catch (Exception e) {
+          LOG.error("Prewarm failure", e);
+          return;
+        }
       }
     }
 
-    public void update() {
+    void update() {
       Deadline.registerIfNot(1000000);
       LOG.debug("CachedStore: updating cached objects");
+      List<String> dbNames;
       try {
-        List<String> dbNames = rawStore.getAllDatabases();
-        if (dbNames != null) {
-          // Update the database in cache
-          updateDatabases(rawStore, dbNames);
-          for (String dbName : dbNames) {
-            updateDatabasePartitionColStats(rawStore, dbName);
-            // Update the tables in cache
-            updateTables(rawStore, dbName);
-            List<String> tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe());
-            for (String tblName : tblNames) {
-              if (!shouldCacheTable(dbName, tblName)) {
-                continue;
-              }
-              // Update the partitions for a table in cache
-              updateTablePartitions(rawStore, dbName, tblName);
-              // Update the table column stats for a table in cache
-              updateTableColStats(rawStore, dbName, tblName);
-              // Update aggregate column stats cache
-              updateAggregateStatsCache(rawStore, dbName, tblName);
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOG.error("Updating CachedStore: error happen when refresh; ignoring", e);
+        dbNames = rawStore.getAllDatabases();
+      } catch (MetaException e) {
+        LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e);
+        return;
       }
-    }
-
-    private void updateDatabasePartitionColStats(RawStore rawStore, String dbName) {
-      try {
-        Deadline.startTimer("getColStatsForDatabasePartitions");
-        List<ColStatsObjWithSourceInfo> colStatsForDB =
-            rawStore.getPartitionColStatsForDatabase(dbName);
-        Deadline.stopTimer();
-        if (colStatsForDB != null) {
-          if (partitionColStatsCacheLock.writeLock().tryLock()) {
-            // Skip background updates if we detect change
-            if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
-              LOG.debug("Skipping partition column stats cache update; the partition column stats "
-                  + "list we have is dirty.");
-              return;
-            }
-            sharedCacheWrapper.getUnsafe()
-                .refreshPartitionColStats(StringUtils.normalizeIdentifier(dbName), colStatsForDB);
-          }
-        }
-      } catch (MetaException | NoSuchObjectException e) {
-        LOG.info("Updating CachedStore: unable to read partitions column stats of database: {}",
-            dbName, e);
-      } finally {
-        if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) {
-          partitionColStatsCacheLock.writeLock().unlock();
+      // Update the database in cache
+      updateDatabases(rawStore, dbNames);
+      for (String dbName : dbNames) {
+        // Update the tables in cache
+        updateTables(rawStore, dbName);
+        List<String> tblNames;
+        try {
+          tblNames = rawStore.getAllTables(dbName);
+        } catch (MetaException e) {
+          // Continue with next database
+          continue;
         }
-      }
-    }
-
-    // Update cached aggregate stats for all partitions of a table and for all
-    // but default partition
-    private void updateAggregateStatsCache(RawStore rawStore, String dbName, String tblName) {
-      try {
-        Table table = rawStore.getTable(dbName, tblName);
-        List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
-        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
-        if ((partNames != null) && (partNames.size() > 0)) {
-          Deadline.startTimer("getAggregareStatsForAllPartitions");
-          AggrStats aggrStatsAllPartitions =
-              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
-          Deadline.stopTimer();
-          // Remove default partition from partition names and get aggregate stats again
-          List<FieldSchema> partKeys = table.getPartitionKeys();
-          String defaultPartitionValue =
-              MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
-          List<String> partCols = new ArrayList<String>();
-          List<String> partVals = new ArrayList<String>();
-          for (FieldSchema fs : partKeys) {
-            partCols.add(fs.getName());
-            partVals.add(defaultPartitionValue);
-          }
-          String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
-          partNames.remove(defaultPartitionName);
-          Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault");
-          AggrStats aggrStatsAllButDefaultPartition =
-              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
-          Deadline.stopTimer();
-          if ((aggrStatsAllPartitions != null) && (aggrStatsAllButDefaultPartition != null)) {
-            if (partitionAggrColStatsCacheLock.writeLock().tryLock()) {
-              // Skip background updates if we detect change
-              if (isPartitionAggrColStatsCacheDirty.compareAndSet(true, false)) {
-                LOG.debug(
-                    "Skipping aggregate column stats cache update; the aggregate column stats we "
-                        + "have is dirty.");
-                return;
-              }
-              sharedCacheWrapper.getUnsafe().refreshAggregateStatsCache(
-                  StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName),
-                  aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
-            }
+        for (String tblName : tblNames) {
+          if (!shouldCacheTable(dbName, tblName)) {
+            continue;
           }
-        }
-      } catch (MetaException | NoSuchObjectException e) {
-        LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
-            e);
-      } finally {
-        if (partitionAggrColStatsCacheLock.isWriteLockedByCurrentThread()) {
-          partitionAggrColStatsCacheLock.writeLock().unlock();
+          // Update the table column stats for a table in cache
+          updateTableColStats(rawStore, dbName, tblName);
+          // Update the partitions for a table in cache
+          updateTablePartitions(rawStore, dbName, tblName);
+          // Update the partition col stats for a table in cache
+          updateTablePartitionColStats(rawStore, dbName, tblName);
+          // Update aggregate partition column stats for a table in cache
+          updateTableAggregatePartitionColStats(rawStore, dbName, tblName);
         }
       }
+      sharedCache.incrementUpdateCount();
     }
 
     private void updateDatabases(RawStore rawStore, List<String> dbNames) {
-      // Prepare the list of databases
-      List<Database> databases = new ArrayList<>();
+      List<Database> databases = new ArrayList<>(dbNames.size());
       for (String dbName : dbNames) {
         Database db;
         try {
@@ -574,24 +472,9 @@ public class CachedStore implements RawStore, Configurable {
           LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
         }
       }
-      // Update the cached database objects
-      try {
-        if (databaseCacheLock.writeLock().tryLock()) {
-          // Skip background updates if we detect change
-          if (isDatabaseCacheDirty.compareAndSet(true, false)) {
-            LOG.debug("Skipping database cache update; the database list we have is dirty.");
-            return;
-          }
-          sharedCacheWrapper.getUnsafe().refreshDatabases(databases);
-        }
-      } finally {
-        if (databaseCacheLock.isWriteLockedByCurrentThread()) {
-          databaseCacheLock.writeLock().unlock();
-        }
-      }
+      sharedCache.refreshDatabasesInCache(databases);
     }
 
-    // Update the cached table objects
     private void updateTables(RawStore rawStore, String dbName) {
       List<Table> tables = new ArrayList<>();
       try {
@@ -600,81 +483,99 @@ public class CachedStore implements RawStore, Configurable {
           if (!shouldCacheTable(dbName, tblName)) {
             continue;
           }
-          Table table =
-              rawStore.getTable(StringUtils.normalizeIdentifier(dbName),
-                  StringUtils.normalizeIdentifier(tblName));
+          Table table = rawStore.getTable(StringUtils.normalizeIdentifier(dbName),
+              StringUtils.normalizeIdentifier(tblName));
           tables.add(table);
         }
-        if (tableCacheLock.writeLock().tryLock()) {
-          // Skip background updates if we detect change
-          if (isTableCacheDirty.compareAndSet(true, false)) {
-            LOG.debug("Skipping table cache update; the table list we have is dirty.");
-            return;
-          }
-          sharedCacheWrapper.getUnsafe().refreshTables(dbName, tables);
-        }
+        sharedCache.refreshTablesInCache(dbName, tables);
       } catch (MetaException e) {
-        LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e);
-      } finally {
-        if (tableCacheLock.isWriteLockedByCurrentThread()) {
-          tableCacheLock.writeLock().unlock();
+        LOG.debug("Unable to refresh cached tables for database: " + dbName, e);
+      }
+    }
+
+    private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+      try {
+        Table table = rawStore.getTable(dbName, tblName);
+        if (!table.isSetPartitionKeys()) {
+          List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+          Deadline.startTimer("getTableColumnStatistics");
+          ColumnStatistics tableColStats =
+              rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+          Deadline.stopTimer();
+          if (tableColStats != null) {
+            sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(dbName),
+                StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+          }
         }
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Unable to refresh table column stats for table: " + tblName, e);
       }
     }
 
-    // Update the cached partition objects for a table
     private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) {
       try {
         Deadline.startTimer("getPartitions");
         List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
         Deadline.stopTimer();
-        if (partitionCacheLock.writeLock().tryLock()) {
-          // Skip background updates if we detect change
-          if (isPartitionCacheDirty.compareAndSet(true, false)) {
-            LOG.debug("Skipping partition cache update; the partition list we have is dirty.");
-            return;
-          }
-          sharedCacheWrapper.getUnsafe().refreshPartitions(
-              StringUtils.normalizeIdentifier(dbName),
-              StringUtils.normalizeIdentifier(tblName), partitions);
-        }
+        sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(dbName),
+            StringUtils.normalizeIdentifier(tblName), partitions);
       } catch (MetaException | NoSuchObjectException e) {
         LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
-      } finally {
-        if (partitionCacheLock.isWriteLockedByCurrentThread()) {
-          partitionCacheLock.writeLock().unlock();
-        }
       }
     }
 
-    // Update the cached col stats for this table
-    private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+    private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) {
       try {
         Table table = rawStore.getTable(dbName, tblName);
         List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
-        Deadline.startTimer("getTableColumnStatistics");
-        ColumnStatistics tableColStats =
-            rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+        List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
+        // Get partition column stats for this table
+        Deadline.startTimer("getPartitionColumnStatistics");
+        List<ColumnStatistics> partitionColStats =
+            rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
         Deadline.stopTimer();
-        if (tableColStats != null) {
-          if (tableColStatsCacheLock.writeLock().tryLock()) {
-            // Skip background updates if we detect change
-            if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
-              LOG.debug("Skipping table column stats cache update; the table column stats list we "
-                  + "have is dirty.");
-              return;
-            }
-            sharedCacheWrapper.getUnsafe().refreshTableColStats(
-                StringUtils.normalizeIdentifier(dbName),
-                StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+        sharedCache.refreshPartitionColStatsInCache(dbName, tblName, partitionColStats);
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+      }
+    }
+
+    // Update cached aggregate stats for all partitions of a table and for all
+    // but default partition
+    private void updateTableAggregatePartitionColStats(RawStore rawStore, String dbName,
+        String tblName) {
+      try {
+        Table table = rawStore.getTable(dbName, tblName);
+        List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
+        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+        if ((partNames != null) && (partNames.size() > 0)) {
+          Deadline.startTimer("getAggregareStatsForAllPartitions");
+          AggrStats aggrStatsAllPartitions =
+              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+          Deadline.stopTimer();
+          // Remove default partition from partition names and get aggregate stats again
+          List<FieldSchema> partKeys = table.getPartitionKeys();
+          String defaultPartitionValue =
+              MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+          List<String> partCols = new ArrayList<String>();
+          List<String> partVals = new ArrayList<String>();
+          for (FieldSchema fs : partKeys) {
+            partCols.add(fs.getName());
+            partVals.add(defaultPartitionValue);
           }
+          String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+          partNames.remove(defaultPartitionName);
+          Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault");
+          AggrStats aggrStatsAllButDefaultPartition =
+              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+          Deadline.stopTimer();
+          sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(dbName),
+              StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
+              aggrStatsAllButDefaultPartition);
         }
       } catch (MetaException | NoSuchObjectException e) {
-        LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e);
-      } finally {
-        if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) {
-          tableColStatsCacheLock.writeLock().unlock();
-        }
+        LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
+            e);
       }
     }
   }
@@ -712,35 +613,17 @@ public class CachedStore implements RawStore, Configurable {
   @Override
   public void createDatabase(Database db) throws InvalidObjectException, MetaException {
     rawStore.createDatabase(db);
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
-    try {
-      // Wait if background cache update is happening
-      databaseCacheLock.readLock().lock();
-      isDatabaseCacheDirty.set(true);
-      sharedCache.addDatabaseToCache(StringUtils.normalizeIdentifier(db.getName()),
-          db.deepCopy());
-    } finally {
-      databaseCacheLock.readLock().unlock();
-    }
+    sharedCache.addDatabaseToCache(db);
   }
 
   @Override
   public Database getDatabase(String dbName) throws NoSuchObjectException {
-    SharedCache sharedCache;
-
-    if (!sharedCacheWrapper.isInitialized()) {
+    if (!sharedCache.isDatabaseCachePrewarmed()) {
       return rawStore.getDatabase(dbName);
     }
-
-    try {
-      sharedCache = sharedCacheWrapper.get();
-    } catch (MetaException e) {
-      throw new RuntimeException(e); // TODO: why doesn't getDatabase throw MetaEx?
-    }
-    Database db = sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
+    dbName = dbName.toLowerCase();
+    Database db =
+        sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
     if (db == null) {
       throw new NoSuchObjectException();
     }
@@ -748,68 +631,39 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException {
-    boolean succ = rawStore.dropDatabase(dbname);
+  public boolean dropDatabase(String dbName) throws NoSuchObjectException, MetaException {
+    boolean succ = rawStore.dropDatabase(dbName);
     if (succ) {
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        databaseCacheLock.readLock().lock();
-        isDatabaseCacheDirty.set(true);
-        sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbname));
-      } finally {
-        databaseCacheLock.readLock().unlock();
-      }
+      dbName = dbName.toLowerCase();
+      sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
     }
     return succ;
   }
 
   @Override
-  public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException,
-      MetaException {
+  public boolean alterDatabase(String dbName, Database db)
+      throws NoSuchObjectException, MetaException {
     boolean succ = rawStore.alterDatabase(dbName, db);
     if (succ) {
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        databaseCacheLock.readLock().lock();
-        isDatabaseCacheDirty.set(true);
-        sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db);
-      } finally {
-        databaseCacheLock.readLock().unlock();
-      }
+      dbName = dbName.toLowerCase();
+      sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db);
     }
     return succ;
   }
 
   @Override
   public List<String> getDatabases(String pattern) throws MetaException {
-    if (!sharedCacheWrapper.isInitialized()) {
+    if (!sharedCache.isDatabaseCachePrewarmed()) {
       return rawStore.getDatabases(pattern);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    List<String> results = new ArrayList<>();
-    for (String dbName : sharedCache.listCachedDatabases()) {
-      dbName = StringUtils.normalizeIdentifier(dbName);
-      if (CacheUtils.matches(dbName, pattern)) {
-        results.add(dbName);
-      }
-    }
-    return results;
+    return sharedCache.listCachedDatabases(pattern);
   }
 
   @Override
   public List<String> getAllDatabases() throws MetaException {
-    if (!sharedCacheWrapper.isInitialized()) {
+    if (!sharedCache.isDatabaseCachePrewarmed()) {
       return rawStore.getAllDatabases();
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     return sharedCache.listCachedDatabases();
   }
 
@@ -854,24 +708,13 @@ public class CachedStore implements RawStore, Configurable {
     if (!shouldCacheTable(dbName, tblName)) {
       return;
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
     validateTableType(tbl);
-    try {
-      // Wait if background cache update is happening
-      tableCacheLock.readLock().lock();
-      isTableCacheDirty.set(true);
-      sharedCache.addTableToCache(dbName, tblName, tbl);
-    } finally {
-      tableCacheLock.readLock().unlock();
-    }
+    sharedCache.addTableToCache(dbName, tblName, tbl);
   }
 
   @Override
-  public boolean dropTable(String dbName, String tblName) throws MetaException,
-      NoSuchObjectException, InvalidObjectException, InvalidInputException {
+  public boolean dropTable(String dbName, String tblName)
+      throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.dropTable(dbName, tblName);
     if (succ) {
       dbName = StringUtils.normalizeIdentifier(dbName);
@@ -879,28 +722,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      // Remove table
-      try {
-        // Wait if background table cache update is happening
-        tableCacheLock.readLock().lock();
-        isTableCacheDirty.set(true);
-        sharedCache.removeTableFromCache(dbName, tblName);
-      } finally {
-        tableCacheLock.readLock().unlock();
-      }
-      // Remove table col stats
-      try {
-        // Wait if background table col stats cache update is happening
-        tableColStatsCacheLock.readLock().lock();
-        isTableColStatsCacheDirty.set(true);
-        sharedCache.removeTableColStatsFromCache(dbName, tblName);
-      } finally {
-        tableColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.removeTableFromCache(dbName, tblName);
     }
     return succ;
   }
@@ -909,11 +731,14 @@ public class CachedStore implements RawStore, Configurable {
   public Table getTable(String dbName, String tblName) throws MetaException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getTable(dbName, tblName);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // This table is not yet loaded in cache
+      return rawStore.getTable(dbName, tblName);
+    }
     if (tbl != null) {
       tbl.unsetPrivileges();
       tbl.setRewriteEnabled(tbl.isRewriteEnabled());
@@ -930,27 +755,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.addPartitionToCache(dbName, tblName, part);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.addPartitionToCache(dbName, tblName, part);
     }
     return succ;
   }
@@ -965,29 +770,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        for (Partition part : parts) {
-          sharedCache.addPartitionToCache(dbName, tblName, part);
-        }
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.addPartitionsToCache(dbName, tblName, parts);
     }
     return succ;
   }
@@ -1002,30 +785,10 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
-        while (iterator.hasNext()) {
-          Partition part = iterator.next();
-          sharedCache.addPartitionToCache(dbName, tblName, part);
-        }
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
+      PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+      while (iterator.hasNext()) {
+        Partition part = iterator.next();
+        sharedCache.addPartitionToCache(dbName, tblName, part);
       }
     }
     return succ;
@@ -1036,16 +799,13 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getPartition(dbName, tblName, part_vals);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    Partition part =
-        sharedCache.getPartitionFromCache(dbName, tblName, part_vals);
+    Partition part = sharedCache.getPartitionFromCache(dbName, tblName, part_vals);
     if (part == null) {
-      // TODO Manage privileges
-      throw new NoSuchObjectException("partition values=" + part_vals.toString());
+      // The table containing the partition is not yet loaded in cache
+      return rawStore.getPartition(dbName, tblName, part_vals);
     }
     return part;
   }
@@ -1055,10 +815,14 @@ public class CachedStore implements RawStore, Configurable {
       List<String> part_vals) throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.doesPartitionExist(dbName, tblName, part_vals);
+    }
+    Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // The table containing the partition is not yet loaded in cache
       return rawStore.doesPartitionExist(dbName, tblName, part_vals);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     return sharedCache.existPartitionFromCache(dbName, tblName, part_vals);
   }
 
@@ -1072,50 +836,40 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      // Remove partition
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.removePartitionFromCache(dbName, tblName, part_vals);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove partition col stats
-      try {
-        // Wait if background cache update is happening
-        partitionColStatsCacheLock.readLock().lock();
-        isPartitionColStatsCacheDirty.set(true);
-        sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals);
-      } finally {
-        partitionColStatsCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.removePartitionFromCache(dbName, tblName, part_vals);
     }
     return succ;
   }
 
   @Override
+  public void dropPartitions(String dbName, String tblName, List<String> partNames)
+      throws MetaException, NoSuchObjectException {
+    rawStore.dropPartitions(dbName, tblName, partNames);
+    dbName = StringUtils.normalizeIdentifier(dbName);
+    tblName = StringUtils.normalizeIdentifier(tblName);
+    if (!shouldCacheTable(dbName, tblName)) {
+      return;
+    }
+    List<List<String>> partVals = new ArrayList<List<String>>();
+    for (String partName : partNames) {
+      partVals.add(partNameToVals(partName));
+    }
+    sharedCache.removePartitionsFromCache(dbName, tblName, partVals);
+  }
+
+  @Override
   public List<Partition> getPartitions(String dbName, String tblName, int max)
       throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.getPartitions(dbName, tblName, max);
+    }
+    Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // The table containing the partitions is not yet loaded in cache
       return rawStore.getPartitions(dbName, tblName, max);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<Partition> parts = sharedCache.listCachedPartitions(dbName, tblName, max);
     return parts;
   }
@@ -1130,73 +884,20 @@ public class CachedStore implements RawStore, Configurable {
     if (!shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) {
       return;
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
+    Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // The table is not yet loaded in cache
       return;
     }
-
-    if (shouldCacheTable(dbName, newTblName)) {
-      validateTableType(newTable);
-      // Update table cache
-      try {
-        // Wait if background cache update is happening
-        tableCacheLock.readLock().lock();
-        isTableCacheDirty.set(true);
-        sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName),
-            StringUtils.normalizeIdentifier(tblName), newTable);
-      } finally {
-        tableCacheLock.readLock().unlock();
-      }
-      // Update partition cache (key might have changed since table name is a
-      // component of key)
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName),
-            StringUtils.normalizeIdentifier(tblName), newTable);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-    } else {
-      // Remove the table and its cached partitions, stats etc,
-      // since it does not pass the whitelist/blacklist filter.
-      // Remove table
-      try {
-        // Wait if background cache update is happening
-        tableCacheLock.readLock().lock();
-        isTableCacheDirty.set(true);
-        sharedCache.removeTableFromCache(dbName, tblName);
-      } finally {
-        tableCacheLock.readLock().unlock();
-      }
-      // Remove partitions
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.removePartitionsFromCache(dbName, tblName);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove partition col stats
-      try {
-        // Wait if background cache update is happening
-        partitionColStatsCacheLock.readLock().lock();
-        isPartitionColStatsCacheDirty.set(true);
-        sharedCache.removePartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionColStatsCacheLock.readLock().unlock();
-      }
-      // Update aggregate partition col stats keys wherever applicable
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.alterTableInAggrPartitionColStatsCache(dbName, tblName, newTable);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+    if (shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) {
+      // If old table is in the cache and the new table can also be cached
+      sharedCache.alterTableInCache(dbName, tblName, newTable);
+    } else if (!shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) {
+      // If old table is *not* in the cache but the new table can be cached
+      sharedCache.addTableToCache(dbName, newTblName, newTable);
+    } else if (shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) {
+      // If old table is in the cache but the new table *cannot* be cached
+      sharedCache.removeTableFromCache(dbName, tblName);
     }
   }
 
@@ -1208,34 +909,21 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public List<String> getTables(String dbName, String pattern) throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.getTables(dbName, pattern);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    List<String> tableNames = new ArrayList<>();
-    for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
-      if (CacheUtils.matches(table.getTableName(), pattern)) {
-        tableNames.add(table.getTableName());
-      }
-    }
-    return tableNames;
+    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern,
+        (short) -1);
   }
 
   @Override
   public List<String> getTables(String dbName, String pattern, TableType tableType)
       throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.getTables(dbName, pattern, tableType);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    List<String> tableNames = new ArrayList<>();
-    for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
-      if (CacheUtils.matches(table.getTableName(), pattern) &&
-          table.getTableType().equals(tableType.toString())) {
-        tableNames.add(table.getTableName());
-      }
-    }
-    return tableNames;
+    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern,
+        tableType);
   }
 
   @Override
@@ -1248,10 +936,9 @@ public class CachedStore implements RawStore, Configurable {
   public List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes)
       throws MetaException {
     // TODO Check if all required tables are allowed, if so, get it from cache
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.getTableMeta(dbNames, tableNames, tableTypes);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames),
         StringUtils.normalizeIdentifier(tableNames), tableTypes);
   }
@@ -1268,10 +955,9 @@ public class CachedStore implements RawStore, Configurable {
         break;
       }
     }
-    if (!sharedCacheWrapper.isInitialized() || missSomeInCache) {
+    if (!isCachePrewarmed.get() || missSomeInCache) {
       return rawStore.getTableObjectsByName(dbName, tblNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<Table> tables = new ArrayList<>();
     for (String tblName : tblNames) {
       tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1286,38 +972,20 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public List<String> getAllTables(String dbName) throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.getAllTables(dbName);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    return getAllTablesInternal(dbName, sharedCache);
-  }
-
-  private static List<String> getAllTablesInternal(String dbName, SharedCache sharedCache) {
-    List<String> tblNames = new ArrayList<>();
-    for (Table tbl : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
-      tblNames.add(StringUtils.normalizeIdentifier(tbl.getTableName()));
-    }
-    return tblNames;
+    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName));
   }
 
   @Override
   public List<String> listTableNamesByFilter(String dbName, String filter, short max_tables)
       throws MetaException, UnknownDBException {
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.listTableNamesByFilter(dbName, filter, max_tables);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    List<String> tableNames = new ArrayList<>();
-    int count = 0;
-    for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
-      if (CacheUtils.matches(table.getTableName(), filter)
-          && (max_tables == -1 || count < max_tables)) {
-        tableNames.add(table.getTableName());
-        count++;
-      }
-    }
-    return tableNames;
+    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), filter,
+        max_tables);
   }
 
   @Override
@@ -1325,16 +993,19 @@ public class CachedStore implements RawStore, Configurable {
       short max_parts) throws MetaException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.listPartitionNames(dbName, tblName, max_parts);
+    }
+    Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // The table is not yet loaded in cache
       return rawStore.listPartitionNames(dbName, tblName, max_parts);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<String> partitionNames = new ArrayList<>();
-    Table t = sharedCache.getTableFromCache(dbName, tblName);
     int count = 0;
     for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, max_parts)) {
       if (max_parts == -1 || count < max_parts) {
-        partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+        partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()));
       }
     }
     return partitionNames;
@@ -1363,37 +1034,7 @@ public class CachedStore implements RawStore, Configurable {
     if (!shouldCacheTable(dbName, tblName)) {
       return;
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
-    // Update partition cache
-    try {
-      // Wait if background cache update is happening
-      partitionCacheLock.readLock().lock();
-      isPartitionCacheDirty.set(true);
-      sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
-    } finally {
-      partitionCacheLock.readLock().unlock();
-    }
-    // Update partition column stats cache
-    try {
-      // Wait if background cache update is happening
-      partitionColStatsCacheLock.readLock().lock();
-      isPartitionColStatsCacheDirty.set(true);
-      sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart);
-    } finally {
-      partitionColStatsCacheLock.readLock().unlock();
-    }
-    // Remove aggregate partition col stats for this table
-    try {
-      // Wait if background cache update is happening
-      partitionAggrColStatsCacheLock.readLock().lock();
-      isPartitionAggrColStatsCacheDirty.set(true);
-      sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-    } finally {
-      partitionAggrColStatsCacheLock.readLock().unlock();
-    }
+    sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
   }
 
   @Override
@@ -1405,61 +1046,23 @@ public class CachedStore implements RawStore, Configurable {
     if (!shouldCacheTable(dbName, tblName)) {
       return;
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
-    // Update partition cache
-    try {
-      // Wait if background cache update is happening
-      partitionCacheLock.readLock().lock();
-      isPartitionCacheDirty.set(true);
-      for (int i = 0; i < partValsList.size(); i++) {
-        List<String> partVals = partValsList.get(i);
-        Partition newPart = newParts.get(i);
-        sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
-      }
-    } finally {
-      partitionCacheLock.readLock().unlock();
-    }
-    // Update partition column stats cache
-    try {
-      // Wait if background cache update is happening
-      partitionColStatsCacheLock.readLock().lock();
-      isPartitionColStatsCacheDirty.set(true);
-      for (int i = 0; i < partValsList.size(); i++) {
-        List<String> partVals = partValsList.get(i);
-        Partition newPart = newParts.get(i);
-        sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart);
-      }
-    } finally {
-      partitionColStatsCacheLock.readLock().unlock();
-    }
-    // Remove aggregate partition col stats for this table
-    try {
-      // Wait if background cache update is happening
-      partitionAggrColStatsCacheLock.readLock().lock();
-      isPartitionAggrColStatsCacheDirty.set(true);
-      sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-    } finally {
-      partitionAggrColStatsCacheLock.readLock().unlock();
-    }
+    sharedCache.alterPartitionsInCache(dbName, tblName, partValsList, newParts);
   }
 
   private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr,
       String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache)
-          throws MetaException, NoSuchObjectException {
-    List<Partition> parts = sharedCache.listCachedPartitions(
-        StringUtils.normalizeIdentifier(table.getDbName()),
-        StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
+      throws MetaException, NoSuchObjectException {
+    List<Partition> parts =
+        sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getDbName()),
+            StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
     for (Partition part : parts) {
       result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
     }
     if (defaultPartName == null || defaultPartName.isEmpty()) {
       defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
     }
-    return expressionProxy.filterPartitionsByExpr(
-        table.getPartitionKeys(), expr, defaultPartName, result);
+    return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName,
+        result);
   }
 
   @Override
@@ -1474,13 +1077,17 @@ public class CachedStore implements RawStore, Configurable {
       String defaultPartitionName, short maxParts, List<Partition> result) throws TException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts,
           result);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<String> partNames = new LinkedList<>();
     Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
+      return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts,
+          result);
+    }
     boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr,
         defaultPartitionName, maxParts, partNames, sharedCache);
     return hasUnknownPartitions;
@@ -1497,13 +1104,16 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getNumPartitionsByExpr(dbName, tblName, expr);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
     List<String> partNames = new LinkedList<>();
     Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
+      return rawStore.getNumPartitionsByExpr(dbName, tblName, expr);
+    }
     getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames,
         sharedCache);
     return partNames.size();
@@ -1526,10 +1136,14 @@ public class CachedStore implements RawStore, Configurable {
       List<String> partNames) throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.getPartitionsByNames(dbName, tblName, partNames);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.getPartitionsByNames(dbName, tblName, partNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<Partition> partitions = new ArrayList<>();
     for (String partName : partNames) {
       Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName));
@@ -1702,14 +1316,17 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException, InvalidObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     Partition p = sharedCache.getPartitionFromCache(dbName, tblName, partVals);
-    if (p!=null) {
-      Table t = sharedCache.getTableFromCache(dbName, tblName);
-      String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals);
+    if (p != null) {
+      String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals);
       PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
           userName, groupNames);
       p.setPrivileges(privs);
@@ -1723,16 +1340,19 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException, InvalidObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    Table t = sharedCache.getTableFromCache(dbName, tblName);
     List<Partition> partitions = new ArrayList<>();
     int count = 0;
     for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
       if (maxParts == -1 || count < maxParts) {
-        String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+        String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
         PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
             userName, groupNames);
         part.setPrivileges(privs);
@@ -1749,13 +1369,16 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<String> partNames = new ArrayList<>();
     int count = 0;
-    Table t = sharedCache.getTableFromCache(dbName, tblName);
     for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
       boolean psMatch = true;
       for (int i=0;i<partVals.size();i++) {
@@ -1770,7 +1393,7 @@ public class CachedStore implements RawStore, Configurable {
         continue;
       }
       if (maxParts == -1 || count < maxParts) {
-        partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+        partNames.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
         count++;
       }
     }
@@ -1783,13 +1406,17 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, InvalidObjectException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName,
+          groupNames);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName,
           groupNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<Partition> partitions = new ArrayList<>();
-    Table t = sharedCache.getTableFromCache(dbName, tblName);
     int count = 0;
     for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
       boolean psMatch = true;
@@ -1805,7 +1432,7 @@ public class CachedStore implements RawStore, Configurable {
         continue;
       }
       if (maxParts == -1 || count < maxParts) {
-        String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+        String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
         PrincipalPrivilegeSet privs =
             getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames);
         part.setPrivileges(privs);
@@ -1825,35 +1452,19 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
+      Table table = sharedCache.getTableFromCache(dbName, tblName);
+      if (table == null) {
+        // The table is not yet loaded in cache
         return succ;
       }
       List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
-      Table tbl = getTable(dbName, tblName);
       List<String> colNames = new ArrayList<>();
       for (ColumnStatisticsObj statsObj : statsObjs) {
         colNames.add(statsObj.getColName());
       }
-      StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
-      // Update table
-      try {
-        // Wait if background cache update is happening
-        tableCacheLock.readLock().lock();
-        isTableCacheDirty.set(true);
-        sharedCache.alterTableInCache(dbName, tblName, tbl);
-      } finally {
-        tableCacheLock.readLock().unlock();
-      }
-      // Update table col stats
-      try {
-        // Wait if background cache update is happening
-        tableColStatsCacheLock.readLock().lock();
-        isTableColStatsCacheDirty.set(true);
-        sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs);
-      } finally {
-        tableColStatsCacheLock.readLock().unlock();
-      }
+      StatsSetupConst.setColumnStatsState(table.getParameters(), colNames);
+      sharedCache.alterTableInCache(dbName, tblName, table);
+      sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs);
     }
     return succ;
   }
@@ -1863,24 +1474,18 @@ public class CachedStore implements RawStore, Configurable {
       List<String> colNames) throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getTableColumnStatistics(dbName, tblName, colNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
-    List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
-    for (String colName : colNames) {
-      String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName);
-      ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey);
-      if (colStat != null) {
-        colStatObjs.add(colStat);
-      }
-    }
-    if (colStatObjs.isEmpty()) {
-      return null;
-    } else {
-      return new ColumnStatistics(csd, colStatObjs);
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
+      return rawStore.getTableColumnStatistics(dbName, tblName, colNames);
     }
+    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
+    List<ColumnStatisticsObj> colStatObjs =
+        sharedCache.getTableColStatsFromCache(dbName, tblName, colNames);
+    return new ColumnStatistics(csd, colStatObjs);
   }
 
   @Override
@@ -1893,18 +1498,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        tableColStatsCacheLock.readLock().lock();
-        isTableColStatsCacheDirty.set(true);
-        sharedCache.removeTableColStatsFromCache(dbName, tblName, colName);
-      } finally {
-        tableColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.removeTableColStatsFromCache(dbName, tblName, colName);
     }
     return succ;
   }
@@ -1919,10 +1513,6 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
       List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
       Partition part = getPartition(dbName, tblName, partVals);
       List<String> colNames = new ArrayList<>();
@@ -1930,34 +1520,8 @@ public class CachedStore implements RawStore, Configurable {
         colNames.add(statsObj.getColName());
       }
       StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
-      // Update partition
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.alterPartitionInCache(dbName, tblName, partVals, part);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Update partition column stats
-      try {
-        // Wait if background cache update is happening
-        partitionColStatsCacheLock.readLock().lock();
-        isPartitionColStatsCacheDirty.set(true);
-        sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals,
-            colStats.getStatsObj());
-      } finally {
-        partitionColStatsCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.alterPartitionInCache(dbName, tblName, partVals, part);
+      sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, colStats.getStatsObj());
     }
     return succ;
   }
@@ -1981,27 +1545,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        partitionColStatsCacheLock.readLock().lock();
-        isPartitionColStatsCacheDirty.set(true);
-        sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName);
-      } finally {
-        partitionColStatsCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName);
     }
     return succ;
   }
@@ -2012,10 +1556,14 @@ public class CachedStore implements RawStore, Configurable {
     List<ColumnStatisticsObj> colStats;
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
+      return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+    }
     List<String> allPartNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
     if (partNames.size() == allPartNames.size()) {
       colStats = sharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALL);
@@ -2054,10 +1602,8 @@ public class CachedStore implements RawStore, Configurable {
       List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList =
           new ArrayList<ColStatsObjWithSourceInfo>();
       for (String partName : partNames) {
-        String colStatsCacheKey =
-            CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
         ColumnStatisticsObj colStatsForPart =
-            sharedCache.getCachedPartitionColStats(colStatsCacheKey);
+            sharedCache.getPartitionColStatsFromCache(dbName, tblName, partNameToVals(partName), colName);
         if (colStatsForPart != null) {
           ColStatsObjWithSourceInfo colStatsWithPartInfo =
               new ColStatsObjWithSourceInfo(colStatsForPart, dbName, tblName, partName);
@@ -2173,54 +1719,6 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void dropPartitions(String dbName, String tblName, List<String> partNames)
-      throws MetaException, NoSuchObjectException {
-    rawStore.dropPartitions(dbName, tblName, partNames);
-    dbName = StringUtils.normalizeIdentifier(dbName);
-    tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(dbName, tblName)) {
-      return;
-    }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
-    // Remove partitions
-    try {
-      // Wait if background cache update is happening
-      partitionCacheLock.readLock().lock();
-      isPartitionCacheDirty.set(true);
-      for (String partName : partNames) {
-        List<String> vals = partNameToVals(partName);
-        sharedCache.removePartitionFromCache(dbName, tblName, vals);
-      }
-    } finally {
-      partitionCacheLock.readLock().unlock();
-    }
-    // Remove partition col stats
-    try {
-      // Wait if background cache update is happening
-      partitionColStatsCacheLock.readLock().lock();
-      isPartitionColStatsCacheDirty.set(true);
-      for (String partName : partNames) {
-        List<String> part_vals = partNameToVals(partName);
-        sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals);
-      }
-    } finally {
-      partitionColStatsCacheLock.readLock().unlock();
-    }
-   

<TRUNCATED>