You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/03/19 17:54:31 UTC

[2/4] hive git commit: HIVE-18264: CachedStore: Store cached partitions/col stats within the table cache and make prewarm non-blocking (Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index 32ea174..cf92eda 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -21,15 +21,20 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.TreeMap;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -38,11 +43,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,15 +51,21 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 
 public class SharedCache {
-  private Map<String, Database> databaseCache = new TreeMap<>();
-  private Map<String, TableWrapper> tableCache = new TreeMap<>();
-  private Map<String, PartitionWrapper> partitionCache = new TreeMap<>();
-  private Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<>();
-  private Map<String, ColumnStatisticsObj> tableColStatsCache = new TreeMap<>();
-  private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<>();
-  private Map<String, List<ColumnStatisticsObj>> aggrColStatsCache =
-      new HashMap<String, List<ColumnStatisticsObj>>();
+  private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true);
+  // For caching Database objects. Key is database name
+  private Map<String, Database> databaseCache = new ConcurrentHashMap<String, Database>();
+  private boolean isDatabaseCachePrewarmed = false;
+  private HashSet<String> databasesDeletedDuringPrewarm = new HashSet<String>();
+  private AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
+  // For caching TableWrapper objects. Key is aggregate of database name and table name
+  private Map<String, TableWrapper> tableCache = new ConcurrentHashMap<String, TableWrapper>();
+  private boolean isTableCachePrewarmed = false;
+  private HashSet<String> tablesDeletedDuringPrewarm = new HashSet<String>();
+  private AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
+  private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new ConcurrentHashMap<>();
   private static MessageDigest md;
+  static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
+  private AtomicLong cacheUpdateCount = new AtomicLong(0);
 
   static enum StatsType {
     ALL(0), ALLBUTDEFAULT(1);
@@ -74,8 +81,6 @@ public class SharedCache {
     }
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class);
-
   static {
     try {
       md = MessageDigest.getInstance("MD5");
@@ -84,43 +89,804 @@ public class SharedCache {
     }
   }
 
-  public synchronized Database getDatabaseFromCache(String name) {
-    return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null;
+  static class TableWrapper {
+    Table t;
+    String location;
+    Map<String, String> parameters;
+    byte[] sdHash;
+    ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true);
+    // For caching column stats for an unpartitioned table
+    // Key is column name and the value is the col stat object
+    private Map<String, ColumnStatisticsObj> tableColStatsCache =
+        new ConcurrentHashMap<String, ColumnStatisticsObj>();
+    private AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
+    // For caching partition objects
+    // Ket is partition values and the value is a wrapper around the partition object
+    private Map<String, PartitionWrapper> partitionCache = new ConcurrentHashMap<String, PartitionWrapper>();
+    private AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
+    // For caching column stats for a partitioned table
+    // Key is aggregate of partition values, column name and the value is the col stat object
+    private Map<String, ColumnStatisticsObj> partitionColStatsCache =
+        new ConcurrentHashMap<String, ColumnStatisticsObj>();
+    private AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
+    // For caching aggregate column stats for all and all minus default partition
+    // Key is column name and the value is a list of 2 col stat objects
+    // (all partitions and all but default)
+    private Map<String, List<ColumnStatisticsObj>> aggrColStatsCache =
+        new ConcurrentHashMap<String, List<ColumnStatisticsObj>>();
+    private AtomicBoolean isAggrPartitionColStatsCacheDirty = new AtomicBoolean(false);
+
+    TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) {
+      this.t = t;
+      this.sdHash = sdHash;
+      this.location = location;
+      this.parameters = parameters;
+    }
+
+    public Table getTable() {
+      return t;
+    }
+
+    public void setTable(Table t) {
+      this.t = t;
+    }
+
+    public byte[] getSdHash() {
+      return sdHash;
+    }
+
+    public void setSdHash(byte[] sdHash) {
+      this.sdHash = sdHash;
+    }
+
+    public String getLocation() {
+      return location;
+    }
+
+    public void setLocation(String location) {
+      this.location = location;
+    }
+
+    public Map<String, String> getParameters() {
+      return parameters;
+    }
+
+    public void setParameters(Map<String, String> parameters) {
+      this.parameters = parameters;
+    }
+
+    void cachePartition(Partition part, SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+        partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+        isPartitionCacheDirty.set(true);
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    void cachePartitions(List<Partition> parts, SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        for (Partition part : parts) {
+          PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+          partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+          isPartitionCacheDirty.set(true);
+        }
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public Partition getPartition(List<String> partVals, SharedCache sharedCache) {
+      Partition part = null;
+      try {
+        tableLock.readLock().lock();
+        PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
+        if (wrapper == null) {
+          return null;
+        }
+        part = CacheUtils.assemble(wrapper, sharedCache);
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return part;
+    }
+
+    public List<Partition> listPartitions(int max, SharedCache sharedCache) {
+      List<Partition> parts = new ArrayList<>();
+      int count = 0;
+      try {
+        tableLock.readLock().lock();
+        for (PartitionWrapper wrapper : partitionCache.values()) {
+          if (max == -1 || count < max) {
+            parts.add(CacheUtils.assemble(wrapper, sharedCache));
+            count++;
+          }
+        }
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return parts;
+    }
+
+    public boolean containsPartition(List<String> partVals) {
+      boolean containsPart = false;
+      try {
+        tableLock.readLock().lock();
+        containsPart = partitionCache.containsKey(CacheUtils.buildPartitionCacheKey(partVals));
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return containsPart;
+    }
+
+    public Partition removePartition(List<String> partVal, SharedCache sharedCache) {
+      Partition part = null;
+      try {
+        tableLock.writeLock().lock();
+        PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
+        isPartitionCacheDirty.set(true);
+        if (wrapper.getSdHash() != null) {
+          sharedCache.decrSd(wrapper.getSdHash());
+        }
+        part = CacheUtils.assemble(wrapper, sharedCache);
+        // Remove col stats
+        String partialKey = CacheUtils.buildPartitionCacheKey(partVal);
+        Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+            partitionColStatsCache.entrySet().iterator();
+        while (iterator.hasNext()) {
+          Entry<String, ColumnStatisticsObj> entry = iterator.next();
+          String key = entry.getKey();
+          if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+            iterator.remove();
+          }
+        }
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+      return part;
+    }
+
+    public void removePartitions(List<List<String>> partVals, SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        for (List<String> partVal : partVals) {
+          removePartition(partVal, sharedCache);
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void alterPartition(List<String> partVals, Partition newPart, SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        removePartition(partVals, sharedCache);
+        cachePartition(newPart, sharedCache);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void alterPartitions(List<List<String>> partValsList, List<Partition> newParts,
+        SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        for (int i = 0; i < partValsList.size(); i++) {
+          List<String> partVals = partValsList.get(i);
+          Partition newPart = newParts.get(i);
+          alterPartition(partVals, newPart, sharedCache);
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void refreshPartitions(List<Partition> partitions, SharedCache sharedCache) {
+      Map<String, PartitionWrapper> newPartitionCache = new HashMap<String, PartitionWrapper>();
+      try {
+        tableLock.writeLock().lock();
+        for (Partition part : partitions) {
+          if (isPartitionCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping partition cache update for table: " + getTable().getTableName()
+                + "; the partition list we have is dirty.");
+            return;
+          }
+          String key = CacheUtils.buildPartitionCacheKey(part.getValues());
+          PartitionWrapper wrapper = partitionCache.get(key);
+          if (wrapper != null) {
+            if (wrapper.getSdHash() != null) {
+              sharedCache.decrSd(wrapper.getSdHash());
+            }
+          }
+          wrapper = makePartitionWrapper(part, sharedCache);
+          newPartitionCache.put(key, wrapper);
+        }
+        partitionCache = newPartitionCache;
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void updateTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
+      try {
+        tableLock.writeLock().lock();
+        for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+          // Get old stats object if present
+          String key = colStatObj.getColName();
+          ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
+          if (oldStatsObj != null) {
+            // Update existing stat object's field
+            StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+          } else {
+            // No stats exist for this key; add a new object to the cache
+            // TODO: get rid of deepCopy after making sure callers don't use references
+            tableColStatsCache.put(key, colStatObj.deepCopy());
+          }
+        }
+        isTableColStatsCacheDirty.set(true);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void refreshTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
+      Map<String, ColumnStatisticsObj> newTableColStatsCache =
+          new HashMap<String, ColumnStatisticsObj>();
+      try {
+        tableLock.writeLock().lock();
+        for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+          if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping table col stats cache update for table: "
+                + getTable().getTableName() + "; the table col stats list we have is dirty.");
+            return;
+          }
+          String key = colStatObj.getColName();
+          // TODO: get rid of deepCopy after making sure callers don't use references
+          newTableColStatsCache.put(key, colStatObj.deepCopy());
+        }
+        tableColStatsCache = newTableColStatsCache;
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public List<ColumnStatisticsObj> getCachedTableColStats(List<String> colNames) {
+      List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+      try {
+        tableLock.readLock().lock();
+        for (String colName : colNames) {
+          ColumnStatisticsObj colStatObj = tableColStatsCache.get(colName);
+          if (colStatObj != null) {
+            colStatObjs.add(colStatObj);
+          }
+        }
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return colStatObjs;
+    }
+
+    public void removeTableColStats(String colName) {
+      try {
+        tableLock.writeLock().lock();
+        tableColStatsCache.remove(colName);
+        isTableColStatsCacheDirty.set(true);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public ColumnStatisticsObj getPartitionColStats(List<String> partVal, String colName) {
+      try {
+        tableLock.readLock().lock();
+        return partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+      } finally {
+        tableLock.readLock().unlock();
+      }
+    }
+
+    public void updatePartitionColStats(List<String> partVal,
+        List<ColumnStatisticsObj> colStatsObjs) {
+      try {
+        tableLock.writeLock().lock();
+        addPartitionColStatsToCache(partVal, colStatsObjs);
+        isPartitionColStatsCacheDirty.set(true);
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void removePartitionColStats(List<String> partVals, String colName) {
+      try {
+        tableLock.writeLock().lock();
+        partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName));
+        isPartitionColStatsCacheDirty.set(true);
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    private void addPartitionColStatsToCache(List<String> partVal,
+        List<ColumnStatisticsObj> colStatsObjs) {
+      for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+        // Get old stats object if present
+        String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+        ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
+        if (oldStatsObj != null) {
+          // Update existing stat object's field
+          StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+        } else {
+          // No stats exist for this key; add a new object to the cache
+          // TODO: get rid of deepCopy after making sure callers don't use references
+          partitionColStatsCache.put(key, colStatObj.deepCopy());
+        }
+      }
+    }
+
+    public void refreshPartitionColStats(List<ColumnStatistics> partitionColStats) {
+      Map<String, ColumnStatisticsObj> newPartitionColStatsCache =
+          new HashMap<String, ColumnStatisticsObj>();
+      try {
+        tableLock.writeLock().lock();
+        String tableName = StringUtils.normalizeIdentifier(getTable().getTableName());
+        for (ColumnStatistics cs : partitionColStats) {
+          if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping partition column stats cache update for table: "
+                + getTable().getTableName() + "; the partition column stats list we have is dirty");
+            return;
+          }
+          List<String> partVal;
+          try {
+            partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null);
+            List<ColumnStatisticsObj> colStatsObjs = cs.getStatsObj();
+            for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+              if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+                LOG.debug("Skipping partition column stats cache update for table: "
+                    + getTable().getTableName() + "; the partition column list we have is dirty");
+                return;
+              }
+              String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+              newPartitionColStatsCache.put(key, colStatObj.deepCopy());
+            }
+          } catch (MetaException e) {
+            LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
+          }
+        }
+        partitionColStatsCache = newPartitionColStatsCache;
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public List<ColumnStatisticsObj> getAggrPartitionColStats(List<String> colNames,
+        StatsType statsType) {
+      List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
+      try {
+        tableLock.readLock().lock();
+        for (String colName : colNames) {
+          List<ColumnStatisticsObj> colStatList = aggrColStatsCache.get(colName);
+          // If unable to find stats for a column, return null so we can build stats
+          if (colStatList == null) {
+            return null;
+          }
+          ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition());
+          // If unable to find stats for this StatsType, return null so we can build stats
+          if (colStatObj == null) {
+            return null;
+          }
+          colStats.add(colStatObj);
+        }
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return colStats;
+    }
+
+    public void cacheAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
+        AggrStats aggrStatsAllButDefaultPartition) {
+      try {
+        tableLock.writeLock().lock();
+        if (aggrStatsAllPartitions != null) {
+          for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
+            if (statObj != null) {
+              List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
+              aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
+              aggrColStatsCache.put(statObj.getColName(), aggrStats);
+            }
+          }
+        }
+        if (aggrStatsAllButDefaultPartition != null) {
+          for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
+            if (statObj != null) {
+              List<ColumnStatisticsObj> aggrStats = aggrColStatsCache.get(statObj.getColName());
+              if (aggrStats == null) {
+                aggrStats = new ArrayList<ColumnStatisticsObj>();
+              }
+              aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+            }
+          }
+        }
+        isAggrPartitionColStatsCacheDirty.set(true);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
+        AggrStats aggrStatsAllButDefaultPartition) {
+      Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
+          new HashMap<String, List<ColumnStatisticsObj>>();
+      try {
+        tableLock.writeLock().lock();
+        if (aggrStatsAllPartitions != null) {
+          for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
+            if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+              LOG.debug("Skipping aggregate stats cache update for table: "
+                  + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+              return;
+            }
+            if (statObj != null) {
+              List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
+              aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
+              newAggrColStatsCache.put(statObj.getColName(), aggrStats);
+            }
+          }
+        }
+        if (aggrStatsAllButDefaultPartition != null) {
+          for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
+            if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+              LOG.debug("Skipping aggregate stats cache update for table: "
+                  + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+              return;
+            }
+            if (statObj != null) {
+              List<ColumnStatisticsObj> aggrStats = newAggrColStatsCache.get(statObj.getColName());
+              if (aggrStats == null) {
+                aggrStats = new ArrayList<ColumnStatisticsObj>();
+              }
+              aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+            }
+          }
+        }
+        aggrColStatsCache = newAggrColStatsCache;
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    private void updateTableObj(Table newTable, SharedCache sharedCache) {
+      byte[] sdHash = getSdHash();
+      // Remove old table object's sd hash
+      if (sdHash != null) {
+        sharedCache.decrSd(sdHash);
+      }
+      Table tblCopy = newTable.deepCopy();
+      if (tblCopy.getPartitionKeys() != null) {
+        for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+          fs.setName(StringUtils.normalizeIdentifier(fs.getName()));
+        }
+      }
+      setTable(tblCopy);
+      if (tblCopy.getSd() != null) {
+        sdHash = MetaStoreUtils.hashStorageDescriptor(tblCopy.getSd(), md);
+        StorageDescriptor sd = tblCopy.getSd();
+        sharedCache.increSd(sd, sdHash);
+        tblCopy.setSd(null);
+        setSdHash(sdHash);
+        setLocation(sd.getLocation());
+        setParameters(sd.getParameters());
+      } else {
+        setSdHash(null);
+        setLocation(null);
+        setParameters(null);
+      }
+    }
+
+    private PartitionWrapper makePartitionWrapper(Partition part, SharedCache sharedCache) {
+      Partition partCopy = part.deepCopy();
+      PartitionWrapper wrapper;
+      if (part.getSd() != null) {
+        byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
+        StorageDescriptor sd = part.getSd();
+        sharedCache.increSd(sd, sdHash);
+        partCopy.setSd(null);
+        wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
+      } else {
+        wrapper = new PartitionWrapper(partCopy, null, null, null);
+      }
+      return wrapper;
+    }
+  }
+
+  static class PartitionWrapper {
+    Partition p;
+    String location;
+    Map<String, String> parameters;
+    byte[] sdHash;
+
+    PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
+      this.p = p;
+      this.sdHash = sdHash;
+      this.location = location;
+      this.parameters = parameters;
+    }
+
+    public Partition getPartition() {
+      return p;
+    }
+
+    public byte[] getSdHash() {
+      return sdHash;
+    }
+
+    public String getLocation() {
+      return location;
+    }
+
+    public Map<String, String> getParameters() {
+      return parameters;
+    }
+  }
+
+  static class StorageDescriptorWrapper {
+    StorageDescriptor sd;
+    int refCount = 0;
+
+    StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
+      this.sd = sd;
+      this.refCount = refCount;
+    }
+
+    public StorageDescriptor getSd() {
+      return sd;
+    }
+
+    public int getRefCount() {
+      return refCount;
+    }
+  }
+
+  public Database getDatabaseFromCache(String name) {
+    Database db = null;
+    try {
+      cacheLock.readLock().lock();
+      if (databaseCache.get(name) != null) {
+        db = databaseCache.get(name).deepCopy();
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return db;
+  }
+
+  public void populateDatabasesInCache(List<Database> databases) {
+    for (Database db : databases) {
+      Database dbCopy = db.deepCopy();
+      // ObjectStore also stores db name in lowercase
+      dbCopy.setName(dbCopy.getName().toLowerCase());
+      try {
+        cacheLock.writeLock().lock();
+        // Since we allow write operations on cache while prewarm is happening:
+        // 1. Don't add databases that were deleted while we were preparing list for prewarm
+        // 2. Skip overwriting exisiting db object
+        // (which is present because it was added after prewarm started)
+        if (databasesDeletedDuringPrewarm.contains(dbCopy.getName().toLowerCase())) {
+          continue;
+        }
+        databaseCache.putIfAbsent(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy);
+        databasesDeletedDuringPrewarm.clear();
+        isDatabaseCachePrewarmed = true;
+      } finally {
+        cacheLock.writeLock().unlock();
+      }
+    }
   }
 
-  public synchronized void addDatabaseToCache(String dbName, Database db) {
-    Database dbCopy = db.deepCopy();
-    dbCopy.setName(StringUtils.normalizeIdentifier(dbName));
-    databaseCache.put(dbName, dbCopy);
+  public boolean isDatabaseCachePrewarmed() {
+    return isDatabaseCachePrewarmed;
   }
 
-  public synchronized void removeDatabaseFromCache(String dbName) {
-    databaseCache.remove(dbName);
+  public void addDatabaseToCache(Database db) {
+    try {
+      cacheLock.writeLock().lock();
+      Database dbCopy = db.deepCopy();
+      // ObjectStore also stores db name in lowercase
+      dbCopy.setName(dbCopy.getName().toLowerCase());
+      databaseCache.put(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy);
+      isDatabaseCacheDirty.set(true);
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
   }
 
-  public synchronized List<String> listCachedDatabases() {
-    return new ArrayList<>(databaseCache.keySet());
+  public void removeDatabaseFromCache(String dbName) {
+    try {
+      cacheLock.writeLock().lock();
+      // If db cache is not yet prewarmed, add this to a set which the prewarm thread can check
+      // so that the prewarm thread does not add it back
+      if (!isDatabaseCachePrewarmed) {
+        databasesDeletedDuringPrewarm.add(dbName.toLowerCase());
+      }
+      if (databaseCache.remove(dbName) != null) {
+        isDatabaseCacheDirty.set(true);
+      }
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
+  public List<String> listCachedDatabases() {
+    List<String> results = new ArrayList<>();
+    try {
+      cacheLock.readLock().lock();
+      results.addAll(databaseCache.keySet());
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return results;
+  }
+
+  public List<String> listCachedDatabases(String pattern) {
+    List<String> results = new ArrayList<>();
+    try {
+      cacheLock.readLock().lock();
+      for (String dbName : databaseCache.keySet()) {
+        dbName = StringUtils.normalizeIdentifier(dbName);
+        if (CacheUtils.matches(dbName, pattern)) {
+          results.add(dbName);
+        }
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return results;
   }
 
-  public synchronized void alterDatabaseInCache(String dbName, Database newDb) {
-    removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
-    addDatabaseToCache(StringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy());
+  /**
+   * Replaces the old db object with the new one.
+   * This will add the new database to cache if it does not exist.
+   * @param dbName
+   * @param newDb
+   */
+  public void alterDatabaseInCache(String dbName, Database newDb) {
+    try {
+      cacheLock.writeLock().lock();
+      removeDatabaseFromCache(dbName);
+      addDatabaseToCache(newDb.deepCopy());
+      isDatabaseCacheDirty.set(true);
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
+  public void refreshDatabasesInCache(List<Database> databases) {
+    try {
+      cacheLock.writeLock().lock();
+      if (isDatabaseCacheDirty.compareAndSet(true, false)) {
+        LOG.debug("Skipping database cache update; the database list we have is dirty.");
+        return;
+      }
+      databaseCache.clear();
+      for (Database db : databases) {
+        addDatabaseToCache(db);
+      }
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
+  public int getCachedDatabaseCount() {
+    try {
+      cacheLock.readLock().lock();
+      return databaseCache.size();
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+  }
+
+  public void populateTableInCache(Table table, ColumnStatistics tableColStats,
+      List<Partition> partitions, List<ColumnStatistics> partitionColStats,
+      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+    String dbName = StringUtils.normalizeIdentifier(table.getDbName());
+    String tableName = StringUtils.normalizeIdentifier(table.getTableName());
+    // Since we allow write operations on cache while prewarm is happening:
+    // 1. Don't add tables that were deleted while we were preparing list for prewarm
+    if (tablesDeletedDuringPrewarm.contains(CacheUtils.buildTableCacheKey(dbName, tableName))) {
+      return;
+    }
+    TableWrapper tblWrapper = createTableWrapper(dbName, tableName, table);
+    if (!table.isSetPartitionKeys() && (tableColStats != null)) {
+      tblWrapper.updateTableColStats(tableColStats.getStatsObj());
+    } else {
+      if (partitions != null) {
+        tblWrapper.cachePartitions(partitions, this);
+      }
+      if (partitionColStats != null) {
+        for (ColumnStatistics cs : partitionColStats) {
+          List<String> partVal;
+          try {
+            partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null);
+            List<ColumnStatisticsObj> colStats = cs.getStatsObj();
+            tblWrapper.updatePartitionColStats(partVal, colStats);
+          } catch (MetaException e) {
+            LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
+          }
+        }
+      }
+      tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
+          aggrStatsAllButDefaultPartition);
+    }
+    try {
+      cacheLock.writeLock().lock();
+      // 2. Skip overwriting exisiting table object
+      // (which is present because it was added after prewarm started)
+      tableCache.putIfAbsent(CacheUtils.buildTableCacheKey(dbName, tableName), tblWrapper);
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
   }
 
-  public synchronized int getCachedDatabaseCount() {
-    return databaseCache.size();
+  public void completeTableCachePrewarm() {
+    try {
+      cacheLock.writeLock().lock();
+      tablesDeletedDuringPrewarm.clear();
+      isTableCachePrewarmed = true;
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
   }
 
-  public synchronized Table getTableFromCache(String dbName, String tableName) {
-    TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName));
-    if (tblWrapper == null) {
-      return null;
+  public Table getTableFromCache(String dbName, String tableName) {
+    Table t = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+      if (tblWrapper != null) {
+        t = CacheUtils.assemble(tblWrapper, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    Table t = CacheUtils.assemble(tblWrapper, this);
     return t;
   }
 
-  public synchronized void addTableToCache(String dbName, String tblName, Table tbl) {
+  public TableWrapper addTableToCache(String dbName, String tblName, Table tbl) {
+    try {
+      cacheLock.writeLock().lock();
+      TableWrapper wrapper = createTableWrapper(dbName, tblName, tbl);
+      tableCache.put(CacheUtils.buildTableCacheKey(dbName, tblName), wrapper);
+      isTableCacheDirty.set(true);
+      return wrapper;
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
+  private TableWrapper createTableWrapper(String dbName, String tblName, Table tbl) {
+    TableWrapper wrapper;
     Table tblCopy = tbl.deepCopy();
     tblCopy.setDbName(StringUtils.normalizeIdentifier(dbName));
     tblCopy.setTableName(StringUtils.normalizeIdentifier(tblName));
@@ -129,7 +895,6 @@ public class SharedCache {
         fs.setName(StringUtils.normalizeIdentifier(fs.getName()));
       }
     }
-    TableWrapper wrapper;
     if (tbl.getSd() != null) {
       byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md);
       StorageDescriptor sd = tbl.getSd();
@@ -139,482 +904,452 @@ public class SharedCache {
     } else {
       wrapper = new TableWrapper(tblCopy, null, null, null);
     }
-    tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper);
+    return wrapper;
   }
 
-  public synchronized void removeTableFromCache(String dbName, String tblName) {
-    TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName));
-    byte[] sdHash = tblWrapper.getSdHash();
-    if (sdHash!=null) {
-      decrSd(sdHash);
+  public void removeTableFromCache(String dbName, String tblName) {
+    try {
+      cacheLock.writeLock().lock();
+      // If table cache is not yet prewarmed, add this to a set which the prewarm thread can check
+      // so that the prewarm thread does not add it back
+      if (!isTableCachePrewarmed) {
+        tablesDeletedDuringPrewarm.add(CacheUtils.buildTableCacheKey(dbName, tblName));
+      }
+      TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildTableCacheKey(dbName, tblName));
+      byte[] sdHash = tblWrapper.getSdHash();
+      if (sdHash != null) {
+        decrSd(sdHash);
+      }
+      isTableCacheDirty.set(true);
+    } finally {
+      cacheLock.writeLock().unlock();
     }
   }
 
-  public synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) {
-    return tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null;
-  }
-
-  public synchronized void removeTableColStatsFromCache(String dbName, String tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        tableColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public void alterTableInCache(String dbName, String tblName, Table newTable) {
+    try {
+      cacheLock.writeLock().lock();
+      TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.updateTableObj(newTable, this);
+        String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
+        String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName());
+        tableCache.put(CacheUtils.buildTableCacheKey(newDbName, newTblName), tblWrapper);
+        isTableCacheDirty.set(true);
       }
+    } finally {
+      cacheLock.writeLock().unlock();
     }
   }
 
-  public synchronized void removeTableColStatsFromCache(String dbName, String tblName,
-      String colName) {
-    if (colName == null) {
-      removeTableColStatsFromCache(dbName, tblName);
-    } else {
-      tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName));
+  public List<Table> listCachedTables(String dbName) {
+    List<Table> tables = new ArrayList<>();
+    try {
+      cacheLock.readLock().lock();
+      for (TableWrapper wrapper : tableCache.values()) {
+        if (wrapper.getTable().getDbName().equals(dbName)) {
+          tables.add(CacheUtils.assemble(wrapper, this));
+        }
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return tables;
   }
 
-  public synchronized void updateTableColStatsInCache(String dbName, String tableName,
-      List<ColumnStatisticsObj> colStatsForTable) {
-    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
-      // Get old stats object if present
-      String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
-      ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
-      if (oldStatsObj != null) {
-        LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName()
-            + ", of table: " + tableName + " and database: " + dbName);
-        // Update existing stat object's field
-        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
-      } else {
-        // No stats exist for this key; add a new object to the cache
-        tableColStatsCache.put(key, colStatObj);
+  public List<String> listCachedTableNames(String dbName) {
+    List<String> tableNames = new ArrayList<>();
+    try {
+      cacheLock.readLock().lock();
+      for (TableWrapper wrapper : tableCache.values()) {
+        if (wrapper.getTable().getDbName().equals(dbName)) {
+          tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+        }
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return tableNames;
   }
 
-  public synchronized void alterTableInCache(String dbName, String tblName, Table newTable) {
-    removeTableFromCache(dbName, tblName);
-    addTableToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
-        StringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+  public List<String> listCachedTableNames(String dbName, String pattern, short maxTables) {
+    List<String> tableNames = new ArrayList<String>();
+    try {
+      cacheLock.readLock().lock();
+      int count = 0;
+      for (TableWrapper wrapper : tableCache.values()) {
+        if ((wrapper.getTable().getDbName().equals(dbName))
+            && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+            && (maxTables == -1 || count < maxTables)) {
+          tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+          count++;
+        }
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return tableNames;
   }
 
-  public synchronized void alterTableInPartitionCache(String dbName, String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
-      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
-      for (Partition part : partitions) {
-        removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues());
-        part.setDbName(StringUtils.normalizeIdentifier(newTable.getDbName()));
-        part.setTableName(StringUtils.normalizeIdentifier(newTable.getTableName()));
-        addPartitionToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
-            StringUtils.normalizeIdentifier(newTable.getTableName()), part);
+  public List<String> listCachedTableNames(String dbName, String pattern, TableType tableType) {
+    List<String> tableNames = new ArrayList<String>();
+    try {
+      cacheLock.readLock().lock();
+      for (TableWrapper wrapper : tableCache.values()) {
+        if ((wrapper.getTable().getDbName().equals(dbName))
+            && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+            && wrapper.getTable().getTableType().equals(tableType.toString())) {
+          tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+        }
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return tableNames;
   }
 
-  public synchronized void alterTableInTableColStatsCache(String dbName, String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
-      String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-      Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-          tableColStatsCache.entrySet().iterator();
-      Map<String, ColumnStatisticsObj> newTableColStats =
-          new HashMap<>();
-      while (iterator.hasNext()) {
-        Entry<String, ColumnStatisticsObj> entry = iterator.next();
-        String key = entry.getKey();
-        ColumnStatisticsObj colStatObj = entry.getValue();
-        if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) {
-          String[] decomposedKey = CacheUtils.splitTableColStats(key);
-          String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]);
-          newTableColStats.put(newKey, colStatObj);
-          iterator.remove();
+  public void refreshTablesInCache(String dbName, List<Table> tables) {
+    try {
+      cacheLock.writeLock().lock();
+      if (isTableCacheDirty.compareAndSet(true, false)) {
+        LOG.debug("Skipping table cache update; the table list we have is dirty.");
+        return;
+      }
+      Map<String, TableWrapper> newTableCache = new HashMap<String, TableWrapper>();
+      for (Table tbl : tables) {
+        String tblName = StringUtils.normalizeIdentifier(tbl.getTableName());
+        TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+        if (tblWrapper != null) {
+          tblWrapper.updateTableObj(tbl, this);
+        } else {
+          tblWrapper = createTableWrapper(dbName, tblName, tbl);
         }
+        newTableCache.put(CacheUtils.buildTableCacheKey(dbName, tblName), tblWrapper);
       }
-      tableColStatsCache.putAll(newTableColStats);
+      tableCache.clear();
+      tableCache = newTableCache;
+    } finally {
+      cacheLock.writeLock().unlock();
     }
   }
 
-  public synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
-      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
-      Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
-      for (Partition part : partitions) {
-        String oldPartialPartitionKey =
-            CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues());
-        Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-            partitionColStatsCache.entrySet().iterator();
-        while (iterator.hasNext()) {
-          Entry<String, ColumnStatisticsObj> entry = iterator.next();
-          String key = entry.getKey();
-          ColumnStatisticsObj colStatObj = entry.getValue();
-          if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
-            Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
-            // New key has the new table name
-            String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(),
-                (List<String>) decomposedKey[2], (String) decomposedKey[3]);
-            newPartitionColStats.put(newKey, colStatObj);
-            iterator.remove();
-          }
-        }
+  public List<ColumnStatisticsObj> getTableColStatsFromCache(String dbName, String tblName,
+      List<String> colNames) {
+    List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        colStatObjs = tblWrapper.getCachedTableColStats(colNames);
       }
-      partitionColStatsCache.putAll(newPartitionColStats);
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return colStatObjs;
   }
 
-  public synchronized void alterTableInAggrPartitionColStatsCache(String dbName, String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
-      Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
-          new HashMap<String, List<ColumnStatisticsObj>>();
-      String oldPartialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-      Iterator<Entry<String, List<ColumnStatisticsObj>>> iterator =
-          aggrColStatsCache.entrySet().iterator();
-      while (iterator.hasNext()) {
-        Entry<String, List<ColumnStatisticsObj>> entry = iterator.next();
-        String key = entry.getKey();
-        List<ColumnStatisticsObj> value = entry.getValue();
-        if (key.toLowerCase().startsWith(oldPartialKey.toLowerCase())) {
-          Object[] decomposedKey = CacheUtils.splitAggrColStats(key);
-          // New key has the new table name
-          String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(),
-              (String) decomposedKey[2]);
-          newAggrColStatsCache.put(newKey, value);
-          iterator.remove();
-        }
+  public void removeTableColStatsFromCache(String dbName, String tblName, String colName) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.removeTableColStats(colName);
       }
-      aggrColStatsCache.putAll(newAggrColStatsCache);
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  public synchronized int getCachedTableCount() {
-    return tableCache.size();
+  public void updateTableColStatsInCache(String dbName, String tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+      if (tblWrapper != null) {
+        tblWrapper.updateTableColStats(colStatsForTable);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
   }
 
-  public synchronized List<Table> listCachedTables(String dbName) {
-    List<Table> tables = new ArrayList<>();
-    for (TableWrapper wrapper : tableCache.values()) {
-      if (wrapper.getTable().getDbName().equals(dbName)) {
-        tables.add(CacheUtils.assemble(wrapper, this));
+  public void refreshTableColStatsInCache(String dbName, String tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+      if (tblWrapper != null) {
+        tblWrapper.refreshTableColStats(colStatsForTable);
       }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+  }
+
+  public int getCachedTableCount() {
+    try {
+      cacheLock.readLock().lock();
+      return tableCache.size();
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    return tables;
   }
 
-  public synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) {
+  public List<TableMeta> getTableMeta(String dbNames, String tableNames,
+      List<String> tableTypes) {
     List<TableMeta> tableMetas = new ArrayList<>();
-    for (String dbName : listCachedDatabases()) {
-      if (CacheUtils.matches(dbName, dbNames)) {
-        for (Table table : listCachedTables(dbName)) {
-          if (CacheUtils.matches(table.getTableName(), tableNames)) {
-            if (tableTypes==null || tableTypes.contains(table.getTableType())) {
-              TableMeta metaData = new TableMeta(
-                  dbName, table.getTableName(), table.getTableType());
+    try {
+      cacheLock.readLock().lock();
+      for (String dbName : listCachedDatabases()) {
+        if (CacheUtils.matches(dbName, dbNames)) {
+          for (Table table : listCachedTables(dbName)) {
+            if (CacheUtils.matches(table.getTableName(), tableNames)) {
+              if (tableTypes == null || tableTypes.contains(table.getTableType())) {
+                TableMeta metaData =
+                    new TableMeta(dbName, table.getTableName(), table.getTableType());
                 metaData.setComments(table.getParameters().get("comment"));
                 tableMetas.add(metaData);
+              }
             }
           }
         }
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
     return tableMetas;
   }
 
-  public synchronized void addPartitionToCache(String dbName, String tblName, Partition part) {
-    Partition partCopy = part.deepCopy();
-    PartitionWrapper wrapper;
-    if (part.getSd()!=null) {
-      byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
-      StorageDescriptor sd = part.getSd();
-      increSd(sd, sdHash);
-      partCopy.setSd(null);
-      wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
-    } else {
-      wrapper = new PartitionWrapper(partCopy, null, null, null);
+  public void addPartitionToCache(String dbName, String tblName, Partition part) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.cachePartition(part, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper);
   }
 
-  public synchronized Partition getPartitionFromCache(String key) {
-    PartitionWrapper wrapper = partitionCache.get(key);
-    if (wrapper == null) {
-      return null;
+  public void addPartitionsToCache(String dbName, String tblName, List<Partition> parts) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.cachePartitions(parts, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    Partition p = CacheUtils.assemble(wrapper, this);
-    return p;
-  }
-
-  public synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
-    return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals));
   }
 
-  public synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
-    return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals));
-  }
-
-  public synchronized Partition removePartitionFromCache(String dbName, String tblName,
-      List<String> part_vals) {
-    PartitionWrapper wrapper =
-        partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
-    if (wrapper.getSdHash() != null) {
-      decrSd(wrapper.getSdHash());
+  public Partition getPartitionFromCache(String dbName, String tblName,
+      List<String> partVals) {
+    Partition part = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        part = tblWrapper.getPartition(partVals, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    return wrapper.getPartition();
+    return part;
   }
 
-  /**
-   * Given a db + table, remove all partitions for this table from the cache
-   * @param dbName
-   * @param tblName
-   * @return
-   */
-  public synchronized void removePartitionsFromCache(String dbName, String tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, PartitionWrapper> entry = iterator.next();
-      String key = entry.getKey();
-      PartitionWrapper wrapper = entry.getValue();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
-        if (wrapper.getSdHash() != null) {
-          decrSd(wrapper.getSdHash());
-        }
+  public boolean existPartitionFromCache(String dbName, String tblName, List<String> partVals) {
+    boolean existsPart = false;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        existsPart = tblWrapper.containsPartition(partVals);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return existsPart;
   }
 
-  // Remove cached column stats for all partitions of all tables in a db
-  public synchronized void removePartitionColStatsFromCache(String dbName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public Partition removePartitionFromCache(String dbName, String tblName,
+      List<String> partVals) {
+    Partition part = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        part = tblWrapper.removePartition(partVals, this);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return part;
   }
 
-  // Remove cached column stats for all partitions of a table
-  public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public void removePartitionsFromCache(String dbName, String tblName,
+      List<List<String>> partVals) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.removePartitions(partVals, this);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  // Remove cached column stats for a particular partition of a table
-  public synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
-      List<String> partVals) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
+    List<Partition> parts = new ArrayList<Partition>();
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        parts = tblWrapper.listPartitions(max, this);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return parts;
   }
 
-  // Remove cached column stats for a particular partition and a particular column of a table
-  public synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
-      List<String> partVals, String colName) {
-    partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName));
+  public void alterPartitionInCache(String dbName, String tblName, List<String> partVals,
+      Partition newPart) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.alterPartition(partVals, newPart, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
   }
 
-  public synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
-    List<Partition> partitions = new ArrayList<>();
-    int count = 0;
-    for (PartitionWrapper wrapper : partitionCache.values()) {
-      if (wrapper.getPartition().getDbName().equals(dbName)
-          && wrapper.getPartition().getTableName().equals(tblName)
-          && (max == -1 || count < max)) {
-        partitions.add(CacheUtils.assemble(wrapper, this));
-        count++;
+  public void alterPartitionsInCache(String dbName, String tblName, List<List<String>> partValsList,
+      List<Partition> newParts) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.alterPartitions(partValsList, newParts, this);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    return partitions;
   }
 
-  public synchronized void alterPartitionInCache(String dbName, String tblName,
-      List<String> partVals, Partition newPart) {
-    removePartitionFromCache(dbName, tblName, partVals);
-    addPartitionToCache(StringUtils.normalizeIdentifier(newPart.getDbName()),
-        StringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
+  public void refreshPartitionsInCache(String dbName, String tblName, List<Partition> partitions) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.refreshPartitions(partitions, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
   }
 
-  public synchronized void alterPartitionInColStatsCache(String dbName, String tblName,
-      List<String> partVals, Partition newPart) {
-    String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
-    Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      ColumnStatisticsObj colStatObj = entry.getValue();
-      if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
-        Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
-        String newKey =
-            CacheUtils.buildKey(StringUtils.normalizeIdentifier(newPart.getDbName()),
-                StringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(),
-                (String) decomposedKey[3]);
-        newPartitionColStats.put(newKey, colStatObj);
-        iterator.remove();
+  public void removePartitionColStatsFromCache(String dbName, String tblName,
+      List<String> partVals, String colName) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.removePartitionColStats(partVals, colName);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    partitionColStatsCache.putAll(newPartitionColStats);
   }
 
-  public synchronized void updatePartitionColStatsInCache(String dbName, String tableName,
+  public void updatePartitionColStatsInCache(String dbName, String tableName,
       List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
-    for (ColumnStatisticsObj colStatObj : colStatsObjs) {
-      // Get old stats object if present
-      String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
-      ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
-      if (oldStatsObj != null) {
-        // Update existing stat object's field
-        LOG.debug("CachedStore: updating partition column stats for column: "
-            + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName);
-        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
-      } else {
-        // No stats exist for this key; add a new object to the cache
-        partitionColStatsCache.put(key, colStatObj);
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+      if (tblWrapper != null) {
+        tblWrapper.updatePartitionColStats(partVals, colStatsObjs);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  public synchronized int getCachedPartitionCount() {
-    return partitionCache.size();
-  }
-
-  public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) {
-    return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null;
-  }
-
-  public synchronized void addPartitionColStatsToCache(
-      List<ColStatsObjWithSourceInfo> colStatsForDB) {
-    for (ColStatsObjWithSourceInfo colStatWithSourceInfo : colStatsForDB) {
-      List<String> partVals;
-      try {
-        partVals = Warehouse.getPartValuesFromPartName(colStatWithSourceInfo.getPartName());
-        ColumnStatisticsObj colStatObj = colStatWithSourceInfo.getColStatsObj();
-        String key = CacheUtils.buildKey(colStatWithSourceInfo.getDbName(),
-            colStatWithSourceInfo.getTblName(), partVals, colStatObj.getColName());
-        partitionColStatsCache.put(key, colStatObj);
-      } catch (MetaException e) {
-        LOG.info("Unable to add partition stats for: {} to SharedCache",
-            colStatWithSourceInfo.getPartName(), e);
+  public ColumnStatisticsObj getPartitionColStatsFromCache(String dbName, String tblName,
+      List<String> partVal, String colName) {
+    ColumnStatisticsObj colStatObj = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null){
+        colStatObj = tblWrapper.getPartitionColStats(partVal, colName);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-
-  }
-
-  public synchronized void refreshPartitionColStats(String dbName,
-      List<ColStatsObjWithSourceInfo> colStatsForDB) {
-    LOG.debug("CachedStore: updating cached partition column stats objects for database: {}",
-        dbName);
-    removePartitionColStatsFromCache(dbName);
-    addPartitionColStatsToCache(colStatsForDB);
+    return colStatObj;
   }
 
-  public synchronized void addAggregateStatsToCache(String dbName, String tblName,
-      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
-    if (aggrStatsAllPartitions != null) {
-      for (ColumnStatisticsObj colStatObj : aggrStatsAllPartitions.getColStats()) {
-        String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName());
-        List<ColumnStatisticsObj> value = new ArrayList<ColumnStatisticsObj>();
-        value.add(StatsType.ALL.getPosition(), colStatObj);
-        aggrColStatsCache.put(key, value);
-      }
-    }
-    if (aggrStatsAllButDefaultPartition != null) {
-      for (ColumnStatisticsObj colStatObj : aggrStatsAllButDefaultPartition.getColStats()) {
-        String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName());
-        List<ColumnStatisticsObj> value = aggrColStatsCache.get(key);
-        if ((value != null) && (value.size() > 0)) {
-          value.add(StatsType.ALLBUTDEFAULT.getPosition(), colStatObj);
-        }
+  public void refreshPartitionColStatsInCache(String dbName, String tblName,
+      List<ColumnStatistics> partitionColStats) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.refreshPartitionColStats(partitionColStats);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
   public List<ColumnStatisticsObj> getAggrStatsFromCache(String dbName, String tblName,
       List<String> colNames, StatsType statsType) {
-    List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
-    for (String colName : colNames) {
-      String key = CacheUtils.buildKey(dbName, tblName, colName);
-      List<ColumnStatisticsObj> colStatList = aggrColStatsCache.get(key);
-      // If unable to find stats for a column, return null so we can build stats
-      if (colStatList == null) {
-        return null;
-      }
-      ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition());
-      // If unable to find stats for this StatsType, return null so we can build
-      // stats
-      if (colStatObj == null) {
-        return null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        return tblWrapper.getAggrPartitionColStats(colNames, statsType);
       }
-      colStats.add(colStatObj);
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    return colStats;
+    return null;
   }
 
-  public synchronized void removeAggrPartitionColStatsFromCache(String dbName, String tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, List<ColumnStatisticsObj>>> iterator =
-        aggrColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, List<ColumnStatisticsObj>> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public void addAggregateStatsToCache(String dbName, String tblName,
+      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null){
+        tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
+            aggrStatsAllButDefaultPartition);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  public synchronized void refreshAggregateStatsCache(String dbName, String tblName,
+  public void refreshAggregateStatsInCache(String dbName, String tblName,
       AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
-    LOG.debug("CachedStore: updating aggregate stats cache for database: {}, table: {}", dbName,
-        tblName);
-    removeAggrPartitionColStatsFromCache(dbName, tblName);
-    addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions,
-        aggrStatsAllButDefaultPartition);
-  }
-
-  public synchronized void addTableColStatsToCache(String dbName, String tableName,
-      List<ColumnStatisticsObj> colStatsForTable) {
-    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
-      String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
-      tableColStatsCache.put(key, colStatObj);
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions,
+            aggrStatsAllButDefaultPartition);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  public synchronized void refreshTableColStats(String dbName, String tableName,
-      List<ColumnStatisticsObj> colStatsForTable) {
-    LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName
-        + " and table: " + tableName);
-    // Remove all old cache entries for this table
-    removeTableColStatsFromCache(dbName, tableName);
-    // Add new entries to cache
-    addTableColStatsToCache(dbName, tableName, colStatsForTable);
-  }
-
-  public void increSd(StorageDescriptor sd, byte[] sdHash) {
+  public synchronized void increSd(StorageDescriptor sd, byte[] sdHash) {
     ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
     if (sdCache.containsKey(byteArray)) {
       sdCache.get(byteArray).refCount++;
@@ -626,7 +1361,7 @@ public class SharedCache {
     }
   }
 
-  public void decrSd(byte[] sdHash) {
+  public synchronized void decrSd(byte[] sdHash) {
     ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
     StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray);
     sdWrapper.refCount--;
@@ -635,50 +1370,11 @@ public class SharedCache {
     }
   }
 
-  public StorageDescriptor getSdFromCache(byte[] sdHash) {
+  public synchronized StorageDescriptor getSdFromCache(byte[] sdHash) {
     StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash));
     return sdWrapper.getSd();
   }
 
-  // Replace databases in databaseCache with the new list
-  public synchronized void refreshDatabases(List<Database> databases) {
-    LOG.debug("CachedStore: updating cached database objects");
-    for (String dbName : listCachedDatabases()) {
-      removeDatabaseFromCache(dbName);
-    }
-    for (Database db : databases) {
-      addDatabaseToCache(db.getName(), db);
-    }
-  }
-
-  // Replace tables in tableCache with the new list
-  public synchronized void refreshTables(String dbName, List<Table> tables) {
-    LOG.debug("CachedStore: updating cached table objects for database: " + dbName);
-    for (Table tbl : listCachedTables(dbName)) {
-      removeTableFromCache(dbName, tbl.getTableName());
-    }
-    for (Table tbl : tables) {
-      addTableToCache(dbName, tbl.getTableName(), tbl);
-    }
-  }
-
-  public synchronized void refreshPartitions(String dbName, String tblName,
-      List<Partition> partitions) {
-    LOG.debug("CachedStore: updating cached partition objects for database: " + dbName
-        + " and table: " + tblName);
-    Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      PartitionWrapper partitionWrapper = iterator.next().getValue();
-      if (partitionWrapper.getPartition().getDbName().equals(dbName)
-          && partitionWrapper.getPartition().getTableName().equals(tblName)) {
-        iterator.remove();
-      }
-    }
-    for (Partition part : partitions) {
-      addPartitionToCache(dbName, tblName, part);
-    }
-  }
-
   @VisibleForTesting
   Map<String, Database> getDatabaseCache() {
     return databaseCache;
@@ -690,17 +1386,15 @@ public class SharedCache {
   }
 
   @VisibleForTesting
-  Map<String, PartitionWrapper> getPartitionCache() {
-    return partitionCache;
-  }
-
-  @VisibleForTesting
   Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() {
     return sdCache;
   }
 
-  @VisibleForTesting
-  Map<String, ColumnStatisticsObj> getPartitionColStatsCache() {
-    return partitionColStatsCache;
+  public long getUpdateCount() {
+    return cacheUpdateCount.get();
+  }
+
+  public void incrementUpdateCount() {
+    cacheUpdateCount.incrementAndGet();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 5aee2e3..e373753 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -339,7 +339,7 @@ public class MetaStoreUtils {
    * @param md message descriptor to use to generate the hash
    * @return the hash as a byte array
    */
-  public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md)  {
+  public static synchronized byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md)  {
     // Note all maps and lists have to be absolutely sorted.  Otherwise we'll produce different
     // results for hashes based on the OS or JVM being used.
     md.reset();
@@ -434,6 +434,15 @@ public class MetaStoreUtils {
     return colNames;
   }
 
+  public static List<String> getColumnNamesForPartition(Partition partition) {
+    List<String> colNames = new ArrayList<>();
+    Iterator<FieldSchema> colsIterator = partition.getSd().getColsIterator();
+    while (colsIterator.hasNext()) {
+      colNames.add(colsIterator.next().getName());
+    }
+    return colNames;
+  }
+
   /**
    * validateName
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 5edf8b3..e9527c7 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -1046,13 +1046,6 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
     objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath);
   }
 
-  @Override
-  public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
-      throws MetaException, NoSuchObjectException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
   public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException,
       NoSuchObjectException {
     objectStore.createISchema(schema);

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 132cdc3..8fc0c83 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -1033,13 +1033,6 @@ public class DummyRawStoreForJdoConnection implements RawStore {
       String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException {
   }
 
-  @Override
-  public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
-      throws MetaException, NoSuchObjectException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
   public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException {
 
   }