You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/03/19 17:54:30 UTC

[1/4] hive git commit: HIVE-18264: CachedStore: Store cached partitions/col stats within the table cache and make prewarm non-blocking (Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)

Repository: hive
Updated Branches:
  refs/heads/master 79e88695c -> 26c0ab6ad


http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
index 0006815..a72fc0b 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
@@ -22,7 +22,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
@@ -66,19 +69,13 @@ public class TestCachedStore {
     objectStore = new ObjectStore();
     objectStore.setConf(conf);
     cachedStore = new CachedStore();
-    cachedStore.setConf(conf);
-    // Stop the CachedStore cache update service. We'll start it explicitly to control the test
-    CachedStore.stopCacheUpdateService(1);
-    cachedStore.setInitializedForTest();
-
+    cachedStore.setConfForTest(conf);
     // Stop the CachedStore cache update service. We'll start it explicitly to control the test
     CachedStore.stopCacheUpdateService(1);
     sharedCache = new SharedCache();
     sharedCache.getDatabaseCache().clear();
     sharedCache.getTableCache().clear();
-    sharedCache.getPartitionCache().clear();
     sharedCache.getSdCache().clear();
-    sharedCache.getPartitionColStatsCache().clear();
   }
 
   /**********************************************************************************************
@@ -89,61 +86,49 @@ public class TestCachedStore {
   public void testDatabaseOps() throws Exception {
     // Add a db via ObjectStore
     String dbName = "testDatabaseOps";
-    String dbDescription = "testDatabaseOps";
-    String dbLocation = "file:/tmp";
-    Map<String, String> dbParams = new HashMap<>();
     String dbOwner = "user1";
-    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
+    Database db = createTestDb(dbName, dbOwner);
     objectStore.createDatabase(db);
     db = objectStore.getDatabase(dbName);
     // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
     CachedStore.prewarm(objectStore);
 
     // Read database via CachedStore
-    Database dbNew = cachedStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
+    Database dbRead = cachedStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbRead);
 
     // Add another db via CachedStore
     final String dbName1 = "testDatabaseOps1";
-    final String dbDescription1 = "testDatabaseOps1";
-    Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams);
-    db1.setOwnerName(dbOwner);
-    db1.setOwnerType(PrincipalType.USER);
+    Database db1 = createTestDb(dbName1, dbOwner);
     cachedStore.createDatabase(db1);
     db1 = cachedStore.getDatabase(dbName1);
 
     // Read db via ObjectStore
-    dbNew = objectStore.getDatabase(dbName1);
-    Assert.assertEquals(db1, dbNew);
+    dbRead = objectStore.getDatabase(dbName1);
+    Assert.assertEquals(db1, dbRead);
 
     // Alter the db via CachedStore (can only alter owner or parameters)
-    db = new Database(dbName, dbDescription, dbLocation, dbParams);
     dbOwner = "user2";
+    db = new Database(db);
     db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
     cachedStore.alterDatabase(dbName, db);
     db = cachedStore.getDatabase(dbName);
 
     // Read db via ObjectStore
-    dbNew = objectStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
+    dbRead = objectStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbRead);
 
     // Add another db via ObjectStore
     final String dbName2 = "testDatabaseOps2";
-    final String dbDescription2 = "testDatabaseOps2";
-    Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams);
-    db2.setOwnerName(dbOwner);
-    db2.setOwnerType(PrincipalType.USER);
+    Database db2 = createTestDb(dbName2, dbOwner);
     objectStore.createDatabase(db2);
     db2 = objectStore.getDatabase(dbName2);
 
     // Alter db "testDatabaseOps" via ObjectStore
     dbOwner = "user1";
-    db = new Database(dbName, dbDescription, dbLocation, dbParams);
+    db = new Database(db);
     db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
     objectStore.alterDatabase(dbName, db);
     db = objectStore.getDatabase(dbName);
 
@@ -151,20 +136,20 @@ public class TestCachedStore {
     objectStore.dropDatabase(dbName1);
 
     // We update twice to accurately detect if cache is dirty or not
-    updateCache(cachedStore, 100, 500, 100);
-    updateCache(cachedStore, 100, 500, 100);
+    updateCache(cachedStore);
+    updateCache(cachedStore);
 
     // Read the newly added db via CachedStore
-    dbNew = cachedStore.getDatabase(dbName2);
-    Assert.assertEquals(db2, dbNew);
+    dbRead = cachedStore.getDatabase(dbName2);
+    Assert.assertEquals(db2, dbRead);
 
     // Read the altered db via CachedStore (altered user from "user2" to "user1")
-    dbNew = cachedStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
+    dbRead = cachedStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbRead);
 
     // Try to read the dropped db after cache update
     try {
-      dbNew = cachedStore.getDatabase(dbName1);
+      dbRead = cachedStore.getDatabase(dbName1);
       Assert.fail("The database: " + dbName1
           + " should have been removed from the cache after running the update service");
     } catch (NoSuchObjectException e) {
@@ -174,78 +159,64 @@ public class TestCachedStore {
     // Clean up
     objectStore.dropDatabase(dbName);
     objectStore.dropDatabase(dbName2);
+    sharedCache.getDatabaseCache().clear();
+    sharedCache.getTableCache().clear();
+    sharedCache.getSdCache().clear();
   }
 
   @Test
   public void testTableOps() throws Exception {
     // Add a db via ObjectStore
     String dbName = "testTableOps";
-    String dbDescription = "testTableOps";
-    String dbLocation = "file:/tmp";
-    Map<String, String> dbParams = new HashMap<>();
     String dbOwner = "user1";
-    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
+    Database db = createTestDb(dbName, dbOwner);
     objectStore.createDatabase(db);
     db = objectStore.getDatabase(dbName);
 
     // Add a table via ObjectStore
     String tblName = "tbl";
     String tblOwner = "user1";
-    String serdeLocation = "file:/tmp";
     FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
     FieldSchema col2 = new FieldSchema("col2", "string", "string column");
-    List<FieldSchema> cols = new ArrayList<>();
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
     cols.add(col1);
     cols.add(col2);
-    Map<String, String> serdeParams = new HashMap<>();
-    Map<String, String> tblParams = new HashMap<>();
-    SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>());
-    StorageDescriptor sd =
-        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
-            null, serdeParams);
-    sd.setStoredAsSubDirectories(false);
-    Table tbl =
-        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
+    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+    Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
     objectStore.createTable(tbl);
     tbl = objectStore.getTable(dbName, tblName);
 
     // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
     CachedStore.prewarm(objectStore);
 
     // Read database, table via CachedStore
-    Database dbNew = cachedStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
-    Table tblNew = cachedStore.getTable(dbName, tblName);
-    Assert.assertEquals(tbl, tblNew);
+    Database dbRead= cachedStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbRead);
+    Table tblRead = cachedStore.getTable(dbName, tblName);
+    Assert.assertEquals(tbl, tblRead);
 
     // Add a new table via CachedStore
     String tblName1 = "tbl1";
-    Table tbl1 =
-        new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
+    Table tbl1 = new Table(tbl);
+    tbl1.setTableName(tblName1);
     cachedStore.createTable(tbl1);
     tbl1 = cachedStore.getTable(dbName, tblName1);
 
     // Read via object store
-    tblNew = objectStore.getTable(dbName, tblName1);
-    Assert.assertEquals(tbl1, tblNew);
+    tblRead = objectStore.getTable(dbName, tblName1);
+    Assert.assertEquals(tbl1, tblRead);
 
     // Add a new table via ObjectStore
     String tblName2 = "tbl2";
-    Table tbl2 =
-        new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
+    Table tbl2 = new Table(tbl);
+    tbl2.setTableName(tblName2);
     objectStore.createTable(tbl2);
     tbl2 = objectStore.getTable(dbName, tblName2);
 
     // Alter table "tbl" via ObjectStore
     tblOwner = "user2";
-    tbl =
-        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
+    tbl.setOwner(tblOwner);
     objectStore.alterTable(dbName, tblName, tbl);
     tbl = objectStore.getTable(dbName, tblName);
 
@@ -253,20 +224,20 @@ public class TestCachedStore {
     objectStore.dropTable(dbName, tblName1);
 
     // We update twice to accurately detect if cache is dirty or not
-    updateCache(cachedStore, 100, 500, 100);
-    updateCache(cachedStore, 100, 500, 100);
+    updateCache(cachedStore);
+    updateCache(cachedStore);
 
     // Read "tbl2" via CachedStore
-    tblNew = cachedStore.getTable(dbName, tblName2);
-    Assert.assertEquals(tbl2, tblNew);
+    tblRead = cachedStore.getTable(dbName, tblName2);
+    Assert.assertEquals(tbl2, tblRead);
 
     // Read the altered "tbl" via CachedStore
-    tblNew = cachedStore.getTable(dbName, tblName);
-    Assert.assertEquals(tbl, tblNew);
+    tblRead = cachedStore.getTable(dbName, tblName);
+    Assert.assertEquals(tbl, tblRead);
 
     // Try to read the dropped "tbl1" via CachedStore (should throw exception)
-    tblNew = cachedStore.getTable(dbName, tblName1);
-    Assert.assertNull(tblNew);
+    tblRead = cachedStore.getTable(dbName, tblName1);
+    Assert.assertNull(tblRead);
 
     // Should return "tbl" and "tbl2"
     List<String> tblNames = cachedStore.getTables(dbName, "*");
@@ -278,81 +249,72 @@ public class TestCachedStore {
     objectStore.dropTable(dbName, tblName);
     objectStore.dropTable(dbName, tblName2);
     objectStore.dropDatabase(dbName);
+    sharedCache.getDatabaseCache().clear();
+    sharedCache.getTableCache().clear();
+    sharedCache.getSdCache().clear();
   }
 
   @Test
   public void testPartitionOps() throws Exception {
     // Add a db via ObjectStore
     String dbName = "testPartitionOps";
-    String dbDescription = "testPartitionOps";
-    String dbLocation = "file:/tmp";
-    Map<String, String> dbParams = new HashMap<>();
     String dbOwner = "user1";
-    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
+    Database db = createTestDb(dbName, dbOwner);
     objectStore.createDatabase(db);
     db = objectStore.getDatabase(dbName);
 
     // Add a table via ObjectStore
     String tblName = "tbl";
     String tblOwner = "user1";
-    String serdeLocation = "file:/tmp";
     FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
     FieldSchema col2 = new FieldSchema("col2", "string", "string column");
-    List<FieldSchema> cols = new ArrayList<>();
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
     cols.add(col1);
     cols.add(col2);
-    Map<String, String> serdeParams = new HashMap<>();
-    Map<String, String> tblParams = new HashMap<>();
-    SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd =
-        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
-            null, serdeParams);
     FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column");
-    List<FieldSchema> ptnCols = new ArrayList<>();
+    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
     ptnCols.add(ptnCol1);
-    Table tbl =
-        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null,
-            TableType.MANAGED_TABLE.toString());
+    Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
     objectStore.createTable(tbl);
     tbl = objectStore.getTable(dbName, tblName);
+
     final String ptnColVal1 = "aaa";
-    Map<String, String> partParams = new HashMap<>();
+    Map<String, String> partParams = new HashMap<String, String>();
     Partition ptn1 =
-        new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, partParams);
+        new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, tbl.getSd(), partParams);
     objectStore.addPartition(ptn1);
     ptn1 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
     final String ptnColVal2 = "bbb";
     Partition ptn2 =
-        new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, partParams);
+        new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, tbl.getSd(), partParams);
     objectStore.addPartition(ptn2);
     ptn2 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
 
     // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
     CachedStore.prewarm(objectStore);
 
     // Read database, table, partition via CachedStore
-    Database dbNew = cachedStore.getDatabase(dbName);
-    Assert.assertEquals(db, dbNew);
-    Table tblNew = cachedStore.getTable(dbName, tblName);
-    Assert.assertEquals(tbl, tblNew);
-    Partition newPtn1 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
-    Assert.assertEquals(ptn1, newPtn1);
-    Partition newPtn2 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
-    Assert.assertEquals(ptn2, newPtn2);
+    Database dbRead = cachedStore.getDatabase(dbName);
+    Assert.assertEquals(db, dbRead);
+    Table tblRead = cachedStore.getTable(dbName, tblName);
+    Assert.assertEquals(tbl, tblRead);
+    Partition ptn1Read = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
+    Assert.assertEquals(ptn1, ptn1Read);
+    Partition ptn2Read = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+    Assert.assertEquals(ptn2, ptn2Read);
 
     // Add a new partition via ObjectStore
     final String ptnColVal3 = "ccc";
     Partition ptn3 =
-        new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, partParams);
+        new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, tbl.getSd(), partParams);
     objectStore.addPartition(ptn3);
     ptn3 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
 
     // Alter an existing partition ("aaa") via ObjectStore
     final String ptnColVal1Alt = "aaaAlt";
     Partition ptn1Atl =
-        new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, partParams);
+        new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, tbl.getSd(), partParams);
     objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl);
     ptn1Atl = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
 
@@ -360,45 +322,47 @@ public class TestCachedStore {
     objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2));
 
     // We update twice to accurately detect if cache is dirty or not
-    updateCache(cachedStore, 100, 500, 100);
-    updateCache(cachedStore, 100, 500, 100);
+    updateCache(cachedStore);
+    updateCache(cachedStore);
 
     // Read the newly added partition via CachedStore
-    Partition newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
-    Assert.assertEquals(ptn3, newPtn);
+    Partition ptnRead = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+    Assert.assertEquals(ptn3, ptnRead);
 
     // Read the altered partition via CachedStore
-    newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
-    Assert.assertEquals(ptn1Atl, newPtn);
+    ptnRead = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+    Assert.assertEquals(ptn1Atl, ptnRead);
 
     // Try to read the dropped partition via CachedStore
     try {
-      newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+      ptnRead = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
       Assert.fail("The partition: " + ptnColVal2
           + " should have been removed from the cache after running the update service");
     } catch (NoSuchObjectException e) {
       // Expected
     }
+    // Clean up
+    objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+    objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+    objectStore.dropTable(dbName, tblName);
+    objectStore.dropDatabase(dbName);
+    sharedCache.getDatabaseCache().clear();
+    sharedCache.getTableCache().clear();
+    sharedCache.getSdCache().clear();
   }
 
   //@Test
   public void testTableColStatsOps() throws Exception {
     // Add a db via ObjectStore
     String dbName = "testTableColStatsOps";
-    String dbDescription = "testTableColStatsOps";
-    String dbLocation = "file:/tmp";
-    Map<String, String> dbParams = new HashMap<>();
     String dbOwner = "user1";
-    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
-    db.setOwnerName(dbOwner);
-    db.setOwnerType(PrincipalType.USER);
+    Database db = createTestDb(dbName, dbOwner);
     objectStore.createDatabase(db);
     db = objectStore.getDatabase(dbName);
 
     // Add a table via ObjectStore
     final String tblName = "tbl";
     final String tblOwner = "user1";
-    final String serdeLocation = "file:/tmp";
     final FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
     // Stats values for col1
     long col1LowVal = 5;
@@ -420,15 +384,10 @@ public class TestCachedStore {
     cols.add(col1);
     cols.add(col2);
     cols.add(col3);
-    Map<String, String> serdeParams = new HashMap<>();
-    Map<String, String> tblParams = new HashMap<>();
-    final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
-    StorageDescriptor sd =
-        new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
-            null, serdeParams);
-    Table tbl =
-        new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
-            null, null, TableType.MANAGED_TABLE.toString());
+    FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column");
+    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+    ptnCols.add(ptnCol1);
+    Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
     objectStore.createTable(tbl);
     tbl = objectStore.getTable(dbName, tblName);
 
@@ -476,6 +435,7 @@ public class TestCachedStore {
     objectStore.updateTableColumnStatistics(stats);
 
     // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
     CachedStore.prewarm(objectStore);
 
     // Read table stats via CachedStore
@@ -483,18 +443,13 @@ public class TestCachedStore {
         cachedStore.getTableColumnStatistics(dbName, tblName,
             Arrays.asList(col1.getName(), col2.getName(), col3.getName()));
     Assert.assertEquals(stats, newStats);
-  }
 
-  private void updateCache(CachedStore cachedStore, long frequency, long sleepTime,
-      long shutdownTimeout) throws InterruptedException {
-    // Set cache refresh period to 100 milliseconds
-    CachedStore.setCacheRefreshPeriod(100);
-    // Start the CachedStore update service
-    CachedStore.startCacheUpdateService(cachedStore.getConf());
-    // Sleep for 500 ms so that cache update is complete
-    Thread.sleep(500);
-    // Stop cache update service
-    CachedStore.stopCacheUpdateService(100);
+    // Clean up
+    objectStore.dropTable(dbName, tblName);
+    objectStore.dropDatabase(dbName);
+    sharedCache.getDatabaseCache().clear();
+    sharedCache.getTableCache().clear();
+    sharedCache.getSdCache().clear();
   }
 
   /**********************************************************************************************
@@ -503,29 +458,21 @@ public class TestCachedStore {
 
   @Test
   public void testSharedStoreDb() {
-    Database db1 = new Database();
-    Database db2 = new Database();
-    Database db3 = new Database();
-    Database newDb1 = new Database();
-    newDb1.setName("db1");
-
-    sharedCache.addDatabaseToCache("db1", db1);
-    sharedCache.addDatabaseToCache("db2", db2);
-    sharedCache.addDatabaseToCache("db3", db3);
-
+    Database db1 = createTestDb("db1", "user1");
+    Database db2 = createTestDb("db2", "user1");
+    Database db3 = createTestDb("db3", "user1");
+    Database newDb1 = createTestDb("newdb1", "user1");
+    sharedCache.addDatabaseToCache(db1);
+    sharedCache.addDatabaseToCache(db2);
+    sharedCache.addDatabaseToCache(db3);
     Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
-
     sharedCache.alterDatabaseInCache("db1", newDb1);
-
     Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
-
     sharedCache.removeDatabaseFromCache("db2");
-
     Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 2);
-
     List<String> dbs = sharedCache.listCachedDatabases();
     Assert.assertEquals(dbs.size(), 2);
-    Assert.assertTrue(dbs.contains("db1"));
+    Assert.assertTrue(dbs.contains("newdb1"));
     Assert.assertTrue(dbs.contains("db3"));
   }
 
@@ -608,6 +555,23 @@ public class TestCachedStore {
 
   @Test
   public void testSharedStorePartition() {
+    String dbName = "db1";
+    String tbl1Name = "tbl1";
+    String tbl2Name = "tbl2";
+    String owner = "user1";
+    Database db = createTestDb(dbName, owner);
+    sharedCache.addDatabaseToCache(db);
+    FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+    FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(col1);
+    cols.add(col2);
+    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+    Table tbl1 = createTestTbl(dbName, tbl1Name, owner, cols, ptnCols);
+    sharedCache.addTableToCache(dbName, tbl1Name, tbl1);
+    Table tbl2 = createTestTbl(dbName, tbl2Name, owner, cols, ptnCols);
+    sharedCache.addTableToCache(dbName, tbl2Name, tbl2);
+
     Partition part1 = new Partition();
     StorageDescriptor sd1 = new StorageDescriptor();
     List<FieldSchema> cols1 = new ArrayList<>();
@@ -645,8 +609,8 @@ public class TestCachedStore {
     part3.setValues(Arrays.asList("201703"));
 
     Partition newPart1 = new Partition();
-    newPart1.setDbName("db1");
-    newPart1.setTableName("tbl1");
+    newPart1.setDbName(dbName);
+    newPart1.setTableName(tbl1Name);
     StorageDescriptor newSd1 = new StorageDescriptor();
     List<FieldSchema> newCols1 = new ArrayList<>();
     newCols1.add(new FieldSchema("newcol1", "int", ""));
@@ -654,32 +618,25 @@ public class TestCachedStore {
     newParams1.put("key", "value");
     newSd1.setCols(newCols1);
     newSd1.setParameters(params1);
-    newSd1.setLocation("loc1");
+    newSd1.setLocation("loc1new");
     newPart1.setSd(newSd1);
     newPart1.setValues(Arrays.asList("201701"));
 
-    sharedCache.addPartitionToCache("db1", "tbl1", part1);
-    sharedCache.addPartitionToCache("db1", "tbl1", part2);
-    sharedCache.addPartitionToCache("db1", "tbl1", part3);
-    sharedCache.addPartitionToCache("db1", "tbl2", part1);
-
-    Assert.assertEquals(sharedCache.getCachedPartitionCount(), 4);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 2);
+    sharedCache.addPartitionToCache(dbName, tbl1Name, part1);
+    sharedCache.addPartitionToCache(dbName, tbl1Name, part2);
+    sharedCache.addPartitionToCache(dbName, tbl1Name, part3);
+    sharedCache.addPartitionToCache(dbName, tbl2Name, part1);
 
-    Partition t = sharedCache.getPartitionFromCache("db1", "tbl1", Arrays.asList("201701"));
+    Partition t = sharedCache.getPartitionFromCache(dbName, tbl1Name, Arrays.asList("201701"));
     Assert.assertEquals(t.getSd().getLocation(), "loc1");
 
-    sharedCache.removePartitionFromCache("db1", "tbl2", Arrays.asList("201701"));
-    Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 2);
-
-    sharedCache.alterPartitionInCache("db1", "tbl1", Arrays.asList("201701"), newPart1);
-    Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 3);
+    sharedCache.removePartitionFromCache(dbName, tbl2Name, Arrays.asList("201701"));
+    t = sharedCache.getPartitionFromCache(dbName, tbl2Name, Arrays.asList("201701"));
+    Assert.assertNull(t);
 
-    sharedCache.removePartitionFromCache("db1", "tbl1", Arrays.asList("201702"));
-    Assert.assertEquals(sharedCache.getCachedPartitionCount(), 2);
-    Assert.assertEquals(sharedCache.getSdCache().size(), 2);
+    sharedCache.alterPartitionInCache(dbName, tbl1Name, Arrays.asList("201701"), newPart1);
+    t = sharedCache.getPartitionFromCache(dbName, tbl1Name, Arrays.asList("201701"));
+    Assert.assertEquals(t.getSd().getLocation(), "loc1new");
   }
 
   @Test
@@ -753,10 +710,10 @@ public class TestCachedStore {
     String dbName = "testTableColStatsOps1";
     String tblName = "tbl1";
     String colName = "f1";
-    
+
     Database db = new Database(dbName, null, "some_location", null);
     cachedStore.createDatabase(db);
-    
+
     List<FieldSchema> cols = new ArrayList<>();
     cols.add(new FieldSchema(colName, "int", null));
     List<FieldSchema> partCols = new ArrayList<>();
@@ -764,29 +721,29 @@ public class TestCachedStore {
     StorageDescriptor sd =
         new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()),
             null, null, null);
-    
+
     Table tbl =
         new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(),
             null, null, TableType.MANAGED_TABLE.toString());
     cachedStore.createTable(tbl);
-    
+
     List<String> partVals1 = new ArrayList<>();
     partVals1.add("1");
     List<String> partVals2 = new ArrayList<>();
     partVals2.add("2");
-    
+
     Partition ptn1 =
         new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>());
     cachedStore.addPartition(ptn1);
     Partition ptn2 =
         new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>());
     cachedStore.addPartition(ptn2);
-    
+
     ColumnStatistics stats = new ColumnStatistics();
     ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName);
     statsDesc.setPartName("col");
     List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
-    
+
     ColumnStatisticsData data = new ColumnStatisticsData();
     ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data);
     LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
@@ -796,15 +753,15 @@ public class TestCachedStore {
     longStats.setNumDVs(30);
     data.setLongStats(longStats);
     colStatObjs.add(colStats);
-    
+
     stats.setStatsDesc(statsDesc);
     stats.setStatsObj(colStatObjs);
-    
+
     cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1);
-    
+
     longStats.setNumDVs(40);
     cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2);
-    
+
     List<String> colNames = new ArrayList<>();
     colNames.add(colName);
     List<String> aggrPartVals = new ArrayList<>();
@@ -823,10 +780,10 @@ public class TestCachedStore {
     String dbName = "testTableColStatsOps2";
     String tblName = "tbl2";
     String colName = "f1";
-    
+
     Database db = new Database(dbName, null, "some_location", null);
     cachedStore.createDatabase(db);
-    
+
     List<FieldSchema> cols = new ArrayList<>();
     cols.add(new FieldSchema(colName, "int", null));
     List<FieldSchema> partCols = new ArrayList<>();
@@ -834,29 +791,29 @@ public class TestCachedStore {
     StorageDescriptor sd =
         new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()),
             null, null, null);
-    
+
     Table tbl =
         new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(),
             null, null, TableType.MANAGED_TABLE.toString());
     cachedStore.createTable(tbl);
-    
+
     List<String> partVals1 = new ArrayList<>();
     partVals1.add("1");
     List<String> partVals2 = new ArrayList<>();
     partVals2.add("2");
-    
+
     Partition ptn1 =
         new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>());
     cachedStore.addPartition(ptn1);
     Partition ptn2 =
         new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>());
     cachedStore.addPartition(ptn2);
-    
+
     ColumnStatistics stats = new ColumnStatistics();
     ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName);
     statsDesc.setPartName("col");
     List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
-    
+
     ColumnStatisticsData data = new ColumnStatisticsData();
     ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data);
     LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
@@ -864,21 +821,21 @@ public class TestCachedStore {
     longStats.setHighValue(100);
     longStats.setNumNulls(50);
     longStats.setNumDVs(30);
-    
+
     HyperLogLog hll = HyperLogLog.builder().build();
     hll.addLong(1);
     hll.addLong(2);
     hll.addLong(3);
     longStats.setBitVectors(hll.serialize());
-    
+
     data.setLongStats(longStats);
     colStatObjs.add(colStats);
-    
+
     stats.setStatsDesc(statsDesc);
     stats.setStatsObj(colStatObjs);
-    
+
     cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1);
-    
+
     longStats.setNumDVs(40);
     hll = HyperLogLog.builder().build();
     hll.addLong(2);
@@ -886,9 +843,9 @@ public class TestCachedStore {
     hll.addLong(4);
     hll.addLong(5);
     longStats.setBitVectors(hll.serialize());
-    
+
     cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2);
-    
+
     List<String> colNames = new ArrayList<>();
     colNames.add(colName);
     List<String> aggrPartVals = new ArrayList<>();
@@ -901,4 +858,179 @@ public class TestCachedStore {
     Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100);
     Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(), 5);
   }
+
+  @Test
+  public void testMultiThreadedSharedCacheOps() throws Exception {
+    List<String> dbNames = new ArrayList<String>(Arrays.asList("db1", "db2", "db3", "db4", "db5"));
+    List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
+    ExecutorService executor = Executors.newFixedThreadPool(50, new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = Executors.defaultThreadFactory().newThread(r);
+        t.setDaemon(true);
+        return t;
+      }
+    });
+
+    // Create 5 dbs
+    for (String dbName : dbNames) {
+      Callable<Object> c = new Callable<Object>() {
+        public Object call() {
+          Database db = createTestDb(dbName, "user1");
+          sharedCache.addDatabaseToCache(db);
+          return null;
+        }
+      };
+      tasks.add(c);
+    }
+    executor.invokeAll(tasks);
+    for (String dbName : dbNames) {
+      Database db = sharedCache.getDatabaseFromCache(dbName);
+      Assert.assertNotNull(db);
+      Assert.assertEquals(dbName, db.getName());
+    }
+
+    // Created 5 tables under "db1"
+    List<String> tblNames =
+        new ArrayList<String>(Arrays.asList("tbl1", "tbl2", "tbl3", "tbl4", "tbl5"));
+    tasks.clear();
+    for (String tblName : tblNames) {
+      FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+      FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+      List<FieldSchema> cols = new ArrayList<FieldSchema>();
+      cols.add(col1);
+      cols.add(col2);
+      FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column");
+      List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+      ptnCols.add(ptnCol1);
+      Callable<Object> c = new Callable<Object>() {
+        public Object call() {
+          Table tbl = createTestTbl(dbNames.get(0), tblName, "user1", cols, ptnCols);
+          sharedCache.addTableToCache(dbNames.get(0), tblName, tbl);
+          return null;
+        }
+      };
+      tasks.add(c);
+    }
+    executor.invokeAll(tasks);
+    for (String tblName : tblNames) {
+      Table tbl = sharedCache.getTableFromCache(dbNames.get(0), tblName);
+      Assert.assertNotNull(tbl);
+      Assert.assertEquals(tblName, tbl.getTableName());
+    }
+
+    // Add 5 partitions to all tables
+    List<String> ptnVals = new ArrayList<String>(Arrays.asList("aaa", "bbb", "ccc", "ddd", "eee"));
+    tasks.clear();
+    for (String tblName : tblNames) {
+      Table tbl = sharedCache.getTableFromCache(dbNames.get(0), tblName);
+      for (String ptnVal : ptnVals) {
+        Map<String, String> partParams = new HashMap<String, String>();
+        Callable<Object> c = new Callable<Object>() {
+          public Object call() {
+            Partition ptn = new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0,
+                tbl.getSd(), partParams);
+            sharedCache.addPartitionToCache(dbNames.get(0), tblName, ptn);
+            return null;
+          }
+        };
+        tasks.add(c);
+      }
+    }
+    executor.invokeAll(tasks);
+    for (String tblName : tblNames) {
+      for (String ptnVal : ptnVals) {
+        Partition ptn = sharedCache.getPartitionFromCache(dbNames.get(0), tblName, Arrays.asList(ptnVal));
+        Assert.assertNotNull(ptn);
+        Assert.assertEquals(tblName, ptn.getTableName());
+        Assert.assertEquals(tblName, ptn.getTableName());
+        Assert.assertEquals(Arrays.asList(ptnVal), ptn.getValues());
+      }
+    }
+
+    // Drop all partitions from "tbl1", "tbl2", "tbl3" and add 2 new partitions to "tbl4" and "tbl5"
+    List<String> newPtnVals = new ArrayList<String>(Arrays.asList("fff", "ggg"));
+    List<String> dropPtnTblNames = new ArrayList<String>(Arrays.asList("tbl1", "tbl2", "tbl3"));
+    List<String> addPtnTblNames = new ArrayList<String>(Arrays.asList("tbl4", "tbl5"));
+    tasks.clear();
+    for (String tblName : dropPtnTblNames) {
+      for (String ptnVal : ptnVals) {
+        Callable<Object> c = new Callable<Object>() {
+          public Object call() {
+            sharedCache.removePartitionFromCache(dbNames.get(0), tblName, Arrays.asList(ptnVal));
+            return null;
+          }
+        };
+        tasks.add(c);
+      }
+    }
+    for (String tblName : addPtnTblNames) {
+      Table tbl = sharedCache.getTableFromCache(dbNames.get(0), tblName);
+      for (String ptnVal : newPtnVals) {
+        Map<String, String> partParams = new HashMap<String, String>();
+        Callable<Object> c = new Callable<Object>() {
+          public Object call() {
+            Partition ptn = new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0,
+                tbl.getSd(), partParams);
+            sharedCache.addPartitionToCache(dbNames.get(0), tblName, ptn);
+            return null;
+          }
+        };
+        tasks.add(c);
+      }
+    }
+    executor.invokeAll(tasks);
+    for (String tblName : addPtnTblNames) {
+      for (String ptnVal : newPtnVals) {
+        Partition ptn = sharedCache.getPartitionFromCache(dbNames.get(0), tblName, Arrays.asList(ptnVal));
+        Assert.assertNotNull(ptn);
+        Assert.assertEquals(tblName, ptn.getTableName());
+        Assert.assertEquals(tblName, ptn.getTableName());
+        Assert.assertEquals(Arrays.asList(ptnVal), ptn.getValues());
+      }
+    }
+    for (String tblName : dropPtnTblNames) {
+      List<Partition> ptns = sharedCache.listCachedPartitions(dbNames.get(0), tblName, 100);
+      Assert.assertEquals(0, ptns.size());
+    }
+    sharedCache.getDatabaseCache().clear();
+    sharedCache.getTableCache().clear();
+    sharedCache.getSdCache().clear();
+  }
+
+  private Database createTestDb(String dbName, String dbOwner) {
+    String dbDescription = dbName;
+    String dbLocation = "file:/tmp";
+    Map<String, String> dbParams = new HashMap<>();
+    Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+    db.setOwnerName(dbOwner);
+    db.setOwnerType(PrincipalType.USER);
+    return db;
+  }
+
+  private Table createTestTbl(String dbName, String tblName, String tblOwner,
+      List<FieldSchema> cols, List<FieldSchema> ptnCols) {
+    String serdeLocation = "file:/tmp";
+    Map<String, String> serdeParams = new HashMap<>();
+    Map<String, String> tblParams = new HashMap<>();
+    SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>());
+    StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0,
+        serdeInfo, null, null, serdeParams);
+    sd.setStoredAsSubDirectories(false);
+    Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null,
+        TableType.MANAGED_TABLE.toString());
+    return tbl;
+  }
+
+  // This method will return only after the cache has updated once
+  private void updateCache(CachedStore cachedStore) throws InterruptedException {
+    int maxTries = 100000;
+    long updateCountBefore = cachedStore.getCacheUpdateCount();
+    // Start the CachedStore update service
+    CachedStore.startCacheUpdateService(cachedStore.getConf(), true, false);
+    while ((cachedStore.getCacheUpdateCount() != (updateCountBefore + 1)) && (maxTries-- > 0)) {
+      Thread.sleep(1000);
+    }
+    CachedStore.stopCacheUpdateService(100);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/resources/log4j2.properties b/standalone-metastore/src/test/resources/log4j2.properties
index 365687e..db8a550 100644
--- a/standalone-metastore/src/test/resources/log4j2.properties
+++ b/standalone-metastore/src/test/resources/log4j2.properties
@@ -8,28 +8,64 @@
 #
 #     http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
-name=PropertiesConfig
-property.filename = logs
-appenders = console
+status = INFO
+name = MetastoreLog4j2
+packages = org.apache.hadoop.hive.metastore
 
+# list of properties
+property.metastore.log.level = INFO
+property.metastore.root.logger = DRFA
+property.metastore.log.dir = ${sys:java.io.tmpdir}/${sys:user.name}
+property.metastore.log.file = metastore.log
+property.hive.perflogger.log.level = INFO
+
+# list of all appenders
+appenders = console, DRFA
+
+# console appender
 appender.console.type = Console
-appender.console.name = STDOUT
+appender.console.name = console
+appender.console.target = SYSTEM_ERR
 appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+appender.console.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n
+
+# daily rolling file appender
+appender.DRFA.type = RollingRandomAccessFile
+appender.DRFA.name = DRFA
+appender.DRFA.fileName = ${sys:metastore.log.dir}/${sys:metastore.log.file}
+# Use %pid in the filePattern to append <process-id>@<host-name> to the filename if you want separate log files for different CLI session
+appender.DRFA.filePattern = ${sys:metastore.log.dir}/${sys:metastore.log.file}.%d{yyyy-MM-dd}
+appender.DRFA.layout.type = PatternLayout
+appender.DRFA.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n
+appender.DRFA.policies.type = Policies
+appender.DRFA.policies.time.type = TimeBasedTriggeringPolicy
+appender.DRFA.policies.time.interval = 1
+appender.DRFA.policies.time.modulate = true
+appender.DRFA.strategy.type = DefaultRolloverStrategy
+appender.DRFA.strategy.max = 30
+
+# list of all loggers
+loggers = DataNucleus, Datastore, JPOX, PerfLogger
+
+logger.DataNucleus.name = DataNucleus
+logger.DataNucleus.level = INFO
+
+logger.Datastore.name = Datastore
+logger.Datastore.level = INFO
+
+logger.JPOX.name = JPOX
+logger.JPOX.level = INFO
 
-loggers=file
-logger.file.name=guru.springframework.blog.log4j2properties
-logger.file.level = debug
-logger.file.appenderRefs = file
-logger.file.appenderRef.file.ref = LOGFILE
+logger.PerfLogger.name = org.apache.hadoop.hive.ql.log.PerfLogger
+logger.PerfLogger.level = ${sys:hive.perflogger.log.level}
 
-rootLogger.level = debug
-rootLogger.appenderRefs = stdout
-rootLogger.appenderRef.stdout.ref = STDOUT
+# root logger
+rootLogger.level = ${sys:metastore.log.level}
+rootLogger.appenderRefs = root
+rootLogger.appenderRef.root.ref = ${sys:metastore.root.logger}


[3/4] hive git commit: HIVE-18264: CachedStore: Store cached partitions/col stats within the table cache and make prewarm non-blocking (Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)

Posted by vg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index d28b196..d37b201 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hive.metastore.cache;
 import org.apache.hadoop.hive.metastore.api.CreationMetadata;
 import org.apache.hadoop.hive.metastore.api.ISchemaName;
 import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -35,7 +35,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -95,8 +94,6 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.api.SchemaVersion;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.hadoop.hive.metastore.api.Type;
@@ -124,130 +121,50 @@ import com.google.common.annotations.VisibleForTesting;
 // TODO constraintCache
 // TODO need sd nested copy?
 // TODO String intern
-// TODO restructure HBaseStore
 // TODO monitor event queue
 // TODO initial load slow?
 // TODO size estimation
-// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation
 
 public class CachedStore implements RawStore, Configurable {
   private static ScheduledExecutorService cacheUpdateMaster = null;
-  private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
-  private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
-  private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
-  private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
-  private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock(
-      true);
-  private static ReentrantReadWriteLock partitionAggrColStatsCacheLock =
-      new ReentrantReadWriteLock(true);
-  private static AtomicBoolean isPartitionAggrColStatsCacheDirty = new AtomicBoolean(false);
-  private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
   private static List<Pattern> whitelistPatterns = null;
   private static List<Pattern> blacklistPatterns = null;
+  // Default value set to 100 milliseconds for test purpose
+  private static long DEFAULT_CACHE_REFRESH_PERIOD = 100;
+  // Time after which metastore cache is updated from metastore DB by the background update thread
+  private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD;
+  private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false);
   private RawStore rawStore = null;
   private Configuration conf;
   private PartitionExpressionProxy expressionProxy = null;
-  // Default value set to 100 milliseconds for test purpose
-  private static long cacheRefreshPeriod = 100;
-
-  /** A wrapper over SharedCache. Allows one to get SharedCache safely; should be merged
-   *  into SharedCache itself (see the TODO on the class). */
-  private static final SharedCacheWrapper sharedCacheWrapper = new SharedCacheWrapper();
+  private static final SharedCache sharedCache = new SharedCache();
 
   static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
 
