You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/03/19 17:54:30 UTC
[1/4] hive git commit: HIVE-18264: CachedStore: Store cached
partitions/col stats within the table cache and make prewarm non-blocking
(Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)
Repository: hive
Updated Branches:
refs/heads/master 79e88695c -> 26c0ab6ad
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
index 0006815..a72fc0b 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
@@ -22,7 +22,10 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
@@ -66,19 +69,13 @@ public class TestCachedStore {
objectStore = new ObjectStore();
objectStore.setConf(conf);
cachedStore = new CachedStore();
- cachedStore.setConf(conf);
- // Stop the CachedStore cache update service. We'll start it explicitly to control the test
- CachedStore.stopCacheUpdateService(1);
- cachedStore.setInitializedForTest();
-
+ cachedStore.setConfForTest(conf);
// Stop the CachedStore cache update service. We'll start it explicitly to control the test
CachedStore.stopCacheUpdateService(1);
sharedCache = new SharedCache();
sharedCache.getDatabaseCache().clear();
sharedCache.getTableCache().clear();
- sharedCache.getPartitionCache().clear();
sharedCache.getSdCache().clear();
- sharedCache.getPartitionColStatsCache().clear();
}
/**********************************************************************************************
@@ -89,61 +86,49 @@ public class TestCachedStore {
public void testDatabaseOps() throws Exception {
// Add a db via ObjectStore
String dbName = "testDatabaseOps";
- String dbDescription = "testDatabaseOps";
- String dbLocation = "file:/tmp";
- Map<String, String> dbParams = new HashMap<>();
String dbOwner = "user1";
- Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
- db.setOwnerName(dbOwner);
- db.setOwnerType(PrincipalType.USER);
+ Database db = createTestDb(dbName, dbOwner);
objectStore.createDatabase(db);
db = objectStore.getDatabase(dbName);
// Prewarm CachedStore
+ CachedStore.setCachePrewarmedState(false);
CachedStore.prewarm(objectStore);
// Read database via CachedStore
- Database dbNew = cachedStore.getDatabase(dbName);
- Assert.assertEquals(db, dbNew);
+ Database dbRead = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbRead);
// Add another db via CachedStore
final String dbName1 = "testDatabaseOps1";
- final String dbDescription1 = "testDatabaseOps1";
- Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams);
- db1.setOwnerName(dbOwner);
- db1.setOwnerType(PrincipalType.USER);
+ Database db1 = createTestDb(dbName1, dbOwner);
cachedStore.createDatabase(db1);
db1 = cachedStore.getDatabase(dbName1);
// Read db via ObjectStore
- dbNew = objectStore.getDatabase(dbName1);
- Assert.assertEquals(db1, dbNew);
+ dbRead = objectStore.getDatabase(dbName1);
+ Assert.assertEquals(db1, dbRead);
// Alter the db via CachedStore (can only alter owner or parameters)
- db = new Database(dbName, dbDescription, dbLocation, dbParams);
dbOwner = "user2";
+ db = new Database(db);
db.setOwnerName(dbOwner);
- db.setOwnerType(PrincipalType.USER);
cachedStore.alterDatabase(dbName, db);
db = cachedStore.getDatabase(dbName);
// Read db via ObjectStore
- dbNew = objectStore.getDatabase(dbName);
- Assert.assertEquals(db, dbNew);
+ dbRead = objectStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbRead);
// Add another db via ObjectStore
final String dbName2 = "testDatabaseOps2";
- final String dbDescription2 = "testDatabaseOps2";
- Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams);
- db2.setOwnerName(dbOwner);
- db2.setOwnerType(PrincipalType.USER);
+ Database db2 = createTestDb(dbName2, dbOwner);
objectStore.createDatabase(db2);
db2 = objectStore.getDatabase(dbName2);
// Alter db "testDatabaseOps" via ObjectStore
dbOwner = "user1";
- db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db = new Database(db);
db.setOwnerName(dbOwner);
- db.setOwnerType(PrincipalType.USER);
objectStore.alterDatabase(dbName, db);
db = objectStore.getDatabase(dbName);
@@ -151,20 +136,20 @@ public class TestCachedStore {
objectStore.dropDatabase(dbName1);
// We update twice to accurately detect if cache is dirty or not
- updateCache(cachedStore, 100, 500, 100);
- updateCache(cachedStore, 100, 500, 100);
+ updateCache(cachedStore);
+ updateCache(cachedStore);
// Read the newly added db via CachedStore
- dbNew = cachedStore.getDatabase(dbName2);
- Assert.assertEquals(db2, dbNew);
+ dbRead = cachedStore.getDatabase(dbName2);
+ Assert.assertEquals(db2, dbRead);
// Read the altered db via CachedStore (altered user from "user2" to "user1")
- dbNew = cachedStore.getDatabase(dbName);
- Assert.assertEquals(db, dbNew);
+ dbRead = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbRead);
// Try to read the dropped db after cache update
try {
- dbNew = cachedStore.getDatabase(dbName1);
+ dbRead = cachedStore.getDatabase(dbName1);
Assert.fail("The database: " + dbName1
+ " should have been removed from the cache after running the update service");
} catch (NoSuchObjectException e) {
@@ -174,78 +159,64 @@ public class TestCachedStore {
// Clean up
objectStore.dropDatabase(dbName);
objectStore.dropDatabase(dbName2);
+ sharedCache.getDatabaseCache().clear();
+ sharedCache.getTableCache().clear();
+ sharedCache.getSdCache().clear();
}
@Test
public void testTableOps() throws Exception {
// Add a db via ObjectStore
String dbName = "testTableOps";
- String dbDescription = "testTableOps";
- String dbLocation = "file:/tmp";
- Map<String, String> dbParams = new HashMap<>();
String dbOwner = "user1";
- Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
- db.setOwnerName(dbOwner);
- db.setOwnerType(PrincipalType.USER);
+ Database db = createTestDb(dbName, dbOwner);
objectStore.createDatabase(db);
db = objectStore.getDatabase(dbName);
// Add a table via ObjectStore
String tblName = "tbl";
String tblOwner = "user1";
- String serdeLocation = "file:/tmp";
FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
FieldSchema col2 = new FieldSchema("col2", "string", "string column");
- List<FieldSchema> cols = new ArrayList<>();
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
cols.add(col1);
cols.add(col2);
- Map<String, String> serdeParams = new HashMap<>();
- Map<String, String> tblParams = new HashMap<>();
- SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>());
- StorageDescriptor sd =
- new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
- null, serdeParams);
- sd.setStoredAsSubDirectories(false);
- Table tbl =
- new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
- null, null, TableType.MANAGED_TABLE.toString());
+ List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+ Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
objectStore.createTable(tbl);
tbl = objectStore.getTable(dbName, tblName);
// Prewarm CachedStore
+ CachedStore.setCachePrewarmedState(false);
CachedStore.prewarm(objectStore);
// Read database, table via CachedStore
- Database dbNew = cachedStore.getDatabase(dbName);
- Assert.assertEquals(db, dbNew);
- Table tblNew = cachedStore.getTable(dbName, tblName);
- Assert.assertEquals(tbl, tblNew);
+ Database dbRead= cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbRead);
+ Table tblRead = cachedStore.getTable(dbName, tblName);
+ Assert.assertEquals(tbl, tblRead);
// Add a new table via CachedStore
String tblName1 = "tbl1";
- Table tbl1 =
- new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
- null, null, TableType.MANAGED_TABLE.toString());
+ Table tbl1 = new Table(tbl);
+ tbl1.setTableName(tblName1);
cachedStore.createTable(tbl1);
tbl1 = cachedStore.getTable(dbName, tblName1);
// Read via object store
- tblNew = objectStore.getTable(dbName, tblName1);
- Assert.assertEquals(tbl1, tblNew);
+ tblRead = objectStore.getTable(dbName, tblName1);
+ Assert.assertEquals(tbl1, tblRead);
// Add a new table via ObjectStore
String tblName2 = "tbl2";
- Table tbl2 =
- new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
- null, null, TableType.MANAGED_TABLE.toString());
+ Table tbl2 = new Table(tbl);
+ tbl2.setTableName(tblName2);
objectStore.createTable(tbl2);
tbl2 = objectStore.getTable(dbName, tblName2);
// Alter table "tbl" via ObjectStore
tblOwner = "user2";
- tbl =
- new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
- null, null, TableType.MANAGED_TABLE.toString());
+ tbl.setOwner(tblOwner);
objectStore.alterTable(dbName, tblName, tbl);
tbl = objectStore.getTable(dbName, tblName);
@@ -253,20 +224,20 @@ public class TestCachedStore {
objectStore.dropTable(dbName, tblName1);
// We update twice to accurately detect if cache is dirty or not
- updateCache(cachedStore, 100, 500, 100);
- updateCache(cachedStore, 100, 500, 100);
+ updateCache(cachedStore);
+ updateCache(cachedStore);
// Read "tbl2" via CachedStore
- tblNew = cachedStore.getTable(dbName, tblName2);
- Assert.assertEquals(tbl2, tblNew);
+ tblRead = cachedStore.getTable(dbName, tblName2);
+ Assert.assertEquals(tbl2, tblRead);
// Read the altered "tbl" via CachedStore
- tblNew = cachedStore.getTable(dbName, tblName);
- Assert.assertEquals(tbl, tblNew);
+ tblRead = cachedStore.getTable(dbName, tblName);
+ Assert.assertEquals(tbl, tblRead);
// Try to read the dropped "tbl1" via CachedStore (should throw exception)
- tblNew = cachedStore.getTable(dbName, tblName1);
- Assert.assertNull(tblNew);
+ tblRead = cachedStore.getTable(dbName, tblName1);
+ Assert.assertNull(tblRead);
// Should return "tbl" and "tbl2"
List<String> tblNames = cachedStore.getTables(dbName, "*");
@@ -278,81 +249,72 @@ public class TestCachedStore {
objectStore.dropTable(dbName, tblName);
objectStore.dropTable(dbName, tblName2);
objectStore.dropDatabase(dbName);
+ sharedCache.getDatabaseCache().clear();
+ sharedCache.getTableCache().clear();
+ sharedCache.getSdCache().clear();
}
@Test
public void testPartitionOps() throws Exception {
// Add a db via ObjectStore
String dbName = "testPartitionOps";
- String dbDescription = "testPartitionOps";
- String dbLocation = "file:/tmp";
- Map<String, String> dbParams = new HashMap<>();
String dbOwner = "user1";
- Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
- db.setOwnerName(dbOwner);
- db.setOwnerType(PrincipalType.USER);
+ Database db = createTestDb(dbName, dbOwner);
objectStore.createDatabase(db);
db = objectStore.getDatabase(dbName);
// Add a table via ObjectStore
String tblName = "tbl";
String tblOwner = "user1";
- String serdeLocation = "file:/tmp";
FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
FieldSchema col2 = new FieldSchema("col2", "string", "string column");
- List<FieldSchema> cols = new ArrayList<>();
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
cols.add(col1);
cols.add(col2);
- Map<String, String> serdeParams = new HashMap<>();
- Map<String, String> tblParams = new HashMap<>();
- SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd =
- new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
- null, serdeParams);
FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column");
- List<FieldSchema> ptnCols = new ArrayList<>();
+ List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
ptnCols.add(ptnCol1);
- Table tbl =
- new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null,
- TableType.MANAGED_TABLE.toString());
+ Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
objectStore.createTable(tbl);
tbl = objectStore.getTable(dbName, tblName);
+
final String ptnColVal1 = "aaa";
- Map<String, String> partParams = new HashMap<>();
+ Map<String, String> partParams = new HashMap<String, String>();
Partition ptn1 =
- new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, partParams);
+ new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, tbl.getSd(), partParams);
objectStore.addPartition(ptn1);
ptn1 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
final String ptnColVal2 = "bbb";
Partition ptn2 =
- new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, partParams);
+ new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, tbl.getSd(), partParams);
objectStore.addPartition(ptn2);
ptn2 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
// Prewarm CachedStore
+ CachedStore.setCachePrewarmedState(false);
CachedStore.prewarm(objectStore);
// Read database, table, partition via CachedStore
- Database dbNew = cachedStore.getDatabase(dbName);
- Assert.assertEquals(db, dbNew);
- Table tblNew = cachedStore.getTable(dbName, tblName);
- Assert.assertEquals(tbl, tblNew);
- Partition newPtn1 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
- Assert.assertEquals(ptn1, newPtn1);
- Partition newPtn2 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
- Assert.assertEquals(ptn2, newPtn2);
+ Database dbRead = cachedStore.getDatabase(dbName);
+ Assert.assertEquals(db, dbRead);
+ Table tblRead = cachedStore.getTable(dbName, tblName);
+ Assert.assertEquals(tbl, tblRead);
+ Partition ptn1Read = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1));
+ Assert.assertEquals(ptn1, ptn1Read);
+ Partition ptn2Read = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+ Assert.assertEquals(ptn2, ptn2Read);
// Add a new partition via ObjectStore
final String ptnColVal3 = "ccc";
Partition ptn3 =
- new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, partParams);
+ new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, tbl.getSd(), partParams);
objectStore.addPartition(ptn3);
ptn3 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
// Alter an existing partition ("aaa") via ObjectStore
final String ptnColVal1Alt = "aaaAlt";
Partition ptn1Atl =
- new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, partParams);
+ new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, tbl.getSd(), partParams);
objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl);
ptn1Atl = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
@@ -360,45 +322,47 @@ public class TestCachedStore {
objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2));
// We update twice to accurately detect if cache is dirty or not
- updateCache(cachedStore, 100, 500, 100);
- updateCache(cachedStore, 100, 500, 100);
+ updateCache(cachedStore);
+ updateCache(cachedStore);
// Read the newly added partition via CachedStore
- Partition newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
- Assert.assertEquals(ptn3, newPtn);
+ Partition ptnRead = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+ Assert.assertEquals(ptn3, ptnRead);
// Read the altered partition via CachedStore
- newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
- Assert.assertEquals(ptn1Atl, newPtn);
+ ptnRead = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+ Assert.assertEquals(ptn1Atl, ptnRead);
// Try to read the dropped partition via CachedStore
try {
- newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
+ ptnRead = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2));
Assert.fail("The partition: " + ptnColVal2
+ " should have been removed from the cache after running the update service");
} catch (NoSuchObjectException e) {
// Expected
}
+ // Clean up
+ objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt));
+ objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal3));
+ objectStore.dropTable(dbName, tblName);
+ objectStore.dropDatabase(dbName);
+ sharedCache.getDatabaseCache().clear();
+ sharedCache.getTableCache().clear();
+ sharedCache.getSdCache().clear();
}
//@Test
public void testTableColStatsOps() throws Exception {
// Add a db via ObjectStore
String dbName = "testTableColStatsOps";
- String dbDescription = "testTableColStatsOps";
- String dbLocation = "file:/tmp";
- Map<String, String> dbParams = new HashMap<>();
String dbOwner = "user1";
- Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
- db.setOwnerName(dbOwner);
- db.setOwnerType(PrincipalType.USER);
+ Database db = createTestDb(dbName, dbOwner);
objectStore.createDatabase(db);
db = objectStore.getDatabase(dbName);
// Add a table via ObjectStore
final String tblName = "tbl";
final String tblOwner = "user1";
- final String serdeLocation = "file:/tmp";
final FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
// Stats values for col1
long col1LowVal = 5;
@@ -420,15 +384,10 @@ public class TestCachedStore {
cols.add(col1);
cols.add(col2);
cols.add(col3);
- Map<String, String> serdeParams = new HashMap<>();
- Map<String, String> tblParams = new HashMap<>();
- final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd =
- new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null,
- null, serdeParams);
- Table tbl =
- new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<>(), tblParams,
- null, null, TableType.MANAGED_TABLE.toString());
+ FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column");
+ List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+ ptnCols.add(ptnCol1);
+ Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols);
objectStore.createTable(tbl);
tbl = objectStore.getTable(dbName, tblName);
@@ -476,6 +435,7 @@ public class TestCachedStore {
objectStore.updateTableColumnStatistics(stats);
// Prewarm CachedStore
+ CachedStore.setCachePrewarmedState(false);
CachedStore.prewarm(objectStore);
// Read table stats via CachedStore
@@ -483,18 +443,13 @@ public class TestCachedStore {
cachedStore.getTableColumnStatistics(dbName, tblName,
Arrays.asList(col1.getName(), col2.getName(), col3.getName()));
Assert.assertEquals(stats, newStats);
- }
- private void updateCache(CachedStore cachedStore, long frequency, long sleepTime,
- long shutdownTimeout) throws InterruptedException {
- // Set cache refresh period to 100 milliseconds
- CachedStore.setCacheRefreshPeriod(100);
- // Start the CachedStore update service
- CachedStore.startCacheUpdateService(cachedStore.getConf());
- // Sleep for 500 ms so that cache update is complete
- Thread.sleep(500);
- // Stop cache update service
- CachedStore.stopCacheUpdateService(100);
+ // Clean up
+ objectStore.dropTable(dbName, tblName);
+ objectStore.dropDatabase(dbName);
+ sharedCache.getDatabaseCache().clear();
+ sharedCache.getTableCache().clear();
+ sharedCache.getSdCache().clear();
}
/**********************************************************************************************
@@ -503,29 +458,21 @@ public class TestCachedStore {
@Test
public void testSharedStoreDb() {
- Database db1 = new Database();
- Database db2 = new Database();
- Database db3 = new Database();
- Database newDb1 = new Database();
- newDb1.setName("db1");
-
- sharedCache.addDatabaseToCache("db1", db1);
- sharedCache.addDatabaseToCache("db2", db2);
- sharedCache.addDatabaseToCache("db3", db3);
-
+ Database db1 = createTestDb("db1", "user1");
+ Database db2 = createTestDb("db2", "user1");
+ Database db3 = createTestDb("db3", "user1");
+ Database newDb1 = createTestDb("newdb1", "user1");
+ sharedCache.addDatabaseToCache(db1);
+ sharedCache.addDatabaseToCache(db2);
+ sharedCache.addDatabaseToCache(db3);
Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
-
sharedCache.alterDatabaseInCache("db1", newDb1);
-
Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
-
sharedCache.removeDatabaseFromCache("db2");
-
Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 2);
-
List<String> dbs = sharedCache.listCachedDatabases();
Assert.assertEquals(dbs.size(), 2);
- Assert.assertTrue(dbs.contains("db1"));
+ Assert.assertTrue(dbs.contains("newdb1"));
Assert.assertTrue(dbs.contains("db3"));
}
@@ -608,6 +555,23 @@ public class TestCachedStore {
@Test
public void testSharedStorePartition() {
+ String dbName = "db1";
+ String tbl1Name = "tbl1";
+ String tbl2Name = "tbl2";
+ String owner = "user1";
+ Database db = createTestDb(dbName, owner);
+ sharedCache.addDatabaseToCache(db);
+ FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+ FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(col1);
+ cols.add(col2);
+ List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+ Table tbl1 = createTestTbl(dbName, tbl1Name, owner, cols, ptnCols);
+ sharedCache.addTableToCache(dbName, tbl1Name, tbl1);
+ Table tbl2 = createTestTbl(dbName, tbl2Name, owner, cols, ptnCols);
+ sharedCache.addTableToCache(dbName, tbl2Name, tbl2);
+
Partition part1 = new Partition();
StorageDescriptor sd1 = new StorageDescriptor();
List<FieldSchema> cols1 = new ArrayList<>();
@@ -645,8 +609,8 @@ public class TestCachedStore {
part3.setValues(Arrays.asList("201703"));
Partition newPart1 = new Partition();
- newPart1.setDbName("db1");
- newPart1.setTableName("tbl1");
+ newPart1.setDbName(dbName);
+ newPart1.setTableName(tbl1Name);
StorageDescriptor newSd1 = new StorageDescriptor();
List<FieldSchema> newCols1 = new ArrayList<>();
newCols1.add(new FieldSchema("newcol1", "int", ""));
@@ -654,32 +618,25 @@ public class TestCachedStore {
newParams1.put("key", "value");
newSd1.setCols(newCols1);
newSd1.setParameters(params1);
- newSd1.setLocation("loc1");
+ newSd1.setLocation("loc1new");
newPart1.setSd(newSd1);
newPart1.setValues(Arrays.asList("201701"));
- sharedCache.addPartitionToCache("db1", "tbl1", part1);
- sharedCache.addPartitionToCache("db1", "tbl1", part2);
- sharedCache.addPartitionToCache("db1", "tbl1", part3);
- sharedCache.addPartitionToCache("db1", "tbl2", part1);
-
- Assert.assertEquals(sharedCache.getCachedPartitionCount(), 4);
- Assert.assertEquals(sharedCache.getSdCache().size(), 2);
+ sharedCache.addPartitionToCache(dbName, tbl1Name, part1);
+ sharedCache.addPartitionToCache(dbName, tbl1Name, part2);
+ sharedCache.addPartitionToCache(dbName, tbl1Name, part3);
+ sharedCache.addPartitionToCache(dbName, tbl2Name, part1);
- Partition t = sharedCache.getPartitionFromCache("db1", "tbl1", Arrays.asList("201701"));
+ Partition t = sharedCache.getPartitionFromCache(dbName, tbl1Name, Arrays.asList("201701"));
Assert.assertEquals(t.getSd().getLocation(), "loc1");
- sharedCache.removePartitionFromCache("db1", "tbl2", Arrays.asList("201701"));
- Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3);
- Assert.assertEquals(sharedCache.getSdCache().size(), 2);
-
- sharedCache.alterPartitionInCache("db1", "tbl1", Arrays.asList("201701"), newPart1);
- Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3);
- Assert.assertEquals(sharedCache.getSdCache().size(), 3);
+ sharedCache.removePartitionFromCache(dbName, tbl2Name, Arrays.asList("201701"));
+ t = sharedCache.getPartitionFromCache(dbName, tbl2Name, Arrays.asList("201701"));
+ Assert.assertNull(t);
- sharedCache.removePartitionFromCache("db1", "tbl1", Arrays.asList("201702"));
- Assert.assertEquals(sharedCache.getCachedPartitionCount(), 2);
- Assert.assertEquals(sharedCache.getSdCache().size(), 2);
+ sharedCache.alterPartitionInCache(dbName, tbl1Name, Arrays.asList("201701"), newPart1);
+ t = sharedCache.getPartitionFromCache(dbName, tbl1Name, Arrays.asList("201701"));
+ Assert.assertEquals(t.getSd().getLocation(), "loc1new");
}
@Test
@@ -753,10 +710,10 @@ public class TestCachedStore {
String dbName = "testTableColStatsOps1";
String tblName = "tbl1";
String colName = "f1";
-
+
Database db = new Database(dbName, null, "some_location", null);
cachedStore.createDatabase(db);
-
+
List<FieldSchema> cols = new ArrayList<>();
cols.add(new FieldSchema(colName, "int", null));
List<FieldSchema> partCols = new ArrayList<>();
@@ -764,29 +721,29 @@ public class TestCachedStore {
StorageDescriptor sd =
new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()),
null, null, null);
-
+
Table tbl =
new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(),
null, null, TableType.MANAGED_TABLE.toString());
cachedStore.createTable(tbl);
-
+
List<String> partVals1 = new ArrayList<>();
partVals1.add("1");
List<String> partVals2 = new ArrayList<>();
partVals2.add("2");
-
+
Partition ptn1 =
new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>());
cachedStore.addPartition(ptn1);
Partition ptn2 =
new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>());
cachedStore.addPartition(ptn2);
-
+
ColumnStatistics stats = new ColumnStatistics();
ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName);
statsDesc.setPartName("col");
List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
-
+
ColumnStatisticsData data = new ColumnStatisticsData();
ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data);
LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
@@ -796,15 +753,15 @@ public class TestCachedStore {
longStats.setNumDVs(30);
data.setLongStats(longStats);
colStatObjs.add(colStats);
-
+
stats.setStatsDesc(statsDesc);
stats.setStatsObj(colStatObjs);
-
+
cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1);
-
+
longStats.setNumDVs(40);
cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2);
-
+
List<String> colNames = new ArrayList<>();
colNames.add(colName);
List<String> aggrPartVals = new ArrayList<>();
@@ -823,10 +780,10 @@ public class TestCachedStore {
String dbName = "testTableColStatsOps2";
String tblName = "tbl2";
String colName = "f1";
-
+
Database db = new Database(dbName, null, "some_location", null);
cachedStore.createDatabase(db);
-
+
List<FieldSchema> cols = new ArrayList<>();
cols.add(new FieldSchema(colName, "int", null));
List<FieldSchema> partCols = new ArrayList<>();
@@ -834,29 +791,29 @@ public class TestCachedStore {
StorageDescriptor sd =
new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()),
null, null, null);
-
+
Table tbl =
new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(),
null, null, TableType.MANAGED_TABLE.toString());
cachedStore.createTable(tbl);
-
+
List<String> partVals1 = new ArrayList<>();
partVals1.add("1");
List<String> partVals2 = new ArrayList<>();
partVals2.add("2");
-
+
Partition ptn1 =
new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>());
cachedStore.addPartition(ptn1);
Partition ptn2 =
new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>());
cachedStore.addPartition(ptn2);
-
+
ColumnStatistics stats = new ColumnStatistics();
ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName);
statsDesc.setPartName("col");
List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
-
+
ColumnStatisticsData data = new ColumnStatisticsData();
ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data);
LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector();
@@ -864,21 +821,21 @@ public class TestCachedStore {
longStats.setHighValue(100);
longStats.setNumNulls(50);
longStats.setNumDVs(30);
-
+
HyperLogLog hll = HyperLogLog.builder().build();
hll.addLong(1);
hll.addLong(2);
hll.addLong(3);
longStats.setBitVectors(hll.serialize());
-
+
data.setLongStats(longStats);
colStatObjs.add(colStats);
-
+
stats.setStatsDesc(statsDesc);
stats.setStatsObj(colStatObjs);
-
+
cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1);
-
+
longStats.setNumDVs(40);
hll = HyperLogLog.builder().build();
hll.addLong(2);
@@ -886,9 +843,9 @@ public class TestCachedStore {
hll.addLong(4);
hll.addLong(5);
longStats.setBitVectors(hll.serialize());
-
+
cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2);
-
+
List<String> colNames = new ArrayList<>();
colNames.add(colName);
List<String> aggrPartVals = new ArrayList<>();
@@ -901,4 +858,179 @@ public class TestCachedStore {
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100);
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(), 5);
}
+
+ @Test
+ public void testMultiThreadedSharedCacheOps() throws Exception {
+ List<String> dbNames = new ArrayList<String>(Arrays.asList("db1", "db2", "db3", "db4", "db5"));
+ List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
+ ExecutorService executor = Executors.newFixedThreadPool(50, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+
+ // Create 5 dbs
+ for (String dbName : dbNames) {
+ Callable<Object> c = new Callable<Object>() {
+ public Object call() {
+ Database db = createTestDb(dbName, "user1");
+ sharedCache.addDatabaseToCache(db);
+ return null;
+ }
+ };
+ tasks.add(c);
+ }
+ executor.invokeAll(tasks);
+ for (String dbName : dbNames) {
+ Database db = sharedCache.getDatabaseFromCache(dbName);
+ Assert.assertNotNull(db);
+ Assert.assertEquals(dbName, db.getName());
+ }
+
+ // Created 5 tables under "db1"
+ List<String> tblNames =
+ new ArrayList<String>(Arrays.asList("tbl1", "tbl2", "tbl3", "tbl4", "tbl5"));
+ tasks.clear();
+ for (String tblName : tblNames) {
+ FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+ FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(col1);
+ cols.add(col2);
+ FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column");
+ List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+ ptnCols.add(ptnCol1);
+ Callable<Object> c = new Callable<Object>() {
+ public Object call() {
+ Table tbl = createTestTbl(dbNames.get(0), tblName, "user1", cols, ptnCols);
+ sharedCache.addTableToCache(dbNames.get(0), tblName, tbl);
+ return null;
+ }
+ };
+ tasks.add(c);
+ }
+ executor.invokeAll(tasks);
+ for (String tblName : tblNames) {
+ Table tbl = sharedCache.getTableFromCache(dbNames.get(0), tblName);
+ Assert.assertNotNull(tbl);
+ Assert.assertEquals(tblName, tbl.getTableName());
+ }
+
+ // Add 5 partitions to all tables
+ List<String> ptnVals = new ArrayList<String>(Arrays.asList("aaa", "bbb", "ccc", "ddd", "eee"));
+ tasks.clear();
+ for (String tblName : tblNames) {
+ Table tbl = sharedCache.getTableFromCache(dbNames.get(0), tblName);
+ for (String ptnVal : ptnVals) {
+ Map<String, String> partParams = new HashMap<String, String>();
+ Callable<Object> c = new Callable<Object>() {
+ public Object call() {
+ Partition ptn = new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0,
+ tbl.getSd(), partParams);
+ sharedCache.addPartitionToCache(dbNames.get(0), tblName, ptn);
+ return null;
+ }
+ };
+ tasks.add(c);
+ }
+ }
+ executor.invokeAll(tasks);
+ for (String tblName : tblNames) {
+ for (String ptnVal : ptnVals) {
+ Partition ptn = sharedCache.getPartitionFromCache(dbNames.get(0), tblName, Arrays.asList(ptnVal));
+ Assert.assertNotNull(ptn);
+ Assert.assertEquals(tblName, ptn.getTableName());
+ Assert.assertEquals(tblName, ptn.getTableName());
+ Assert.assertEquals(Arrays.asList(ptnVal), ptn.getValues());
+ }
+ }
+
+ // Drop all partitions from "tbl1", "tbl2", "tbl3" and add 2 new partitions to "tbl4" and "tbl5"
+ List<String> newPtnVals = new ArrayList<String>(Arrays.asList("fff", "ggg"));
+ List<String> dropPtnTblNames = new ArrayList<String>(Arrays.asList("tbl1", "tbl2", "tbl3"));
+ List<String> addPtnTblNames = new ArrayList<String>(Arrays.asList("tbl4", "tbl5"));
+ tasks.clear();
+ for (String tblName : dropPtnTblNames) {
+ for (String ptnVal : ptnVals) {
+ Callable<Object> c = new Callable<Object>() {
+ public Object call() {
+ sharedCache.removePartitionFromCache(dbNames.get(0), tblName, Arrays.asList(ptnVal));
+ return null;
+ }
+ };
+ tasks.add(c);
+ }
+ }
+ for (String tblName : addPtnTblNames) {
+ Table tbl = sharedCache.getTableFromCache(dbNames.get(0), tblName);
+ for (String ptnVal : newPtnVals) {
+ Map<String, String> partParams = new HashMap<String, String>();
+ Callable<Object> c = new Callable<Object>() {
+ public Object call() {
+ Partition ptn = new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0,
+ tbl.getSd(), partParams);
+ sharedCache.addPartitionToCache(dbNames.get(0), tblName, ptn);
+ return null;
+ }
+ };
+ tasks.add(c);
+ }
+ }
+ executor.invokeAll(tasks);
+ for (String tblName : addPtnTblNames) {
+ for (String ptnVal : newPtnVals) {
+ Partition ptn = sharedCache.getPartitionFromCache(dbNames.get(0), tblName, Arrays.asList(ptnVal));
+ Assert.assertNotNull(ptn);
+ Assert.assertEquals(tblName, ptn.getTableName());
+ Assert.assertEquals(tblName, ptn.getTableName());
+ Assert.assertEquals(Arrays.asList(ptnVal), ptn.getValues());
+ }
+ }
+ for (String tblName : dropPtnTblNames) {
+ List<Partition> ptns = sharedCache.listCachedPartitions(dbNames.get(0), tblName, 100);
+ Assert.assertEquals(0, ptns.size());
+ }
+ sharedCache.getDatabaseCache().clear();
+ sharedCache.getTableCache().clear();
+ sharedCache.getSdCache().clear();
+ }
+
+ private Database createTestDb(String dbName, String dbOwner) {
+ String dbDescription = dbName;
+ String dbLocation = "file:/tmp";
+ Map<String, String> dbParams = new HashMap<>();
+ Database db = new Database(dbName, dbDescription, dbLocation, dbParams);
+ db.setOwnerName(dbOwner);
+ db.setOwnerType(PrincipalType.USER);
+ return db;
+ }
+
+ private Table createTestTbl(String dbName, String tblName, String tblOwner,
+ List<FieldSchema> cols, List<FieldSchema> ptnCols) {
+ String serdeLocation = "file:/tmp";
+ Map<String, String> serdeParams = new HashMap<>();
+ Map<String, String> tblParams = new HashMap<>();
+ SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>());
+ StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0,
+ serdeInfo, null, null, serdeParams);
+ sd.setStoredAsSubDirectories(false);
+ Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null,
+ TableType.MANAGED_TABLE.toString());
+ return tbl;
+ }
+
+ // This method will return only after the cache has updated once
+ private void updateCache(CachedStore cachedStore) throws InterruptedException {
+ int maxTries = 100000;
+ long updateCountBefore = cachedStore.getCacheUpdateCount();
+ // Start the CachedStore update service
+ CachedStore.startCacheUpdateService(cachedStore.getConf(), true, false);
+ while ((cachedStore.getCacheUpdateCount() != (updateCountBefore + 1)) && (maxTries-- > 0)) {
+ Thread.sleep(1000);
+ }
+ CachedStore.stopCacheUpdateService(100);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/resources/log4j2.properties b/standalone-metastore/src/test/resources/log4j2.properties
index 365687e..db8a550 100644
--- a/standalone-metastore/src/test/resources/log4j2.properties
+++ b/standalone-metastore/src/test/resources/log4j2.properties
@@ -8,28 +8,64 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
-name=PropertiesConfig
-property.filename = logs
-appenders = console
+status = INFO
+name = MetastoreLog4j2
+packages = org.apache.hadoop.hive.metastore
+# list of properties
+property.metastore.log.level = INFO
+property.metastore.root.logger = DRFA
+property.metastore.log.dir = ${sys:java.io.tmpdir}/${sys:user.name}
+property.metastore.log.file = metastore.log
+property.hive.perflogger.log.level = INFO
+
+# list of all appenders
+appenders = console, DRFA
+
+# console appender
appender.console.type = Console
-appender.console.name = STDOUT
+appender.console.name = console
+appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+appender.console.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n
+
+# daily rolling file appender
+appender.DRFA.type = RollingRandomAccessFile
+appender.DRFA.name = DRFA
+appender.DRFA.fileName = ${sys:metastore.log.dir}/${sys:metastore.log.file}
+# Use %pid in the filePattern to append <process-id>@<host-name> to the filename if you want separate log files for different CLI session
+appender.DRFA.filePattern = ${sys:metastore.log.dir}/${sys:metastore.log.file}.%d{yyyy-MM-dd}
+appender.DRFA.layout.type = PatternLayout
+appender.DRFA.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n
+appender.DRFA.policies.type = Policies
+appender.DRFA.policies.time.type = TimeBasedTriggeringPolicy
+appender.DRFA.policies.time.interval = 1
+appender.DRFA.policies.time.modulate = true
+appender.DRFA.strategy.type = DefaultRolloverStrategy
+appender.DRFA.strategy.max = 30
+
+# list of all loggers
+loggers = DataNucleus, Datastore, JPOX, PerfLogger
+
+logger.DataNucleus.name = DataNucleus
+logger.DataNucleus.level = INFO
+
+logger.Datastore.name = Datastore
+logger.Datastore.level = INFO
+
+logger.JPOX.name = JPOX
+logger.JPOX.level = INFO
-loggers=file
-logger.file.name=guru.springframework.blog.log4j2properties
-logger.file.level = debug
-logger.file.appenderRefs = file
-logger.file.appenderRef.file.ref = LOGFILE
+logger.PerfLogger.name = org.apache.hadoop.hive.ql.log.PerfLogger
+logger.PerfLogger.level = ${sys:hive.perflogger.log.level}
-rootLogger.level = debug
-rootLogger.appenderRefs = stdout
-rootLogger.appenderRef.stdout.ref = STDOUT
+# root logger
+rootLogger.level = ${sys:metastore.log.level}
+rootLogger.appenderRefs = root
+rootLogger.appenderRef.root.ref = ${sys:metastore.root.logger}
[3/4] hive git commit: HIVE-18264: CachedStore: Store cached
partitions/col stats within the table cache and make prewarm non-blocking
(Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)
Posted by vg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index d28b196..d37b201 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hive.metastore.cache;
import org.apache.hadoop.hive.metastore.api.CreationMetadata;
import org.apache.hadoop.hive.metastore.api.ISchemaName;
import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -35,7 +35,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -95,8 +94,6 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
import org.apache.hadoop.hive.metastore.api.SchemaVersion;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.hadoop.hive.metastore.api.Type;
@@ -124,130 +121,50 @@ import com.google.common.annotations.VisibleForTesting;
// TODO constraintCache
// TODO need sd nested copy?
// TODO String intern
-// TODO restructure HBaseStore
// TODO monitor event queue
// TODO initial load slow?
// TODO size estimation
-// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation
public class CachedStore implements RawStore, Configurable {
private static ScheduledExecutorService cacheUpdateMaster = null;
- private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true);
- private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
- private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true);
- private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
- private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true);
- private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
- private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true);
- private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
- private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock(
- true);
- private static ReentrantReadWriteLock partitionAggrColStatsCacheLock =
- new ReentrantReadWriteLock(true);
- private static AtomicBoolean isPartitionAggrColStatsCacheDirty = new AtomicBoolean(false);
- private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
private static List<Pattern> whitelistPatterns = null;
private static List<Pattern> blacklistPatterns = null;
+ // Default value set to 100 milliseconds for test purpose
+ private static long DEFAULT_CACHE_REFRESH_PERIOD = 100;
+ // Time after which metastore cache is updated from metastore DB by the background update thread
+ private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD;
+ private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false);
private RawStore rawStore = null;
private Configuration conf;
private PartitionExpressionProxy expressionProxy = null;
- // Default value set to 100 milliseconds for test purpose
- private static long cacheRefreshPeriod = 100;
-
- /** A wrapper over SharedCache. Allows one to get SharedCache safely; should be merged
- * into SharedCache itself (see the TODO on the class). */
- private static final SharedCacheWrapper sharedCacheWrapper = new SharedCacheWrapper();
+ private static final SharedCache sharedCache = new SharedCache();
static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
- static class TableWrapper {
- Table t;
- String location;
- Map<String, String> parameters;
- byte[] sdHash;
- TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) {
- this.t = t;
- this.sdHash = sdHash;
- this.location = location;
- this.parameters = parameters;
- }
- public Table getTable() {
- return t;
- }
- public byte[] getSdHash() {
- return sdHash;
- }
- public String getLocation() {
- return location;
- }
- public Map<String, String> getParameters() {
- return parameters;
- }
- }
-
- static class PartitionWrapper {
- Partition p;
- String location;
- Map<String, String> parameters;
- byte[] sdHash;
- PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
- this.p = p;
- this.sdHash = sdHash;
- this.location = location;
- this.parameters = parameters;
- }
- public Partition getPartition() {
- return p;
- }
- public byte[] getSdHash() {
- return sdHash;
- }
- public String getLocation() {
- return location;
- }
- public Map<String, String> getParameters() {
- return parameters;
- }
- }
+ public CachedStore() {
- static class StorageDescriptorWrapper {
- StorageDescriptor sd;
- int refCount = 0;
- StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
- this.sd = sd;
- this.refCount = refCount;
- }
- public StorageDescriptor getSd() {
- return sd;
- }
- public int getRefCount() {
- return refCount;
- }
}
- public CachedStore() {
+ @Override
+ public void setConf(Configuration conf) {
+ setConfInternal(conf);
+ initBlackListWhiteList(conf);
+ startCacheUpdateService(conf, false, true);
}
- public static void initSharedCacheAsync(Configuration conf) {
- String clazzName = null;
- boolean isEnabled = false;
- try {
- clazzName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL);
- isEnabled = JavaUtils.getClass(clazzName, RawStore.class).isAssignableFrom(CachedStore.class);
- } catch (MetaException e) {
- LOG.error("Cannot instantiate metastore class", e);
- }
- if (!isEnabled) {
- LOG.debug("CachedStore is not enabled; using " + clazzName);
- return;
- }
- sharedCacheWrapper.startInit(conf);
+ /**
+ * Similar to setConf but used from within the tests
+ * This does start the background thread for prewarm and update
+ * @param conf
+ */
+ void setConfForTest(Configuration conf) {
+ setConfInternal(conf);
+ initBlackListWhiteList(conf);
}
- @Override
- public void setConf(Configuration conf) {
- String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL,
- ObjectStore.class.getName());
+ private void setConfInternal(Configuration conf) {
+ String rawStoreClassName =
+ MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
if (rawStore == null) {
try {
rawStore = (JavaUtils.getClass(rawStoreClassName, RawStore.class)).newInstance();
@@ -260,94 +177,145 @@ public class CachedStore implements RawStore, Configurable {
this.conf = conf;
if (expressionProxy != null && conf != oldConf) {
LOG.warn("Unexpected setConf when we were already configured");
- }
- if (expressionProxy == null || conf != oldConf) {
+ } else {
expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
}
- initBlackListWhiteList(conf);
}
@VisibleForTesting
- static void prewarm(RawStore rawStore) throws Exception {
- // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
- Deadline.registerIfNot(1000000);
- List<String> dbNames = rawStore.getAllDatabases();
- LOG.info("Number of databases to prewarm: " + dbNames.size());
- SharedCache sharedCache = sharedCacheWrapper.getUnsafe();
- for (int i = 0; i < dbNames.size(); i++) {
- String dbName = StringUtils.normalizeIdentifier(dbNames.get(i));
- // Cache partition column stats
- Deadline.startTimer("getColStatsForDatabase");
- List<ColStatsObjWithSourceInfo> colStatsForDB =
- rawStore.getPartitionColStatsForDatabase(dbName);
- Deadline.stopTimer();
- if (colStatsForDB != null) {
- sharedCache.addPartitionColStatsToCache(colStatsForDB);
+ /**
+ * This initializes the caches in SharedCache by getting the objects from Metastore DB via
+ * ObjectStore and populating the respective caches
+ *
+ * @param rawStore
+ * @throws Exception
+ */
+ static void prewarm(RawStore rawStore) {
+ if (isCachePrewarmed.get()) {
+ return;
+ }
+ long startTime = System.nanoTime();
+ LOG.info("Prewarming CachedStore");
+ while (!isCachePrewarmed.get()) {
+ // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+ Deadline.registerIfNot(1000000);
+ List<String> dbNames;
+ try {
+ dbNames = rawStore.getAllDatabases();
+ } catch (MetaException e) {
+ // Try again
+ continue;
}
- LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size());
- Database db = rawStore.getDatabase(dbName);
- sharedCache.addDatabaseToCache(dbName, db);
- List<String> tblNames = rawStore.getAllTables(dbName);
- LOG.debug("Tables in database: {} : {}", dbName, tblNames);
- for (int j = 0; j < tblNames.size(); j++) {
- String tblName = StringUtils.normalizeIdentifier(tblNames.get(j));
- if (!shouldCacheTable(dbName, tblName)) {
- LOG.info("Not caching database: {}'s table: {}", dbName, tblName);
+ LOG.info("Number of databases to prewarm: {}", dbNames.size());
+ List<Database> databases = new ArrayList<>(dbNames.size());
+ for (String dbName : dbNames) {
+ try {
+ databases.add(rawStore.getDatabase(dbName));
+ } catch (NoSuchObjectException e) {
+ // Continue with next database
continue;
}
- LOG.info("Caching database: {}'s table: {}. Cached {} / {} tables so far.", dbName,
- tblName, j, tblNames.size());
- Table table = null;
- table = rawStore.getTable(dbName, tblName);
- // It is possible the table is deleted during fetching tables of the database,
- // in that case, continue with the next table
- if (table == null) {
+ }
+ sharedCache.populateDatabasesInCache(databases);
+ LOG.debug(
+ "Databases cache is now prewarmed. Now adding tables, partitions and statistics to the cache");
+ int numberOfDatabasesCachedSoFar = 0;
+ for (String dbName : dbNames) {
+ dbName = StringUtils.normalizeIdentifier(dbName);
+ List<String> tblNames;
+ try {
+ tblNames = rawStore.getAllTables(dbName);
+ } catch (MetaException e) {
+ // Continue with next database
continue;
}
- sharedCache.addTableToCache(dbName, tblName, table);
- if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) {
- Deadline.startTimer("getPartitions");
- List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
- Deadline.stopTimer();
- for (Partition partition : partitions) {
- sharedCache.addPartitionToCache(dbName, tblName, partition);
+ int numberOfTablesCachedSoFar = 0;
+ for (String tblName : tblNames) {
+ tblName = StringUtils.normalizeIdentifier(tblName);
+ if (!shouldCacheTable(dbName, tblName)) {
+ continue;
}
- }
- // Cache table column stats
- List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
- Deadline.startTimer("getTableColumnStatistics");
- ColumnStatistics tableColStats =
- rawStore.getTableColumnStatistics(dbName, tblName, colNames);
- Deadline.stopTimer();
- if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) {
- sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj());
- }
- // Cache aggregate stats for all partitions of a table and for all but default partition
- List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
- if ((partNames != null) && (partNames.size() > 0)) {
- AggrStats aggrStatsAllPartitions =
- rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
- // Remove default partition from partition names and get aggregate
- // stats again
- List<FieldSchema> partKeys = table.getPartitionKeys();
- String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
- List<String> partCols = new ArrayList<String>();
- List<String> partVals = new ArrayList<String>();
- for (FieldSchema fs : partKeys) {
- partCols.add(fs.getName());
- partVals.add(defaultPartitionValue);
+ Table table;
+ try {
+ table = rawStore.getTable(dbName, tblName);
+ } catch (MetaException e) {
+ // It is possible the table is deleted during fetching tables of the database,
+ // in that case, continue with the next table
+ continue;
}
- String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
- partNames.remove(defaultPartitionName);
- AggrStats aggrStatsAllButDefaultPartition =
- rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
- sharedCache.addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions,
- aggrStatsAllButDefaultPartition);
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ try {
+ ColumnStatistics tableColStats = null;
+ List<Partition> partitions = null;
+ List<ColumnStatistics> partitionColStats = null;
+ AggrStats aggrStatsAllPartitions = null;
+ AggrStats aggrStatsAllButDefaultPartition = null;
+ if (table.isSetPartitionKeys()) {
+ Deadline.startTimer("getPartitions");
+ partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+ Deadline.stopTimer();
+ List<String> partNames = new ArrayList<>(partitions.size());
+ for (Partition p : partitions) {
+ partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues()));
+ }
+ if (!partNames.isEmpty()) {
+ // Get partition column stats for this table
+ Deadline.startTimer("getPartitionColumnStatistics");
+ partitionColStats =
+ rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ // Get aggregate stats for all partitions of a table and for all but default
+ // partition
+ Deadline.startTimer("getAggrPartitionColumnStatistics");
+ aggrStatsAllPartitions =
+ rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ // Remove default partition from partition names and get aggregate
+ // stats again
+ List<FieldSchema> partKeys = table.getPartitionKeys();
+ String defaultPartitionValue =
+ MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+ List<String> partCols = new ArrayList<>();
+ List<String> partVals = new ArrayList<>();
+ for (FieldSchema fs : partKeys) {
+ partCols.add(fs.getName());
+ partVals.add(defaultPartitionValue);
+ }
+ String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+ partNames.remove(defaultPartitionName);
+ Deadline.startTimer("getAggrPartitionColumnStatistics");
+ aggrStatsAllButDefaultPartition =
+ rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ }
+ } else {
+ Deadline.startTimer("getTableColumnStatistics");
+ tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ Deadline.stopTimer();
+ }
+ sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
+ aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
+ } catch (MetaException | NoSuchObjectException e) {
+ // Continue with next table
+ continue;
+ }
+ LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName,
+ tblName, ++numberOfTablesCachedSoFar, tblNames.size());
}
+ LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName,
+ ++numberOfDatabasesCachedSoFar, dbNames.size());
}
+ isCachePrewarmed.set(true);
}
- // Notify all blocked threads that prewarm is complete now
- sharedCacheWrapper.notifyAllBlocked();
+ LOG.info("CachedStore initialized");
+ long endTime = System.nanoTime();
+ LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms");
+ sharedCache.completeTableCachePrewarm();
+ }
+
+ @VisibleForTesting
+ static void setCachePrewarmedState(boolean state) {
+ isCachePrewarmed.set(state);
}
private static void initBlackListWhiteList(Configuration conf) {
@@ -356,20 +324,27 @@ public class CachedStore implements RawStore, Configurable {
MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST));
blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf,
MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST));
- // The last specified blacklist pattern gets precedence
- Collections.reverse(blacklistPatterns);
}
}
@VisibleForTesting
- synchronized static void startCacheUpdateService(Configuration conf) {
+ /**
+ * This starts a background thread, which initially populates the SharedCache and later
+ * periodically gets updates from the metastore db
+ *
+ * @param conf
+ * @param runOnlyOnce
+ * @param shouldRunPrewarm
+ */
+ static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce,
+ boolean shouldRunPrewarm) {
if (cacheUpdateMaster == null) {
initBlackListWhiteList(conf);
if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) {
- cacheRefreshPeriod = MetastoreConf.getTimeVar(conf,
+ cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf,
ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS);
}
- LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriod);
+ LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriodMS);
cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -379,13 +354,20 @@ public class CachedStore implements RawStore, Configurable {
return t;
}
});
- cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf), 0, cacheRefreshPeriod,
+ if (!runOnlyOnce) {
+ cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
+ cacheRefreshPeriodMS, TimeUnit.MILLISECONDS);
+ }
+ }
+ if (runOnlyOnce) {
+ // Some tests control the execution of the background update thread
+ cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
TimeUnit.MILLISECONDS);
}
}
@VisibleForTesting
- synchronized static boolean stopCacheUpdateService(long timeout) {
+ static synchronized boolean stopCacheUpdateService(long timeout) {
boolean tasksStoppedBeforeShutdown = false;
if (cacheUpdateMaster != null) {
LOG.info("CachedStore: shutting down cache update service");
@@ -404,167 +386,83 @@ public class CachedStore implements RawStore, Configurable {
@VisibleForTesting
static void setCacheRefreshPeriod(long time) {
- cacheRefreshPeriod = time;
+ cacheRefreshPeriodMS = time;
}
static class CacheUpdateMasterWork implements Runnable {
- private boolean isFirstRun = true;
+ private boolean shouldRunPrewarm = true;
private final RawStore rawStore;
- public CacheUpdateMasterWork(Configuration conf) {
- String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL,
- ObjectStore.class.getName());
+ CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) {
+ this.shouldRunPrewarm = shouldRunPrewarm;
+ String rawStoreClassName =
+ MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
try {
rawStore = JavaUtils.getClass(rawStoreClassName, RawStore.class).newInstance();
rawStore.setConf(conf);
} catch (InstantiationException | IllegalAccessException | MetaException e) {
// MetaException here really means ClassNotFound (see the utility method).
// So, if any of these happen, that means we can never succeed.
- sharedCacheWrapper.updateInitState(e, true);
throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
}
}
@Override
public void run() {
- if (isFirstRun) {
- while (isFirstRun) {
- try {
- long startTime = System.nanoTime();
- LOG.info("Prewarming CachedStore");
- prewarm(rawStore);
- LOG.info("CachedStore initialized");
- long endTime = System.nanoTime();
- LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms");
- } catch (Exception e) {
- LOG.error("Prewarm failure", e);
- sharedCacheWrapper.updateInitState(e, false);
- return;
- }
- sharedCacheWrapper.updateInitState(null, false);
- isFirstRun = false;
- }
- } else {
+ if (!shouldRunPrewarm) {
// TODO: prewarm and update can probably be merged.
update();
+ } else {
+ try {
+ prewarm(rawStore);
+ } catch (Exception e) {
+ LOG.error("Prewarm failure", e);
+ return;
+ }
}
}
- public void update() {
+ void update() {
Deadline.registerIfNot(1000000);
LOG.debug("CachedStore: updating cached objects");
+ List<String> dbNames;
try {
- List<String> dbNames = rawStore.getAllDatabases();
- if (dbNames != null) {
- // Update the database in cache
- updateDatabases(rawStore, dbNames);
- for (String dbName : dbNames) {
- updateDatabasePartitionColStats(rawStore, dbName);
- // Update the tables in cache
- updateTables(rawStore, dbName);
- List<String> tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe());
- for (String tblName : tblNames) {
- if (!shouldCacheTable(dbName, tblName)) {
- continue;
- }
- // Update the partitions for a table in cache
- updateTablePartitions(rawStore, dbName, tblName);
- // Update the table column stats for a table in cache
- updateTableColStats(rawStore, dbName, tblName);
- // Update aggregate column stats cache
- updateAggregateStatsCache(rawStore, dbName, tblName);
- }
- }
- }
- } catch (Exception e) {
- LOG.error("Updating CachedStore: error happen when refresh; ignoring", e);
+ dbNames = rawStore.getAllDatabases();
+ } catch (MetaException e) {
+ LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e);
+ return;
}
- }
-
- private void updateDatabasePartitionColStats(RawStore rawStore, String dbName) {
- try {
- Deadline.startTimer("getColStatsForDatabasePartitions");
- List<ColStatsObjWithSourceInfo> colStatsForDB =
- rawStore.getPartitionColStatsForDatabase(dbName);
- Deadline.stopTimer();
- if (colStatsForDB != null) {
- if (partitionColStatsCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping partition column stats cache update; the partition column stats "
- + "list we have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe()
- .refreshPartitionColStats(StringUtils.normalizeIdentifier(dbName), colStatsForDB);
- }
- }
- } catch (MetaException | NoSuchObjectException e) {
- LOG.info("Updating CachedStore: unable to read partitions column stats of database: {}",
- dbName, e);
- } finally {
- if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) {
- partitionColStatsCacheLock.writeLock().unlock();
+ // Update the database in cache
+ updateDatabases(rawStore, dbNames);
+ for (String dbName : dbNames) {
+ // Update the tables in cache
+ updateTables(rawStore, dbName);
+ List<String> tblNames;
+ try {
+ tblNames = rawStore.getAllTables(dbName);
+ } catch (MetaException e) {
+ // Continue with next database
+ continue;
}
- }
- }
-
- // Update cached aggregate stats for all partitions of a table and for all
- // but default partition
- private void updateAggregateStatsCache(RawStore rawStore, String dbName, String tblName) {
- try {
- Table table = rawStore.getTable(dbName, tblName);
- List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
- List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
- if ((partNames != null) && (partNames.size() > 0)) {
- Deadline.startTimer("getAggregareStatsForAllPartitions");
- AggrStats aggrStatsAllPartitions =
- rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
- Deadline.stopTimer();
- // Remove default partition from partition names and get aggregate stats again
- List<FieldSchema> partKeys = table.getPartitionKeys();
- String defaultPartitionValue =
- MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
- List<String> partCols = new ArrayList<String>();
- List<String> partVals = new ArrayList<String>();
- for (FieldSchema fs : partKeys) {
- partCols.add(fs.getName());
- partVals.add(defaultPartitionValue);
- }
- String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
- partNames.remove(defaultPartitionName);
- Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault");
- AggrStats aggrStatsAllButDefaultPartition =
- rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
- Deadline.stopTimer();
- if ((aggrStatsAllPartitions != null) && (aggrStatsAllButDefaultPartition != null)) {
- if (partitionAggrColStatsCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isPartitionAggrColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug(
- "Skipping aggregate column stats cache update; the aggregate column stats we "
- + "have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshAggregateStatsCache(
- StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName),
- aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
- }
+ for (String tblName : tblNames) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ continue;
}
- }
- } catch (MetaException | NoSuchObjectException e) {
- LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
- e);
- } finally {
- if (partitionAggrColStatsCacheLock.isWriteLockedByCurrentThread()) {
- partitionAggrColStatsCacheLock.writeLock().unlock();
+ // Update the table column stats for a table in cache
+ updateTableColStats(rawStore, dbName, tblName);
+ // Update the partitions for a table in cache
+ updateTablePartitions(rawStore, dbName, tblName);
+ // Update the partition col stats for a table in cache
+ updateTablePartitionColStats(rawStore, dbName, tblName);
+ // Update aggregate partition column stats for a table in cache
+ updateTableAggregatePartitionColStats(rawStore, dbName, tblName);
}
}
+ sharedCache.incrementUpdateCount();
}
private void updateDatabases(RawStore rawStore, List<String> dbNames) {
- // Prepare the list of databases
- List<Database> databases = new ArrayList<>();
+ List<Database> databases = new ArrayList<>(dbNames.size());
for (String dbName : dbNames) {
Database db;
try {
@@ -574,24 +472,9 @@ public class CachedStore implements RawStore, Configurable {
LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
}
}
- // Update the cached database objects
- try {
- if (databaseCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isDatabaseCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping database cache update; the database list we have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshDatabases(databases);
- }
- } finally {
- if (databaseCacheLock.isWriteLockedByCurrentThread()) {
- databaseCacheLock.writeLock().unlock();
- }
- }
+ sharedCache.refreshDatabasesInCache(databases);
}
- // Update the cached table objects
private void updateTables(RawStore rawStore, String dbName) {
List<Table> tables = new ArrayList<>();
try {
@@ -600,81 +483,99 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
continue;
}
- Table table =
- rawStore.getTable(StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName));
+ Table table = rawStore.getTable(StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName));
tables.add(table);
}
- if (tableCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isTableCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping table cache update; the table list we have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshTables(dbName, tables);
- }
+ sharedCache.refreshTablesInCache(dbName, tables);
} catch (MetaException e) {
- LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e);
- } finally {
- if (tableCacheLock.isWriteLockedByCurrentThread()) {
- tableCacheLock.writeLock().unlock();
+ LOG.debug("Unable to refresh cached tables for database: " + dbName, e);
+ }
+ }
+
+ private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+ try {
+ Table table = rawStore.getTable(dbName, tblName);
+ if (!table.isSetPartitionKeys()) {
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ Deadline.startTimer("getTableColumnStatistics");
+ ColumnStatistics tableColStats =
+ rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ Deadline.stopTimer();
+ if (tableColStats != null) {
+ sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+ }
}
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Unable to refresh table column stats for table: " + tblName, e);
}
}
- // Update the cached partition objects for a table
private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) {
try {
Deadline.startTimer("getPartitions");
List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
Deadline.stopTimer();
- if (partitionCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isPartitionCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping partition cache update; the partition list we have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshPartitions(
- StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), partitions);
- }
+ sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName), partitions);
} catch (MetaException | NoSuchObjectException e) {
LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
- } finally {
- if (partitionCacheLock.isWriteLockedByCurrentThread()) {
- partitionCacheLock.writeLock().unlock();
- }
}
}
- // Update the cached col stats for this table
- private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+ private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) {
try {
Table table = rawStore.getTable(dbName, tblName);
List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
- Deadline.startTimer("getTableColumnStatistics");
- ColumnStatistics tableColStats =
- rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
+ // Get partition column stats for this table
+ Deadline.startTimer("getPartitionColumnStatistics");
+ List<ColumnStatistics> partitionColStats =
+ rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
Deadline.stopTimer();
- if (tableColStats != null) {
- if (tableColStatsCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping table column stats cache update; the table column stats list we "
- + "have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshTableColStats(
- StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+ sharedCache.refreshPartitionColStatsInCache(dbName, tblName, partitionColStats);
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+ }
+ }
+
+ // Update cached aggregate stats for all partitions of a table and for all
+ // but default partition
+ private void updateTableAggregatePartitionColStats(RawStore rawStore, String dbName,
+ String tblName) {
+ try {
+ Table table = rawStore.getTable(dbName, tblName);
+ List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ if ((partNames != null) && (partNames.size() > 0)) {
+ Deadline.startTimer("getAggregareStatsForAllPartitions");
+ AggrStats aggrStatsAllPartitions =
+ rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ // Remove default partition from partition names and get aggregate stats again
+ List<FieldSchema> partKeys = table.getPartitionKeys();
+ String defaultPartitionValue =
+ MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+ List<String> partCols = new ArrayList<String>();
+ List<String> partVals = new ArrayList<String>();
+ for (FieldSchema fs : partKeys) {
+ partCols.add(fs.getName());
+ partVals.add(defaultPartitionValue);
}
+ String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+ partNames.remove(defaultPartitionName);
+ Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault");
+ AggrStats aggrStatsAllButDefaultPartition =
+ rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
+ aggrStatsAllButDefaultPartition);
}
} catch (MetaException | NoSuchObjectException e) {
- LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e);
- } finally {
- if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) {
- tableColStatsCacheLock.writeLock().unlock();
- }
+ LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
+ e);
}
}
}
@@ -712,35 +613,17 @@ public class CachedStore implements RawStore, Configurable {
@Override
public void createDatabase(Database db) throws InvalidObjectException, MetaException {
rawStore.createDatabase(db);
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
- try {
- // Wait if background cache update is happening
- databaseCacheLock.readLock().lock();
- isDatabaseCacheDirty.set(true);
- sharedCache.addDatabaseToCache(StringUtils.normalizeIdentifier(db.getName()),
- db.deepCopy());
- } finally {
- databaseCacheLock.readLock().unlock();
- }
+ sharedCache.addDatabaseToCache(db);
}
@Override
public Database getDatabase(String dbName) throws NoSuchObjectException {
- SharedCache sharedCache;
-
- if (!sharedCacheWrapper.isInitialized()) {
+ if (!sharedCache.isDatabaseCachePrewarmed()) {
return rawStore.getDatabase(dbName);
}
-
- try {
- sharedCache = sharedCacheWrapper.get();
- } catch (MetaException e) {
- throw new RuntimeException(e); // TODO: why doesn't getDatabase throw MetaEx?
- }
- Database db = sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
+ dbName = dbName.toLowerCase();
+ Database db =
+ sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
if (db == null) {
throw new NoSuchObjectException();
}
@@ -748,68 +631,39 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException {
- boolean succ = rawStore.dropDatabase(dbname);
+ public boolean dropDatabase(String dbName) throws NoSuchObjectException, MetaException {
+ boolean succ = rawStore.dropDatabase(dbName);
if (succ) {
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- databaseCacheLock.readLock().lock();
- isDatabaseCacheDirty.set(true);
- sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbname));
- } finally {
- databaseCacheLock.readLock().unlock();
- }
+ dbName = dbName.toLowerCase();
+ sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
}
return succ;
}
@Override
- public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException,
- MetaException {
+ public boolean alterDatabase(String dbName, Database db)
+ throws NoSuchObjectException, MetaException {
boolean succ = rawStore.alterDatabase(dbName, db);
if (succ) {
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- databaseCacheLock.readLock().lock();
- isDatabaseCacheDirty.set(true);
- sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db);
- } finally {
- databaseCacheLock.readLock().unlock();
- }
+ dbName = dbName.toLowerCase();
+ sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db);
}
return succ;
}
@Override
public List<String> getDatabases(String pattern) throws MetaException {
- if (!sharedCacheWrapper.isInitialized()) {
+ if (!sharedCache.isDatabaseCachePrewarmed()) {
return rawStore.getDatabases(pattern);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- List<String> results = new ArrayList<>();
- for (String dbName : sharedCache.listCachedDatabases()) {
- dbName = StringUtils.normalizeIdentifier(dbName);
- if (CacheUtils.matches(dbName, pattern)) {
- results.add(dbName);
- }
- }
- return results;
+ return sharedCache.listCachedDatabases(pattern);
}
@Override
public List<String> getAllDatabases() throws MetaException {
- if (!sharedCacheWrapper.isInitialized()) {
+ if (!sharedCache.isDatabaseCachePrewarmed()) {
return rawStore.getAllDatabases();
}
- SharedCache sharedCache = sharedCacheWrapper.get();
return sharedCache.listCachedDatabases();
}
@@ -854,24 +708,13 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
validateTableType(tbl);
- try {
- // Wait if background cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.addTableToCache(dbName, tblName, tbl);
- } finally {
- tableCacheLock.readLock().unlock();
- }
+ sharedCache.addTableToCache(dbName, tblName, tbl);
}
@Override
- public boolean dropTable(String dbName, String tblName) throws MetaException,
- NoSuchObjectException, InvalidObjectException, InvalidInputException {
+ public boolean dropTable(String dbName, String tblName)
+ throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.dropTable(dbName, tblName);
if (succ) {
dbName = StringUtils.normalizeIdentifier(dbName);
@@ -879,28 +722,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- // Remove table
- try {
- // Wait if background table cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.removeTableFromCache(dbName, tblName);
- } finally {
- tableCacheLock.readLock().unlock();
- }
- // Remove table col stats
- try {
- // Wait if background table col stats cache update is happening
- tableColStatsCacheLock.readLock().lock();
- isTableColStatsCacheDirty.set(true);
- sharedCache.removeTableColStatsFromCache(dbName, tblName);
- } finally {
- tableColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.removeTableFromCache(dbName, tblName);
}
return succ;
}
@@ -909,11 +731,14 @@ public class CachedStore implements RawStore, Configurable {
public Table getTable(String dbName, String tblName) throws MetaException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getTable(dbName, tblName);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // This table is not yet loaded in cache
+ return rawStore.getTable(dbName, tblName);
+ }
if (tbl != null) {
tbl.unsetPrivileges();
tbl.setRewriteEnabled(tbl.isRewriteEnabled());
@@ -930,27 +755,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.addPartitionToCache(dbName, tblName, part);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.addPartitionToCache(dbName, tblName, part);
}
return succ;
}
@@ -965,29 +770,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- for (Partition part : parts) {
- sharedCache.addPartitionToCache(dbName, tblName, part);
- }
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.addPartitionsToCache(dbName, tblName, parts);
}
return succ;
}
@@ -1002,30 +785,10 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
- while (iterator.hasNext()) {
- Partition part = iterator.next();
- sharedCache.addPartitionToCache(dbName, tblName, part);
- }
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
+ PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+ while (iterator.hasNext()) {
+ Partition part = iterator.next();
+ sharedCache.addPartitionToCache(dbName, tblName, part);
}
}
return succ;
@@ -1036,16 +799,13 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
-
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getPartition(dbName, tblName, part_vals);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- Partition part =
- sharedCache.getPartitionFromCache(dbName, tblName, part_vals);
+ Partition part = sharedCache.getPartitionFromCache(dbName, tblName, part_vals);
if (part == null) {
- // TODO Manage privileges
- throw new NoSuchObjectException("partition values=" + part_vals.toString());
+ // The table containing the partition is not yet loaded in cache
+ return rawStore.getPartition(dbName, tblName, part_vals);
}
return part;
}
@@ -1055,10 +815,14 @@ public class CachedStore implements RawStore, Configurable {
List<String> part_vals) throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.doesPartitionExist(dbName, tblName, part_vals);
+ }
+ Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // The table containing the partition is not yet loaded in cache
return rawStore.doesPartitionExist(dbName, tblName, part_vals);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
return sharedCache.existPartitionFromCache(dbName, tblName, part_vals);
}
@@ -1072,50 +836,40 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- // Remove partition
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.removePartitionFromCache(dbName, tblName, part_vals);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove partition col stats
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals);
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.removePartitionFromCache(dbName, tblName, part_vals);
}
return succ;
}
@Override
+ public void dropPartitions(String dbName, String tblName, List<String> partNames)
+ throws MetaException, NoSuchObjectException {
+ rawStore.dropPartitions(dbName, tblName, partNames);
+ dbName = StringUtils.normalizeIdentifier(dbName);
+ tblName = StringUtils.normalizeIdentifier(tblName);
+ if (!shouldCacheTable(dbName, tblName)) {
+ return;
+ }
+ List<List<String>> partVals = new ArrayList<List<String>>();
+ for (String partName : partNames) {
+ partVals.add(partNameToVals(partName));
+ }
+ sharedCache.removePartitionsFromCache(dbName, tblName, partVals);
+ }
+
+ @Override
public List<Partition> getPartitions(String dbName, String tblName, int max)
throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.getPartitions(dbName, tblName, max);
+ }
+ Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // The table containing the partitions is not yet loaded in cache
return rawStore.getPartitions(dbName, tblName, max);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<Partition> parts = sharedCache.listCachedPartitions(dbName, tblName, max);
return parts;
}
@@ -1130,73 +884,20 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) {
return;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
+ Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // The table is not yet loaded in cache
return;
}
-
- if (shouldCacheTable(dbName, newTblName)) {
- validateTableType(newTable);
- // Update table cache
- try {
- // Wait if background cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), newTable);
- } finally {
- tableCacheLock.readLock().unlock();
- }
- // Update partition cache (key might have changed since table name is a
- // component of key)
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), newTable);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- } else {
- // Remove the table and its cached partitions, stats etc,
- // since it does not pass the whitelist/blacklist filter.
- // Remove table
- try {
- // Wait if background cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.removeTableFromCache(dbName, tblName);
- } finally {
- tableCacheLock.readLock().unlock();
- }
- // Remove partitions
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.removePartitionsFromCache(dbName, tblName);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove partition col stats
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.removePartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Update aggregate partition col stats keys wherever applicable
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.alterTableInAggrPartitionColStatsCache(dbName, tblName, newTable);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ if (shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) {
+ // If old table is in the cache and the new table can also be cached
+ sharedCache.alterTableInCache(dbName, tblName, newTable);
+ } else if (!shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) {
+ // If old table is *not* in the cache but the new table can be cached
+ sharedCache.addTableToCache(dbName, newTblName, newTable);
+ } else if (shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) {
+ // If old table is in the cache but the new table *cannot* be cached
+ sharedCache.removeTableFromCache(dbName, tblName);
}
}
@@ -1208,34 +909,21 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getTables(String dbName, String pattern) throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.getTables(dbName, pattern);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- List<String> tableNames = new ArrayList<>();
- for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
- if (CacheUtils.matches(table.getTableName(), pattern)) {
- tableNames.add(table.getTableName());
- }
- }
- return tableNames;
+ return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern,
+ (short) -1);
}
@Override
public List<String> getTables(String dbName, String pattern, TableType tableType)
throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.getTables(dbName, pattern, tableType);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- List<String> tableNames = new ArrayList<>();
- for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
- if (CacheUtils.matches(table.getTableName(), pattern) &&
- table.getTableType().equals(tableType.toString())) {
- tableNames.add(table.getTableName());
- }
- }
- return tableNames;
+ return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern,
+ tableType);
}
@Override
@@ -1248,10 +936,9 @@ public class CachedStore implements RawStore, Configurable {
public List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes)
throws MetaException {
// TODO Check if all required tables are allowed, if so, get it from cache
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.getTableMeta(dbNames, tableNames, tableTypes);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames),
StringUtils.normalizeIdentifier(tableNames), tableTypes);
}
@@ -1268,10 +955,9 @@ public class CachedStore implements RawStore, Configurable {
break;
}
}
- if (!sharedCacheWrapper.isInitialized() || missSomeInCache) {
+ if (!isCachePrewarmed.get() || missSomeInCache) {
return rawStore.getTableObjectsByName(dbName, tblNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<Table> tables = new ArrayList<>();
for (String tblName : tblNames) {
tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1286,38 +972,20 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getAllTables(String dbName) throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.getAllTables(dbName);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- return getAllTablesInternal(dbName, sharedCache);
- }
-
- private static List<String> getAllTablesInternal(String dbName, SharedCache sharedCache) {
- List<String> tblNames = new ArrayList<>();
- for (Table tbl : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
- tblNames.add(StringUtils.normalizeIdentifier(tbl.getTableName()));
- }
- return tblNames;
+ return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName));
}
@Override
public List<String> listTableNamesByFilter(String dbName, String filter, short max_tables)
throws MetaException, UnknownDBException {
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.listTableNamesByFilter(dbName, filter, max_tables);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- List<String> tableNames = new ArrayList<>();
- int count = 0;
- for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
- if (CacheUtils.matches(table.getTableName(), filter)
- && (max_tables == -1 || count < max_tables)) {
- tableNames.add(table.getTableName());
- count++;
- }
- }
- return tableNames;
+ return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), filter,
+ max_tables);
}
@Override
@@ -1325,16 +993,19 @@ public class CachedStore implements RawStore, Configurable {
short max_parts) throws MetaException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.listPartitionNames(dbName, tblName, max_parts);
+ }
+ Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // The table is not yet loaded in cache
return rawStore.listPartitionNames(dbName, tblName, max_parts);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<String> partitionNames = new ArrayList<>();
- Table t = sharedCache.getTableFromCache(dbName, tblName);
int count = 0;
for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, max_parts)) {
if (max_parts == -1 || count < max_parts) {
- partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+ partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()));
}
}
return partitionNames;
@@ -1363,37 +1034,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
- // Update partition cache
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Update partition column stats cache
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart);
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
}
@Override
@@ -1405,61 +1046,23 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
- // Update partition cache
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- for (int i = 0; i < partValsList.size(); i++) {
- List<String> partVals = partValsList.get(i);
- Partition newPart = newParts.get(i);
- sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
- }
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Update partition column stats cache
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- for (int i = 0; i < partValsList.size(); i++) {
- List<String> partVals = partValsList.get(i);
- Partition newPart = newParts.get(i);
- sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart);
- }
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.alterPartitionsInCache(dbName, tblName, partValsList, newParts);
}
private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr,
String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache)
- throws MetaException, NoSuchObjectException {
- List<Partition> parts = sharedCache.listCachedPartitions(
- StringUtils.normalizeIdentifier(table.getDbName()),
- StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
+ throws MetaException, NoSuchObjectException {
+ List<Partition> parts =
+ sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getDbName()),
+ StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
for (Partition part : parts) {
result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
}
if (defaultPartName == null || defaultPartName.isEmpty()) {
defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
}
- return expressionProxy.filterPartitionsByExpr(
- table.getPartitionKeys(), expr, defaultPartName, result);
+ return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName,
+ result);
}
@Override
@@ -1474,13 +1077,17 @@ public class CachedStore implements RawStore, Configurable {
String defaultPartitionName, short maxParts, List<Partition> result) throws TException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts,
result);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<String> partNames = new LinkedList<>();
Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
+ return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts,
+ result);
+ }
boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr,
defaultPartitionName, maxParts, partNames, sharedCache);
return hasUnknownPartitions;
@@ -1497,13 +1104,16 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getNumPartitionsByExpr(dbName, tblName, expr);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
List<String> partNames = new LinkedList<>();
Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
+ return rawStore.getNumPartitionsByExpr(dbName, tblName, expr);
+ }
getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames,
sharedCache);
return partNames.size();
@@ -1526,10 +1136,14 @@ public class CachedStore implements RawStore, Configurable {
List<String> partNames) throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.getPartitionsByNames(dbName, tblName, partNames);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.getPartitionsByNames(dbName, tblName, partNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<Partition> partitions = new ArrayList<>();
for (String partName : partNames) {
Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName));
@@ -1702,14 +1316,17 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException, InvalidObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
Partition p = sharedCache.getPartitionFromCache(dbName, tblName, partVals);
- if (p!=null) {
- Table t = sharedCache.getTableFromCache(dbName, tblName);
- String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals);
+ if (p != null) {
+ String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals);
PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
userName, groupNames);
p.setPrivileges(privs);
@@ -1723,16 +1340,19 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException, InvalidObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- Table t = sharedCache.getTableFromCache(dbName, tblName);
List<Partition> partitions = new ArrayList<>();
int count = 0;
for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
if (maxParts == -1 || count < maxParts) {
- String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+ String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
userName, groupNames);
part.setPrivileges(privs);
@@ -1749,13 +1369,16 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<String> partNames = new ArrayList<>();
int count = 0;
- Table t = sharedCache.getTableFromCache(dbName, tblName);
for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
boolean psMatch = true;
for (int i=0;i<partVals.size();i++) {
@@ -1770,7 +1393,7 @@ public class CachedStore implements RawStore, Configurable {
continue;
}
if (maxParts == -1 || count < maxParts) {
- partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+ partNames.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
count++;
}
}
@@ -1783,13 +1406,17 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, InvalidObjectException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName,
+ groupNames);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName,
groupNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<Partition> partitions = new ArrayList<>();
- Table t = sharedCache.getTableFromCache(dbName, tblName);
int count = 0;
for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
boolean psMatch = true;
@@ -1805,7 +1432,7 @@ public class CachedStore implements RawStore, Configurable {
continue;
}
if (maxParts == -1 || count < maxParts) {
- String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+ String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
PrincipalPrivilegeSet privs =
getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames);
part.setPrivileges(privs);
@@ -1825,35 +1452,19 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return succ;
}
List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
- Table tbl = getTable(dbName, tblName);
List<String> colNames = new ArrayList<>();
for (ColumnStatisticsObj statsObj : statsObjs) {
colNames.add(statsObj.getColName());
}
- StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
- // Update table
- try {
- // Wait if background cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.alterTableInCache(dbName, tblName, tbl);
- } finally {
- tableCacheLock.readLock().unlock();
- }
- // Update table col stats
- try {
- // Wait if background cache update is happening
- tableColStatsCacheLock.readLock().lock();
- isTableColStatsCacheDirty.set(true);
- sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs);
- } finally {
- tableColStatsCacheLock.readLock().unlock();
- }
+ StatsSetupConst.setColumnStatsState(table.getParameters(), colNames);
+ sharedCache.alterTableInCache(dbName, tblName, table);
+ sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs);
}
return succ;
}
@@ -1863,24 +1474,18 @@ public class CachedStore implements RawStore, Configurable {
List<String> colNames) throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getTableColumnStatistics(dbName, tblName, colNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
- List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
- for (String colName : colNames) {
- String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName);
- ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey);
- if (colStat != null) {
- colStatObjs.add(colStat);
- }
- }
- if (colStatObjs.isEmpty()) {
- return null;
- } else {
- return new ColumnStatistics(csd, colStatObjs);
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
+ return rawStore.getTableColumnStatistics(dbName, tblName, colNames);
}
+ ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
+ List<ColumnStatisticsObj> colStatObjs =
+ sharedCache.getTableColStatsFromCache(dbName, tblName, colNames);
+ return new ColumnStatistics(csd, colStatObjs);
}
@Override
@@ -1893,18 +1498,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- tableColStatsCacheLock.readLock().lock();
- isTableColStatsCacheDirty.set(true);
- sharedCache.removeTableColStatsFromCache(dbName, tblName, colName);
- } finally {
- tableColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.removeTableColStatsFromCache(dbName, tblName, colName);
}
return succ;
}
@@ -1919,10 +1513,6 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
Partition part = getPartition(dbName, tblName, partVals);
List<String> colNames = new ArrayList<>();
@@ -1930,34 +1520,8 @@ public class CachedStore implements RawStore, Configurable {
colNames.add(statsObj.getColName());
}
StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
- // Update partition
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.alterPartitionInCache(dbName, tblName, partVals, part);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Update partition column stats
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals,
- colStats.getStatsObj());
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.alterPartitionInCache(dbName, tblName, partVals, part);
+ sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, colStats.getStatsObj());
}
return succ;
}
@@ -1981,27 +1545,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName);
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName);
}
return succ;
}
@@ -2012,10 +1556,14 @@ public class CachedStore implements RawStore, Configurable {
List<ColumnStatisticsObj> colStats;
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
+ return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ }
List<String> allPartNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
if (partNames.size() == allPartNames.size()) {
colStats = sharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALL);
@@ -2054,10 +1602,8 @@ public class CachedStore implements RawStore, Configurable {
List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList =
new ArrayList<ColStatsObjWithSourceInfo>();
for (String partName : partNames) {
- String colStatsCacheKey =
- CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
ColumnStatisticsObj colStatsForPart =
- sharedCache.getCachedPartitionColStats(colStatsCacheKey);
+ sharedCache.getPartitionColStatsFromCache(dbName, tblName, partNameToVals(partName), colName);
if (colStatsForPart != null) {
ColStatsObjWithSourceInfo colStatsWithPartInfo =
new ColStatsObjWithSourceInfo(colStatsForPart, dbName, tblName, partName);
@@ -2173,54 +1719,6 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void dropPartitions(String dbName, String tblName, List<String> partNames)
- throws MetaException, NoSuchObjectException {
- rawStore.dropPartitions(dbName, tblName, partNames);
- dbName = StringUtils.normalizeIdentifier(dbName);
- tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(dbName, tblName)) {
- return;
- }
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
- // Remove partitions
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- for (String partName : partNames) {
- List<String> vals = partNameToVals(partName);
- sharedCache.removePartitionFromCache(dbName, tblName, vals);
- }
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove partition col stats
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- for (String partName : partNames) {
- List<String> part_vals = partNameToVals(partName);
- sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals);
- }
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
-
<TRUNCATED>
[2/4] hive git commit: HIVE-18264: CachedStore: Store cached
partitions/col stats within the table cache and make prewarm non-blocking
(Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)
Posted by vg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index 32ea174..cf92eda 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -21,15 +21,20 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.TreeMap;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -38,11 +43,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,15 +51,21 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
public class SharedCache {
- private Map<String, Database> databaseCache = new TreeMap<>();
- private Map<String, TableWrapper> tableCache = new TreeMap<>();
- private Map<String, PartitionWrapper> partitionCache = new TreeMap<>();
- private Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<>();
- private Map<String, ColumnStatisticsObj> tableColStatsCache = new TreeMap<>();
- private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<>();
- private Map<String, List<ColumnStatisticsObj>> aggrColStatsCache =
- new HashMap<String, List<ColumnStatisticsObj>>();
+ private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true);
+ // For caching Database objects. Key is database name
+ private Map<String, Database> databaseCache = new ConcurrentHashMap<String, Database>();
+ private boolean isDatabaseCachePrewarmed = false;
+ private HashSet<String> databasesDeletedDuringPrewarm = new HashSet<String>();
+ private AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
+ // For caching TableWrapper objects. Key is aggregate of database name and table name
+ private Map<String, TableWrapper> tableCache = new ConcurrentHashMap<String, TableWrapper>();
+ private boolean isTableCachePrewarmed = false;
+ private HashSet<String> tablesDeletedDuringPrewarm = new HashSet<String>();
+ private AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
+ private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new ConcurrentHashMap<>();
private static MessageDigest md;
+ static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
+ private AtomicLong cacheUpdateCount = new AtomicLong(0);
static enum StatsType {
ALL(0), ALLBUTDEFAULT(1);
@@ -74,8 +81,6 @@ public class SharedCache {
}
}
- private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class);
-
static {
try {
md = MessageDigest.getInstance("MD5");
@@ -84,43 +89,804 @@ public class SharedCache {
}
}
- public synchronized Database getDatabaseFromCache(String name) {
- return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null;
+ static class TableWrapper {
+ Table t;
+ String location;
+ Map<String, String> parameters;
+ byte[] sdHash;
+ ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true);
+ // For caching column stats for an unpartitioned table
+ // Key is column name and the value is the col stat object
+ private Map<String, ColumnStatisticsObj> tableColStatsCache =
+ new ConcurrentHashMap<String, ColumnStatisticsObj>();
+ private AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
+ // For caching partition objects
+ // Ket is partition values and the value is a wrapper around the partition object
+ private Map<String, PartitionWrapper> partitionCache = new ConcurrentHashMap<String, PartitionWrapper>();
+ private AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
+ // For caching column stats for a partitioned table
+ // Key is aggregate of partition values, column name and the value is the col stat object
+ private Map<String, ColumnStatisticsObj> partitionColStatsCache =
+ new ConcurrentHashMap<String, ColumnStatisticsObj>();
+ private AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
+ // For caching aggregate column stats for all and all minus default partition
+ // Key is column name and the value is a list of 2 col stat objects
+ // (all partitions and all but default)
+ private Map<String, List<ColumnStatisticsObj>> aggrColStatsCache =
+ new ConcurrentHashMap<String, List<ColumnStatisticsObj>>();
+ private AtomicBoolean isAggrPartitionColStatsCacheDirty = new AtomicBoolean(false);
+
+ TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) {
+ this.t = t;
+ this.sdHash = sdHash;
+ this.location = location;
+ this.parameters = parameters;
+ }
+
+ public Table getTable() {
+ return t;
+ }
+
+ public void setTable(Table t) {
+ this.t = t;
+ }
+
+ public byte[] getSdHash() {
+ return sdHash;
+ }
+
+ public void setSdHash(byte[] sdHash) {
+ this.sdHash = sdHash;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public void setLocation(String location) {
+ this.location = location;
+ }
+
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(Map<String, String> parameters) {
+ this.parameters = parameters;
+ }
+
+ void cachePartition(Partition part, SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+ partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+ isPartitionCacheDirty.set(true);
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ void cachePartitions(List<Partition> parts, SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ for (Partition part : parts) {
+ PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+ partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+ isPartitionCacheDirty.set(true);
+ }
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public Partition getPartition(List<String> partVals, SharedCache sharedCache) {
+ Partition part = null;
+ try {
+ tableLock.readLock().lock();
+ PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
+ if (wrapper == null) {
+ return null;
+ }
+ part = CacheUtils.assemble(wrapper, sharedCache);
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return part;
+ }
+
+ public List<Partition> listPartitions(int max, SharedCache sharedCache) {
+ List<Partition> parts = new ArrayList<>();
+ int count = 0;
+ try {
+ tableLock.readLock().lock();
+ for (PartitionWrapper wrapper : partitionCache.values()) {
+ if (max == -1 || count < max) {
+ parts.add(CacheUtils.assemble(wrapper, sharedCache));
+ count++;
+ }
+ }
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return parts;
+ }
+
+ public boolean containsPartition(List<String> partVals) {
+ boolean containsPart = false;
+ try {
+ tableLock.readLock().lock();
+ containsPart = partitionCache.containsKey(CacheUtils.buildPartitionCacheKey(partVals));
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return containsPart;
+ }
+
+ public Partition removePartition(List<String> partVal, SharedCache sharedCache) {
+ Partition part = null;
+ try {
+ tableLock.writeLock().lock();
+ PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
+ isPartitionCacheDirty.set(true);
+ if (wrapper.getSdHash() != null) {
+ sharedCache.decrSd(wrapper.getSdHash());
+ }
+ part = CacheUtils.assemble(wrapper, sharedCache);
+ // Remove col stats
+ String partialKey = CacheUtils.buildPartitionCacheKey(partVal);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+ iterator.remove();
+ }
+ }
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ return part;
+ }
+
+ public void removePartitions(List<List<String>> partVals, SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ for (List<String> partVal : partVals) {
+ removePartition(partVal, sharedCache);
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void alterPartition(List<String> partVals, Partition newPart, SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ removePartition(partVals, sharedCache);
+ cachePartition(newPart, sharedCache);
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void alterPartitions(List<List<String>> partValsList, List<Partition> newParts,
+ SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ for (int i = 0; i < partValsList.size(); i++) {
+ List<String> partVals = partValsList.get(i);
+ Partition newPart = newParts.get(i);
+ alterPartition(partVals, newPart, sharedCache);
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void refreshPartitions(List<Partition> partitions, SharedCache sharedCache) {
+ Map<String, PartitionWrapper> newPartitionCache = new HashMap<String, PartitionWrapper>();
+ try {
+ tableLock.writeLock().lock();
+ for (Partition part : partitions) {
+ if (isPartitionCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition cache update for table: " + getTable().getTableName()
+ + "; the partition list we have is dirty.");
+ return;
+ }
+ String key = CacheUtils.buildPartitionCacheKey(part.getValues());
+ PartitionWrapper wrapper = partitionCache.get(key);
+ if (wrapper != null) {
+ if (wrapper.getSdHash() != null) {
+ sharedCache.decrSd(wrapper.getSdHash());
+ }
+ }
+ wrapper = makePartitionWrapper(part, sharedCache);
+ newPartitionCache.put(key, wrapper);
+ }
+ partitionCache = newPartitionCache;
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void updateTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
+ try {
+ tableLock.writeLock().lock();
+ for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+ // Get old stats object if present
+ String key = colStatObj.getColName();
+ ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
+ if (oldStatsObj != null) {
+ // Update existing stat object's field
+ StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+ } else {
+ // No stats exist for this key; add a new object to the cache
+ // TODO: get rid of deepCopy after making sure callers don't use references
+ tableColStatsCache.put(key, colStatObj.deepCopy());
+ }
+ }
+ isTableColStatsCacheDirty.set(true);
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void refreshTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
+ Map<String, ColumnStatisticsObj> newTableColStatsCache =
+ new HashMap<String, ColumnStatisticsObj>();
+ try {
+ tableLock.writeLock().lock();
+ for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+ if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping table col stats cache update for table: "
+ + getTable().getTableName() + "; the table col stats list we have is dirty.");
+ return;
+ }
+ String key = colStatObj.getColName();
+ // TODO: get rid of deepCopy after making sure callers don't use references
+ newTableColStatsCache.put(key, colStatObj.deepCopy());
+ }
+ tableColStatsCache = newTableColStatsCache;
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public List<ColumnStatisticsObj> getCachedTableColStats(List<String> colNames) {
+ List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+ try {
+ tableLock.readLock().lock();
+ for (String colName : colNames) {
+ ColumnStatisticsObj colStatObj = tableColStatsCache.get(colName);
+ if (colStatObj != null) {
+ colStatObjs.add(colStatObj);
+ }
+ }
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return colStatObjs;
+ }
+
+ public void removeTableColStats(String colName) {
+ try {
+ tableLock.writeLock().lock();
+ tableColStatsCache.remove(colName);
+ isTableColStatsCacheDirty.set(true);
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public ColumnStatisticsObj getPartitionColStats(List<String> partVal, String colName) {
+ try {
+ tableLock.readLock().lock();
+ return partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ }
+
+ public void updatePartitionColStats(List<String> partVal,
+ List<ColumnStatisticsObj> colStatsObjs) {
+ try {
+ tableLock.writeLock().lock();
+ addPartitionColStatsToCache(partVal, colStatsObjs);
+ isPartitionColStatsCacheDirty.set(true);
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void removePartitionColStats(List<String> partVals, String colName) {
+ try {
+ tableLock.writeLock().lock();
+ partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName));
+ isPartitionColStatsCacheDirty.set(true);
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ private void addPartitionColStatsToCache(List<String> partVal,
+ List<ColumnStatisticsObj> colStatsObjs) {
+ for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+ // Get old stats object if present
+ String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+ ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
+ if (oldStatsObj != null) {
+ // Update existing stat object's field
+ StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+ } else {
+ // No stats exist for this key; add a new object to the cache
+ // TODO: get rid of deepCopy after making sure callers don't use references
+ partitionColStatsCache.put(key, colStatObj.deepCopy());
+ }
+ }
+ }
+
+ public void refreshPartitionColStats(List<ColumnStatistics> partitionColStats) {
+ Map<String, ColumnStatisticsObj> newPartitionColStatsCache =
+ new HashMap<String, ColumnStatisticsObj>();
+ try {
+ tableLock.writeLock().lock();
+ String tableName = StringUtils.normalizeIdentifier(getTable().getTableName());
+ for (ColumnStatistics cs : partitionColStats) {
+ if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition column stats cache update for table: "
+ + getTable().getTableName() + "; the partition column stats list we have is dirty");
+ return;
+ }
+ List<String> partVal;
+ try {
+ partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null);
+ List<ColumnStatisticsObj> colStatsObjs = cs.getStatsObj();
+ for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+ if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition column stats cache update for table: "
+ + getTable().getTableName() + "; the partition column list we have is dirty");
+ return;
+ }
+ String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+ newPartitionColStatsCache.put(key, colStatObj.deepCopy());
+ }
+ } catch (MetaException e) {
+ LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
+ }
+ }
+ partitionColStatsCache = newPartitionColStatsCache;
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public List<ColumnStatisticsObj> getAggrPartitionColStats(List<String> colNames,
+ StatsType statsType) {
+ List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
+ try {
+ tableLock.readLock().lock();
+ for (String colName : colNames) {
+ List<ColumnStatisticsObj> colStatList = aggrColStatsCache.get(colName);
+ // If unable to find stats for a column, return null so we can build stats
+ if (colStatList == null) {
+ return null;
+ }
+ ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition());
+ // If unable to find stats for this StatsType, return null so we can build stats
+ if (colStatObj == null) {
+ return null;
+ }
+ colStats.add(colStatObj);
+ }
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return colStats;
+ }
+
+ public void cacheAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
+ AggrStats aggrStatsAllButDefaultPartition) {
+ try {
+ tableLock.writeLock().lock();
+ if (aggrStatsAllPartitions != null) {
+ for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
+ if (statObj != null) {
+ List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
+ aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
+ aggrColStatsCache.put(statObj.getColName(), aggrStats);
+ }
+ }
+ }
+ if (aggrStatsAllButDefaultPartition != null) {
+ for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
+ if (statObj != null) {
+ List<ColumnStatisticsObj> aggrStats = aggrColStatsCache.get(statObj.getColName());
+ if (aggrStats == null) {
+ aggrStats = new ArrayList<ColumnStatisticsObj>();
+ }
+ aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+ }
+ }
+ }
+ isAggrPartitionColStatsCacheDirty.set(true);
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
+ AggrStats aggrStatsAllButDefaultPartition) {
+ Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
+ new HashMap<String, List<ColumnStatisticsObj>>();
+ try {
+ tableLock.writeLock().lock();
+ if (aggrStatsAllPartitions != null) {
+ for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
+ if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping aggregate stats cache update for table: "
+ + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+ return;
+ }
+ if (statObj != null) {
+ List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
+ aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
+ newAggrColStatsCache.put(statObj.getColName(), aggrStats);
+ }
+ }
+ }
+ if (aggrStatsAllButDefaultPartition != null) {
+ for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
+ if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping aggregate stats cache update for table: "
+ + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+ return;
+ }
+ if (statObj != null) {
+ List<ColumnStatisticsObj> aggrStats = newAggrColStatsCache.get(statObj.getColName());
+ if (aggrStats == null) {
+ aggrStats = new ArrayList<ColumnStatisticsObj>();
+ }
+ aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+ }
+ }
+ }
+ aggrColStatsCache = newAggrColStatsCache;
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ private void updateTableObj(Table newTable, SharedCache sharedCache) {
+ byte[] sdHash = getSdHash();
+ // Remove old table object's sd hash
+ if (sdHash != null) {
+ sharedCache.decrSd(sdHash);
+ }
+ Table tblCopy = newTable.deepCopy();
+ if (tblCopy.getPartitionKeys() != null) {
+ for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+ fs.setName(StringUtils.normalizeIdentifier(fs.getName()));
+ }
+ }
+ setTable(tblCopy);
+ if (tblCopy.getSd() != null) {
+ sdHash = MetaStoreUtils.hashStorageDescriptor(tblCopy.getSd(), md);
+ StorageDescriptor sd = tblCopy.getSd();
+ sharedCache.increSd(sd, sdHash);
+ tblCopy.setSd(null);
+ setSdHash(sdHash);
+ setLocation(sd.getLocation());
+ setParameters(sd.getParameters());
+ } else {
+ setSdHash(null);
+ setLocation(null);
+ setParameters(null);
+ }
+ }
+
+ private PartitionWrapper makePartitionWrapper(Partition part, SharedCache sharedCache) {
+ Partition partCopy = part.deepCopy();
+ PartitionWrapper wrapper;
+ if (part.getSd() != null) {
+ byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
+ StorageDescriptor sd = part.getSd();
+ sharedCache.increSd(sd, sdHash);
+ partCopy.setSd(null);
+ wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
+ } else {
+ wrapper = new PartitionWrapper(partCopy, null, null, null);
+ }
+ return wrapper;
+ }
+ }
+
+ static class PartitionWrapper {
+ Partition p;
+ String location;
+ Map<String, String> parameters;
+ byte[] sdHash;
+
+ PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
+ this.p = p;
+ this.sdHash = sdHash;
+ this.location = location;
+ this.parameters = parameters;
+ }
+
+ public Partition getPartition() {
+ return p;
+ }
+
+ public byte[] getSdHash() {
+ return sdHash;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+ }
+
+ static class StorageDescriptorWrapper {
+ StorageDescriptor sd;
+ int refCount = 0;
+
+ StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
+ this.sd = sd;
+ this.refCount = refCount;
+ }
+
+ public StorageDescriptor getSd() {
+ return sd;
+ }
+
+ public int getRefCount() {
+ return refCount;
+ }
+ }
+
+ public Database getDatabaseFromCache(String name) {
+ Database db = null;
+ try {
+ cacheLock.readLock().lock();
+ if (databaseCache.get(name) != null) {
+ db = databaseCache.get(name).deepCopy();
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ return db;
+ }
+
+ public void populateDatabasesInCache(List<Database> databases) {
+ for (Database db : databases) {
+ Database dbCopy = db.deepCopy();
+ // ObjectStore also stores db name in lowercase
+ dbCopy.setName(dbCopy.getName().toLowerCase());
+ try {
+ cacheLock.writeLock().lock();
+ // Since we allow write operations on cache while prewarm is happening:
+ // 1. Don't add databases that were deleted while we were preparing list for prewarm
+ // 2. Skip overwriting exisiting db object
+ // (which is present because it was added after prewarm started)
+ if (databasesDeletedDuringPrewarm.contains(dbCopy.getName().toLowerCase())) {
+ continue;
+ }
+ databaseCache.putIfAbsent(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy);
+ databasesDeletedDuringPrewarm.clear();
+ isDatabaseCachePrewarmed = true;
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
}
- public synchronized void addDatabaseToCache(String dbName, Database db) {
- Database dbCopy = db.deepCopy();
- dbCopy.setName(StringUtils.normalizeIdentifier(dbName));
- databaseCache.put(dbName, dbCopy);
+ public boolean isDatabaseCachePrewarmed() {
+ return isDatabaseCachePrewarmed;
}
- public synchronized void removeDatabaseFromCache(String dbName) {
- databaseCache.remove(dbName);
+ public void addDatabaseToCache(Database db) {
+ try {
+ cacheLock.writeLock().lock();
+ Database dbCopy = db.deepCopy();
+ // ObjectStore also stores db name in lowercase
+ dbCopy.setName(dbCopy.getName().toLowerCase());
+ databaseCache.put(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy);
+ isDatabaseCacheDirty.set(true);
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
}
- public synchronized List<String> listCachedDatabases() {
- return new ArrayList<>(databaseCache.keySet());
+ public void removeDatabaseFromCache(String dbName) {
+ try {
+ cacheLock.writeLock().lock();
+ // If db cache is not yet prewarmed, add this to a set which the prewarm thread can check
+ // so that the prewarm thread does not add it back
+ if (!isDatabaseCachePrewarmed) {
+ databasesDeletedDuringPrewarm.add(dbName.toLowerCase());
+ }
+ if (databaseCache.remove(dbName) != null) {
+ isDatabaseCacheDirty.set(true);
+ }
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
+ public List<String> listCachedDatabases() {
+ List<String> results = new ArrayList<>();
+ try {
+ cacheLock.readLock().lock();
+ results.addAll(databaseCache.keySet());
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ return results;
+ }
+
+ public List<String> listCachedDatabases(String pattern) {
+ List<String> results = new ArrayList<>();
+ try {
+ cacheLock.readLock().lock();
+ for (String dbName : databaseCache.keySet()) {
+ dbName = StringUtils.normalizeIdentifier(dbName);
+ if (CacheUtils.matches(dbName, pattern)) {
+ results.add(dbName);
+ }
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ return results;
}
- public synchronized void alterDatabaseInCache(String dbName, Database newDb) {
- removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
- addDatabaseToCache(StringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy());
+ /**
+ * Replaces the old db object with the new one.
+ * This will add the new database to cache if it does not exist.
+ * @param dbName
+ * @param newDb
+ */
+ public void alterDatabaseInCache(String dbName, Database newDb) {
+ try {
+ cacheLock.writeLock().lock();
+ removeDatabaseFromCache(dbName);
+ addDatabaseToCache(newDb.deepCopy());
+ isDatabaseCacheDirty.set(true);
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
+ public void refreshDatabasesInCache(List<Database> databases) {
+ try {
+ cacheLock.writeLock().lock();
+ if (isDatabaseCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping database cache update; the database list we have is dirty.");
+ return;
+ }
+ databaseCache.clear();
+ for (Database db : databases) {
+ addDatabaseToCache(db);
+ }
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
+ public int getCachedDatabaseCount() {
+ try {
+ cacheLock.readLock().lock();
+ return databaseCache.size();
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ }
+
+ public void populateTableInCache(Table table, ColumnStatistics tableColStats,
+ List<Partition> partitions, List<ColumnStatistics> partitionColStats,
+ AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+ String dbName = StringUtils.normalizeIdentifier(table.getDbName());
+ String tableName = StringUtils.normalizeIdentifier(table.getTableName());
+ // Since we allow write operations on cache while prewarm is happening:
+ // 1. Don't add tables that were deleted while we were preparing list for prewarm
+ if (tablesDeletedDuringPrewarm.contains(CacheUtils.buildTableCacheKey(dbName, tableName))) {
+ return;
+ }
+ TableWrapper tblWrapper = createTableWrapper(dbName, tableName, table);
+ if (!table.isSetPartitionKeys() && (tableColStats != null)) {
+ tblWrapper.updateTableColStats(tableColStats.getStatsObj());
+ } else {
+ if (partitions != null) {
+ tblWrapper.cachePartitions(partitions, this);
+ }
+ if (partitionColStats != null) {
+ for (ColumnStatistics cs : partitionColStats) {
+ List<String> partVal;
+ try {
+ partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null);
+ List<ColumnStatisticsObj> colStats = cs.getStatsObj();
+ tblWrapper.updatePartitionColStats(partVal, colStats);
+ } catch (MetaException e) {
+ LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
+ }
+ }
+ }
+ tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
+ aggrStatsAllButDefaultPartition);
+ }
+ try {
+ cacheLock.writeLock().lock();
+ // 2. Skip overwriting exisiting table object
+ // (which is present because it was added after prewarm started)
+ tableCache.putIfAbsent(CacheUtils.buildTableCacheKey(dbName, tableName), tblWrapper);
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
}
- public synchronized int getCachedDatabaseCount() {
- return databaseCache.size();
+ public void completeTableCachePrewarm() {
+ try {
+ cacheLock.writeLock().lock();
+ tablesDeletedDuringPrewarm.clear();
+ isTableCachePrewarmed = true;
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
}
- public synchronized Table getTableFromCache(String dbName, String tableName) {
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName));
- if (tblWrapper == null) {
- return null;
+ public Table getTableFromCache(String dbName, String tableName) {
+ Table t = null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+ if (tblWrapper != null) {
+ t = CacheUtils.assemble(tblWrapper, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
- Table t = CacheUtils.assemble(tblWrapper, this);
return t;
}
- public synchronized void addTableToCache(String dbName, String tblName, Table tbl) {
+ public TableWrapper addTableToCache(String dbName, String tblName, Table tbl) {
+ try {
+ cacheLock.writeLock().lock();
+ TableWrapper wrapper = createTableWrapper(dbName, tblName, tbl);
+ tableCache.put(CacheUtils.buildTableCacheKey(dbName, tblName), wrapper);
+ isTableCacheDirty.set(true);
+ return wrapper;
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
+ private TableWrapper createTableWrapper(String dbName, String tblName, Table tbl) {
+ TableWrapper wrapper;
Table tblCopy = tbl.deepCopy();
tblCopy.setDbName(StringUtils.normalizeIdentifier(dbName));
tblCopy.setTableName(StringUtils.normalizeIdentifier(tblName));
@@ -129,7 +895,6 @@ public class SharedCache {
fs.setName(StringUtils.normalizeIdentifier(fs.getName()));
}
}
- TableWrapper wrapper;
if (tbl.getSd() != null) {
byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md);
StorageDescriptor sd = tbl.getSd();
@@ -139,482 +904,452 @@ public class SharedCache {
} else {
wrapper = new TableWrapper(tblCopy, null, null, null);
}
- tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper);
+ return wrapper;
}
- public synchronized void removeTableFromCache(String dbName, String tblName) {
- TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName));
- byte[] sdHash = tblWrapper.getSdHash();
- if (sdHash!=null) {
- decrSd(sdHash);
+ public void removeTableFromCache(String dbName, String tblName) {
+ try {
+ cacheLock.writeLock().lock();
+ // If table cache is not yet prewarmed, add this to a set which the prewarm thread can check
+ // so that the prewarm thread does not add it back
+ if (!isTableCachePrewarmed) {
+ tablesDeletedDuringPrewarm.add(CacheUtils.buildTableCacheKey(dbName, tblName));
+ }
+ TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildTableCacheKey(dbName, tblName));
+ byte[] sdHash = tblWrapper.getSdHash();
+ if (sdHash != null) {
+ decrSd(sdHash);
+ }
+ isTableCacheDirty.set(true);
+ } finally {
+ cacheLock.writeLock().unlock();
}
}
- public synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) {
- return tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null;
- }
-
- public synchronized void removeTableColStatsFromCache(String dbName, String tblName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- tableColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public void alterTableInCache(String dbName, String tblName, Table newTable) {
+ try {
+ cacheLock.writeLock().lock();
+ TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.updateTableObj(newTable, this);
+ String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
+ String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName());
+ tableCache.put(CacheUtils.buildTableCacheKey(newDbName, newTblName), tblWrapper);
+ isTableCacheDirty.set(true);
}
+ } finally {
+ cacheLock.writeLock().unlock();
}
}
- public synchronized void removeTableColStatsFromCache(String dbName, String tblName,
- String colName) {
- if (colName == null) {
- removeTableColStatsFromCache(dbName, tblName);
- } else {
- tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName));
+ public List<Table> listCachedTables(String dbName) {
+ List<Table> tables = new ArrayList<>();
+ try {
+ cacheLock.readLock().lock();
+ for (TableWrapper wrapper : tableCache.values()) {
+ if (wrapper.getTable().getDbName().equals(dbName)) {
+ tables.add(CacheUtils.assemble(wrapper, this));
+ }
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return tables;
}
- public synchronized void updateTableColStatsInCache(String dbName, String tableName,
- List<ColumnStatisticsObj> colStatsForTable) {
- for (ColumnStatisticsObj colStatObj : colStatsForTable) {
- // Get old stats object if present
- String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
- ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
- if (oldStatsObj != null) {
- LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName()
- + ", of table: " + tableName + " and database: " + dbName);
- // Update existing stat object's field
- StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
- } else {
- // No stats exist for this key; add a new object to the cache
- tableColStatsCache.put(key, colStatObj);
+ public List<String> listCachedTableNames(String dbName) {
+ List<String> tableNames = new ArrayList<>();
+ try {
+ cacheLock.readLock().lock();
+ for (TableWrapper wrapper : tableCache.values()) {
+ if (wrapper.getTable().getDbName().equals(dbName)) {
+ tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+ }
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return tableNames;
}
- public synchronized void alterTableInCache(String dbName, String tblName, Table newTable) {
- removeTableFromCache(dbName, tblName);
- addTableToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
- StringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+ public List<String> listCachedTableNames(String dbName, String pattern, short maxTables) {
+ List<String> tableNames = new ArrayList<String>();
+ try {
+ cacheLock.readLock().lock();
+ int count = 0;
+ for (TableWrapper wrapper : tableCache.values()) {
+ if ((wrapper.getTable().getDbName().equals(dbName))
+ && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+ && (maxTables == -1 || count < maxTables)) {
+ tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+ count++;
+ }
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ return tableNames;
}
- public synchronized void alterTableInPartitionCache(String dbName, String tblName,
- Table newTable) {
- if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
- List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
- for (Partition part : partitions) {
- removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues());
- part.setDbName(StringUtils.normalizeIdentifier(newTable.getDbName()));
- part.setTableName(StringUtils.normalizeIdentifier(newTable.getTableName()));
- addPartitionToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
- StringUtils.normalizeIdentifier(newTable.getTableName()), part);
+ public List<String> listCachedTableNames(String dbName, String pattern, TableType tableType) {
+ List<String> tableNames = new ArrayList<String>();
+ try {
+ cacheLock.readLock().lock();
+ for (TableWrapper wrapper : tableCache.values()) {
+ if ((wrapper.getTable().getDbName().equals(dbName))
+ && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+ && wrapper.getTable().getTableType().equals(tableType.toString())) {
+ tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+ }
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return tableNames;
}
- public synchronized void alterTableInTableColStatsCache(String dbName, String tblName,
- Table newTable) {
- if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
- String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- tableColStatsCache.entrySet().iterator();
- Map<String, ColumnStatisticsObj> newTableColStats =
- new HashMap<>();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- ColumnStatisticsObj colStatObj = entry.getValue();
- if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) {
- String[] decomposedKey = CacheUtils.splitTableColStats(key);
- String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]);
- newTableColStats.put(newKey, colStatObj);
- iterator.remove();
+ public void refreshTablesInCache(String dbName, List<Table> tables) {
+ try {
+ cacheLock.writeLock().lock();
+ if (isTableCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping table cache update; the table list we have is dirty.");
+ return;
+ }
+ Map<String, TableWrapper> newTableCache = new HashMap<String, TableWrapper>();
+ for (Table tbl : tables) {
+ String tblName = StringUtils.normalizeIdentifier(tbl.getTableName());
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.updateTableObj(tbl, this);
+ } else {
+ tblWrapper = createTableWrapper(dbName, tblName, tbl);
}
+ newTableCache.put(CacheUtils.buildTableCacheKey(dbName, tblName), tblWrapper);
}
- tableColStatsCache.putAll(newTableColStats);
+ tableCache.clear();
+ tableCache = newTableCache;
+ } finally {
+ cacheLock.writeLock().unlock();
}
}
- public synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName,
- Table newTable) {
- if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
- List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
- Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
- for (Partition part : partitions) {
- String oldPartialPartitionKey =
- CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues());
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- ColumnStatisticsObj colStatObj = entry.getValue();
- if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
- Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
- // New key has the new table name
- String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(),
- (List<String>) decomposedKey[2], (String) decomposedKey[3]);
- newPartitionColStats.put(newKey, colStatObj);
- iterator.remove();
- }
- }
+ public List<ColumnStatisticsObj> getTableColStatsFromCache(String dbName, String tblName,
+ List<String> colNames) {
+ List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ colStatObjs = tblWrapper.getCachedTableColStats(colNames);
}
- partitionColStatsCache.putAll(newPartitionColStats);
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return colStatObjs;
}
- public synchronized void alterTableInAggrPartitionColStatsCache(String dbName, String tblName,
- Table newTable) {
- if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
- Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
- new HashMap<String, List<ColumnStatisticsObj>>();
- String oldPartialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, List<ColumnStatisticsObj>>> iterator =
- aggrColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, List<ColumnStatisticsObj>> entry = iterator.next();
- String key = entry.getKey();
- List<ColumnStatisticsObj> value = entry.getValue();
- if (key.toLowerCase().startsWith(oldPartialKey.toLowerCase())) {
- Object[] decomposedKey = CacheUtils.splitAggrColStats(key);
- // New key has the new table name
- String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(),
- (String) decomposedKey[2]);
- newAggrColStatsCache.put(newKey, value);
- iterator.remove();
- }
+ public void removeTableColStatsFromCache(String dbName, String tblName, String colName) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.removeTableColStats(colName);
}
- aggrColStatsCache.putAll(newAggrColStatsCache);
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- public synchronized int getCachedTableCount() {
- return tableCache.size();
+ public void updateTableColStatsInCache(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+ if (tblWrapper != null) {
+ tblWrapper.updateTableColStats(colStatsForTable);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
}
- public synchronized List<Table> listCachedTables(String dbName) {
- List<Table> tables = new ArrayList<>();
- for (TableWrapper wrapper : tableCache.values()) {
- if (wrapper.getTable().getDbName().equals(dbName)) {
- tables.add(CacheUtils.assemble(wrapper, this));
+ public void refreshTableColStatsInCache(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+ if (tblWrapper != null) {
+ tblWrapper.refreshTableColStats(colStatsForTable);
}
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ }
+
+ public int getCachedTableCount() {
+ try {
+ cacheLock.readLock().lock();
+ return tableCache.size();
+ } finally {
+ cacheLock.readLock().unlock();
}
- return tables;
}
- public synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) {
+ public List<TableMeta> getTableMeta(String dbNames, String tableNames,
+ List<String> tableTypes) {
List<TableMeta> tableMetas = new ArrayList<>();
- for (String dbName : listCachedDatabases()) {
- if (CacheUtils.matches(dbName, dbNames)) {
- for (Table table : listCachedTables(dbName)) {
- if (CacheUtils.matches(table.getTableName(), tableNames)) {
- if (tableTypes==null || tableTypes.contains(table.getTableType())) {
- TableMeta metaData = new TableMeta(
- dbName, table.getTableName(), table.getTableType());
+ try {
+ cacheLock.readLock().lock();
+ for (String dbName : listCachedDatabases()) {
+ if (CacheUtils.matches(dbName, dbNames)) {
+ for (Table table : listCachedTables(dbName)) {
+ if (CacheUtils.matches(table.getTableName(), tableNames)) {
+ if (tableTypes == null || tableTypes.contains(table.getTableType())) {
+ TableMeta metaData =
+ new TableMeta(dbName, table.getTableName(), table.getTableType());
metaData.setComments(table.getParameters().get("comment"));
tableMetas.add(metaData);
+ }
}
}
}
}
+ } finally {
+ cacheLock.readLock().unlock();
}
return tableMetas;
}
- public synchronized void addPartitionToCache(String dbName, String tblName, Partition part) {
- Partition partCopy = part.deepCopy();
- PartitionWrapper wrapper;
- if (part.getSd()!=null) {
- byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
- StorageDescriptor sd = part.getSd();
- increSd(sd, sdHash);
- partCopy.setSd(null);
- wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
- } else {
- wrapper = new PartitionWrapper(partCopy, null, null, null);
+ public void addPartitionToCache(String dbName, String tblName, Partition part) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.cachePartition(part, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
- partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper);
}
- public synchronized Partition getPartitionFromCache(String key) {
- PartitionWrapper wrapper = partitionCache.get(key);
- if (wrapper == null) {
- return null;
+ public void addPartitionsToCache(String dbName, String tblName, List<Partition> parts) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.cachePartitions(parts, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
- Partition p = CacheUtils.assemble(wrapper, this);
- return p;
- }
-
- public synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
- return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals));
}
- public synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
- return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals));
- }
-
- public synchronized Partition removePartitionFromCache(String dbName, String tblName,
- List<String> part_vals) {
- PartitionWrapper wrapper =
- partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
- if (wrapper.getSdHash() != null) {
- decrSd(wrapper.getSdHash());
+ public Partition getPartitionFromCache(String dbName, String tblName,
+ List<String> partVals) {
+ Partition part = null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ part = tblWrapper.getPartition(partVals, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
- return wrapper.getPartition();
+ return part;
}
- /**
- * Given a db + table, remove all partitions for this table from the cache
- * @param dbName
- * @param tblName
- * @return
- */
- public synchronized void removePartitionsFromCache(String dbName, String tblName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, PartitionWrapper> entry = iterator.next();
- String key = entry.getKey();
- PartitionWrapper wrapper = entry.getValue();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
- if (wrapper.getSdHash() != null) {
- decrSd(wrapper.getSdHash());
- }
+ public boolean existPartitionFromCache(String dbName, String tblName, List<String> partVals) {
+ boolean existsPart = false;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ existsPart = tblWrapper.containsPartition(partVals);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return existsPart;
}
- // Remove cached column stats for all partitions of all tables in a db
- public synchronized void removePartitionColStatsFromCache(String dbName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public Partition removePartitionFromCache(String dbName, String tblName,
+ List<String> partVals) {
+ Partition part = null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ part = tblWrapper.removePartition(partVals, this);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return part;
}
- // Remove cached column stats for all partitions of a table
- public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public void removePartitionsFromCache(String dbName, String tblName,
+ List<List<String>> partVals) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.removePartitions(partVals, this);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- // Remove cached column stats for a particular partition of a table
- public synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
- List<String> partVals) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
+ List<Partition> parts = new ArrayList<Partition>();
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ parts = tblWrapper.listPartitions(max, this);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return parts;
}
- // Remove cached column stats for a particular partition and a particular column of a table
- public synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
- List<String> partVals, String colName) {
- partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName));
+ public void alterPartitionInCache(String dbName, String tblName, List<String> partVals,
+ Partition newPart) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.alterPartition(partVals, newPart, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
}
- public synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
- List<Partition> partitions = new ArrayList<>();
- int count = 0;
- for (PartitionWrapper wrapper : partitionCache.values()) {
- if (wrapper.getPartition().getDbName().equals(dbName)
- && wrapper.getPartition().getTableName().equals(tblName)
- && (max == -1 || count < max)) {
- partitions.add(CacheUtils.assemble(wrapper, this));
- count++;
+ public void alterPartitionsInCache(String dbName, String tblName, List<List<String>> partValsList,
+ List<Partition> newParts) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.alterPartitions(partValsList, newParts, this);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
- return partitions;
}
- public synchronized void alterPartitionInCache(String dbName, String tblName,
- List<String> partVals, Partition newPart) {
- removePartitionFromCache(dbName, tblName, partVals);
- addPartitionToCache(StringUtils.normalizeIdentifier(newPart.getDbName()),
- StringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
+ public void refreshPartitionsInCache(String dbName, String tblName, List<Partition> partitions) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.refreshPartitions(partitions, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
}
- public synchronized void alterPartitionInColStatsCache(String dbName, String tblName,
- List<String> partVals, Partition newPart) {
- String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
- Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- ColumnStatisticsObj colStatObj = entry.getValue();
- if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
- Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
- String newKey =
- CacheUtils.buildKey(StringUtils.normalizeIdentifier(newPart.getDbName()),
- StringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(),
- (String) decomposedKey[3]);
- newPartitionColStats.put(newKey, colStatObj);
- iterator.remove();
+ public void removePartitionColStatsFromCache(String dbName, String tblName,
+ List<String> partVals, String colName) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.removePartitionColStats(partVals, colName);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
- partitionColStatsCache.putAll(newPartitionColStats);
}
- public synchronized void updatePartitionColStatsInCache(String dbName, String tableName,
+ public void updatePartitionColStatsInCache(String dbName, String tableName,
List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
- for (ColumnStatisticsObj colStatObj : colStatsObjs) {
- // Get old stats object if present
- String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
- ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
- if (oldStatsObj != null) {
- // Update existing stat object's field
- LOG.debug("CachedStore: updating partition column stats for column: "
- + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName);
- StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
- } else {
- // No stats exist for this key; add a new object to the cache
- partitionColStatsCache.put(key, colStatObj);
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+ if (tblWrapper != null) {
+ tblWrapper.updatePartitionColStats(partVals, colStatsObjs);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- public synchronized int getCachedPartitionCount() {
- return partitionCache.size();
- }
-
- public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) {
- return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null;
- }
-
- public synchronized void addPartitionColStatsToCache(
- List<ColStatsObjWithSourceInfo> colStatsForDB) {
- for (ColStatsObjWithSourceInfo colStatWithSourceInfo : colStatsForDB) {
- List<String> partVals;
- try {
- partVals = Warehouse.getPartValuesFromPartName(colStatWithSourceInfo.getPartName());
- ColumnStatisticsObj colStatObj = colStatWithSourceInfo.getColStatsObj();
- String key = CacheUtils.buildKey(colStatWithSourceInfo.getDbName(),
- colStatWithSourceInfo.getTblName(), partVals, colStatObj.getColName());
- partitionColStatsCache.put(key, colStatObj);
- } catch (MetaException e) {
- LOG.info("Unable to add partition stats for: {} to SharedCache",
- colStatWithSourceInfo.getPartName(), e);
+ public ColumnStatisticsObj getPartitionColStatsFromCache(String dbName, String tblName,
+ List<String> partVal, String colName) {
+ ColumnStatisticsObj colStatObj = null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null){
+ colStatObj = tblWrapper.getPartitionColStats(partVal, colName);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
-
- }
-
- public synchronized void refreshPartitionColStats(String dbName,
- List<ColStatsObjWithSourceInfo> colStatsForDB) {
- LOG.debug("CachedStore: updating cached partition column stats objects for database: {}",
- dbName);
- removePartitionColStatsFromCache(dbName);
- addPartitionColStatsToCache(colStatsForDB);
+ return colStatObj;
}
- public synchronized void addAggregateStatsToCache(String dbName, String tblName,
- AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
- if (aggrStatsAllPartitions != null) {
- for (ColumnStatisticsObj colStatObj : aggrStatsAllPartitions.getColStats()) {
- String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName());
- List<ColumnStatisticsObj> value = new ArrayList<ColumnStatisticsObj>();
- value.add(StatsType.ALL.getPosition(), colStatObj);
- aggrColStatsCache.put(key, value);
- }
- }
- if (aggrStatsAllButDefaultPartition != null) {
- for (ColumnStatisticsObj colStatObj : aggrStatsAllButDefaultPartition.getColStats()) {
- String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName());
- List<ColumnStatisticsObj> value = aggrColStatsCache.get(key);
- if ((value != null) && (value.size() > 0)) {
- value.add(StatsType.ALLBUTDEFAULT.getPosition(), colStatObj);
- }
+ public void refreshPartitionColStatsInCache(String dbName, String tblName,
+ List<ColumnStatistics> partitionColStats) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.refreshPartitionColStats(partitionColStats);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
}
public List<ColumnStatisticsObj> getAggrStatsFromCache(String dbName, String tblName,
List<String> colNames, StatsType statsType) {
- List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
- for (String colName : colNames) {
- String key = CacheUtils.buildKey(dbName, tblName, colName);
- List<ColumnStatisticsObj> colStatList = aggrColStatsCache.get(key);
- // If unable to find stats for a column, return null so we can build stats
- if (colStatList == null) {
- return null;
- }
- ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition());
- // If unable to find stats for this StatsType, return null so we can build
- // stats
- if (colStatObj == null) {
- return null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ return tblWrapper.getAggrPartitionColStats(colNames, statsType);
}
- colStats.add(colStatObj);
+ } finally {
+ cacheLock.readLock().unlock();
}
- return colStats;
+ return null;
}
- public synchronized void removeAggrPartitionColStatsFromCache(String dbName, String tblName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, List<ColumnStatisticsObj>>> iterator =
- aggrColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, List<ColumnStatisticsObj>> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public void addAggregateStatsToCache(String dbName, String tblName,
+ AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null){
+ tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
+ aggrStatsAllButDefaultPartition);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- public synchronized void refreshAggregateStatsCache(String dbName, String tblName,
+ public void refreshAggregateStatsInCache(String dbName, String tblName,
AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
- LOG.debug("CachedStore: updating aggregate stats cache for database: {}, table: {}", dbName,
- tblName);
- removeAggrPartitionColStatsFromCache(dbName, tblName);
- addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions,
- aggrStatsAllButDefaultPartition);
- }
-
- public synchronized void addTableColStatsToCache(String dbName, String tableName,
- List<ColumnStatisticsObj> colStatsForTable) {
- for (ColumnStatisticsObj colStatObj : colStatsForTable) {
- String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
- tableColStatsCache.put(key, colStatObj);
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions,
+ aggrStatsAllButDefaultPartition);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- public synchronized void refreshTableColStats(String dbName, String tableName,
- List<ColumnStatisticsObj> colStatsForTable) {
- LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName
- + " and table: " + tableName);
- // Remove all old cache entries for this table
- removeTableColStatsFromCache(dbName, tableName);
- // Add new entries to cache
- addTableColStatsToCache(dbName, tableName, colStatsForTable);
- }
-
- public void increSd(StorageDescriptor sd, byte[] sdHash) {
+ public synchronized void increSd(StorageDescriptor sd, byte[] sdHash) {
ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
if (sdCache.containsKey(byteArray)) {
sdCache.get(byteArray).refCount++;
@@ -626,7 +1361,7 @@ public class SharedCache {
}
}
- public void decrSd(byte[] sdHash) {
+ public synchronized void decrSd(byte[] sdHash) {
ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray);
sdWrapper.refCount--;
@@ -635,50 +1370,11 @@ public class SharedCache {
}
}
- public StorageDescriptor getSdFromCache(byte[] sdHash) {
+ public synchronized StorageDescriptor getSdFromCache(byte[] sdHash) {
StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash));
return sdWrapper.getSd();
}
- // Replace databases in databaseCache with the new list
- public synchronized void refreshDatabases(List<Database> databases) {
- LOG.debug("CachedStore: updating cached database objects");
- for (String dbName : listCachedDatabases()) {
- removeDatabaseFromCache(dbName);
- }
- for (Database db : databases) {
- addDatabaseToCache(db.getName(), db);
- }
- }
-
- // Replace tables in tableCache with the new list
- public synchronized void refreshTables(String dbName, List<Table> tables) {
- LOG.debug("CachedStore: updating cached table objects for database: " + dbName);
- for (Table tbl : listCachedTables(dbName)) {
- removeTableFromCache(dbName, tbl.getTableName());
- }
- for (Table tbl : tables) {
- addTableToCache(dbName, tbl.getTableName(), tbl);
- }
- }
-
- public synchronized void refreshPartitions(String dbName, String tblName,
- List<Partition> partitions) {
- LOG.debug("CachedStore: updating cached partition objects for database: " + dbName
- + " and table: " + tblName);
- Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
- while (iterator.hasNext()) {
- PartitionWrapper partitionWrapper = iterator.next().getValue();
- if (partitionWrapper.getPartition().getDbName().equals(dbName)
- && partitionWrapper.getPartition().getTableName().equals(tblName)) {
- iterator.remove();
- }
- }
- for (Partition part : partitions) {
- addPartitionToCache(dbName, tblName, part);
- }
- }
-
@VisibleForTesting
Map<String, Database> getDatabaseCache() {
return databaseCache;
@@ -690,17 +1386,15 @@ public class SharedCache {
}
@VisibleForTesting
- Map<String, PartitionWrapper> getPartitionCache() {
- return partitionCache;
- }
-
- @VisibleForTesting
Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() {
return sdCache;
}
- @VisibleForTesting
- Map<String, ColumnStatisticsObj> getPartitionColStatsCache() {
- return partitionColStatsCache;
+ public long getUpdateCount() {
+ return cacheUpdateCount.get();
+ }
+
+ public void incrementUpdateCount() {
+ cacheUpdateCount.incrementAndGet();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 5aee2e3..e373753 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -339,7 +339,7 @@ public class MetaStoreUtils {
* @param md message descriptor to use to generate the hash
* @return the hash as a byte array
*/
- public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) {
+ public static synchronized byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) {
// Note all maps and lists have to be absolutely sorted. Otherwise we'll produce different
// results for hashes based on the OS or JVM being used.
md.reset();
@@ -434,6 +434,15 @@ public class MetaStoreUtils {
return colNames;
}
+ public static List<String> getColumnNamesForPartition(Partition partition) {
+ List<String> colNames = new ArrayList<>();
+ Iterator<FieldSchema> colsIterator = partition.getSd().getColsIterator();
+ while (colsIterator.hasNext()) {
+ colNames.add(colsIterator.next().getName());
+ }
+ return colNames;
+ }
+
/**
* validateName
*
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 5edf8b3..e9527c7 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -1046,13 +1046,6 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath);
}
- @Override
- public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
- throws MetaException, NoSuchObjectException {
- // TODO Auto-generated method stub
- return null;
- }
-
public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException,
NoSuchObjectException {
objectStore.createISchema(schema);
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 132cdc3..8fc0c83 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -1033,13 +1033,6 @@ public class DummyRawStoreForJdoConnection implements RawStore {
String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException {
}
- @Override
- public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
- throws MetaException, NoSuchObjectException {
- // TODO Auto-generated method stub
- return null;
- }
-
public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException {
}
[4/4] hive git commit: HIVE-18264: CachedStore: Store cached
partitions/col stats within the table cache and make prewarm non-blocking
(Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)
Posted by vg...@apache.org.
HIVE-18264: CachedStore: Store cached partitions/col stats within the table cache and make prewarm non-blocking (Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/26c0ab6a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/26c0ab6a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/26c0ab6a
Branch: refs/heads/master
Commit: 26c0ab6adb48755ef2f5cff2ec9c4b0e9a431821
Parents: 79e8869
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Mon Mar 19 10:47:37 2018 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Mon Mar 19 10:47:37 2018 -0700
----------------------------------------------------------------------
.../listener/DummyRawStoreFailEvent.java | 9 +-
.../apache/hive/service/server/HiveServer2.java | 6 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 4 -
.../hadoop/hive/metastore/ObjectStore.java | 30 -
.../apache/hadoop/hive/metastore/RawStore.java | 11 -
.../hadoop/hive/metastore/cache/CacheUtils.java | 85 +-
.../hive/metastore/cache/CachedStore.java | 1552 +++++------------
.../hive/metastore/cache/SharedCache.java | 1588 +++++++++++++-----
.../hive/metastore/utils/MetaStoreUtils.java | 11 +-
.../DummyRawStoreControlledCommit.java | 7 -
.../DummyRawStoreForJdoConnection.java | 7 -
.../hive/metastore/cache/TestCachedStore.java | 546 +++---
.../src/test/resources/log4j2.properties | 74 +-
13 files changed, 2043 insertions(+), 1887 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 6144b61..e2244a1 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -976,7 +976,7 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
public List<WMResourcePlan> getAllResourcePlans() throws MetaException {
return objectStore.getAllResourcePlans();
}
-
+
@Override
public WMFullResourcePlan alterResourcePlan(String name, WMNullableResourcePlan resourcePlan,
boolean canActivateDisabled, boolean canDeactivate, boolean isReplace)
@@ -1069,13 +1069,6 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath);
}
- @Override
- public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
- throws MetaException, NoSuchObjectException {
- // TODO Auto-generated method stub
- return null;
- }
-
public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException,
NoSuchObjectException {
objectStore.createISchema(schema);
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 5b792ac..bb92c44 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
-import org.apache.hadoop.hive.metastore.cache.CachedStore;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
@@ -163,9 +162,6 @@ public class HiveServer2 extends CompositeService {
LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t);
}
- // Initialize cachedstore with background prewarm. The prewarm will only start if configured.
- CachedStore.initSharedCacheAsync(hiveConf);
-
cliService = new CLIService(this);
addService(cliService);
final HiveServer2 hiveServer2 = this;
@@ -570,7 +566,7 @@ public class HiveServer2 extends CompositeService {
private void removeServerInstanceFromZooKeeper() throws Exception {
setDeregisteredWithZooKeeper(true);
-
+
if (znode != null) {
znode.close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 66353e7..5285570 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -73,7 +73,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
-import org.apache.hadoop.hive.metastore.cache.CachedStore;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.events.AddNotNullConstraintEvent;
@@ -7962,9 +7961,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ThreadPool.shutdown();
}, 10);
- // This will only initialize the cache if configured.
- CachedStore.initSharedCacheAsync(conf);
-
//Start Metrics for Standalone (Remote) Mode
if (MetastoreConf.getBoolVar(conf, ConfVars.METRICS_ENABLED)) {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 1f75105..88d88ed 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -201,7 +201,6 @@ import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
import org.apache.hadoop.hive.metastore.utils.ObjectPair;
import org.apache.thrift.TException;
import org.datanucleus.AbstractNucleusContext;
@@ -7906,35 +7905,6 @@ public class ObjectStore implements RawStore, Configurable {
}
@Override
- public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
- throws MetaException, NoSuchObjectException {
- final boolean enableBitVector =
- MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_FETCH_BITVECTOR);
- return new GetHelper<List<ColStatsObjWithSourceInfo>>(dbName, null, true, false) {
- @Override
- protected List<ColStatsObjWithSourceInfo> getSqlResult(
- GetHelper<List<ColStatsObjWithSourceInfo>> ctx) throws MetaException {
- return directSql.getColStatsForAllTablePartitions(dbName, enableBitVector);
- }
-
- @Override
- protected List<ColStatsObjWithSourceInfo> getJdoResult(
- GetHelper<List<ColStatsObjWithSourceInfo>> ctx)
- throws MetaException, NoSuchObjectException {
- // This is fast path for query optimizations, if we can find this info
- // quickly using directSql, do it. No point in failing back to slow path
- // here.
- throw new MetaException("Jdo path is not implemented for getPartitionColStatsForDatabase.");
- }
-
- @Override
- protected String describeResult() {
- return null;
- }
- }.run(true);
- }
-
- @Override
public void flushCache() {
// NOP as there's no caching
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
index b079f8b..ad4af1a 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -596,17 +596,6 @@ public interface RawStore extends Configurable {
List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException;
/**
- * Get column stats for all partitions of all tables in the database
- *
- * @param dbName
- * @return List of column stats objects for all partitions of all tables in the database
- * @throws MetaException
- * @throws NoSuchObjectException
- */
- List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
- throws MetaException, NoSuchObjectException;
-
- /**
* Get the next notification event.
* @param rqst Request containing information on the last processed notification.
* @return list of notifications, sorted by eventId
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
index f0f650d..97d8af6 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
@@ -17,78 +17,57 @@
*/
package org.apache.hadoop.hive.metastore.cache;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
+import org.apache.hadoop.hive.metastore.cache.SharedCache.PartitionWrapper;
+import org.apache.hadoop.hive.metastore.cache.SharedCache.TableWrapper;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
public class CacheUtils {
private static final String delimit = "\u0001";
- public static String buildKey(String dbName) {
- return dbName;
- }
-
- public static String buildKeyWithDelimit(String dbName) {
- return buildKey(dbName) + delimit;
- }
-
- public static String buildKey(String dbName, String tableName) {
+ /**
+ * Builds a key for the table cache which is concatenation of database name and table name
+ * separated by a delimiter
+ *
+ * @param dbName
+ * @param tableName
+ * @return
+ */
+ public static String buildTableCacheKey(String dbName, String tableName) {
return dbName + delimit + tableName;
}
- public static String buildKeyWithDelimit(String dbName, String tableName) {
- return buildKey(dbName, tableName) + delimit;
- }
-
- public static String buildKey(String dbName, String tableName, List<String> partVals) {
- String key = buildKey(dbName, tableName);
- if (CollectionUtils.isNotEmpty(partVals)) {
- key += delimit;
- key += String.join(delimit, partVals);
+ /**
+ * Builds a key for the partition cache which is concatenation of partition values, each value
+ * separated by a delimiter
+ *
+ * @param list of partition values
+ * @return cache key for partitions cache
+ */
+ public static String buildPartitionCacheKey(List<String> partVals) {
+ if (partVals == null || partVals.isEmpty()) {
+ return "";
}
- return key;
- }
-
- public static String buildKeyWithDelimit(String dbName, String tableName, List<String> partVals) {
- return buildKey(dbName, tableName, partVals) + delimit;
- }
-
- public static String buildKey(String dbName, String tableName, List<String> partVals, String colName) {
- String key = buildKey(dbName, tableName, partVals);
- return key + delimit + colName;
- }
-
- public static String buildKey(String dbName, String tableName, String colName) {
- String key = buildKey(dbName, tableName);
- return key + delimit + colName;
- }
-
- public static String[] splitTableColStats(String key) {
- return key.split(delimit);
- }
-
- public static Object[] splitPartitionColStats(String key) {
- Object[] result = new Object[4];
- String[] comps = key.split(delimit);
- result[0] = comps[0];
- result[1] = comps[1];
- result[2] = Arrays.asList((Arrays.copyOfRange(comps, 2, comps.length - 1)));
- result[3] = comps[comps.length-1];
- return result;
+ return String.join(delimit, partVals);
}
- public static Object[] splitAggrColStats(String key) {
- return key.split(delimit);
+ /**
+ * Builds a key for the partitions column cache which is concatenation of partition values, each
+ * value separated by a delimiter and the column name
+ *
+ * @param list of partition values
+ * @param column name
+ * @return cache key for partitions column stats cache
+ */
+ public static String buildPartitonColStatsCacheKey(List<String> partVals, String colName) {
+ return buildPartitionCacheKey(partVals) + delimit + colName;
}
static Table assemble(TableWrapper wrapper, SharedCache sharedCache) {