-  static class TableWrapper {
-    Table t;
-    String location;
-    Map<String, String> parameters;
-    byte[] sdHash;
-    TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) {
-      this.t = t;
-      this.sdHash = sdHash;
-      this.location = location;
-      this.parameters = parameters;
-    }
-    public Table getTable() {
-      return t;
-    }
-    public byte[] getSdHash() {
-      return sdHash;
-    }
-    public String getLocation() {
-      return location;
-    }
-    public Map<String, String> getParameters() {
-      return parameters;
-    }
-  }
-
-  static class PartitionWrapper {
-    Partition p;
-    String location;
-    Map<String, String> parameters;
-    byte[] sdHash;
-    PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
-      this.p = p;
-      this.sdHash = sdHash;
-      this.location = location;
-      this.parameters = parameters;
-    }
-    public Partition getPartition() {
-      return p;
-    }
-    public byte[] getSdHash() {
-      return sdHash;
-    }
-    public String getLocation() {
-      return location;
-    }
-    public Map<String, String> getParameters() {
-      return parameters;
-    }
-  }
+  public CachedStore() {
 
-  static class StorageDescriptorWrapper {
-    StorageDescriptor sd;
-    int refCount = 0;
-    StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
-      this.sd = sd;
-      this.refCount = refCount;
-    }
-    public StorageDescriptor getSd() {
-      return sd;
-    }
-    public int getRefCount() {
-      return refCount;
-    }
   }
 
-  public CachedStore() {
+  @Override
+  public void setConf(Configuration conf) {
+    setConfInternal(conf);
+    initBlackListWhiteList(conf);
+    startCacheUpdateService(conf, false, true);
   }
 
-  public static void initSharedCacheAsync(Configuration conf) {
-    String clazzName = null;
-    boolean isEnabled = false;
-    try {
-      clazzName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL);
-      isEnabled = JavaUtils.getClass(clazzName, RawStore.class).isAssignableFrom(CachedStore.class);
-    } catch (MetaException e) {
-      LOG.error("Cannot instantiate metastore class", e);
-    }
-    if (!isEnabled) {
-      LOG.debug("CachedStore is not enabled; using " + clazzName);
-      return;
-    }
-    sharedCacheWrapper.startInit(conf);
+  /**
+   * Similar to setConf but used from within the tests
+   * This does start the background thread for prewarm and update
+   * @param conf
+   */
+  void setConfForTest(Configuration conf) {
+    setConfInternal(conf);
+    initBlackListWhiteList(conf);
   }
 
-  @Override
-  public void setConf(Configuration conf) {
-    String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL,
-        ObjectStore.class.getName());
+  private void setConfInternal(Configuration conf) {
+    String rawStoreClassName =
+        MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
     if (rawStore == null) {
       try {
         rawStore = (JavaUtils.getClass(rawStoreClassName, RawStore.class)).newInstance();
@@ -260,94 +177,145 @@ public class CachedStore implements RawStore, Configurable {
     this.conf = conf;
     if (expressionProxy != null && conf != oldConf) {
       LOG.warn("Unexpected setConf when we were already configured");
-    }
-    if (expressionProxy == null || conf != oldConf) {
+    } else {
       expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
     }
-    initBlackListWhiteList(conf);
   }
 
   @VisibleForTesting
-  static void prewarm(RawStore rawStore) throws Exception {
-    // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
-    Deadline.registerIfNot(1000000);
-    List<String> dbNames = rawStore.getAllDatabases();
-    LOG.info("Number of databases to prewarm: " + dbNames.size());
-    SharedCache sharedCache = sharedCacheWrapper.getUnsafe();
-    for (int i = 0; i < dbNames.size(); i++) {
-      String dbName = StringUtils.normalizeIdentifier(dbNames.get(i));
-      // Cache partition column stats
-      Deadline.startTimer("getColStatsForDatabase");
-      List<ColStatsObjWithSourceInfo> colStatsForDB =
-          rawStore.getPartitionColStatsForDatabase(dbName);
-      Deadline.stopTimer();
-      if (colStatsForDB != null) {
-        sharedCache.addPartitionColStatsToCache(colStatsForDB);
+  /**
+   * This initializes the caches in SharedCache by getting the objects from Metastore DB via
+   * ObjectStore and populating the respective caches
+   *
+   * @param rawStore
+   * @throws Exception
+   */
+  static void prewarm(RawStore rawStore) {
+    if (isCachePrewarmed.get()) {
+      return;
+    }
+    long startTime = System.nanoTime();
+    LOG.info("Prewarming CachedStore");
+    while (!isCachePrewarmed.get()) {
+      // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+      Deadline.registerIfNot(1000000);
+      List<String> dbNames;
+      try {
+        dbNames = rawStore.getAllDatabases();
+      } catch (MetaException e) {
+        // Try again
+        continue;
       }
-      LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size());
-      Database db = rawStore.getDatabase(dbName);
-      sharedCache.addDatabaseToCache(dbName, db);
-      List<String> tblNames = rawStore.getAllTables(dbName);
-      LOG.debug("Tables in database: {} : {}", dbName, tblNames);
-      for (int j = 0; j < tblNames.size(); j++) {
-        String tblName = StringUtils.normalizeIdentifier(tblNames.get(j));
-        if (!shouldCacheTable(dbName, tblName)) {
-          LOG.info("Not caching database: {}'s table: {}", dbName, tblName);
+      LOG.info("Number of databases to prewarm: {}", dbNames.size());
+      List<Database> databases = new ArrayList<>(dbNames.size());
+      for (String dbName : dbNames) {
+        try {
+          databases.add(rawStore.getDatabase(dbName));
+        } catch (NoSuchObjectException e) {
+          // Continue with next database
           continue;
         }
-        LOG.info("Caching database: {}'s table: {}. Cached {} / {}  tables so far.", dbName,
-            tblName, j, tblNames.size());
-        Table table = null;
-        table = rawStore.getTable(dbName, tblName);
-        // It is possible the table is deleted during fetching tables of the database,
-        // in that case, continue with the next table
-        if (table == null) {
+      }
+      sharedCache.populateDatabasesInCache(databases);
+      LOG.debug(
+          "Databases cache is now prewarmed. Now adding tables, partitions and statistics to the cache");
+      int numberOfDatabasesCachedSoFar = 0;
+      for (String dbName : dbNames) {
+        dbName = StringUtils.normalizeIdentifier(dbName);
+        List<String> tblNames;
+        try {
+          tblNames = rawStore.getAllTables(dbName);
+        } catch (MetaException e) {
+          // Continue with next database
           continue;
         }
-        sharedCache.addTableToCache(dbName, tblName, table);
-        if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) {
-          Deadline.startTimer("getPartitions");
-          List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
-          Deadline.stopTimer();
-          for (Partition partition : partitions) {
-            sharedCache.addPartitionToCache(dbName, tblName, partition);
+        int numberOfTablesCachedSoFar = 0;
+        for (String tblName : tblNames) {
+          tblName = StringUtils.normalizeIdentifier(tblName);
+          if (!shouldCacheTable(dbName, tblName)) {
+            continue;
           }
-        }
-        // Cache table column stats
-        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
-        Deadline.startTimer("getTableColumnStatistics");
-        ColumnStatistics tableColStats =
-            rawStore.getTableColumnStatistics(dbName, tblName, colNames);
-        Deadline.stopTimer();
-        if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) {
-          sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj());
-        }
-        // Cache aggregate stats for all partitions of a table and for all but default partition
-        List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
-        if ((partNames != null) && (partNames.size() > 0)) {
-          AggrStats aggrStatsAllPartitions =
-              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
-          // Remove default partition from partition names and get aggregate
-          // stats again
-          List<FieldSchema> partKeys = table.getPartitionKeys();
-          String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
-          List<String> partCols = new ArrayList<String>();
-          List<String> partVals = new ArrayList<String>();
-          for (FieldSchema fs : partKeys) {
-            partCols.add(fs.getName());
-            partVals.add(defaultPartitionValue);
+          Table table;
+          try {
+            table = rawStore.getTable(dbName, tblName);
+          } catch (MetaException e) {
+            // It is possible the table is deleted during fetching tables of the database,
+            // in that case, continue with the next table
+            continue;
           }
-          String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
-          partNames.remove(defaultPartitionName);
-          AggrStats aggrStatsAllButDefaultPartition =
-              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
-          sharedCache.addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions,
-              aggrStatsAllButDefaultPartition);
+          List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+          try {
+            ColumnStatistics tableColStats = null;
+            List<Partition> partitions = null;
+            List<ColumnStatistics> partitionColStats = null;
+            AggrStats aggrStatsAllPartitions = null;
+            AggrStats aggrStatsAllButDefaultPartition = null;
+            if (table.isSetPartitionKeys()) {
+              Deadline.startTimer("getPartitions");
+              partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+              Deadline.stopTimer();
+              List<String> partNames = new ArrayList<>(partitions.size());
+              for (Partition p : partitions) {
+                partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues()));
+              }
+              if (!partNames.isEmpty()) {
+                // Get partition column stats for this table
+                Deadline.startTimer("getPartitionColumnStatistics");
+                partitionColStats =
+                    rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
+                Deadline.stopTimer();
+                // Get aggregate stats for all partitions of a table and for all but default
+                // partition
+                Deadline.startTimer("getAggrPartitionColumnStatistics");
+                aggrStatsAllPartitions =
+                    rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+                Deadline.stopTimer();
+                // Remove default partition from partition names and get aggregate
+                // stats again
+                List<FieldSchema> partKeys = table.getPartitionKeys();
+                String defaultPartitionValue =
+                    MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+                List<String> partCols = new ArrayList<>();
+                List<String> partVals = new ArrayList<>();
+                for (FieldSchema fs : partKeys) {
+                  partCols.add(fs.getName());
+                  partVals.add(defaultPartitionValue);
+                }
+                String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+                partNames.remove(defaultPartitionName);
+                Deadline.startTimer("getAggrPartitionColumnStatistics");
+                aggrStatsAllButDefaultPartition =
+                    rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+                Deadline.stopTimer();
+              }
+            } else {
+              Deadline.startTimer("getTableColumnStatistics");
+              tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+              Deadline.stopTimer();
+            }
+            sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
+                aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
+          } catch (MetaException | NoSuchObjectException e) {
+            // Continue with next table
+            continue;
+          }
+          LOG.debug("Processed database: {}'s table: {}. Cached {} / {}  tables so far.", dbName,
+              tblName, ++numberOfTablesCachedSoFar, tblNames.size());
         }
+        LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName,
+            ++numberOfDatabasesCachedSoFar, dbNames.size());
       }
+      isCachePrewarmed.set(true);
     }
-    // Notify all blocked threads that prewarm is complete now
-    sharedCacheWrapper.notifyAllBlocked();
+    LOG.info("CachedStore initialized");
+    long endTime = System.nanoTime();
+    LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms");
+    sharedCache.completeTableCachePrewarm();
+  }
+
+  @VisibleForTesting
+  static void setCachePrewarmedState(boolean state) {
+    isCachePrewarmed.set(state);
   }
 
   private static void initBlackListWhiteList(Configuration conf) {
@@ -356,20 +324,27 @@ public class CachedStore implements RawStore, Configurable {
           MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST));
       blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf,
           MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST));
-      // The last specified blacklist pattern gets precedence
-      Collections.reverse(blacklistPatterns);
     }
   }
 
   @VisibleForTesting
-  synchronized static void startCacheUpdateService(Configuration conf) {
+  /**
+   * This starts a background thread, which initially populates the SharedCache and later
+   * periodically gets updates from the metastore db
+   *
+   * @param conf
+   * @param runOnlyOnce
+   * @param shouldRunPrewarm
+   */
+  static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce,
+      boolean shouldRunPrewarm) {
     if (cacheUpdateMaster == null) {
       initBlackListWhiteList(conf);
       if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) {
-        cacheRefreshPeriod = MetastoreConf.getTimeVar(conf,
+        cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf,
             ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS);
       }
-      LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriod);
+      LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriodMS);
       cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
         @Override
         public Thread newThread(Runnable r) {
@@ -379,13 +354,20 @@ public class CachedStore implements RawStore, Configurable {
           return t;
         }
       });
-      cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf), 0, cacheRefreshPeriod,
+      if (!runOnlyOnce) {
+        cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
+            cacheRefreshPeriodMS, TimeUnit.MILLISECONDS);
+      }
+    }
+    if (runOnlyOnce) {
+      // Some tests control the execution of the background update thread
+      cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
           TimeUnit.MILLISECONDS);
     }
   }
 
   @VisibleForTesting
-  synchronized static boolean stopCacheUpdateService(long timeout) {
+  static synchronized boolean stopCacheUpdateService(long timeout) {
     boolean tasksStoppedBeforeShutdown = false;
     if (cacheUpdateMaster != null) {
       LOG.info("CachedStore: shutting down cache update service");
@@ -404,167 +386,83 @@ public class CachedStore implements RawStore, Configurable {
 
   @VisibleForTesting
   static void setCacheRefreshPeriod(long time) {
-    cacheRefreshPeriod = time;
+    cacheRefreshPeriodMS = time;
   }
 
   static class CacheUpdateMasterWork implements Runnable {
-    private boolean isFirstRun = true;
+    private boolean shouldRunPrewarm = true;
     private final RawStore rawStore;
 
-    public CacheUpdateMasterWork(Configuration conf) {
-      String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL,
-          ObjectStore.class.getName());
+    CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) {
+      this.shouldRunPrewarm = shouldRunPrewarm;
+      String rawStoreClassName =
+          MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
       try {
         rawStore = JavaUtils.getClass(rawStoreClassName, RawStore.class).newInstance();
         rawStore.setConf(conf);
       } catch (InstantiationException | IllegalAccessException | MetaException e) {
         // MetaException here really means ClassNotFound (see the utility method).
         // So, if any of these happen, that means we can never succeed.
-        sharedCacheWrapper.updateInitState(e, true);
         throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
       }
     }
 
     @Override
     public void run() {
-      if (isFirstRun) {
-        while (isFirstRun) {
-          try {
-            long startTime = System.nanoTime();
-            LOG.info("Prewarming CachedStore");
-            prewarm(rawStore);
-            LOG.info("CachedStore initialized");
-            long endTime = System.nanoTime();
-            LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms");
-          } catch (Exception e) {
-            LOG.error("Prewarm failure", e);
-            sharedCacheWrapper.updateInitState(e, false);
-            return;
-          }
-          sharedCacheWrapper.updateInitState(null, false);
-          isFirstRun = false;
-        }
-      } else {
+      if (!shouldRunPrewarm) {
         // TODO: prewarm and update can probably be merged.
         update();
+      } else {
+        try {
+          prewarm(rawStore);
+        } catch (Exception e) {
+          LOG.error("Prewarm failure", e);
+          return;
+        }
       }
     }
 
-    public void update() {
+    void update() {
       Deadline.registerIfNot(1000000);
       LOG.debug("CachedStore: updating cached objects");
+      List<String> dbNames;
       try {
-        List<String> dbNames = rawStore.getAllDatabases();
-        if (dbNames != null) {
-          // Update the database in cache
-          updateDatabases(rawStore, dbNames);
-          for (String dbName : dbNames) {
-            updateDatabasePartitionColStats(rawStore, dbName);
-            // Update the tables in cache
-            updateTables(rawStore, dbName);
-            List<String> tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe());
-            for (String tblName : tblNames) {
-              if (!shouldCacheTable(dbName, tblName)) {
-                continue;
-              }
-              // Update the partitions for a table in cache
-              updateTablePartitions(rawStore, dbName, tblName);
-              // Update the table column stats for a table in cache
-              updateTableColStats(rawStore, dbName, tblName);
-              // Update aggregate column stats cache
-              updateAggregateStatsCache(rawStore, dbName, tblName);
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOG.error("Updating CachedStore: error happen when refresh; ignoring", e);
+        dbNames = rawStore.getAllDatabases();
+      } catch (MetaException e) {
+        LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e);
+        return;
       }
-    }
-
-    private void updateDatabasePartitionColStats(RawStore rawStore, String dbName) {
-      try {
-        Deadline.startTimer("getColStatsForDatabasePartitions");
-        List<ColStatsObjWithSourceInfo> colStatsForDB =
-            rawStore.getPartitionColStatsForDatabase(dbName);
-        Deadline.stopTimer();
-        if (colStatsForDB != null) {
-          if (partitionColStatsCacheLock.writeLock().tryLock()) {
-            // Skip background updates if we detect change
-            if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
-              LOG.debug("Skipping partition column stats cache update; the partition column stats "
-                  + "list we have is dirty.");
-              return;
-            }
-            sharedCacheWrapper.getUnsafe()
-                .refreshPartitionColStats(StringUtils.normalizeIdentifier(dbName), colStatsForDB);
-          }
-        }
-      } catch (MetaException | NoSuchObjectException e) {
-        LOG.info("Updating CachedStore: unable to read partitions column stats of database: {}",
-            dbName, e);
-      } finally {
-        if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) {
-          partitionColStatsCacheLock.writeLock().unlock();
+      // Update the database in cache
+      updateDatabases(rawStore, dbNames);
+      for (String dbName : dbNames) {
+        // Update the tables in cache
+        updateTables(rawStore, dbName);
+        List<String> tblNames;
+        try {
+          tblNames = rawStore.getAllTables(dbName);
+        } catch (MetaException e) {
+          // Continue with next database
+          continue;
         }
-      }
-    }
-
-    // Update cached aggregate stats for all partitions of a table and for all
-    // but default partition
-    private void updateAggregateStatsCache(RawStore rawStore, String dbName, String tblName) {
-      try {
-        Table table = rawStore.getTable(dbName, tblName);
-        List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
-        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
-        if ((partNames != null) && (partNames.size() > 0)) {
-          Deadline.startTimer("getAggregareStatsForAllPartitions");
-          AggrStats aggrStatsAllPartitions =
-              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
-          Deadline.stopTimer();
-          // Remove default partition from partition names and get aggregate stats again
-          List<FieldSchema> partKeys = table.getPartitionKeys();
-          String defaultPartitionValue =
-              MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
-          List<String> partCols = new ArrayList<String>();
-          List<String> partVals = new ArrayList<String>();
-          for (FieldSchema fs : partKeys) {
-            partCols.add(fs.getName());
-            partVals.add(defaultPartitionValue);
-          }
-          String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
-          partNames.remove(defaultPartitionName);
-          Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault");
-          AggrStats aggrStatsAllButDefaultPartition =
-              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
-          Deadline.stopTimer();
-          if ((aggrStatsAllPartitions != null) && (aggrStatsAllButDefaultPartition != null)) {
-            if (partitionAggrColStatsCacheLock.writeLock().tryLock()) {
-              // Skip background updates if we detect change
-              if (isPartitionAggrColStatsCacheDirty.compareAndSet(true, false)) {
-                LOG.debug(
-                    "Skipping aggregate column stats cache update; the aggregate column stats we "
-                        + "have is dirty.");
-                return;
-              }
-              sharedCacheWrapper.getUnsafe().refreshAggregateStatsCache(
-                  StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName),
-                  aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
-            }
+        for (String tblName : tblNames) {
+          if (!shouldCacheTable(dbName, tblName)) {
+            continue;
           }
-        }
-      } catch (MetaException | NoSuchObjectException e) {
-        LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
-            e);
-      } finally {
-        if (partitionAggrColStatsCacheLock.isWriteLockedByCurrentThread()) {
-          partitionAggrColStatsCacheLock.writeLock().unlock();
+          // Update the table column stats for a table in cache
+          updateTableColStats(rawStore, dbName, tblName);
+          // Update the partitions for a table in cache
+          updateTablePartitions(rawStore, dbName, tblName);
+          // Update the partition col stats for a table in cache
+          updateTablePartitionColStats(rawStore, dbName, tblName);
+          // Update aggregate partition column stats for a table in cache
+          updateTableAggregatePartitionColStats(rawStore, dbName, tblName);
         }
       }
+      sharedCache.incrementUpdateCount();
     }
 
     private void updateDatabases(RawStore rawStore, List<String> dbNames) {
-      // Prepare the list of databases
-      List<Database> databases = new ArrayList<>();
+      List<Database> databases = new ArrayList<>(dbNames.size());
       for (String dbName : dbNames) {
         Database db;
         try {
@@ -574,24 +472,9 @@ public class CachedStore implements RawStore, Configurable {
           LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
         }
       }
-      // Update the cached database objects
-      try {
-        if (databaseCacheLock.writeLock().tryLock()) {
-          // Skip background updates if we detect change
-          if (isDatabaseCacheDirty.compareAndSet(true, false)) {
-            LOG.debug("Skipping database cache update; the database list we have is dirty.");
-            return;
-          }
-          sharedCacheWrapper.getUnsafe().refreshDatabases(databases);
-        }
-      } finally {
-        if (databaseCacheLock.isWriteLockedByCurrentThread()) {
-          databaseCacheLock.writeLock().unlock();
-        }
-      }
+      sharedCache.refreshDatabasesInCache(databases);
     }
 
-    // Update the cached table objects
     private void updateTables(RawStore rawStore, String dbName) {
       List<Table> tables = new ArrayList<>();
       try {
@@ -600,81 +483,99 @@ public class CachedStore implements RawStore, Configurable {
           if (!shouldCacheTable(dbName, tblName)) {
             continue;
           }
-          Table table =
-              rawStore.getTable(StringUtils.normalizeIdentifier(dbName),
-                  StringUtils.normalizeIdentifier(tblName));
+          Table table = rawStore.getTable(StringUtils.normalizeIdentifier(dbName),
+              StringUtils.normalizeIdentifier(tblName));
           tables.add(table);
         }
-        if (tableCacheLock.writeLock().tryLock()) {
-          // Skip background updates if we detect change
-          if (isTableCacheDirty.compareAndSet(true, false)) {
-            LOG.debug("Skipping table cache update; the table list we have is dirty.");
-            return;
-          }
-          sharedCacheWrapper.getUnsafe().refreshTables(dbName, tables);
-        }
+        sharedCache.refreshTablesInCache(dbName, tables);
       } catch (MetaException e) {
-        LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e);
-      } finally {
-        if (tableCacheLock.isWriteLockedByCurrentThread()) {
-          tableCacheLock.writeLock().unlock();
+        LOG.debug("Unable to refresh cached tables for database: " + dbName, e);
+      }
+    }
+
+    private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+      try {
+        Table table = rawStore.getTable(dbName, tblName);
+        if (!table.isSetPartitionKeys()) {
+          List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+          Deadline.startTimer("getTableColumnStatistics");
+          ColumnStatistics tableColStats =
+              rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+          Deadline.stopTimer();
+          if (tableColStats != null) {
+            sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(dbName),
+                StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+          }
         }
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Unable to refresh table column stats for table: " + tblName, e);
       }
     }
 
-    // Update the cached partition objects for a table
     private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) {
       try {
         Deadline.startTimer("getPartitions");
         List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
         Deadline.stopTimer();
-        if (partitionCacheLock.writeLock().tryLock()) {
-          // Skip background updates if we detect change
-          if (isPartitionCacheDirty.compareAndSet(true, false)) {
-            LOG.debug("Skipping partition cache update; the partition list we have is dirty.");
-            return;
-          }
-          sharedCacheWrapper.getUnsafe().refreshPartitions(
-              StringUtils.normalizeIdentifier(dbName),
-              StringUtils.normalizeIdentifier(tblName), partitions);
-        }
+        sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(dbName),
+            StringUtils.normalizeIdentifier(tblName), partitions);
       } catch (MetaException | NoSuchObjectException e) {
         LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
-      } finally {
-        if (partitionCacheLock.isWriteLockedByCurrentThread()) {
-          partitionCacheLock.writeLock().unlock();
-        }
       }
     }
 
-    // Update the cached col stats for this table
-    private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+    private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) {
       try {
         Table table = rawStore.getTable(dbName, tblName);
         List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
-        Deadline.startTimer("getTableColumnStatistics");
-        ColumnStatistics tableColStats =
-            rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+        List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
+        // Get partition column stats for this table
+        Deadline.startTimer("getPartitionColumnStatistics");
+        List<ColumnStatistics> partitionColStats =
+            rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
         Deadline.stopTimer();
-        if (tableColStats != null) {
-          if (tableColStatsCacheLock.writeLock().tryLock()) {
-            // Skip background updates if we detect change
-            if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
-              LOG.debug("Skipping table column stats cache update; the table column stats list we "
-                  + "have is dirty.");
-              return;
-            }
-            sharedCacheWrapper.getUnsafe().refreshTableColStats(
-                StringUtils.normalizeIdentifier(dbName),
-                StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+        sharedCache.refreshPartitionColStatsInCache(dbName, tblName, partitionColStats);
+      } catch (MetaException | NoSuchObjectException e) {
+        LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+      }
+    }
+
+    // Update cached aggregate stats for all partitions of a table and for all
+    // but default partition
+    private void updateTableAggregatePartitionColStats(RawStore rawStore, String dbName,
+        String tblName) {
+      try {
+        Table table = rawStore.getTable(dbName, tblName);
+        List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
+        List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+        if ((partNames != null) && (partNames.size() > 0)) {
+          Deadline.startTimer("getAggregareStatsForAllPartitions");
+          AggrStats aggrStatsAllPartitions =
+              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+          Deadline.stopTimer();
+          // Remove default partition from partition names and get aggregate stats again
+          List<FieldSchema> partKeys = table.getPartitionKeys();
+          String defaultPartitionValue =
+              MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+          List<String> partCols = new ArrayList<String>();
+          List<String> partVals = new ArrayList<String>();
+          for (FieldSchema fs : partKeys) {
+            partCols.add(fs.getName());
+            partVals.add(defaultPartitionValue);
           }
+          String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+          partNames.remove(defaultPartitionName);
+          Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault");
+          AggrStats aggrStatsAllButDefaultPartition =
+              rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+          Deadline.stopTimer();
+          sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(dbName),
+              StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
+              aggrStatsAllButDefaultPartition);
         }
       } catch (MetaException | NoSuchObjectException e) {
-        LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e);
-      } finally {
-        if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) {
-          tableColStatsCacheLock.writeLock().unlock();
-        }
+        LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
+            e);
       }
     }
   }
@@ -712,35 +613,17 @@ public class CachedStore implements RawStore, Configurable {
   @Override
   public void createDatabase(Database db) throws InvalidObjectException, MetaException {
     rawStore.createDatabase(db);
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
-    try {
-      // Wait if background cache update is happening
-      databaseCacheLock.readLock().lock();
-      isDatabaseCacheDirty.set(true);
-      sharedCache.addDatabaseToCache(StringUtils.normalizeIdentifier(db.getName()),
-          db.deepCopy());
-    } finally {
-      databaseCacheLock.readLock().unlock();
-    }
+    sharedCache.addDatabaseToCache(db);
   }
 
   @Override
   public Database getDatabase(String dbName) throws NoSuchObjectException {
-    SharedCache sharedCache;
-
-    if (!sharedCacheWrapper.isInitialized()) {
+    if (!sharedCache.isDatabaseCachePrewarmed()) {
       return rawStore.getDatabase(dbName);
     }
-
-    try {
-      sharedCache = sharedCacheWrapper.get();
-    } catch (MetaException e) {
-      throw new RuntimeException(e); // TODO: why doesn't getDatabase throw MetaEx?
-    }
-    Database db = sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
+    dbName = dbName.toLowerCase();
+    Database db =
+        sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
     if (db == null) {
       throw new NoSuchObjectException();
     }
@@ -748,68 +631,39 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException {
-    boolean succ = rawStore.dropDatabase(dbname);
+  public boolean dropDatabase(String dbName) throws NoSuchObjectException, MetaException {
+    boolean succ = rawStore.dropDatabase(dbName);
     if (succ) {
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        databaseCacheLock.readLock().lock();
-        isDatabaseCacheDirty.set(true);
-        sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbname));
-      } finally {
-        databaseCacheLock.readLock().unlock();
-      }
+      dbName = dbName.toLowerCase();
+      sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
     }
     return succ;
   }
 
   @Override
-  public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException,
-      MetaException {
+  public boolean alterDatabase(String dbName, Database db)
+      throws NoSuchObjectException, MetaException {
     boolean succ = rawStore.alterDatabase(dbName, db);
     if (succ) {
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        databaseCacheLock.readLock().lock();
-        isDatabaseCacheDirty.set(true);
-        sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db);
-      } finally {
-        databaseCacheLock.readLock().unlock();
-      }
+      dbName = dbName.toLowerCase();
+      sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db);
     }
     return succ;
   }
 
   @Override
   public List<String> getDatabases(String pattern) throws MetaException {
-    if (!sharedCacheWrapper.isInitialized()) {
+    if (!sharedCache.isDatabaseCachePrewarmed()) {
       return rawStore.getDatabases(pattern);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    List<String> results = new ArrayList<>();
-    for (String dbName : sharedCache.listCachedDatabases()) {
-      dbName = StringUtils.normalizeIdentifier(dbName);
-      if (CacheUtils.matches(dbName, pattern)) {
-        results.add(dbName);
-      }
-    }
-    return results;
+    return sharedCache.listCachedDatabases(pattern);
   }
 
   @Override
   public List<String> getAllDatabases() throws MetaException {
-    if (!sharedCacheWrapper.isInitialized()) {
+    if (!sharedCache.isDatabaseCachePrewarmed()) {
       return rawStore.getAllDatabases();
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     return sharedCache.listCachedDatabases();
   }
 
@@ -854,24 +708,13 @@ public class CachedStore implements RawStore, Configurable {
     if (!shouldCacheTable(dbName, tblName)) {
       return;
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
     validateTableType(tbl);
-    try {
-      // Wait if background cache update is happening
-      tableCacheLock.readLock().lock();
-      isTableCacheDirty.set(true);
-      sharedCache.addTableToCache(dbName, tblName, tbl);
-    } finally {
-      tableCacheLock.readLock().unlock();
-    }
+    sharedCache.addTableToCache(dbName, tblName, tbl);
   }
 
   @Override
-  public boolean dropTable(String dbName, String tblName) throws MetaException,
-      NoSuchObjectException, InvalidObjectException, InvalidInputException {
+  public boolean dropTable(String dbName, String tblName)
+      throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
     boolean succ = rawStore.dropTable(dbName, tblName);
     if (succ) {
       dbName = StringUtils.normalizeIdentifier(dbName);
@@ -879,28 +722,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      // Remove table
-      try {
-        // Wait if background table cache update is happening
-        tableCacheLock.readLock().lock();
-        isTableCacheDirty.set(true);
-        sharedCache.removeTableFromCache(dbName, tblName);
-      } finally {
-        tableCacheLock.readLock().unlock();
-      }
-      // Remove table col stats
-      try {
-        // Wait if background table col stats cache update is happening
-        tableColStatsCacheLock.readLock().lock();
-        isTableColStatsCacheDirty.set(true);
-        sharedCache.removeTableColStatsFromCache(dbName, tblName);
-      } finally {
-        tableColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.removeTableFromCache(dbName, tblName);
     }
     return succ;
   }
@@ -909,11 +731,14 @@ public class CachedStore implements RawStore, Configurable {
   public Table getTable(String dbName, String tblName) throws MetaException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getTable(dbName, tblName);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // This table is not yet loaded in cache
+      return rawStore.getTable(dbName, tblName);
+    }
     if (tbl != null) {
       tbl.unsetPrivileges();
       tbl.setRewriteEnabled(tbl.isRewriteEnabled());
@@ -930,27 +755,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.addPartitionToCache(dbName, tblName, part);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.addPartitionToCache(dbName, tblName, part);
     }
     return succ;
   }
@@ -965,29 +770,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        for (Partition part : parts) {
-          sharedCache.addPartitionToCache(dbName, tblName, part);
-        }
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.addPartitionsToCache(dbName, tblName, parts);
     }
     return succ;
   }
@@ -1002,30 +785,10 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
-        while (iterator.hasNext()) {
-          Partition part = iterator.next();
-          sharedCache.addPartitionToCache(dbName, tblName, part);
-        }
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
+      PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+      while (iterator.hasNext()) {
+        Partition part = iterator.next();
+        sharedCache.addPartitionToCache(dbName, tblName, part);
       }
     }
     return succ;
@@ -1036,16 +799,13 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getPartition(dbName, tblName, part_vals);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    Partition part =
-        sharedCache.getPartitionFromCache(dbName, tblName, part_vals);
+    Partition part = sharedCache.getPartitionFromCache(dbName, tblName, part_vals);
     if (part == null) {
-      // TODO Manage privileges
-      throw new NoSuchObjectException("partition values=" + part_vals.toString());
+      // The table containing the partition is not yet loaded in cache
+      return rawStore.getPartition(dbName, tblName, part_vals);
     }
     return part;
   }
@@ -1055,10 +815,14 @@ public class CachedStore implements RawStore, Configurable {
       List<String> part_vals) throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.doesPartitionExist(dbName, tblName, part_vals);
+    }
+    Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // The table containing the partition is not yet loaded in cache
       return rawStore.doesPartitionExist(dbName, tblName, part_vals);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     return sharedCache.existPartitionFromCache(dbName, tblName, part_vals);
   }
 
@@ -1072,50 +836,40 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      // Remove partition
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.removePartitionFromCache(dbName, tblName, part_vals);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove partition col stats
-      try {
-        // Wait if background cache update is happening
-        partitionColStatsCacheLock.readLock().lock();
-        isPartitionColStatsCacheDirty.set(true);
-        sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals);
-      } finally {
-        partitionColStatsCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.removePartitionFromCache(dbName, tblName, part_vals);
     }
     return succ;
   }
 
   @Override
+  public void dropPartitions(String dbName, String tblName, List<String> partNames)
+      throws MetaException, NoSuchObjectException {
+    rawStore.dropPartitions(dbName, tblName, partNames);
+    dbName = StringUtils.normalizeIdentifier(dbName);
+    tblName = StringUtils.normalizeIdentifier(tblName);
+    if (!shouldCacheTable(dbName, tblName)) {
+      return;
+    }
+    List<List<String>> partVals = new ArrayList<List<String>>();
+    for (String partName : partNames) {
+      partVals.add(partNameToVals(partName));
+    }
+    sharedCache.removePartitionsFromCache(dbName, tblName, partVals);
+  }
+
+  @Override
   public List<Partition> getPartitions(String dbName, String tblName, int max)
       throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.getPartitions(dbName, tblName, max);
+    }
+    Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // The table containing the partitions is not yet loaded in cache
       return rawStore.getPartitions(dbName, tblName, max);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<Partition> parts = sharedCache.listCachedPartitions(dbName, tblName, max);
     return parts;
   }
@@ -1130,73 +884,20 @@ public class CachedStore implements RawStore, Configurable {
     if (!shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) {
       return;
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
+    Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // The table is not yet loaded in cache
       return;
     }
-
-    if (shouldCacheTable(dbName, newTblName)) {
-      validateTableType(newTable);
-      // Update table cache
-      try {
-        // Wait if background cache update is happening
-        tableCacheLock.readLock().lock();
-        isTableCacheDirty.set(true);
-        sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName),
-            StringUtils.normalizeIdentifier(tblName), newTable);
-      } finally {
-        tableCacheLock.readLock().unlock();
-      }
-      // Update partition cache (key might have changed since table name is a
-      // component of key)
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName),
-            StringUtils.normalizeIdentifier(tblName), newTable);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-    } else {
-      // Remove the table and its cached partitions, stats etc,
-      // since it does not pass the whitelist/blacklist filter.
-      // Remove table
-      try {
-        // Wait if background cache update is happening
-        tableCacheLock.readLock().lock();
-        isTableCacheDirty.set(true);
-        sharedCache.removeTableFromCache(dbName, tblName);
-      } finally {
-        tableCacheLock.readLock().unlock();
-      }
-      // Remove partitions
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.removePartitionsFromCache(dbName, tblName);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Remove partition col stats
-      try {
-        // Wait if background cache update is happening
-        partitionColStatsCacheLock.readLock().lock();
-        isPartitionColStatsCacheDirty.set(true);
-        sharedCache.removePartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionColStatsCacheLock.readLock().unlock();
-      }
-      // Update aggregate partition col stats keys wherever applicable
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.alterTableInAggrPartitionColStatsCache(dbName, tblName, newTable);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+    if (shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) {
+      // If old table is in the cache and the new table can also be cached
+      sharedCache.alterTableInCache(dbName, tblName, newTable);
+    } else if (!shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) {
+      // If old table is *not* in the cache but the new table can be cached
+      sharedCache.addTableToCache(dbName, newTblName, newTable);
+    } else if (shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) {
+      // If old table is in the cache but the new table *cannot* be cached
+      sharedCache.removeTableFromCache(dbName, tblName);
     }
   }
 
@@ -1208,34 +909,21 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public List<String> getTables(String dbName, String pattern) throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.getTables(dbName, pattern);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    List<String> tableNames = new ArrayList<>();
-    for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
-      if (CacheUtils.matches(table.getTableName(), pattern)) {
-        tableNames.add(table.getTableName());
-      }
-    }
-    return tableNames;
+    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern,
+        (short) -1);
   }
 
   @Override
   public List<String> getTables(String dbName, String pattern, TableType tableType)
       throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.getTables(dbName, pattern, tableType);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    List<String> tableNames = new ArrayList<>();
-    for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
-      if (CacheUtils.matches(table.getTableName(), pattern) &&
-          table.getTableType().equals(tableType.toString())) {
-        tableNames.add(table.getTableName());
-      }
-    }
-    return tableNames;
+    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern,
+        tableType);
   }
 
   @Override
@@ -1248,10 +936,9 @@ public class CachedStore implements RawStore, Configurable {
   public List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes)
       throws MetaException {
     // TODO Check if all required tables are allowed, if so, get it from cache
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.getTableMeta(dbNames, tableNames, tableTypes);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames),
         StringUtils.normalizeIdentifier(tableNames), tableTypes);
   }
@@ -1268,10 +955,9 @@ public class CachedStore implements RawStore, Configurable {
         break;
       }
     }
-    if (!sharedCacheWrapper.isInitialized() || missSomeInCache) {
+    if (!isCachePrewarmed.get() || missSomeInCache) {
       return rawStore.getTableObjectsByName(dbName, tblNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<Table> tables = new ArrayList<>();
     for (String tblName : tblNames) {
       tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1286,38 +972,20 @@ public class CachedStore implements RawStore, Configurable {
 
   @Override
   public List<String> getAllTables(String dbName) throws MetaException {
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.getAllTables(dbName);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    return getAllTablesInternal(dbName, sharedCache);
-  }
-
-  private static List<String> getAllTablesInternal(String dbName, SharedCache sharedCache) {
-    List<String> tblNames = new ArrayList<>();
-    for (Table tbl : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
-      tblNames.add(StringUtils.normalizeIdentifier(tbl.getTableName()));
-    }
-    return tblNames;
+    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName));
   }
 
   @Override
   public List<String> listTableNamesByFilter(String dbName, String filter, short max_tables)
       throws MetaException, UnknownDBException {
-    if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+    if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
       return rawStore.listTableNamesByFilter(dbName, filter, max_tables);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    List<String> tableNames = new ArrayList<>();
-    int count = 0;
-    for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
-      if (CacheUtils.matches(table.getTableName(), filter)
-          && (max_tables == -1 || count < max_tables)) {
-        tableNames.add(table.getTableName());
-        count++;
-      }
-    }
-    return tableNames;
+    return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), filter,
+        max_tables);
   }
 
   @Override
@@ -1325,16 +993,19 @@ public class CachedStore implements RawStore, Configurable {
       short max_parts) throws MetaException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.listPartitionNames(dbName, tblName, max_parts);
+    }
+    Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+    if (tbl == null) {
+      // The table is not yet loaded in cache
       return rawStore.listPartitionNames(dbName, tblName, max_parts);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<String> partitionNames = new ArrayList<>();
-    Table t = sharedCache.getTableFromCache(dbName, tblName);
     int count = 0;
     for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, max_parts)) {
       if (max_parts == -1 || count < max_parts) {
-        partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+        partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()));
       }
     }
     return partitionNames;
@@ -1363,37 +1034,7 @@ public class CachedStore implements RawStore, Configurable {
     if (!shouldCacheTable(dbName, tblName)) {
       return;
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
-    // Update partition cache
-    try {
-      // Wait if background cache update is happening
-      partitionCacheLock.readLock().lock();
-      isPartitionCacheDirty.set(true);
-      sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
-    } finally {
-      partitionCacheLock.readLock().unlock();
-    }
-    // Update partition column stats cache
-    try {
-      // Wait if background cache update is happening
-      partitionColStatsCacheLock.readLock().lock();
-      isPartitionColStatsCacheDirty.set(true);
-      sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart);
-    } finally {
-      partitionColStatsCacheLock.readLock().unlock();
-    }
-    // Remove aggregate partition col stats for this table
-    try {
-      // Wait if background cache update is happening
-      partitionAggrColStatsCacheLock.readLock().lock();
-      isPartitionAggrColStatsCacheDirty.set(true);
-      sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-    } finally {
-      partitionAggrColStatsCacheLock.readLock().unlock();
-    }
+    sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
   }
 
   @Override
@@ -1405,61 +1046,23 @@ public class CachedStore implements RawStore, Configurable {
     if (!shouldCacheTable(dbName, tblName)) {
       return;
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
-    // Update partition cache
-    try {
-      // Wait if background cache update is happening
-      partitionCacheLock.readLock().lock();
-      isPartitionCacheDirty.set(true);
-      for (int i = 0; i < partValsList.size(); i++) {
-        List<String> partVals = partValsList.get(i);
-        Partition newPart = newParts.get(i);
-        sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
-      }
-    } finally {
-      partitionCacheLock.readLock().unlock();
-    }
-    // Update partition column stats cache
-    try {
-      // Wait if background cache update is happening
-      partitionColStatsCacheLock.readLock().lock();
-      isPartitionColStatsCacheDirty.set(true);
-      for (int i = 0; i < partValsList.size(); i++) {
-        List<String> partVals = partValsList.get(i);
-        Partition newPart = newParts.get(i);
-        sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart);
-      }
-    } finally {
-      partitionColStatsCacheLock.readLock().unlock();
-    }
-    // Remove aggregate partition col stats for this table
-    try {
-      // Wait if background cache update is happening
-      partitionAggrColStatsCacheLock.readLock().lock();
-      isPartitionAggrColStatsCacheDirty.set(true);
-      sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-    } finally {
-      partitionAggrColStatsCacheLock.readLock().unlock();
-    }
+    sharedCache.alterPartitionsInCache(dbName, tblName, partValsList, newParts);
   }
 
   private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr,
       String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache)
-          throws MetaException, NoSuchObjectException {
-    List<Partition> parts = sharedCache.listCachedPartitions(
-        StringUtils.normalizeIdentifier(table.getDbName()),
-        StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
+      throws MetaException, NoSuchObjectException {
+    List<Partition> parts =
+        sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getDbName()),
+            StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
     for (Partition part : parts) {
       result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
     }
     if (defaultPartName == null || defaultPartName.isEmpty()) {
       defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
     }
-    return expressionProxy.filterPartitionsByExpr(
-        table.getPartitionKeys(), expr, defaultPartName, result);
+    return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName,
+        result);
   }
 
   @Override
@@ -1474,13 +1077,17 @@ public class CachedStore implements RawStore, Configurable {
       String defaultPartitionName, short maxParts, List<Partition> result) throws TException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts,
           result);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<String> partNames = new LinkedList<>();
     Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
+      return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts,
+          result);
+    }
     boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr,
         defaultPartitionName, maxParts, partNames, sharedCache);
     return hasUnknownPartitions;
@@ -1497,13 +1104,16 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getNumPartitionsByExpr(dbName, tblName, expr);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
     List<String> partNames = new LinkedList<>();
     Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
+      return rawStore.getNumPartitionsByExpr(dbName, tblName, expr);
+    }
     getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames,
         sharedCache);
     return partNames.size();
@@ -1526,10 +1136,14 @@ public class CachedStore implements RawStore, Configurable {
       List<String> partNames) throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.getPartitionsByNames(dbName, tblName, partNames);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.getPartitionsByNames(dbName, tblName, partNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<Partition> partitions = new ArrayList<>();
     for (String partName : partNames) {
       Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName));
@@ -1702,14 +1316,17 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException, InvalidObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     Partition p = sharedCache.getPartitionFromCache(dbName, tblName, partVals);
-    if (p!=null) {
-      Table t = sharedCache.getTableFromCache(dbName, tblName);
-      String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals);
+    if (p != null) {
+      String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals);
       PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
           userName, groupNames);
       p.setPrivileges(privs);
@@ -1723,16 +1340,19 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException, InvalidObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    Table t = sharedCache.getTableFromCache(dbName, tblName);
     List<Partition> partitions = new ArrayList<>();
     int count = 0;
     for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
       if (maxParts == -1 || count < maxParts) {
-        String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+        String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
         PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
             userName, groupNames);
         part.setPrivileges(privs);
@@ -1749,13 +1369,16 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<String> partNames = new ArrayList<>();
     int count = 0;
-    Table t = sharedCache.getTableFromCache(dbName, tblName);
     for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
       boolean psMatch = true;
       for (int i=0;i<partVals.size();i++) {
@@ -1770,7 +1393,7 @@ public class CachedStore implements RawStore, Configurable {
         continue;
       }
       if (maxParts == -1 || count < maxParts) {
-        partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+        partNames.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
         count++;
       }
     }
@@ -1783,13 +1406,17 @@ public class CachedStore implements RawStore, Configurable {
       throws MetaException, InvalidObjectException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
+      return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName,
+          groupNames);
+    }
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
       return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName,
           groupNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
     List<Partition> partitions = new ArrayList<>();
-    Table t = sharedCache.getTableFromCache(dbName, tblName);
     int count = 0;
     for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
       boolean psMatch = true;
@@ -1805,7 +1432,7 @@ public class CachedStore implements RawStore, Configurable {
         continue;
       }
       if (maxParts == -1 || count < maxParts) {
-        String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+        String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
         PrincipalPrivilegeSet privs =
             getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames);
         part.setPrivileges(privs);
@@ -1825,35 +1452,19 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
+      Table table = sharedCache.getTableFromCache(dbName, tblName);
+      if (table == null) {
+        // The table is not yet loaded in cache
         return succ;
       }
       List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
-      Table tbl = getTable(dbName, tblName);
       List<String> colNames = new ArrayList<>();
       for (ColumnStatisticsObj statsObj : statsObjs) {
         colNames.add(statsObj.getColName());
       }
-      StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
-      // Update table
-      try {
-        // Wait if background cache update is happening
-        tableCacheLock.readLock().lock();
-        isTableCacheDirty.set(true);
-        sharedCache.alterTableInCache(dbName, tblName, tbl);
-      } finally {
-        tableCacheLock.readLock().unlock();
-      }
-      // Update table col stats
-      try {
-        // Wait if background cache update is happening
-        tableColStatsCacheLock.readLock().lock();
-        isTableColStatsCacheDirty.set(true);
-        sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs);
-      } finally {
-        tableColStatsCacheLock.readLock().unlock();
-      }
+      StatsSetupConst.setColumnStatsState(table.getParameters(), colNames);
+      sharedCache.alterTableInCache(dbName, tblName, table);
+      sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs);
     }
     return succ;
   }
@@ -1863,24 +1474,18 @@ public class CachedStore implements RawStore, Configurable {
       List<String> colNames) throws MetaException, NoSuchObjectException {
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       return rawStore.getTableColumnStatistics(dbName, tblName, colNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
-    List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
-    for (String colName : colNames) {
-      String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName);
-      ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey);
-      if (colStat != null) {
-        colStatObjs.add(colStat);
-      }
-    }
-    if (colStatObjs.isEmpty()) {
-      return null;
-    } else {
-      return new ColumnStatistics(csd, colStatObjs);
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
+      return rawStore.getTableColumnStatistics(dbName, tblName, colNames);
     }
+    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
+    List<ColumnStatisticsObj> colStatObjs =
+        sharedCache.getTableColStatsFromCache(dbName, tblName, colNames);
+    return new ColumnStatistics(csd, colStatObjs);
   }
 
   @Override
@@ -1893,18 +1498,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        tableColStatsCacheLock.readLock().lock();
-        isTableColStatsCacheDirty.set(true);
-        sharedCache.removeTableColStatsFromCache(dbName, tblName, colName);
-      } finally {
-        tableColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.removeTableColStatsFromCache(dbName, tblName, colName);
     }
     return succ;
   }
@@ -1919,10 +1513,6 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
       List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
       Partition part = getPartition(dbName, tblName, partVals);
       List<String> colNames = new ArrayList<>();
@@ -1930,34 +1520,8 @@ public class CachedStore implements RawStore, Configurable {
         colNames.add(statsObj.getColName());
       }
       StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
-      // Update partition
-      try {
-        // Wait if background cache update is happening
-        partitionCacheLock.readLock().lock();
-        isPartitionCacheDirty.set(true);
-        sharedCache.alterPartitionInCache(dbName, tblName, partVals, part);
-      } finally {
-        partitionCacheLock.readLock().unlock();
-      }
-      // Update partition column stats
-      try {
-        // Wait if background cache update is happening
-        partitionColStatsCacheLock.readLock().lock();
-        isPartitionColStatsCacheDirty.set(true);
-        sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals,
-            colStats.getStatsObj());
-      } finally {
-        partitionColStatsCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.alterPartitionInCache(dbName, tblName, partVals, part);
+      sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, colStats.getStatsObj());
     }
     return succ;
   }
@@ -1981,27 +1545,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!shouldCacheTable(dbName, tblName)) {
         return succ;
       }
-      SharedCache sharedCache = sharedCacheWrapper.get();
-      if (sharedCache == null) {
-        return succ;
-      }
-      try {
-        // Wait if background cache update is happening
-        partitionColStatsCacheLock.readLock().lock();
-        isPartitionColStatsCacheDirty.set(true);
-        sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName);
-      } finally {
-        partitionColStatsCacheLock.readLock().unlock();
-      }
-      // Remove aggregate partition col stats for this table
-      try {
-        // Wait if background cache update is happening
-        partitionAggrColStatsCacheLock.readLock().lock();
-        isPartitionAggrColStatsCacheDirty.set(true);
-        sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
-      } finally {
-        partitionAggrColStatsCacheLock.readLock().unlock();
-      }
+      sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName);
     }
     return succ;
   }
@@ -2012,10 +1556,14 @@ public class CachedStore implements RawStore, Configurable {
     List<ColumnStatisticsObj> colStats;
     dbName = StringUtils.normalizeIdentifier(dbName);
     tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+    if (!shouldCacheTable(dbName, tblName)) {
       rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
     }
-    SharedCache sharedCache = sharedCacheWrapper.get();
+    Table table = sharedCache.getTableFromCache(dbName, tblName);
+    if (table == null) {
+      // The table is not yet loaded in cache
+      return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+    }
     List<String> allPartNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
     if (partNames.size() == allPartNames.size()) {
       colStats = sharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALL);
@@ -2054,10 +1602,8 @@ public class CachedStore implements RawStore, Configurable {
       List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList =
           new ArrayList<ColStatsObjWithSourceInfo>();
       for (String partName : partNames) {
-        String colStatsCacheKey =
-            CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
         ColumnStatisticsObj colStatsForPart =
-            sharedCache.getCachedPartitionColStats(colStatsCacheKey);
+            sharedCache.getPartitionColStatsFromCache(dbName, tblName, partNameToVals(partName), colName);
         if (colStatsForPart != null) {
           ColStatsObjWithSourceInfo colStatsWithPartInfo =
               new ColStatsObjWithSourceInfo(colStatsForPart, dbName, tblName, partName);
@@ -2173,54 +1719,6 @@ public class CachedStore implements RawStore, Configurable {
   }
 
   @Override
-  public void dropPartitions(String dbName, String tblName, List<String> partNames)
-      throws MetaException, NoSuchObjectException {
-    rawStore.dropPartitions(dbName, tblName, partNames);
-    dbName = StringUtils.normalizeIdentifier(dbName);
-    tblName = StringUtils.normalizeIdentifier(tblName);
-    if (!shouldCacheTable(dbName, tblName)) {
-      return;
-    }
-    SharedCache sharedCache = sharedCacheWrapper.get();
-    if (sharedCache == null) {
-      return;
-    }
-    // Remove partitions
-    try {
-      // Wait if background cache update is happening
-      partitionCacheLock.readLock().lock();
-      isPartitionCacheDirty.set(true);
-      for (String partName : partNames) {
-        List<String> vals = partNameToVals(partName);
-        sharedCache.removePartitionFromCache(dbName, tblName, vals);
-      }
-    } finally {
-      partitionCacheLock.readLock().unlock();
-    }
-    // Remove partition col stats
-    try {
-      // Wait if background cache update is happening
-      partitionColStatsCacheLock.readLock().lock();
-      isPartitionColStatsCacheDirty.set(true);
-      for (String partName : partNames) {
-        List<String> part_vals = partNameToVals(partName);
-        sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals);
-      }
-    } finally {
-      partitionColStatsCacheLock.readLock().unlock();
-    }
-   

<TRUNCATED>

[2/4] hive git commit: HIVE-18264: CachedStore: Store cached partitions/col stats within the table cache and make prewarm non-blocking (Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)

Posted by vg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index 32ea174..cf92eda 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -21,15 +21,20 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.TreeMap;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -38,11 +43,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,15 +51,21 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 
 public class SharedCache {
-  private Map<String, Database> databaseCache = new TreeMap<>();
-  private Map<String, TableWrapper> tableCache = new TreeMap<>();
-  private Map<String, PartitionWrapper> partitionCache = new TreeMap<>();
-  private Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<>();
-  private Map<String, ColumnStatisticsObj> tableColStatsCache = new TreeMap<>();
-  private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<>();
-  private Map<String, List<ColumnStatisticsObj>> aggrColStatsCache =
-      new HashMap<String, List<ColumnStatisticsObj>>();
+  private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true);
+  // For caching Database objects. Key is database name
+  private Map<String, Database> databaseCache = new ConcurrentHashMap<String, Database>();
+  private boolean isDatabaseCachePrewarmed = false;
+  private HashSet<String> databasesDeletedDuringPrewarm = new HashSet<String>();
+  private AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
+  // For caching TableWrapper objects. Key is aggregate of database name and table name
+  private Map<String, TableWrapper> tableCache = new ConcurrentHashMap<String, TableWrapper>();
+  private boolean isTableCachePrewarmed = false;
+  private HashSet<String> tablesDeletedDuringPrewarm = new HashSet<String>();
+  private AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
+  private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new ConcurrentHashMap<>();
   private static MessageDigest md;
+  static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
+  private AtomicLong cacheUpdateCount = new AtomicLong(0);
 
   static enum StatsType {
     ALL(0), ALLBUTDEFAULT(1);
@@ -74,8 +81,6 @@ public class SharedCache {
     }
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class);
-
   static {
     try {
       md = MessageDigest.getInstance("MD5");
@@ -84,43 +89,804 @@ public class SharedCache {
     }
   }
 
-  public synchronized Database getDatabaseFromCache(String name) {
-    return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null;
+  static class TableWrapper {
+    Table t;
+    String location;
+    Map<String, String> parameters;
+    byte[] sdHash;
+    ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true);
+    // For caching column stats for an unpartitioned table
+    // Key is column name and the value is the col stat object
+    private Map<String, ColumnStatisticsObj> tableColStatsCache =
+        new ConcurrentHashMap<String, ColumnStatisticsObj>();
+    private AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
+    // For caching partition objects
+    // Ket is partition values and the value is a wrapper around the partition object
+    private Map<String, PartitionWrapper> partitionCache = new ConcurrentHashMap<String, PartitionWrapper>();
+    private AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
+    // For caching column stats for a partitioned table
+    // Key is aggregate of partition values, column name and the value is the col stat object
+    private Map<String, ColumnStatisticsObj> partitionColStatsCache =
+        new ConcurrentHashMap<String, ColumnStatisticsObj>();
+    private AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
+    // For caching aggregate column stats for all and all minus default partition
+    // Key is column name and the value is a list of 2 col stat objects
+    // (all partitions and all but default)
+    private Map<String, List<ColumnStatisticsObj>> aggrColStatsCache =
+        new ConcurrentHashMap<String, List<ColumnStatisticsObj>>();
+    private AtomicBoolean isAggrPartitionColStatsCacheDirty = new AtomicBoolean(false);
+
+    TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) {
+      this.t = t;
+      this.sdHash = sdHash;
+      this.location = location;
+      this.parameters = parameters;
+    }
+
+    public Table getTable() {
+      return t;
+    }
+
+    public void setTable(Table t) {
+      this.t = t;
+    }
+
+    public byte[] getSdHash() {
+      return sdHash;
+    }
+
+    public void setSdHash(byte[] sdHash) {
+      this.sdHash = sdHash;
+    }
+
+    public String getLocation() {
+      return location;
+    }
+
+    public void setLocation(String location) {
+      this.location = location;
+    }
+
+    public Map<String, String> getParameters() {
+      return parameters;
+    }
+
+    public void setParameters(Map<String, String> parameters) {
+      this.parameters = parameters;
+    }
+
+    void cachePartition(Partition part, SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+        partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+        isPartitionCacheDirty.set(true);
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    void cachePartitions(List<Partition> parts, SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        for (Partition part : parts) {
+          PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+          partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+          isPartitionCacheDirty.set(true);
+        }
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public Partition getPartition(List<String> partVals, SharedCache sharedCache) {
+      Partition part = null;
+      try {
+        tableLock.readLock().lock();
+        PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
+        if (wrapper == null) {
+          return null;
+        }
+        part = CacheUtils.assemble(wrapper, sharedCache);
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return part;
+    }
+
+    public List<Partition> listPartitions(int max, SharedCache sharedCache) {
+      List<Partition> parts = new ArrayList<>();
+      int count = 0;
+      try {
+        tableLock.readLock().lock();
+        for (PartitionWrapper wrapper : partitionCache.values()) {
+          if (max == -1 || count < max) {
+            parts.add(CacheUtils.assemble(wrapper, sharedCache));
+            count++;
+          }
+        }
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return parts;
+    }
+
+    public boolean containsPartition(List<String> partVals) {
+      boolean containsPart = false;
+      try {
+        tableLock.readLock().lock();
+        containsPart = partitionCache.containsKey(CacheUtils.buildPartitionCacheKey(partVals));
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return containsPart;
+    }
+
+    public Partition removePartition(List<String> partVal, SharedCache sharedCache) {
+      Partition part = null;
+      try {
+        tableLock.writeLock().lock();
+        PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
+        isPartitionCacheDirty.set(true);
+        if (wrapper.getSdHash() != null) {
+          sharedCache.decrSd(wrapper.getSdHash());
+        }
+        part = CacheUtils.assemble(wrapper, sharedCache);
+        // Remove col stats
+        String partialKey = CacheUtils.buildPartitionCacheKey(partVal);
+        Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+            partitionColStatsCache.entrySet().iterator();
+        while (iterator.hasNext()) {
+          Entry<String, ColumnStatisticsObj> entry = iterator.next();
+          String key = entry.getKey();
+          if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+            iterator.remove();
+          }
+        }
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+      return part;
+    }
+
+    public void removePartitions(List<List<String>> partVals, SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        for (List<String> partVal : partVals) {
+          removePartition(partVal, sharedCache);
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void alterPartition(List<String> partVals, Partition newPart, SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        removePartition(partVals, sharedCache);
+        cachePartition(newPart, sharedCache);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void alterPartitions(List<List<String>> partValsList, List<Partition> newParts,
+        SharedCache sharedCache) {
+      try {
+        tableLock.writeLock().lock();
+        for (int i = 0; i < partValsList.size(); i++) {
+          List<String> partVals = partValsList.get(i);
+          Partition newPart = newParts.get(i);
+          alterPartition(partVals, newPart, sharedCache);
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void refreshPartitions(List<Partition> partitions, SharedCache sharedCache) {
+      Map<String, PartitionWrapper> newPartitionCache = new HashMap<String, PartitionWrapper>();
+      try {
+        tableLock.writeLock().lock();
+        for (Partition part : partitions) {
+          if (isPartitionCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping partition cache update for table: " + getTable().getTableName()
+                + "; the partition list we have is dirty.");
+            return;
+          }
+          String key = CacheUtils.buildPartitionCacheKey(part.getValues());
+          PartitionWrapper wrapper = partitionCache.get(key);
+          if (wrapper != null) {
+            if (wrapper.getSdHash() != null) {
+              sharedCache.decrSd(wrapper.getSdHash());
+            }
+          }
+          wrapper = makePartitionWrapper(part, sharedCache);
+          newPartitionCache.put(key, wrapper);
+        }
+        partitionCache = newPartitionCache;
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void updateTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
+      try {
+        tableLock.writeLock().lock();
+        for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+          // Get old stats object if present
+          String key = colStatObj.getColName();
+          ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
+          if (oldStatsObj != null) {
+            // Update existing stat object's field
+            StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+          } else {
+            // No stats exist for this key; add a new object to the cache
+            // TODO: get rid of deepCopy after making sure callers don't use references
+            tableColStatsCache.put(key, colStatObj.deepCopy());
+          }
+        }
+        isTableColStatsCacheDirty.set(true);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void refreshTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
+      Map<String, ColumnStatisticsObj> newTableColStatsCache =
+          new HashMap<String, ColumnStatisticsObj>();
+      try {
+        tableLock.writeLock().lock();
+        for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+          if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping table col stats cache update for table: "
+                + getTable().getTableName() + "; the table col stats list we have is dirty.");
+            return;
+          }
+          String key = colStatObj.getColName();
+          // TODO: get rid of deepCopy after making sure callers don't use references
+          newTableColStatsCache.put(key, colStatObj.deepCopy());
+        }
+        tableColStatsCache = newTableColStatsCache;
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public List<ColumnStatisticsObj> getCachedTableColStats(List<String> colNames) {
+      List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+      try {
+        tableLock.readLock().lock();
+        for (String colName : colNames) {
+          ColumnStatisticsObj colStatObj = tableColStatsCache.get(colName);
+          if (colStatObj != null) {
+            colStatObjs.add(colStatObj);
+          }
+        }
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return colStatObjs;
+    }
+
+    public void removeTableColStats(String colName) {
+      try {
+        tableLock.writeLock().lock();
+        tableColStatsCache.remove(colName);
+        isTableColStatsCacheDirty.set(true);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public ColumnStatisticsObj getPartitionColStats(List<String> partVal, String colName) {
+      try {
+        tableLock.readLock().lock();
+        return partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+      } finally {
+        tableLock.readLock().unlock();
+      }
+    }
+
+    public void updatePartitionColStats(List<String> partVal,
+        List<ColumnStatisticsObj> colStatsObjs) {
+      try {
+        tableLock.writeLock().lock();
+        addPartitionColStatsToCache(partVal, colStatsObjs);
+        isPartitionColStatsCacheDirty.set(true);
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void removePartitionColStats(List<String> partVals, String colName) {
+      try {
+        tableLock.writeLock().lock();
+        partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName));
+        isPartitionColStatsCacheDirty.set(true);
+        // Invalidate cached aggregate stats
+        if (!aggrColStatsCache.isEmpty()) {
+          aggrColStatsCache.clear();
+        }
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    private void addPartitionColStatsToCache(List<String> partVal,
+        List<ColumnStatisticsObj> colStatsObjs) {
+      for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+        // Get old stats object if present
+        String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+        ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
+        if (oldStatsObj != null) {
+          // Update existing stat object's field
+          StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+        } else {
+          // No stats exist for this key; add a new object to the cache
+          // TODO: get rid of deepCopy after making sure callers don't use references
+          partitionColStatsCache.put(key, colStatObj.deepCopy());
+        }
+      }
+    }
+
+    public void refreshPartitionColStats(List<ColumnStatistics> partitionColStats) {
+      Map<String, ColumnStatisticsObj> newPartitionColStatsCache =
+          new HashMap<String, ColumnStatisticsObj>();
+      try {
+        tableLock.writeLock().lock();
+        String tableName = StringUtils.normalizeIdentifier(getTable().getTableName());
+        for (ColumnStatistics cs : partitionColStats) {
+          if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+            LOG.debug("Skipping partition column stats cache update for table: "
+                + getTable().getTableName() + "; the partition column stats list we have is dirty");
+            return;
+          }
+          List<String> partVal;
+          try {
+            partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null);
+            List<ColumnStatisticsObj> colStatsObjs = cs.getStatsObj();
+            for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+              if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+                LOG.debug("Skipping partition column stats cache update for table: "
+                    + getTable().getTableName() + "; the partition column list we have is dirty");
+                return;
+              }
+              String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+              newPartitionColStatsCache.put(key, colStatObj.deepCopy());
+            }
+          } catch (MetaException e) {
+            LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
+          }
+        }
+        partitionColStatsCache = newPartitionColStatsCache;
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public List<ColumnStatisticsObj> getAggrPartitionColStats(List<String> colNames,
+        StatsType statsType) {
+      List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
+      try {
+        tableLock.readLock().lock();
+        for (String colName : colNames) {
+          List<ColumnStatisticsObj> colStatList = aggrColStatsCache.get(colName);
+          // If unable to find stats for a column, return null so we can build stats
+          if (colStatList == null) {
+            return null;
+          }
+          ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition());
+          // If unable to find stats for this StatsType, return null so we can build stats
+          if (colStatObj == null) {
+            return null;
+          }
+          colStats.add(colStatObj);
+        }
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return colStats;
+    }
+
+    public void cacheAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
+        AggrStats aggrStatsAllButDefaultPartition) {
+      try {
+        tableLock.writeLock().lock();
+        if (aggrStatsAllPartitions != null) {
+          for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
+            if (statObj != null) {
+              List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
+              aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
+              aggrColStatsCache.put(statObj.getColName(), aggrStats);
+            }
+          }
+        }
+        if (aggrStatsAllButDefaultPartition != null) {
+          for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
+            if (statObj != null) {
+              List<ColumnStatisticsObj> aggrStats = aggrColStatsCache.get(statObj.getColName());
+              if (aggrStats == null) {
+                aggrStats = new ArrayList<ColumnStatisticsObj>();
+              }
+              aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+            }
+          }
+        }
+        isAggrPartitionColStatsCacheDirty.set(true);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
+        AggrStats aggrStatsAllButDefaultPartition) {
+      Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
+          new HashMap<String, List<ColumnStatisticsObj>>();
+      try {
+        tableLock.writeLock().lock();
+        if (aggrStatsAllPartitions != null) {
+          for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
+            if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+              LOG.debug("Skipping aggregate stats cache update for table: "
+                  + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+              return;
+            }
+            if (statObj != null) {
+              List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
+              aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
+              newAggrColStatsCache.put(statObj.getColName(), aggrStats);
+            }
+          }
+        }
+        if (aggrStatsAllButDefaultPartition != null) {
+          for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
+            if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+              LOG.debug("Skipping aggregate stats cache update for table: "
+                  + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+              return;
+            }
+            if (statObj != null) {
+              List<ColumnStatisticsObj> aggrStats = newAggrColStatsCache.get(statObj.getColName());
+              if (aggrStats == null) {
+                aggrStats = new ArrayList<ColumnStatisticsObj>();
+              }
+              aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+            }
+          }
+        }
+        aggrColStatsCache = newAggrColStatsCache;
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
+    private void updateTableObj(Table newTable, SharedCache sharedCache) {
+      byte[] sdHash = getSdHash();
+      // Remove old table object's sd hash
+      if (sdHash != null) {
+        sharedCache.decrSd(sdHash);
+      }
+      Table tblCopy = newTable.deepCopy();
+      if (tblCopy.getPartitionKeys() != null) {
+        for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+          fs.setName(StringUtils.normalizeIdentifier(fs.getName()));
+        }
+      }
+      setTable(tblCopy);
+      if (tblCopy.getSd() != null) {
+        sdHash = MetaStoreUtils.hashStorageDescriptor(tblCopy.getSd(), md);
+        StorageDescriptor sd = tblCopy.getSd();
+        sharedCache.increSd(sd, sdHash);
+        tblCopy.setSd(null);
+        setSdHash(sdHash);
+        setLocation(sd.getLocation());
+        setParameters(sd.getParameters());
+      } else {
+        setSdHash(null);
+        setLocation(null);
+        setParameters(null);
+      }
+    }
+
+    private PartitionWrapper makePartitionWrapper(Partition part, SharedCache sharedCache) {
+      Partition partCopy = part.deepCopy();
+      PartitionWrapper wrapper;
+      if (part.getSd() != null) {
+        byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
+        StorageDescriptor sd = part.getSd();
+        sharedCache.increSd(sd, sdHash);
+        partCopy.setSd(null);
+        wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
+      } else {
+        wrapper = new PartitionWrapper(partCopy, null, null, null);
+      }
+      return wrapper;
+    }
+  }
+
+  static class PartitionWrapper {
+    Partition p;
+    String location;
+    Map<String, String> parameters;
+    byte[] sdHash;
+
+    PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
+      this.p = p;
+      this.sdHash = sdHash;
+      this.location = location;
+      this.parameters = parameters;
+    }
+
+    public Partition getPartition() {
+      return p;
+    }
+
+    public byte[] getSdHash() {
+      return sdHash;
+    }
+
+    public String getLocation() {
+      return location;
+    }
+
+    public Map<String, String> getParameters() {
+      return parameters;
+    }
+  }
+
+  static class StorageDescriptorWrapper {
+    StorageDescriptor sd;
+    int refCount = 0;
+
+    StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
+      this.sd = sd;
+      this.refCount = refCount;
+    }
+
+    public StorageDescriptor getSd() {
+      return sd;
+    }
+
+    public int getRefCount() {
+      return refCount;
+    }
+  }
+
+  public Database getDatabaseFromCache(String name) {
+    Database db = null;
+    try {
+      cacheLock.readLock().lock();
+      if (databaseCache.get(name) != null) {
+        db = databaseCache.get(name).deepCopy();
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return db;
+  }
+
+  public void populateDatabasesInCache(List<Database> databases) {
+    for (Database db : databases) {
+      Database dbCopy = db.deepCopy();
+      // ObjectStore also stores db name in lowercase
+      dbCopy.setName(dbCopy.getName().toLowerCase());
+      try {
+        cacheLock.writeLock().lock();
+        // Since we allow write operations on cache while prewarm is happening:
+        // 1. Don't add databases that were deleted while we were preparing list for prewarm
+        // 2. Skip overwriting exisiting db object
+        // (which is present because it was added after prewarm started)
+        if (databasesDeletedDuringPrewarm.contains(dbCopy.getName().toLowerCase())) {
+          continue;
+        }
+        databaseCache.putIfAbsent(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy);
+        databasesDeletedDuringPrewarm.clear();
+        isDatabaseCachePrewarmed = true;
+      } finally {
+        cacheLock.writeLock().unlock();
+      }
+    }
   }
 
-  public synchronized void addDatabaseToCache(String dbName, Database db) {
-    Database dbCopy = db.deepCopy();
-    dbCopy.setName(StringUtils.normalizeIdentifier(dbName));
-    databaseCache.put(dbName, dbCopy);
+  public boolean isDatabaseCachePrewarmed() {
+    return isDatabaseCachePrewarmed;
   }
 
-  public synchronized void removeDatabaseFromCache(String dbName) {
-    databaseCache.remove(dbName);
+  public void addDatabaseToCache(Database db) {
+    try {
+      cacheLock.writeLock().lock();
+      Database dbCopy = db.deepCopy();
+      // ObjectStore also stores db name in lowercase
+      dbCopy.setName(dbCopy.getName().toLowerCase());
+      databaseCache.put(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy);
+      isDatabaseCacheDirty.set(true);
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
   }
 
-  public synchronized List<String> listCachedDatabases() {
-    return new ArrayList<>(databaseCache.keySet());
+  public void removeDatabaseFromCache(String dbName) {
+    try {
+      cacheLock.writeLock().lock();
+      // If db cache is not yet prewarmed, add this to a set which the prewarm thread can check
+      // so that the prewarm thread does not add it back
+      if (!isDatabaseCachePrewarmed) {
+        databasesDeletedDuringPrewarm.add(dbName.toLowerCase());
+      }
+      if (databaseCache.remove(dbName) != null) {
+        isDatabaseCacheDirty.set(true);
+      }
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
+  public List<String> listCachedDatabases() {
+    List<String> results = new ArrayList<>();
+    try {
+      cacheLock.readLock().lock();
+      results.addAll(databaseCache.keySet());
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return results;
+  }
+
+  public List<String> listCachedDatabases(String pattern) {
+    List<String> results = new ArrayList<>();
+    try {
+      cacheLock.readLock().lock();
+      for (String dbName : databaseCache.keySet()) {
+        dbName = StringUtils.normalizeIdentifier(dbName);
+        if (CacheUtils.matches(dbName, pattern)) {
+          results.add(dbName);
+        }
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return results;
   }
 
-  public synchronized void alterDatabaseInCache(String dbName, Database newDb) {
-    removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
-    addDatabaseToCache(StringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy());
+  /**
+   * Replaces the old db object with the new one.
+   * This will add the new database to cache if it does not exist.
+   * @param dbName
+   * @param newDb
+   */
+  public void alterDatabaseInCache(String dbName, Database newDb) {
+    try {
+      cacheLock.writeLock().lock();
+      removeDatabaseFromCache(dbName);
+      addDatabaseToCache(newDb.deepCopy());
+      isDatabaseCacheDirty.set(true);
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
+  public void refreshDatabasesInCache(List<Database> databases) {
+    try {
+      cacheLock.writeLock().lock();
+      if (isDatabaseCacheDirty.compareAndSet(true, false)) {
+        LOG.debug("Skipping database cache update; the database list we have is dirty.");
+        return;
+      }
+      databaseCache.clear();
+      for (Database db : databases) {
+        addDatabaseToCache(db);
+      }
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
+  public int getCachedDatabaseCount() {
+    try {
+      cacheLock.readLock().lock();
+      return databaseCache.size();
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+  }
+
+  public void populateTableInCache(Table table, ColumnStatistics tableColStats,
+      List<Partition> partitions, List<ColumnStatistics> partitionColStats,
+      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+    String dbName = StringUtils.normalizeIdentifier(table.getDbName());
+    String tableName = StringUtils.normalizeIdentifier(table.getTableName());
+    // Since we allow write operations on cache while prewarm is happening:
+    // 1. Don't add tables that were deleted while we were preparing list for prewarm
+    if (tablesDeletedDuringPrewarm.contains(CacheUtils.buildTableCacheKey(dbName, tableName))) {
+      return;
+    }
+    TableWrapper tblWrapper = createTableWrapper(dbName, tableName, table);
+    if (!table.isSetPartitionKeys() && (tableColStats != null)) {
+      tblWrapper.updateTableColStats(tableColStats.getStatsObj());
+    } else {
+      if (partitions != null) {
+        tblWrapper.cachePartitions(partitions, this);
+      }
+      if (partitionColStats != null) {
+        for (ColumnStatistics cs : partitionColStats) {
+          List<String> partVal;
+          try {
+            partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null);
+            List<ColumnStatisticsObj> colStats = cs.getStatsObj();
+            tblWrapper.updatePartitionColStats(partVal, colStats);
+          } catch (MetaException e) {
+            LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
+          }
+        }
+      }
+      tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
+          aggrStatsAllButDefaultPartition);
+    }
+    try {
+      cacheLock.writeLock().lock();
+      // 2. Skip overwriting exisiting table object
+      // (which is present because it was added after prewarm started)
+      tableCache.putIfAbsent(CacheUtils.buildTableCacheKey(dbName, tableName), tblWrapper);
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
   }
 
-  public synchronized int getCachedDatabaseCount() {
-    return databaseCache.size();
+  public void completeTableCachePrewarm() {
+    try {
+      cacheLock.writeLock().lock();
+      tablesDeletedDuringPrewarm.clear();
+      isTableCachePrewarmed = true;
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
   }
 
-  public synchronized Table getTableFromCache(String dbName, String tableName) {
-    TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName));
-    if (tblWrapper == null) {
-      return null;
+  public Table getTableFromCache(String dbName, String tableName) {
+    Table t = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+      if (tblWrapper != null) {
+        t = CacheUtils.assemble(tblWrapper, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    Table t = CacheUtils.assemble(tblWrapper, this);
     return t;
   }
 
-  public synchronized void addTableToCache(String dbName, String tblName, Table tbl) {
+  public TableWrapper addTableToCache(String dbName, String tblName, Table tbl) {
+    try {
+      cacheLock.writeLock().lock();
+      TableWrapper wrapper = createTableWrapper(dbName, tblName, tbl);
+      tableCache.put(CacheUtils.buildTableCacheKey(dbName, tblName), wrapper);
+      isTableCacheDirty.set(true);
+      return wrapper;
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
+  private TableWrapper createTableWrapper(String dbName, String tblName, Table tbl) {
+    TableWrapper wrapper;
     Table tblCopy = tbl.deepCopy();
     tblCopy.setDbName(StringUtils.normalizeIdentifier(dbName));
     tblCopy.setTableName(StringUtils.normalizeIdentifier(tblName));
@@ -129,7 +895,6 @@ public class SharedCache {
         fs.setName(StringUtils.normalizeIdentifier(fs.getName()));
       }
     }
-    TableWrapper wrapper;
     if (tbl.getSd() != null) {
       byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md);
       StorageDescriptor sd = tbl.getSd();
@@ -139,482 +904,452 @@ public class SharedCache {
     } else {
       wrapper = new TableWrapper(tblCopy, null, null, null);
     }
-    tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper);
+    return wrapper;
   }
 
-  public synchronized void removeTableFromCache(String dbName, String tblName) {
-    TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName));
-    byte[] sdHash = tblWrapper.getSdHash();
-    if (sdHash!=null) {
-      decrSd(sdHash);
+  public void removeTableFromCache(String dbName, String tblName) {
+    try {
+      cacheLock.writeLock().lock();
+      // If table cache is not yet prewarmed, add this to a set which the prewarm thread can check
+      // so that the prewarm thread does not add it back
+      if (!isTableCachePrewarmed) {
+        tablesDeletedDuringPrewarm.add(CacheUtils.buildTableCacheKey(dbName, tblName));
+      }
+      TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildTableCacheKey(dbName, tblName));
+      byte[] sdHash = tblWrapper.getSdHash();
+      if (sdHash != null) {
+        decrSd(sdHash);
+      }
+      isTableCacheDirty.set(true);
+    } finally {
+      cacheLock.writeLock().unlock();
     }
   }
 
-  public synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) {
-    return tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null;
-  }
-
-  public synchronized void removeTableColStatsFromCache(String dbName, String tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        tableColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public void alterTableInCache(String dbName, String tblName, Table newTable) {
+    try {
+      cacheLock.writeLock().lock();
+      TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.updateTableObj(newTable, this);
+        String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
+        String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName());
+        tableCache.put(CacheUtils.buildTableCacheKey(newDbName, newTblName), tblWrapper);
+        isTableCacheDirty.set(true);
       }
+    } finally {
+      cacheLock.writeLock().unlock();
     }
   }
 
-  public synchronized void removeTableColStatsFromCache(String dbName, String tblName,
-      String colName) {
-    if (colName == null) {
-      removeTableColStatsFromCache(dbName, tblName);
-    } else {
-      tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName));
+  public List<Table> listCachedTables(String dbName) {
+    List<Table> tables = new ArrayList<>();
+    try {
+      cacheLock.readLock().lock();
+      for (TableWrapper wrapper : tableCache.values()) {
+        if (wrapper.getTable().getDbName().equals(dbName)) {
+          tables.add(CacheUtils.assemble(wrapper, this));
+        }
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return tables;
   }
 
-  public synchronized void updateTableColStatsInCache(String dbName, String tableName,
-      List<ColumnStatisticsObj> colStatsForTable) {
-    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
-      // Get old stats object if present
-      String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
-      ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
-      if (oldStatsObj != null) {
-        LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName()
-            + ", of table: " + tableName + " and database: " + dbName);
-        // Update existing stat object's field
-        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
-      } else {
-        // No stats exist for this key; add a new object to the cache
-        tableColStatsCache.put(key, colStatObj);
+  public List<String> listCachedTableNames(String dbName) {
+    List<String> tableNames = new ArrayList<>();
+    try {
+      cacheLock.readLock().lock();
+      for (TableWrapper wrapper : tableCache.values()) {
+        if (wrapper.getTable().getDbName().equals(dbName)) {
+          tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+        }
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return tableNames;
   }
 
-  public synchronized void alterTableInCache(String dbName, String tblName, Table newTable) {
-    removeTableFromCache(dbName, tblName);
-    addTableToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
-        StringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+  public List<String> listCachedTableNames(String dbName, String pattern, short maxTables) {
+    List<String> tableNames = new ArrayList<String>();
+    try {
+      cacheLock.readLock().lock();
+      int count = 0;
+      for (TableWrapper wrapper : tableCache.values()) {
+        if ((wrapper.getTable().getDbName().equals(dbName))
+            && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+            && (maxTables == -1 || count < maxTables)) {
+          tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+          count++;
+        }
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return tableNames;
   }
 
-  public synchronized void alterTableInPartitionCache(String dbName, String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
-      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
-      for (Partition part : partitions) {
-        removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues());
-        part.setDbName(StringUtils.normalizeIdentifier(newTable.getDbName()));
-        part.setTableName(StringUtils.normalizeIdentifier(newTable.getTableName()));
-        addPartitionToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
-            StringUtils.normalizeIdentifier(newTable.getTableName()), part);
+  public List<String> listCachedTableNames(String dbName, String pattern, TableType tableType) {
+    List<String> tableNames = new ArrayList<String>();
+    try {
+      cacheLock.readLock().lock();
+      for (TableWrapper wrapper : tableCache.values()) {
+        if ((wrapper.getTable().getDbName().equals(dbName))
+            && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+            && wrapper.getTable().getTableType().equals(tableType.toString())) {
+          tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+        }
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return tableNames;
   }
 
-  public synchronized void alterTableInTableColStatsCache(String dbName, String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
-      String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-      Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-          tableColStatsCache.entrySet().iterator();
-      Map<String, ColumnStatisticsObj> newTableColStats =
-          new HashMap<>();
-      while (iterator.hasNext()) {
-        Entry<String, ColumnStatisticsObj> entry = iterator.next();
-        String key = entry.getKey();
-        ColumnStatisticsObj colStatObj = entry.getValue();
-        if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) {
-          String[] decomposedKey = CacheUtils.splitTableColStats(key);
-          String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]);
-          newTableColStats.put(newKey, colStatObj);
-          iterator.remove();
+  public void refreshTablesInCache(String dbName, List<Table> tables) {
+    try {
+      cacheLock.writeLock().lock();
+      if (isTableCacheDirty.compareAndSet(true, false)) {
+        LOG.debug("Skipping table cache update; the table list we have is dirty.");
+        return;
+      }
+      Map<String, TableWrapper> newTableCache = new HashMap<String, TableWrapper>();
+      for (Table tbl : tables) {
+        String tblName = StringUtils.normalizeIdentifier(tbl.getTableName());
+        TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+        if (tblWrapper != null) {
+          tblWrapper.updateTableObj(tbl, this);
+        } else {
+          tblWrapper = createTableWrapper(dbName, tblName, tbl);
         }
+        newTableCache.put(CacheUtils.buildTableCacheKey(dbName, tblName), tblWrapper);
       }
-      tableColStatsCache.putAll(newTableColStats);
+      tableCache.clear();
+      tableCache = newTableCache;
+    } finally {
+      cacheLock.writeLock().unlock();
     }
   }
 
-  public synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
-      List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
-      Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
-      for (Partition part : partitions) {
-        String oldPartialPartitionKey =
-            CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues());
-        Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-            partitionColStatsCache.entrySet().iterator();
-        while (iterator.hasNext()) {
-          Entry<String, ColumnStatisticsObj> entry = iterator.next();
-          String key = entry.getKey();
-          ColumnStatisticsObj colStatObj = entry.getValue();
-          if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
-            Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
-            // New key has the new table name
-            String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(),
-                (List<String>) decomposedKey[2], (String) decomposedKey[3]);
-            newPartitionColStats.put(newKey, colStatObj);
-            iterator.remove();
-          }
-        }
+  public List<ColumnStatisticsObj> getTableColStatsFromCache(String dbName, String tblName,
+      List<String> colNames) {
+    List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        colStatObjs = tblWrapper.getCachedTableColStats(colNames);
       }
-      partitionColStatsCache.putAll(newPartitionColStats);
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return colStatObjs;
   }
 
-  public synchronized void alterTableInAggrPartitionColStatsCache(String dbName, String tblName,
-      Table newTable) {
-    if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
-      Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
-          new HashMap<String, List<ColumnStatisticsObj>>();
-      String oldPartialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-      Iterator<Entry<String, List<ColumnStatisticsObj>>> iterator =
-          aggrColStatsCache.entrySet().iterator();
-      while (iterator.hasNext()) {
-        Entry<String, List<ColumnStatisticsObj>> entry = iterator.next();
-        String key = entry.getKey();
-        List<ColumnStatisticsObj> value = entry.getValue();
-        if (key.toLowerCase().startsWith(oldPartialKey.toLowerCase())) {
-          Object[] decomposedKey = CacheUtils.splitAggrColStats(key);
-          // New key has the new table name
-          String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(),
-              (String) decomposedKey[2]);
-          newAggrColStatsCache.put(newKey, value);
-          iterator.remove();
-        }
+  public void removeTableColStatsFromCache(String dbName, String tblName, String colName) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.removeTableColStats(colName);
       }
-      aggrColStatsCache.putAll(newAggrColStatsCache);
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  public synchronized int getCachedTableCount() {
-    return tableCache.size();
+  public void updateTableColStatsInCache(String dbName, String tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+      if (tblWrapper != null) {
+        tblWrapper.updateTableColStats(colStatsForTable);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
   }
 
-  public synchronized List<Table> listCachedTables(String dbName) {
-    List<Table> tables = new ArrayList<>();
-    for (TableWrapper wrapper : tableCache.values()) {
-      if (wrapper.getTable().getDbName().equals(dbName)) {
-        tables.add(CacheUtils.assemble(wrapper, this));
+  public void refreshTableColStatsInCache(String dbName, String tableName,
+      List<ColumnStatisticsObj> colStatsForTable) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+      if (tblWrapper != null) {
+        tblWrapper.refreshTableColStats(colStatsForTable);
       }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+  }
+
+  public int getCachedTableCount() {
+    try {
+      cacheLock.readLock().lock();
+      return tableCache.size();
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    return tables;
   }
 
-  public synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) {
+  public List<TableMeta> getTableMeta(String dbNames, String tableNames,
+      List<String> tableTypes) {
     List<TableMeta> tableMetas = new ArrayList<>();
-    for (String dbName : listCachedDatabases()) {
-      if (CacheUtils.matches(dbName, dbNames)) {
-        for (Table table : listCachedTables(dbName)) {
-          if (CacheUtils.matches(table.getTableName(), tableNames)) {
-            if (tableTypes==null || tableTypes.contains(table.getTableType())) {
-              TableMeta metaData = new TableMeta(
-                  dbName, table.getTableName(), table.getTableType());
+    try {
+      cacheLock.readLock().lock();
+      for (String dbName : listCachedDatabases()) {
+        if (CacheUtils.matches(dbName, dbNames)) {
+          for (Table table : listCachedTables(dbName)) {
+            if (CacheUtils.matches(table.getTableName(), tableNames)) {
+              if (tableTypes == null || tableTypes.contains(table.getTableType())) {
+                TableMeta metaData =
+                    new TableMeta(dbName, table.getTableName(), table.getTableType());
                 metaData.setComments(table.getParameters().get("comment"));
                 tableMetas.add(metaData);
+              }
             }
           }
         }
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
     return tableMetas;
   }
 
-  public synchronized void addPartitionToCache(String dbName, String tblName, Partition part) {
-    Partition partCopy = part.deepCopy();
-    PartitionWrapper wrapper;
-    if (part.getSd()!=null) {
-      byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
-      StorageDescriptor sd = part.getSd();
-      increSd(sd, sdHash);
-      partCopy.setSd(null);
-      wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
-    } else {
-      wrapper = new PartitionWrapper(partCopy, null, null, null);
+  public void addPartitionToCache(String dbName, String tblName, Partition part) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.cachePartition(part, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper);
   }
 
-  public synchronized Partition getPartitionFromCache(String key) {
-    PartitionWrapper wrapper = partitionCache.get(key);
-    if (wrapper == null) {
-      return null;
+  public void addPartitionsToCache(String dbName, String tblName, List<Partition> parts) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.cachePartitions(parts, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    Partition p = CacheUtils.assemble(wrapper, this);
-    return p;
-  }
-
-  public synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
-    return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals));
   }
 
-  public synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
-    return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals));
-  }
-
-  public synchronized Partition removePartitionFromCache(String dbName, String tblName,
-      List<String> part_vals) {
-    PartitionWrapper wrapper =
-        partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
-    if (wrapper.getSdHash() != null) {
-      decrSd(wrapper.getSdHash());
+  public Partition getPartitionFromCache(String dbName, String tblName,
+      List<String> partVals) {
+    Partition part = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        part = tblWrapper.getPartition(partVals, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    return wrapper.getPartition();
+    return part;
   }
 
-  /**
-   * Given a db + table, remove all partitions for this table from the cache
-   * @param dbName
-   * @param tblName
-   * @return
-   */
-  public synchronized void removePartitionsFromCache(String dbName, String tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, PartitionWrapper> entry = iterator.next();
-      String key = entry.getKey();
-      PartitionWrapper wrapper = entry.getValue();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
-        if (wrapper.getSdHash() != null) {
-          decrSd(wrapper.getSdHash());
-        }
+  public boolean existPartitionFromCache(String dbName, String tblName, List<String> partVals) {
+    boolean existsPart = false;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        existsPart = tblWrapper.containsPartition(partVals);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return existsPart;
   }
 
-  // Remove cached column stats for all partitions of all tables in a db
-  public synchronized void removePartitionColStatsFromCache(String dbName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public Partition removePartitionFromCache(String dbName, String tblName,
+      List<String> partVals) {
+    Partition part = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        part = tblWrapper.removePartition(partVals, this);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return part;
   }
 
-  // Remove cached column stats for all partitions of a table
-  public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public void removePartitionsFromCache(String dbName, String tblName,
+      List<List<String>> partVals) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.removePartitions(partVals, this);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  // Remove cached column stats for a particular partition of a table
-  public synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
-      List<String> partVals) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
+    List<Partition> parts = new ArrayList<Partition>();
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        parts = tblWrapper.listPartitions(max, this);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
+    return parts;
   }
 
-  // Remove cached column stats for a particular partition and a particular column of a table
-  public synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
-      List<String> partVals, String colName) {
-    partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName));
+  public void alterPartitionInCache(String dbName, String tblName, List<String> partVals,
+      Partition newPart) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.alterPartition(partVals, newPart, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
   }
 
-  public synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
-    List<Partition> partitions = new ArrayList<>();
-    int count = 0;
-    for (PartitionWrapper wrapper : partitionCache.values()) {
-      if (wrapper.getPartition().getDbName().equals(dbName)
-          && wrapper.getPartition().getTableName().equals(tblName)
-          && (max == -1 || count < max)) {
-        partitions.add(CacheUtils.assemble(wrapper, this));
-        count++;
+  public void alterPartitionsInCache(String dbName, String tblName, List<List<String>> partValsList,
+      List<Partition> newParts) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.alterPartitions(partValsList, newParts, this);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    return partitions;
   }
 
-  public synchronized void alterPartitionInCache(String dbName, String tblName,
-      List<String> partVals, Partition newPart) {
-    removePartitionFromCache(dbName, tblName, partVals);
-    addPartitionToCache(StringUtils.normalizeIdentifier(newPart.getDbName()),
-        StringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
+  public void refreshPartitionsInCache(String dbName, String tblName, List<Partition> partitions) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.refreshPartitions(partitions, this);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
   }
 
-  public synchronized void alterPartitionInColStatsCache(String dbName, String tblName,
-      List<String> partVals, Partition newPart) {
-    String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
-    Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
-    Iterator<Entry<String, ColumnStatisticsObj>> iterator =
-        partitionColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, ColumnStatisticsObj> entry = iterator.next();
-      String key = entry.getKey();
-      ColumnStatisticsObj colStatObj = entry.getValue();
-      if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
-        Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
-        String newKey =
-            CacheUtils.buildKey(StringUtils.normalizeIdentifier(newPart.getDbName()),
-                StringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(),
-                (String) decomposedKey[3]);
-        newPartitionColStats.put(newKey, colStatObj);
-        iterator.remove();
+  public void removePartitionColStatsFromCache(String dbName, String tblName,
+      List<String> partVals, String colName) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.removePartitionColStats(partVals, colName);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    partitionColStatsCache.putAll(newPartitionColStats);
   }
 
-  public synchronized void updatePartitionColStatsInCache(String dbName, String tableName,
+  public void updatePartitionColStatsInCache(String dbName, String tableName,
       List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
-    for (ColumnStatisticsObj colStatObj : colStatsObjs) {
-      // Get old stats object if present
-      String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
-      ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
-      if (oldStatsObj != null) {
-        // Update existing stat object's field
-        LOG.debug("CachedStore: updating partition column stats for column: "
-            + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName);
-        StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
-      } else {
-        // No stats exist for this key; add a new object to the cache
-        partitionColStatsCache.put(key, colStatObj);
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+      if (tblWrapper != null) {
+        tblWrapper.updatePartitionColStats(partVals, colStatsObjs);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  public synchronized int getCachedPartitionCount() {
-    return partitionCache.size();
-  }
-
-  public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) {
-    return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null;
-  }
-
-  public synchronized void addPartitionColStatsToCache(
-      List<ColStatsObjWithSourceInfo> colStatsForDB) {
-    for (ColStatsObjWithSourceInfo colStatWithSourceInfo : colStatsForDB) {
-      List<String> partVals;
-      try {
-        partVals = Warehouse.getPartValuesFromPartName(colStatWithSourceInfo.getPartName());
-        ColumnStatisticsObj colStatObj = colStatWithSourceInfo.getColStatsObj();
-        String key = CacheUtils.buildKey(colStatWithSourceInfo.getDbName(),
-            colStatWithSourceInfo.getTblName(), partVals, colStatObj.getColName());
-        partitionColStatsCache.put(key, colStatObj);
-      } catch (MetaException e) {
-        LOG.info("Unable to add partition stats for: {} to SharedCache",
-            colStatWithSourceInfo.getPartName(), e);
+  public ColumnStatisticsObj getPartitionColStatsFromCache(String dbName, String tblName,
+      List<String> partVal, String colName) {
+    ColumnStatisticsObj colStatObj = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null){
+        colStatObj = tblWrapper.getPartitionColStats(partVal, colName);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
-
-  }
-
-  public synchronized void refreshPartitionColStats(String dbName,
-      List<ColStatsObjWithSourceInfo> colStatsForDB) {
-    LOG.debug("CachedStore: updating cached partition column stats objects for database: {}",
-        dbName);
-    removePartitionColStatsFromCache(dbName);
-    addPartitionColStatsToCache(colStatsForDB);
+    return colStatObj;
   }
 
-  public synchronized void addAggregateStatsToCache(String dbName, String tblName,
-      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
-    if (aggrStatsAllPartitions != null) {
-      for (ColumnStatisticsObj colStatObj : aggrStatsAllPartitions.getColStats()) {
-        String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName());
-        List<ColumnStatisticsObj> value = new ArrayList<ColumnStatisticsObj>();
-        value.add(StatsType.ALL.getPosition(), colStatObj);
-        aggrColStatsCache.put(key, value);
-      }
-    }
-    if (aggrStatsAllButDefaultPartition != null) {
-      for (ColumnStatisticsObj colStatObj : aggrStatsAllButDefaultPartition.getColStats()) {
-        String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName());
-        List<ColumnStatisticsObj> value = aggrColStatsCache.get(key);
-        if ((value != null) && (value.size() > 0)) {
-          value.add(StatsType.ALLBUTDEFAULT.getPosition(), colStatObj);
-        }
+  public void refreshPartitionColStatsInCache(String dbName, String tblName,
+      List<ColumnStatistics> partitionColStats) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.refreshPartitionColStats(partitionColStats);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
   public List<ColumnStatisticsObj> getAggrStatsFromCache(String dbName, String tblName,
       List<String> colNames, StatsType statsType) {
-    List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
-    for (String colName : colNames) {
-      String key = CacheUtils.buildKey(dbName, tblName, colName);
-      List<ColumnStatisticsObj> colStatList = aggrColStatsCache.get(key);
-      // If unable to find stats for a column, return null so we can build stats
-      if (colStatList == null) {
-        return null;
-      }
-      ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition());
-      // If unable to find stats for this StatsType, return null so we can build
-      // stats
-      if (colStatObj == null) {
-        return null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        return tblWrapper.getAggrPartitionColStats(colNames, statsType);
       }
-      colStats.add(colStatObj);
+    } finally {
+      cacheLock.readLock().unlock();
     }
-    return colStats;
+    return null;
   }
 
-  public synchronized void removeAggrPartitionColStatsFromCache(String dbName, String tblName) {
-    String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
-    Iterator<Entry<String, List<ColumnStatisticsObj>>> iterator =
-        aggrColStatsCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Entry<String, List<ColumnStatisticsObj>> entry = iterator.next();
-      String key = entry.getKey();
-      if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
-        iterator.remove();
+  public void addAggregateStatsToCache(String dbName, String tblName,
+      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null){
+        tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
+            aggrStatsAllButDefaultPartition);
       }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  public synchronized void refreshAggregateStatsCache(String dbName, String tblName,
+  public void refreshAggregateStatsInCache(String dbName, String tblName,
       AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
-    LOG.debug("CachedStore: updating aggregate stats cache for database: {}, table: {}", dbName,
-        tblName);
-    removeAggrPartitionColStatsFromCache(dbName, tblName);
-    addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions,
-        aggrStatsAllButDefaultPartition);
-  }
-
-  public synchronized void addTableColStatsToCache(String dbName, String tableName,
-      List<ColumnStatisticsObj> colStatsForTable) {
-    for (ColumnStatisticsObj colStatObj : colStatsForTable) {
-      String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
-      tableColStatsCache.put(key, colStatObj);
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions,
+            aggrStatsAllButDefaultPartition);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
     }
   }
 
-  public synchronized void refreshTableColStats(String dbName, String tableName,
-      List<ColumnStatisticsObj> colStatsForTable) {
-    LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName
-        + " and table: " + tableName);
-    // Remove all old cache entries for this table
-    removeTableColStatsFromCache(dbName, tableName);
-    // Add new entries to cache
-    addTableColStatsToCache(dbName, tableName, colStatsForTable);
-  }
-
-  public void increSd(StorageDescriptor sd, byte[] sdHash) {
+  public synchronized void increSd(StorageDescriptor sd, byte[] sdHash) {
     ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
     if (sdCache.containsKey(byteArray)) {
       sdCache.get(byteArray).refCount++;
@@ -626,7 +1361,7 @@ public class SharedCache {
     }
   }
 
-  public void decrSd(byte[] sdHash) {
+  public synchronized void decrSd(byte[] sdHash) {
     ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
     StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray);
     sdWrapper.refCount--;
@@ -635,50 +1370,11 @@ public class SharedCache {
     }
   }
 
-  public StorageDescriptor getSdFromCache(byte[] sdHash) {
+  public synchronized StorageDescriptor getSdFromCache(byte[] sdHash) {
     StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash));
     return sdWrapper.getSd();
   }
 
-  // Replace databases in databaseCache with the new list
-  public synchronized void refreshDatabases(List<Database> databases) {
-    LOG.debug("CachedStore: updating cached database objects");
-    for (String dbName : listCachedDatabases()) {
-      removeDatabaseFromCache(dbName);
-    }
-    for (Database db : databases) {
-      addDatabaseToCache(db.getName(), db);
-    }
-  }
-
-  // Replace tables in tableCache with the new list
-  public synchronized void refreshTables(String dbName, List<Table> tables) {
-    LOG.debug("CachedStore: updating cached table objects for database: " + dbName);
-    for (Table tbl : listCachedTables(dbName)) {
-      removeTableFromCache(dbName, tbl.getTableName());
-    }
-    for (Table tbl : tables) {
-      addTableToCache(dbName, tbl.getTableName(), tbl);
-    }
-  }
-
-  public synchronized void refreshPartitions(String dbName, String tblName,
-      List<Partition> partitions) {
-    LOG.debug("CachedStore: updating cached partition objects for database: " + dbName
-        + " and table: " + tblName);
-    Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
-    while (iterator.hasNext()) {
-      PartitionWrapper partitionWrapper = iterator.next().getValue();
-      if (partitionWrapper.getPartition().getDbName().equals(dbName)
-          && partitionWrapper.getPartition().getTableName().equals(tblName)) {
-        iterator.remove();
-      }
-    }
-    for (Partition part : partitions) {
-      addPartitionToCache(dbName, tblName, part);
-    }
-  }
-
   @VisibleForTesting
   Map<String, Database> getDatabaseCache() {
     return databaseCache;
@@ -690,17 +1386,15 @@ public class SharedCache {
   }
 
   @VisibleForTesting
-  Map<String, PartitionWrapper> getPartitionCache() {
-    return partitionCache;
-  }
-
-  @VisibleForTesting
   Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() {
     return sdCache;
   }
 
-  @VisibleForTesting
-  Map<String, ColumnStatisticsObj> getPartitionColStatsCache() {
-    return partitionColStatsCache;
+  public long getUpdateCount() {
+    return cacheUpdateCount.get();
+  }
+
+  public void incrementUpdateCount() {
+    cacheUpdateCount.incrementAndGet();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 5aee2e3..e373753 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -339,7 +339,7 @@ public class MetaStoreUtils {
    * @param md message descriptor to use to generate the hash
    * @return the hash as a byte array
    */
-  public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md)  {
+  public static synchronized byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md)  {
     // Note all maps and lists have to be absolutely sorted.  Otherwise we'll produce different
     // results for hashes based on the OS or JVM being used.
     md.reset();
@@ -434,6 +434,15 @@ public class MetaStoreUtils {
     return colNames;
   }
 
+  public static List<String> getColumnNamesForPartition(Partition partition) {
+    List<String> colNames = new ArrayList<>();
+    Iterator<FieldSchema> colsIterator = partition.getSd().getColsIterator();
+    while (colsIterator.hasNext()) {
+      colNames.add(colsIterator.next().getName());
+    }
+    return colNames;
+  }
+
   /**
    * validateName
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 5edf8b3..e9527c7 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -1046,13 +1046,6 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
     objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath);
   }
 
-  @Override
-  public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
-      throws MetaException, NoSuchObjectException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
   public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException,
       NoSuchObjectException {
     objectStore.createISchema(schema);

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 132cdc3..8fc0c83 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -1033,13 +1033,6 @@ public class DummyRawStoreForJdoConnection implements RawStore {
       String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException {
   }
 
-  @Override
-  public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
-      throws MetaException, NoSuchObjectException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
   public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException {
 
   }


[4/4] hive git commit: HIVE-18264: CachedStore: Store cached partitions/col stats within the table cache and make prewarm non-blocking (Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)

Posted by vg...@apache.org.
HIVE-18264: CachedStore: Store cached partitions/col stats within the table cache and make prewarm non-blocking (Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/26c0ab6a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/26c0ab6a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/26c0ab6a

Branch: refs/heads/master
Commit: 26c0ab6adb48755ef2f5cff2ec9c4b0e9a431821
Parents: 79e8869
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Mon Mar 19 10:47:37 2018 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Mon Mar 19 10:47:37 2018 -0700

----------------------------------------------------------------------
 .../listener/DummyRawStoreFailEvent.java        |    9 +-
 .../apache/hive/service/server/HiveServer2.java |    6 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |    4 -
 .../hadoop/hive/metastore/ObjectStore.java      |   30 -
 .../apache/hadoop/hive/metastore/RawStore.java  |   11 -
 .../hadoop/hive/metastore/cache/CacheUtils.java |   85 +-
 .../hive/metastore/cache/CachedStore.java       | 1552 +++++------------
 .../hive/metastore/cache/SharedCache.java       | 1588 +++++++++++++-----
 .../hive/metastore/utils/MetaStoreUtils.java    |   11 +-
 .../DummyRawStoreControlledCommit.java          |    7 -
 .../DummyRawStoreForJdoConnection.java          |    7 -
 .../hive/metastore/cache/TestCachedStore.java   |  546 +++---
 .../src/test/resources/log4j2.properties        |   74 +-
 13 files changed, 2043 insertions(+), 1887 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 6144b61..e2244a1 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -976,7 +976,7 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
   public List<WMResourcePlan> getAllResourcePlans() throws MetaException {
     return objectStore.getAllResourcePlans();
   }
- 
+
   @Override
   public WMFullResourcePlan alterResourcePlan(String name, WMNullableResourcePlan resourcePlan,
       boolean canActivateDisabled, boolean canDeactivate, boolean isReplace)
@@ -1069,13 +1069,6 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
     objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath);
   }
 
-  @Override
-  public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
-      throws MetaException, NoSuchObjectException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
   public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException,
       NoSuchObjectException {
     objectStore.createISchema(schema);

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 5b792ac..bb92c44 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
-import org.apache.hadoop.hive.metastore.cache.CachedStore;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
@@ -163,9 +162,6 @@ public class HiveServer2 extends CompositeService {
       LOG.warn("Could not initiate the HiveServer2 Metrics system.  Metrics may not be reported.", t);
     }
 
-    // Initialize cachedstore with background prewarm. The prewarm will only start if configured.
-    CachedStore.initSharedCacheAsync(hiveConf);
-
     cliService = new CLIService(this);
     addService(cliService);
     final HiveServer2 hiveServer2 = this;
@@ -570,7 +566,7 @@ public class HiveServer2 extends CompositeService {
 
   private void removeServerInstanceFromZooKeeper() throws Exception {
     setDeregisteredWithZooKeeper(true);
-    
+
     if (znode != null) {
       znode.close();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 66353e7..5285570 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -73,7 +73,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
-import org.apache.hadoop.hive.metastore.cache.CachedStore;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.events.AddNotNullConstraintEvent;
@@ -7962,9 +7961,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         ThreadPool.shutdown();
       }, 10);
 
-      // This will only initialize the cache if configured.
-      CachedStore.initSharedCacheAsync(conf);
-
       //Start Metrics for Standalone (Remote) Mode
       if (MetastoreConf.getBoolVar(conf, ConfVars.METRICS_ENABLED)) {
         try {

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 1f75105..88d88ed 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -201,7 +201,6 @@ import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
 import org.apache.hadoop.hive.metastore.utils.ObjectPair;
 import org.apache.thrift.TException;
 import org.datanucleus.AbstractNucleusContext;
@@ -7906,35 +7905,6 @@ public class ObjectStore implements RawStore, Configurable {
   }
 
   @Override
-  public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
-      throws MetaException, NoSuchObjectException {
-    final boolean enableBitVector =
-        MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_FETCH_BITVECTOR);
-    return new GetHelper<List<ColStatsObjWithSourceInfo>>(dbName, null, true, false) {
-      @Override
-      protected List<ColStatsObjWithSourceInfo> getSqlResult(
-          GetHelper<List<ColStatsObjWithSourceInfo>> ctx) throws MetaException {
-        return directSql.getColStatsForAllTablePartitions(dbName, enableBitVector);
-      }
-
-      @Override
-      protected List<ColStatsObjWithSourceInfo> getJdoResult(
-          GetHelper<List<ColStatsObjWithSourceInfo>> ctx)
-          throws MetaException, NoSuchObjectException {
-        // This is fast path for query optimizations, if we can find this info
-        // quickly using directSql, do it. No point in failing back to slow path
-        // here.
-        throw new MetaException("Jdo path is not implemented for getPartitionColStatsForDatabase.");
-      }
-
-      @Override
-      protected String describeResult() {
-        return null;
-      }
-    }.run(true);
-  }
-
-  @Override
   public void flushCache() {
     // NOP as there's no caching
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
index b079f8b..ad4af1a 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -596,17 +596,6 @@ public interface RawStore extends Configurable {
     List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException;
 
   /**
-   * Get column stats for all partitions of all tables in the database
-   *
-   * @param dbName
-   * @return List of column stats objects for all partitions of all tables in the database
-   * @throws MetaException
-   * @throws NoSuchObjectException
-   */
-  List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
-      throws MetaException, NoSuchObjectException;
-
-  /**
    * Get the next notification event.
    * @param rqst Request containing information on the last processed notification.
    * @return list of notifications, sorted by eventId

http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
index f0f650d..97d8af6 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
@@ -17,78 +17,57 @@
  */
 package org.apache.hadoop.hive.metastore.cache;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.regex.Pattern;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
+import org.apache.hadoop.hive.metastore.cache.SharedCache.PartitionWrapper;
+import org.apache.hadoop.hive.metastore.cache.SharedCache.TableWrapper;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 
 public class CacheUtils {
   private static final String delimit = "\u0001";
 
-  public static String buildKey(String dbName) {
-    return dbName;
-  }
-
-  public static String buildKeyWithDelimit(String dbName) {
-    return buildKey(dbName) + delimit;
-  }
-
-  public static String buildKey(String dbName, String tableName) {
+  /**
+   * Builds a key for the table cache which is concatenation of database name and table name
+   * separated by a delimiter
+   *
+   * @param dbName
+   * @param tableName
+   * @return
+   */
+  public static String buildTableCacheKey(String dbName, String tableName) {
     return dbName + delimit + tableName;
   }
 
-  public static String buildKeyWithDelimit(String dbName, String tableName) {
-    return buildKey(dbName, tableName) + delimit;
-  }
-
-  public static String buildKey(String dbName, String tableName, List<String> partVals) {
-    String key = buildKey(dbName, tableName);
-    if (CollectionUtils.isNotEmpty(partVals)) {
-      key += delimit;
-      key += String.join(delimit, partVals);
+  /**
+   * Builds a key for the partition cache which is concatenation of partition values, each value
+   * separated by a delimiter
+   *
+   * @param list of partition values
+   * @return cache key for partitions cache
+   */
+  public static String buildPartitionCacheKey(List<String> partVals) {
+    if (partVals == null || partVals.isEmpty()) {
+      return "";
     }
-    return key;
-  }
-
-  public static String buildKeyWithDelimit(String dbName, String tableName, List<String> partVals) {
-    return buildKey(dbName, tableName, partVals) + delimit;
-  }
-
-  public static String buildKey(String dbName, String tableName, List<String> partVals, String colName) {
-    String key = buildKey(dbName, tableName, partVals);
-    return key + delimit + colName;
-  }
-
-  public static String buildKey(String dbName, String tableName, String colName) {
-    String key = buildKey(dbName, tableName);
-    return key + delimit + colName;
-  }
-
-  public static String[] splitTableColStats(String key) {
-    return key.split(delimit);
-  }
-
-  public static Object[] splitPartitionColStats(String key) {
-    Object[] result = new Object[4];
-    String[] comps = key.split(delimit);
-    result[0] = comps[0];
-    result[1] = comps[1];
-    result[2] = Arrays.asList((Arrays.copyOfRange(comps, 2, comps.length - 1)));
-    result[3] = comps[comps.length-1];
-    return result;
+    return String.join(delimit, partVals);
   }
 
-  public static Object[] splitAggrColStats(String key) {
-    return key.split(delimit);
+  /**
+   * Builds a key for the partitions column cache which is concatenation of partition values, each
+   * value separated by a delimiter and the column name
+   *
+   * @param list of partition values
+   * @param column name
+   * @return cache key for partitions column stats cache
+   */
+  public static String buildPartitonColStatsCacheKey(List<String> partVals, String colName) {
+    return buildPartitionCacheKey(partVals) + delimit + colName;
   }
 
   static Table assemble(TableWrapper wrapper, SharedCache sharedCache) {