You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2019/06/20 06:17:28 UTC
[hive] branch master updated: HIVE-21787: Metastore table cache LRU
eviction (Sam An, reviewed by Daniel Dai)
This is an automated email from the ASF dual-hosted git repository.
daijy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b36b89c HIVE-21787: Metastore table cache LRU eviction (Sam An, reviewed by Daniel Dai)
b36b89c is described below
commit b36b89ce8eb542038990af0bd2cd8de57176acfa
Author: Daniel Dai <da...@cloudera.com>
AuthorDate: Wed Jun 19 23:17:17 2019 -0700
HIVE-21787: Metastore table cache LRU eviction (Sam An, reviewed by Daniel Dai)
---
.../cache/TestCachedStoreUpdateUsingEvents.java | 10 +-
.../hadoop/hive/metastore/cache/CacheUtils.java | 2 +-
.../hadoop/hive/metastore/cache/CachedStore.java | 1389 ++++++++------------
.../hadoop/hive/metastore/cache/SharedCache.java | 723 ++++++----
.../hive/metastore/cache/TestCachedStore.java | 316 ++++-
5 files changed, 1264 insertions(+), 1176 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
index cdfc60c..285f30b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java
@@ -53,6 +53,10 @@ public class TestCachedStoreUpdateUsingEvents {
rawStore = new ObjectStore();
rawStore.setConf(hmsHandler.getConf());
+
+ CachedStore cachedStore = new CachedStore();
+ CachedStore.clearSharedCache();
+ cachedStore.setConfForTest(conf);
sharedCache = CachedStore.getSharedCache();
// Stop the CachedStore cache update service. We'll start it explicitly to control the test
@@ -190,7 +194,7 @@ public class TestCachedStoreUpdateUsingEvents {
hmsHandler.drop_database(dbName, true, true);
hmsHandler.drop_database(dbName2, true, true);
sharedCache.getDatabaseCache().clear();
- sharedCache.getTableCache().clear();
+ sharedCache.clearTableCache();
sharedCache.getSdCache().clear();
}
@@ -267,7 +271,7 @@ public class TestCachedStoreUpdateUsingEvents {
Assert.assertNull(tblRead);
sharedCache.getDatabaseCache().clear();
- sharedCache.getTableCache().clear();
+ sharedCache.clearTableCache();
sharedCache.getSdCache().clear();
}
@@ -379,7 +383,7 @@ public class TestCachedStoreUpdateUsingEvents {
// Clean up
rawStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName);
sharedCache.getDatabaseCache().clear();
- sharedCache.getTableCache().clear();
+ sharedCache.clearTableCache();
sharedCache.getSdCache().clear();
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
index d50fa13..bb673f4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java
@@ -65,7 +65,7 @@ public class CacheUtils {
return buildKey(catName, dbName, tableName, colName);
}
- private static String buildKey(String... elements) {
+ public static String buildKey(String... elements) {
return org.apache.commons.lang.StringUtils.join(elements, delimit);
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 07f325d..511e6c1 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.metastore.cache;
-
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -52,7 +51,6 @@ import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.HiveAlterHandler;
-import org.apache.hadoop.hive.metastore.HiveMetaException;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType;
import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
@@ -120,14 +118,15 @@ public class CachedStore implements RawStore, Configurable {
private Configuration conf;
private static boolean areTxnStatsSupported;
private PartitionExpressionProxy expressionProxy = null;
+ private static String lock = "L";
+ private static boolean sharedCacheInited = false;
private static SharedCache sharedCache = new SharedCache();
- private static boolean canUseEvents = false;
+ private static boolean canUseEvents = false;
private static long lastEventId;
- static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
- @Override
- public void setConf(Configuration conf) {
+ @Override public void setConf(Configuration conf) {
setConfInternal(conf);
initBlackListWhiteList(conf);
initSharedCache(conf);
@@ -140,12 +139,16 @@ public class CachedStore implements RawStore, Configurable {
* @param conf
*/
void setConfForTest(Configuration conf) {
+ setConfForTestExceptSharedCache(conf);
+ initSharedCache(conf);
+ }
+
+ void setConfForTestExceptSharedCache(Configuration conf) {
setConfInternal(conf);
initBlackListWhiteList(conf);
- initSharedCache(conf);
}
- synchronized private static void triggerUpdateUsingEvent(RawStore rawStore) {
+ private static synchronized void triggerUpdateUsingEvent(RawStore rawStore) {
if (!isCachePrewarmed.get()) {
LOG.error("cache update should be done only after prewarm");
throw new RuntimeException("cache update should be done only after prewarm");
@@ -159,12 +162,12 @@ public class CachedStore implements RawStore, Configurable {
throw new RuntimeException(e.getMessage());
} finally {
long endTime = System.nanoTime();
- LOG.info("Time taken in updateUsingNotificationEvents for num events : " + (lastEventId - preEventId) + " = " +
- (endTime - startTime) / 1000000 + "ms");
+ LOG.info("Time taken in updateUsingNotificationEvents for num events : " + (lastEventId - preEventId) + " = "
+ + (endTime - startTime) / 1000000 + "ms");
}
}
- synchronized private static void triggerPreWarm(RawStore rawStore) {
+ private static synchronized void triggerPreWarm(RawStore rawStore) {
lastEventId = rawStore.getCurrentNotificationEventId().getEventId();
prewarm(rawStore);
}
@@ -177,8 +180,7 @@ public class CachedStore implements RawStore, Configurable {
}
LOG.info("canUseEvents is set to " + canUseEvents + " in cached Store");
- String rawStoreClassName =
- MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
+ String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
if (rawStore == null) {
try {
rawStore = (JavaUtils.getClass(rawStoreClassName, RawStore.class)).newInstance();
@@ -198,38 +200,37 @@ public class CachedStore implements RawStore, Configurable {
}
private void initSharedCache(Configuration conf) {
- long maxSharedCacheSizeInBytes =
- MetastoreConf.getSizeVar(conf, ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY);
- sharedCache.initialize(maxSharedCacheSizeInBytes);
- if (maxSharedCacheSizeInBytes > 0) {
- LOG.info("Maximum memory that the cache will use: {} KB",
- maxSharedCacheSizeInBytes / (1024));
+ synchronized (lock) {
+ if (!sharedCacheInited) {
+ sharedCacheInited = true;
+ sharedCache.initialize(conf);
+ }
}
}
- @VisibleForTesting
- public static SharedCache getSharedCache() {
- return sharedCache;
+ @VisibleForTesting public static SharedCache getSharedCache() {
+ return sharedCache;
}
- static private ColumnStatistics updateStatsForAlterPart(RawStore rawStore, Table before, String catalogName,
- String dbName, String tableName, Partition part) throws Exception {
+ private static ColumnStatistics updateStatsForAlterPart(RawStore rawStore, Table before, String catalogName,
+ String dbName, String tableName, Partition part) throws Exception {
ColumnStatistics colStats;
List<String> deletedCols = new ArrayList<>();
- colStats = HiveAlterHandler.updateOrGetPartitionColumnStats(rawStore, catalogName, dbName, tableName,
- part.getValues(), part.getSd().getCols(), before, part, null, deletedCols);
+ colStats = HiveAlterHandler
+ .updateOrGetPartitionColumnStats(rawStore, catalogName, dbName, tableName, part.getValues(),
+ part.getSd().getCols(), before, part, null, deletedCols);
for (String column : deletedCols) {
sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, part.getValues(), column);
}
if (colStats != null) {
- sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, part.getWriteId(),
- part.getValues(), part.getParameters(), colStats.getStatsObj());
+ sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, part.getWriteId(), part.getValues(),
+ part.getParameters(), colStats.getStatsObj());
}
return colStats;
}
- static private void updateStatsForAlterTable(RawStore rawStore, Table tblBefore, Table tblAfter, String catalogName,
- String dbName, String tableName) throws Exception {
+ private static void updateStatsForAlterTable(RawStore rawStore, Table tblBefore, Table tblAfter, String catalogName,
+ String dbName, String tableName) throws Exception {
ColumnStatistics colStats = null;
List<String> deletedCols = new ArrayList<>();
if (tblBefore.isSetPartitionKeys()) {
@@ -239,19 +240,19 @@ public class CachedStore implements RawStore, Configurable {
}
}
- List<ColumnStatisticsObj> statisticsObjs = HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, tblBefore,
- tblAfter,null, null, rawStore.getConf(), deletedCols);
+ List<ColumnStatisticsObj> statisticsObjs = HiveAlterHandler
+ .alterTableUpdateTableColumnStats(rawStore, tblBefore, tblAfter, null, null, rawStore.getConf(), deletedCols);
if (colStats != null) {
- sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, tblAfter.getWriteId(),
- statisticsObjs, tblAfter.getParameters());
+ sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, tblAfter.getWriteId(), statisticsObjs,
+ tblAfter.getParameters());
}
for (String column : deletedCols) {
sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, column);
}
}
- @VisibleForTesting
- public static long updateUsingNotificationEvents(RawStore rawStore, long lastEventId) throws Exception {
+ @VisibleForTesting public static long updateUsingNotificationEvents(RawStore rawStore, long lastEventId)
+ throws Exception {
LOG.debug("updating cache using notification events starting from event id " + lastEventId);
NotificationEventRequest rqst = new NotificationEventRequest(lastEventId);
@@ -305,96 +306,95 @@ public class CachedStore implements RawStore, Configurable {
continue;
}
switch (event.getEventType()) {
- case MessageBuilder.ADD_PARTITION_EVENT:
- AddPartitionMessage addPartMessage = deserializer.getAddPartitionMessage(message);
- sharedCache.addPartitionsToCache(catalogName,
- dbName, tableName, addPartMessage.getPartitionObjs());
- break;
- case MessageBuilder.ALTER_PARTITION_EVENT:
- AlterPartitionMessage alterPartitionMessage = deserializer.getAlterPartitionMessage(message);
- sharedCache.alterPartitionInCache(catalogName, dbName, tableName,
- alterPartitionMessage.getPtnObjBefore().getValues(), alterPartitionMessage.getPtnObjAfter());
- //TODO : Use the stat object stored in the alter table message to update the stats in cache.
- updateStatsForAlterPart(rawStore, alterPartitionMessage.getTableObj(),
- catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter());
- break;
- case MessageBuilder.DROP_PARTITION_EVENT:
- DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message);
- for (Map<String, String> partMap : dropPartitionMessage.getPartitions()) {
- sharedCache.removePartitionFromCache(catalogName, dbName, tableName, new ArrayList<>(partMap.values()));
- }
- break;
- case MessageBuilder.CREATE_TABLE_EVENT:
- CreateTableMessage createTableMessage = deserializer.getCreateTableMessage(message);
- sharedCache.addTableToCache(catalogName, dbName,
- tableName, createTableMessage.getTableObj());
- break;
- case MessageBuilder.ALTER_TABLE_EVENT:
- AlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(message);
- sharedCache.alterTableInCache(catalogName, dbName, tableName, alterTableMessage.getTableObjAfter());
- //TODO : Use the stat object stored in the alter table message to update the stats in cache.
- updateStatsForAlterTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
- catalogName, dbName, tableName);
- break;
- case MessageBuilder.DROP_TABLE_EVENT:
- DropTableMessage dropTableMessage = deserializer.getDropTableMessage(message);
- int batchSize = MetastoreConf.getIntVar(rawStore.getConf(), ConfVars.BATCH_RETRIEVE_OBJECTS_MAX);
- String tableDnsPath = null;
- Path tablePath = new Path(dropTableMessage.getTableObj().getSd().getLocation());
- if (tablePath != null) {
- tableDnsPath = new Warehouse(rawStore.getConf()).getDnsPath(tablePath).toString();
- }
+ case MessageBuilder.ADD_PARTITION_EVENT:
+ AddPartitionMessage addPartMessage = deserializer.getAddPartitionMessage(message);
+ sharedCache.addPartitionsToCache(catalogName, dbName, tableName, addPartMessage.getPartitionObjs());
+ break;
+ case MessageBuilder.ALTER_PARTITION_EVENT:
+ AlterPartitionMessage alterPartitionMessage = deserializer.getAlterPartitionMessage(message);
+ sharedCache
+ .alterPartitionInCache(catalogName, dbName, tableName, alterPartitionMessage.getPtnObjBefore().getValues(),
+ alterPartitionMessage.getPtnObjAfter());
+ //TODO : Use the stat object stored in the alter table message to update the stats in cache.
+ updateStatsForAlterPart(rawStore, alterPartitionMessage.getTableObj(), catalogName, dbName, tableName,
+ alterPartitionMessage.getPtnObjAfter());
+ break;
+ case MessageBuilder.DROP_PARTITION_EVENT:
+ DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message);
+ for (Map<String, String> partMap : dropPartitionMessage.getPartitions()) {
+ sharedCache.removePartitionFromCache(catalogName, dbName, tableName, new ArrayList<>(partMap.values()));
+ }
+ break;
+ case MessageBuilder.CREATE_TABLE_EVENT:
+ CreateTableMessage createTableMessage = deserializer.getCreateTableMessage(message);
+ sharedCache.addTableToCache(catalogName, dbName, tableName, createTableMessage.getTableObj());
+ break;
+ case MessageBuilder.ALTER_TABLE_EVENT:
+ AlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(message);
+ sharedCache.alterTableInCache(catalogName, dbName, tableName, alterTableMessage.getTableObjAfter());
+ //TODO : Use the stat object stored in the alter table message to update the stats in cache.
+ updateStatsForAlterTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
+ catalogName, dbName, tableName);
+ break;
+ case MessageBuilder.DROP_TABLE_EVENT:
+ DropTableMessage dropTableMessage = deserializer.getDropTableMessage(message);
+ int batchSize = MetastoreConf.getIntVar(rawStore.getConf(), ConfVars.BATCH_RETRIEVE_OBJECTS_MAX);
+ String tableDnsPath = null;
+ Path tablePath = new Path(dropTableMessage.getTableObj().getSd().getLocation());
+ if (tablePath != null) {
+ tableDnsPath = new Warehouse(rawStore.getConf()).getDnsPath(tablePath).toString();
+ }
- while (true) {
- Map<String, String> partitionLocations = rawStore.getPartitionLocations(catalogName, dbName, tableName,
- tableDnsPath, batchSize);
- if (partitionLocations == null || partitionLocations.isEmpty()) {
- break;
- }
- sharedCache.removePartitionFromCache(catalogName, dbName, tableName,
- new ArrayList<>(partitionLocations.values()));
+ while (true) {
+ Map<String, String> partitionLocations =
+ rawStore.getPartitionLocations(catalogName, dbName, tableName, tableDnsPath, batchSize);
+ if (partitionLocations == null || partitionLocations.isEmpty()) {
+ break;
}
- sharedCache.removeTableFromCache(catalogName, dbName, tableName);
- break;
- case MessageBuilder.CREATE_DATABASE_EVENT:
- CreateDatabaseMessage createDatabaseMessage = deserializer.getCreateDatabaseMessage(message);
- sharedCache.addDatabaseToCache(createDatabaseMessage.getDatabaseObject());
- break;
- case MessageBuilder.ALTER_DATABASE_EVENT:
- AlterDatabaseMessage alterDatabaseMessage = deserializer.getAlterDatabaseMessage(message);
- sharedCache.alterDatabaseInCache(catalogName, dbName, alterDatabaseMessage.getDbObjAfter());
- break;
- case MessageBuilder.DROP_DATABASE_EVENT:
- sharedCache.removeDatabaseFromCache(catalogName, dbName);
- break;
- case MessageBuilder.CREATE_CATALOG_EVENT:
- case MessageBuilder.DROP_CATALOG_EVENT:
- case MessageBuilder.ALTER_CATALOG_EVENT:
- // TODO : Need to add cache invalidation for catalog events
- LOG.error("catalog Events are not supported for cache invalidation : " + event.getEventType());
- break;
- case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT:
- UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message);
- sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, msg.getWriteId(),
- msg.getColumnStatistics().getStatsObj(), msg.getParameters());
- break;
- case MessageBuilder.DELETE_TBL_COL_STAT_EVENT:
- DeleteTableColumnStatMessage msgDel = deserializer.getDeleteTableColumnStatMessage(message);
- sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, msgDel.getColName());
- break;
- case MessageBuilder.UPDATE_PART_COL_STAT_EVENT:
- UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message);
- sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getWriteId(),
- msgPartUpdate.getPartVals(), msgPartUpdate.getParameters(),
- msgPartUpdate.getColumnStatistics().getStatsObj());
- break;
- case MessageBuilder.DELETE_PART_COL_STAT_EVENT:
- DeletePartitionColumnStatMessage msgPart = deserializer.getDeletePartitionColumnStatMessage(message);
- sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName,
- msgPart.getPartValues(), msgPart.getColName());
- break;
- default:
- LOG.error("Event is not supported for cache invalidation : " + event.getEventType());
+ sharedCache
+ .removePartitionFromCache(catalogName, dbName, tableName, new ArrayList<>(partitionLocations.values()));
+ }
+ sharedCache.removeTableFromCache(catalogName, dbName, tableName);
+ break;
+ case MessageBuilder.CREATE_DATABASE_EVENT:
+ CreateDatabaseMessage createDatabaseMessage = deserializer.getCreateDatabaseMessage(message);
+ sharedCache.addDatabaseToCache(createDatabaseMessage.getDatabaseObject());
+ break;
+ case MessageBuilder.ALTER_DATABASE_EVENT:
+ AlterDatabaseMessage alterDatabaseMessage = deserializer.getAlterDatabaseMessage(message);
+ sharedCache.alterDatabaseInCache(catalogName, dbName, alterDatabaseMessage.getDbObjAfter());
+ break;
+ case MessageBuilder.DROP_DATABASE_EVENT:
+ sharedCache.removeDatabaseFromCache(catalogName, dbName);
+ break;
+ case MessageBuilder.CREATE_CATALOG_EVENT:
+ case MessageBuilder.DROP_CATALOG_EVENT:
+ case MessageBuilder.ALTER_CATALOG_EVENT:
+ // TODO : Need to add cache invalidation for catalog events
+ LOG.error("catalog Events are not supported for cache invalidation : " + event.getEventType());
+ break;
+ case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT:
+ UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message);
+ sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, msg.getWriteId(),
+ msg.getColumnStatistics().getStatsObj(), msg.getParameters());
+ break;
+ case MessageBuilder.DELETE_TBL_COL_STAT_EVENT:
+ DeleteTableColumnStatMessage msgDel = deserializer.getDeleteTableColumnStatMessage(message);
+ sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, msgDel.getColName());
+ break;
+ case MessageBuilder.UPDATE_PART_COL_STAT_EVENT:
+ UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message);
+ sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getWriteId(),
+ msgPartUpdate.getPartVals(), msgPartUpdate.getParameters(),
+ msgPartUpdate.getColumnStatistics().getStatsObj());
+ break;
+ case MessageBuilder.DELETE_PART_COL_STAT_EVENT:
+ DeletePartitionColumnStatMessage msgPart = deserializer.getDeletePartitionColumnStatMessage(message);
+ sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, msgPart.getPartValues(),
+ msgPart.getColName());
+ break;
+ default:
+ LOG.error("Event is not supported for cache invalidation : " + event.getEventType());
}
}
return lastEventId;
@@ -535,8 +535,9 @@ public class CachedStore implements RawStore, Configurable {
Deadline.stopTimer();
}
// If the table could not cached due to memory limit, stop prewarm
- boolean isSuccess = sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
- aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
+ boolean isSuccess = sharedCache
+ .populateTableInCache(table, tableColStats, partitions, partitionColStats, aggrStatsAllPartitions,
+ aggrStatsAllButDefaultPartition);
if (isSuccess) {
LOG.trace("Cached Database: {}'s Table: {}.", dbName, tblName);
} else {
@@ -565,8 +566,16 @@ public class CachedStore implements RawStore, Configurable {
}
}
+ /**
+ * This method is only used for testing. Test method will init a new cache and use the new handle to query the cache
+ * to get content in the cache. In production, no code would/should call this method, because SharedCache should be
+ * a singleton.
+ */
@VisibleForTesting
static void clearSharedCache() {
+ synchronized (lock) {
+ sharedCacheInited = false;
+ }
sharedCache = new SharedCache();
}
@@ -605,23 +614,20 @@ public class CachedStore implements RawStore, Configurable {
}
}
- @VisibleForTesting
- static void setCachePrewarmedState(boolean state) {
+ @VisibleForTesting static void setCachePrewarmedState(boolean state) {
isCachePrewarmed.set(state);
}
private static void initBlackListWhiteList(Configuration conf) {
- whitelistPatterns = createPatterns(MetastoreConf.getAsString(conf,
- MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST));
- blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf,
- MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST));
+ whitelistPatterns = createPatterns(
+ MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST));
+ blacklistPatterns = createPatterns(
+ MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST));
}
private static Collection<String> catalogsToCache(RawStore rs) throws MetaException {
- Collection<String> confValue =
- MetastoreConf.getStringCollection(rs.getConf(), ConfVars.CATALOGS_TO_CACHE);
- if (confValue == null || confValue.isEmpty() ||
- (confValue.size() == 1 && confValue.contains(""))) {
+ Collection<String> confValue = MetastoreConf.getStringCollection(rs.getConf(), ConfVars.CATALOGS_TO_CACHE);
+ if (confValue == null || confValue.isEmpty() || (confValue.size() == 1 && confValue.contains(""))) {
return rs.getCatalogs();
} else {
return confValue;
@@ -636,19 +642,17 @@ public class CachedStore implements RawStore, Configurable {
* @param conf
* @param runOnlyOnce
* @param shouldRunPrewarm
- */
- static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce,
+ */ static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce,
boolean shouldRunPrewarm) {
if (cacheUpdateMaster == null) {
initBlackListWhiteList(conf);
if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) {
- cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf,
- ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS);
+ cacheRefreshPeriodMS =
+ MetastoreConf.getTimeVar(conf, ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS);
}
LOG.info("CachedStore: starting cache update service (run every {} ms)", cacheRefreshPeriodMS);
cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
+ @Override public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId());
t.setDaemon(true);
@@ -656,24 +660,23 @@ public class CachedStore implements RawStore, Configurable {
}
});
if (!runOnlyOnce) {
- cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
- cacheRefreshPeriodMS, TimeUnit.MILLISECONDS);
+ cacheUpdateMaster
+ .scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, cacheRefreshPeriodMS,
+ TimeUnit.MILLISECONDS);
}
- }
+ }
if (runOnlyOnce) {
// Some tests control the execution of the background update thread
cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, TimeUnit.MILLISECONDS);
}
}
- @VisibleForTesting
- static synchronized boolean stopCacheUpdateService(long timeout) {
+ @VisibleForTesting static synchronized boolean stopCacheUpdateService(long timeout) {
boolean tasksStoppedBeforeShutdown = false;
if (cacheUpdateMaster != null) {
LOG.info("CachedStore: shutting down cache update service");
try {
- tasksStoppedBeforeShutdown =
- cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+ tasksStoppedBeforeShutdown = cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to "
+ "complete before shutting down. Will make a hard stop now.");
@@ -684,8 +687,7 @@ public class CachedStore implements RawStore, Configurable {
return tasksStoppedBeforeShutdown;
}
- @VisibleForTesting
- static void setCacheRefreshPeriod(long time) {
+ @VisibleForTesting static void setCacheRefreshPeriod(long time) {
cacheRefreshPeriodMS = time;
}
@@ -693,7 +695,6 @@ public class CachedStore implements RawStore, Configurable {
private boolean shouldRunPrewarm = true;
private final RawStore rawStore;
-
CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) {
this.shouldRunPrewarm = shouldRunPrewarm;
String rawStoreClassName =
@@ -708,8 +709,7 @@ public class CachedStore implements RawStore, Configurable {
}
}
- @Override
- public void run() {
+ @Override public void run() {
if (!shouldRunPrewarm) {
if (canUseEvents) {
try {
@@ -814,8 +814,9 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(catName, dbName, tblName)) {
continue;
}
- Table table = rawStore.getTable(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName));
+ Table table = rawStore
+ .getTable(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName));
tables.add(table);
}
success = sharedCache.refreshTablesInCache(catName, dbName, tables);
@@ -864,8 +865,9 @@ public class CachedStore implements RawStore, Configurable {
Deadline.startTimer("getPartitions");
List<Partition> partitions = rawStore.getPartitions(catName, dbName, tblName, -1);
Deadline.stopTimer();
- sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), partitions);
+ sharedCache
+ .refreshPartitionsInCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName), partitions);
LOG.debug("CachedStore: updated cached partition objects for catalog: {}, database: {}, table: {}", catName,
dbName, tblName);
} catch (MetaException | NoSuchObjectException e) {
@@ -886,7 +888,7 @@ public class CachedStore implements RawStore, Configurable {
// Get partition column stats for this table
Deadline.startTimer("getPartitionColumnStatistics");
List<ColumnStatistics> partitionColStats =
- rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames);
+ rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames);
Deadline.stopTimer();
sharedCache.refreshPartitionColStatsInCache(catName, dbName, tblName, partitionColStats);
Deadline.startTimer("getPartitionsByNames");
@@ -914,7 +916,8 @@ public class CachedStore implements RawStore, Configurable {
// but default partition
private static void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName,
String tblName) {
- LOG.debug("CachedStore: updating cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}",
+ LOG.debug(
+ "CachedStore: updating cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}",
catName, dbName, tblName);
try {
Table table = rawStore.getTable(catName, dbName, tblName);
@@ -943,10 +946,10 @@ public class CachedStore implements RawStore, Configurable {
rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
Deadline.stopTimer();
sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
+ StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
aggrStatsAllButDefaultPartition, null);
- LOG.debug("CachedStore: updated cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}",
- catName, dbName, tblName);
+ LOG.debug("CachedStore: updated cached aggregate partition col stats objects for catalog:"
+ + " {}, database: {}, table: {}", catName, dbName, tblName);
}
} catch (MetaException | NoSuchObjectException e) {
LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, e);
@@ -954,23 +957,19 @@ public class CachedStore implements RawStore, Configurable {
}
}
- @Override
- public Configuration getConf() {
+ @Override public Configuration getConf() {
return rawStore.getConf();
}
- @Override
- public void shutdown() {
+ @Override public void shutdown() {
rawStore.shutdown();
}
- @Override
- public boolean openTransaction() {
+ @Override public boolean openTransaction() {
return rawStore.openTransaction();
}
- @Override
- public boolean commitTransaction() {
+ @Override public boolean commitTransaction() {
if (!rawStore.commitTransaction()) {
return false;
}
@@ -994,18 +993,15 @@ public class CachedStore implements RawStore, Configurable {
return true;
}
- @Override
- public boolean isActiveTransaction() {
+ @Override public boolean isActiveTransaction() {
return rawStore.isActiveTransaction();
}
- @Override
- public void rollbackTransaction() {
+ @Override public void rollbackTransaction() {
rawStore.rollbackTransaction();
}
- @Override
- public void createCatalog(Catalog cat) throws MetaException {
+ @Override public void createCatalog(Catalog cat) throws MetaException {
rawStore.createCatalog(cat);
// in case of event based cache update, cache will not be updated for catalog.
if (!canUseEvents) {
@@ -1013,9 +1009,7 @@ public class CachedStore implements RawStore, Configurable {
}
}
- @Override
- public void alterCatalog(String catName, Catalog cat) throws MetaException,
- InvalidOperationException {
+ @Override public void alterCatalog(String catName, Catalog cat) throws MetaException, InvalidOperationException {
rawStore.alterCatalog(catName, cat);
// in case of event based cache update, cache will not be updated for catalog.
if (!canUseEvents) {
@@ -1023,8 +1017,7 @@ public class CachedStore implements RawStore, Configurable {
}
}
- @Override
- public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException {
+ @Override public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException {
// in case of event based cache update, cache will not be updated for catalog.
if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) {
return rawStore.getCatalog(catalogName);
@@ -1036,8 +1029,7 @@ public class CachedStore implements RawStore, Configurable {
return cat;
}
- @Override
- public List<String> getCatalogs() throws MetaException {
+ @Override public List<String> getCatalogs() throws MetaException {
// in case of event based cache update, cache will not be updated for catalog.
if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) {
return rawStore.getCatalogs();
@@ -1045,8 +1037,7 @@ public class CachedStore implements RawStore, Configurable {
return sharedCache.listCachedCatalogs();
}
- @Override
- public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException {
+ @Override public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException {
rawStore.dropCatalog(catalogName);
// in case of event based cache update, cache will not be updated for catalog.
@@ -1056,8 +1047,7 @@ public class CachedStore implements RawStore, Configurable {
}
}
- @Override
- public void createDatabase(Database db) throws InvalidObjectException, MetaException {
+ @Override public void createDatabase(Database db) throws InvalidObjectException, MetaException {
rawStore.createDatabase(db);
// in case of event based cache update, cache will be updated during commit.
if (!canUseEvents) {
@@ -1065,8 +1055,7 @@ public class CachedStore implements RawStore, Configurable {
}
}
- @Override
- public Database getDatabase(String catName, String dbName) throws NoSuchObjectException {
+ @Override public Database getDatabase(String catName, String dbName) throws NoSuchObjectException {
// in case of event based cache update, cache will be updated during commit. So within active transaction, read
// directly from rawStore to avoid reading stale data as the data updated during same transaction will not be
// updated in the cache.
@@ -1074,65 +1063,58 @@ public class CachedStore implements RawStore, Configurable {
return rawStore.getDatabase(catName, dbName);
}
dbName = dbName.toLowerCase();
- Database db = sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName));
+ Database db = sharedCache
+ .getDatabaseFromCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName));
if (db == null) {
throw new NoSuchObjectException();
}
return db;
}
- @Override
- public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException {
+ @Override public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException {
boolean succ = rawStore.dropDatabase(catName, dbName);
if (succ && !canUseEvents) {
// in case of event based cache update, cache will be updated during commit.
- sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName));
+ sharedCache
+ .removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName));
}
return succ;
}
- @Override
- public boolean alterDatabase(String catName, String dbName, Database db)
+ @Override public boolean alterDatabase(String catName, String dbName, Database db)
throws NoSuchObjectException, MetaException {
boolean succ = rawStore.alterDatabase(catName, dbName, db);
if (succ && !canUseEvents) {
// in case of event based cache update, cache will be updated during commit.
- sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName), db);
+ sharedCache
+ .alterDatabaseInCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName), db);
}
return succ;
}
- @Override
- public List<String> getDatabases(String catName, String pattern) throws MetaException {
+ @Override public List<String> getDatabases(String catName, String pattern) throws MetaException {
if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getDatabases(catName, pattern);
}
return sharedCache.listCachedDatabases(catName, pattern);
}
- @Override
- public List<String> getAllDatabases(String catName) throws MetaException {
+ @Override public List<String> getAllDatabases(String catName) throws MetaException {
if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) {
return rawStore.getAllDatabases(catName);
}
return sharedCache.listCachedDatabases(catName);
}
- @Override
- public boolean createType(Type type) {
+ @Override public boolean createType(Type type) {
return rawStore.createType(type);
}
- @Override
- public Type getType(String typeName) {
+ @Override public Type getType(String typeName) {
return rawStore.getType(typeName);
}
- @Override
- public boolean dropType(String typeName) {
+ @Override public boolean dropType(String typeName) {
return rawStore.dropType(typeName);
}
@@ -1154,8 +1136,7 @@ public class CachedStore implements RawStore, Configurable {
tbl.setTableType(tableType);
}
- @Override
- public void createTable(Table tbl) throws InvalidObjectException, MetaException {
+ @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException {
rawStore.createTable(tbl);
// in case of event based cache update, cache will be updated during commit.
if (canUseEvents) {
@@ -1171,8 +1152,7 @@ public class CachedStore implements RawStore, Configurable {
sharedCache.addTableToCache(catName, dbName, tblName, tbl);
}
- @Override
- public boolean dropTable(String catName, String dbName, String tblName)
+ @Override public boolean dropTable(String catName, String dbName, String tblName)
throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.dropTable(catName, dbName, tblName);
// in case of event based cache update, cache will be updated during commit.
@@ -1188,13 +1168,12 @@ public class CachedStore implements RawStore, Configurable {
return succ;
}
- @Override
- public Table getTable(String catName, String dbName, String tblName) throws MetaException {
+ @Override public Table getTable(String catName, String dbName, String tblName) throws MetaException {
return getTable(catName, dbName, tblName, null);
}
- @Override
- public Table getTable(String catName, String dbName, String tblName, String validWriteIds) throws MetaException {
+ @Override public Table getTable(String catName, String dbName, String tblName, String validWriteIds)
+ throws MetaException {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1209,7 +1188,11 @@ public class CachedStore implements RawStore, Configurable {
// let's move this table to the top of tblNamesBeingPrewarmed stack,
// so that it gets loaded to the cache faster and is available for subsequent requests
tblsPendingPrewarm.prioritizeTableForPrewarm(tblName);
- return rawStore.getTable(catName, dbName, tblName, validWriteIds);
+ Table t = rawStore.getTable(catName, dbName, tblName, validWriteIds);
+ if (t != null) {
+ sharedCache.addTableToCache(catName, dbName, tblName, t);
+ }
+ return t;
}
if (validWriteIds != null) {
tbl.setParameters(
@@ -1237,8 +1220,7 @@ public class CachedStore implements RawStore, Configurable {
return tbl;
}
- @Override
- public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
+ @Override public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartition(part);
// in case of event based cache update, cache will be updated during commit.
if (succ && !canUseEvents) {
@@ -1253,8 +1235,7 @@ public class CachedStore implements RawStore, Configurable {
return succ;
}
- @Override
- public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
+ @Override public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartitions(catName, dbName, tblName, parts);
// in case of event based cache update, cache will be updated during commit.
@@ -1270,9 +1251,8 @@ public class CachedStore implements RawStore, Configurable {
return succ;
}
- @Override
- public boolean addPartitions(String catName, String dbName, String tblName, PartitionSpecProxy partitionSpec,
- boolean ifNotExists) throws InvalidObjectException, MetaException {
+ @Override public boolean addPartitions(String catName, String dbName, String tblName,
+ PartitionSpecProxy partitionSpec, boolean ifNotExists) throws InvalidObjectException, MetaException {
boolean succ = rawStore.addPartitions(catName, dbName, tblName, partitionSpec, ifNotExists);
// in case of event based cache update, cache will be updated during commit.
if (succ && !canUseEvents) {
@@ -1291,65 +1271,56 @@ public class CachedStore implements RawStore, Configurable {
return succ;
}
- @Override
- public Partition getPartition(String catName, String dbName, String tblName, List<String> part_vals)
+ @Override public Partition getPartition(String catName, String dbName, String tblName, List<String> partVals)
throws MetaException, NoSuchObjectException {
- return getPartition(catName, dbName, tblName, part_vals, null);
+ return getPartition(catName, dbName, tblName, partVals, null);
}
- @Override
- public Partition getPartition(String catName, String dbName, String tblName,
- List<String> part_vals, String validWriteIds)
- throws MetaException, NoSuchObjectException {
+ @Override public Partition getPartition(String catName, String dbName, String tblName, List<String> partVals,
+ String validWriteIds) throws MetaException, NoSuchObjectException {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
- return rawStore.getPartition(
- catName, dbName, tblName, part_vals, validWriteIds);
+ return rawStore.getPartition(catName, dbName, tblName, partVals, validWriteIds);
}
- Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, part_vals);
+ Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, partVals);
if (part == null) {
// The table containing the partition is not yet loaded in cache
- return rawStore.getPartition(
- catName, dbName, tblName, part_vals, validWriteIds);
+ return rawStore.getPartition(catName, dbName, tblName, partVals, validWriteIds);
}
if (validWriteIds != null) {
Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
if (table == null) {
// The table containing the partition is not yet loaded in cache
- return rawStore.getPartition(
- catName, dbName, tblName, part_vals, validWriteIds);
+ return rawStore.getPartition(catName, dbName, tblName, partVals, validWriteIds);
}
- part.setParameters(adjustStatsParamsForGet(table.getParameters(),
- part.getParameters(), part.getWriteId(), validWriteIds));
+ part.setParameters(
+ adjustStatsParamsForGet(table.getParameters(), part.getParameters(), part.getWriteId(), validWriteIds));
}
return part;
}
- @Override
- public boolean doesPartitionExist(String catName, String dbName, String tblName,
- List<FieldSchema> partKeys, List<String> part_vals)
- throws MetaException, NoSuchObjectException {
+ @Override public boolean doesPartitionExist(String catName, String dbName, String tblName, List<FieldSchema> partKeys,
+ List<String> partVals) throws MetaException, NoSuchObjectException {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
- return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals);
+ return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, partVals);
}
Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
if (tbl == null) {
// The table containing the partition is not yet loaded in cache
- return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals);
+ return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, partVals);
}
- return sharedCache.existPartitionFromCache(catName, dbName, tblName, part_vals);
+ return sharedCache.existPartitionFromCache(catName, dbName, tblName, partVals);
}
- @Override
- public boolean dropPartition(String catName, String dbName, String tblName, List<String> part_vals)
+ @Override public boolean dropPartition(String catName, String dbName, String tblName, List<String> partVals)
throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
- boolean succ = rawStore.dropPartition(catName, dbName, tblName, part_vals);
+ boolean succ = rawStore.dropPartition(catName, dbName, tblName, partVals);
// in case of event based cache update, cache will be updated during commit.
if (succ && !canUseEvents) {
catName = normalizeIdentifier(catName);
@@ -1358,13 +1329,12 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(catName, dbName, tblName)) {
return succ;
}
- sharedCache.removePartitionFromCache(catName, dbName, tblName, part_vals);
+ sharedCache.removePartitionFromCache(catName, dbName, tblName, partVals);
}
return succ;
}
- @Override
- public void dropPartitions(String catName, String dbName, String tblName, List<String> partNames)
+ @Override public void dropPartitions(String catName, String dbName, String tblName, List<String> partNames)
throws MetaException, NoSuchObjectException {
rawStore.dropPartitions(catName, dbName, tblName, partNames);
// in case of event based cache update, cache will be updated during commit.
@@ -1384,8 +1354,7 @@ public class CachedStore implements RawStore, Configurable {
sharedCache.removePartitionsFromCache(catName, dbName, tblName, partVals);
}
- @Override
- public List<Partition> getPartitions(String catName, String dbName, String tblName, int max)
+ @Override public List<Partition> getPartitions(String catName, String dbName, String tblName, int max)
throws MetaException, NoSuchObjectException {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
@@ -1402,26 +1371,23 @@ public class CachedStore implements RawStore, Configurable {
return parts;
}
- @Override
- public Map<String, String> getPartitionLocations(String catName, String dbName, String tblName,
+ @Override public Map<String, String> getPartitionLocations(String catName, String dbName, String tblName,
String baseLocationToNotShow, int max) {
return rawStore.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max);
}
- @Override
- public Table alterTable(String catName, String dbName, String tblName, Table newTable,
- String validWriteIds) throws InvalidObjectException, MetaException {
+ @Override public Table alterTable(String catName, String dbName, String tblName, Table newTable, String validWriteIds)
+ throws InvalidObjectException, MetaException {
newTable = rawStore.alterTable(catName, dbName, tblName, newTable, validWriteIds);
// in case of event based cache update, cache will be updated during commit.
if (canUseEvents) {
- return newTable;
+ return newTable;
}
catName = normalizeIdentifier(catName);
dbName = normalizeIdentifier(dbName);
tblName = normalizeIdentifier(tblName);
String newTblName = normalizeIdentifier(newTable.getTableName());
- if (!shouldCacheTable(catName, dbName, tblName) &&
- !shouldCacheTable(catName, dbName, newTblName)) {
+ if (!shouldCacheTable(catName, dbName, tblName) && !shouldCacheTable(catName, dbName, newTblName)) {
return newTable;
}
Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
@@ -1442,60 +1408,36 @@ public class CachedStore implements RawStore, Configurable {
return newTable;
}
- @Override
- public void updateCreationMetadata(String catName, String dbname, String tablename, CreationMetadata cm)
+ @Override public void updateCreationMetadata(String catName, String dbname, String tablename, CreationMetadata cm)
throws MetaException {
rawStore.updateCreationMetadata(catName, dbname, tablename, cm);
}
- @Override
- public List<String> getTables(String catName, String dbName, String pattern) throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() ||
- (canUseEvents && rawStore.isActiveTransaction())) {
- return rawStore.getTables(catName, dbName, pattern);
- }
- return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName), pattern, -1);
+ @Override public List<String> getTables(String catName, String dbName, String pattern) throws MetaException {
+ return rawStore.getTables(catName, dbName, pattern);
}
- @Override
- public List<String> getTables(String catName, String dbName, String pattern, TableType tableType, int limit)
+ @Override public List<String> getTables(String catName, String dbName, String pattern, TableType tableType, int limit)
throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()|| !isCachedAllMetadata.get()
- || (canUseEvents && rawStore.isActiveTransaction())) {
- return rawStore.getTables(catName, dbName, pattern, tableType, limit);
- }
- return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName), pattern, tableType, limit);
+ return rawStore.getTables(catName, dbName, pattern, tableType, limit);
}
- @Override
- public List<Table> getAllMaterializedViewObjectsForRewriting(String catName) throws MetaException {
+ @Override public List<Table> getAllMaterializedViewObjectsForRewriting(String catName) throws MetaException {
// TODO fucntionCache
return rawStore.getAllMaterializedViewObjectsForRewriting(catName);
}
- @Override
- public List<String> getMaterializedViewsForRewriting(String catName, String dbName)
+ @Override public List<String> getMaterializedViewsForRewriting(String catName, String dbName)
throws MetaException, NoSuchObjectException {
return rawStore.getMaterializedViewsForRewriting(catName, dbName);
}
- @Override
- public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames, List<String> tableTypes)
- throws MetaException {
- // TODO Check if all required tables are allowed, if so, get it from cache
- if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() ||
- (canUseEvents && rawStore.isActiveTransaction())) {
- return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes);
- }
- return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbNames),
- StringUtils.normalizeIdentifier(tableNames), tableTypes);
+ @Override public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames,
+ List<String> tableTypes) throws MetaException {
+ return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes);
}
- @Override
- public List<Table> getTableObjectsByName(String catName, String dbName, List<String> tblNames)
+ @Override public List<Table> getTableObjectsByName(String catName, String dbName, List<String> tblNames)
throws MetaException, UnknownDBException {
if (canUseEvents && rawStore.isActiveTransaction()) {
return rawStore.getTableObjectsByName(catName, dbName, tblNames);
@@ -1523,6 +1465,7 @@ public class CachedStore implements RawStore, Configurable {
Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
if (tbl == null) {
tbl = rawStore.getTable(catName, dbName, tblName);
+ sharedCache.addTableToCache(catName, dbName, tblName, tbl);
}
if (tbl != null) {
tables.add(tbl);
@@ -1532,58 +1475,48 @@ public class CachedStore implements RawStore, Configurable {
return tables;
}
- @Override
- public List<String> getAllTables(String catName, String dbName) throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() ||
- (canUseEvents && rawStore.isActiveTransaction())) {
- return rawStore.getAllTables(catName, dbName);
- }
- return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName));
+ @Override public List<String> getAllTables(String catName, String dbName) throws MetaException {
+ return rawStore.getAllTables(catName, dbName);
}
@Override
// TODO: implement using SharedCache
- public List<String> listTableNamesByFilter(String catName, String dbName, String filter, short max_tables)
+ public List<String> listTableNamesByFilter(String catName, String dbName, String filter, short maxTables)
throws MetaException, UnknownDBException {
- return rawStore.listTableNamesByFilter(catName, dbName, filter, max_tables);
+ return rawStore.listTableNamesByFilter(catName, dbName, filter, maxTables);
}
- @Override
- public List<String> listPartitionNames(String catName, String dbName, String tblName,
- short max_parts) throws MetaException {
+ @Override public List<String> listPartitionNames(String catName, String dbName, String tblName, short maxParts)
+ throws MetaException {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) {
- return rawStore.listPartitionNames(catName, dbName, tblName, max_parts);
+ return rawStore.listPartitionNames(catName, dbName, tblName, maxParts);
}
Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
if (tbl == null) {
// The table is not yet loaded in cache
- return rawStore.listPartitionNames(catName, dbName, tblName, max_parts);
+ return rawStore.listPartitionNames(catName, dbName, tblName, maxParts);
}
List<String> partitionNames = new ArrayList<>();
int count = 0;
- for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, max_parts)) {
- if (max_parts == -1 || count < max_parts) {
+ for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) {
+ if (maxParts == -1 || count < maxParts) {
partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()));
}
}
return partitionNames;
}
- @Override
- public PartitionValuesResponse listPartitionValues(String catName, String db_name, String tbl_name,
- List<FieldSchema> cols, boolean applyDistinct, String filter, boolean ascending,
- List<FieldSchema> order, long maxParts) throws MetaException {
+ @Override public PartitionValuesResponse listPartitionValues(String catName, String dbName, String tblName,
+ List<FieldSchema> cols, boolean applyDistinct, String filter, boolean ascending, List<FieldSchema> order,
+ long maxParts) throws MetaException {
throw new UnsupportedOperationException();
}
- @Override
- public Partition alterPartition(String catName, String dbName, String tblName,
- List<String> partVals, Partition newPart, String validWriteIds)
- throws InvalidObjectException, MetaException {
+ @Override public Partition alterPartition(String catName, String dbName, String tblName, List<String> partVals,
+ Partition newPart, String validWriteIds) throws InvalidObjectException, MetaException {
newPart = rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, validWriteIds);
// in case of event based cache update, cache will be updated during commit.
if (canUseEvents) {
@@ -1599,13 +1532,10 @@ public class CachedStore implements RawStore, Configurable {
return newPart;
}
- @Override
- public List<Partition> alterPartitions(String catName, String dbName, String tblName,
- List<List<String>> partValsList, List<Partition> newParts,
- long writeId, String validWriteIds)
+ @Override public List<Partition> alterPartitions(String catName, String dbName, String tblName,
+ List<List<String>> partValsList, List<Partition> newParts, long writeId, String validWriteIds)
throws InvalidObjectException, MetaException {
- newParts = rawStore.alterPartitions(
- catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds);
+ newParts = rawStore.alterPartitions(catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds);
// in case of event based cache update, cache will be updated during commit.
if (canUseEvents) {
return newParts;
@@ -1620,43 +1550,37 @@ public class CachedStore implements RawStore, Configurable {
return newParts;
}
- private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr,
- String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache)
- throws MetaException, NoSuchObjectException {
- List<Partition> parts =
- sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getCatName()),
- StringUtils.normalizeIdentifier(table.getDbName()),
- StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
+ private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, String defaultPartName, short maxParts,
+ List<String> result, SharedCache sharedCache) throws MetaException, NoSuchObjectException {
+ List<Partition> parts = sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getCatName()),
+ StringUtils.normalizeIdentifier(table.getDbName()), StringUtils.normalizeIdentifier(table.getTableName()),
+ maxParts);
for (Partition part : parts) {
result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
}
if (defaultPartName == null || defaultPartName.isEmpty()) {
defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
}
- return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName,
- result);
+ return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName, result);
}
@Override
// TODO: implement using SharedCache
- public List<Partition> getPartitionsByFilter(String catName, String dbName, String tblName,
- String filter, short maxParts)
- throws MetaException, NoSuchObjectException {
+ public List<Partition> getPartitionsByFilter(String catName, String dbName, String tblName, String filter,
+ short maxParts) throws MetaException, NoSuchObjectException {
return rawStore.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts);
}
@Override
/**
* getPartitionSpecsByFilterAndProjection interface is currently non-cacheable.
- */
- public List<Partition> getPartitionSpecsByFilterAndProjection(Table table,
+ */ public List<Partition> getPartitionSpecsByFilterAndProjection(Table table,
GetPartitionsProjectionSpec projectionSpec, GetPartitionsFilterSpec filterSpec)
throws MetaException, NoSuchObjectException {
return rawStore.getPartitionSpecsByFilterAndProjection(table, projectionSpec, filterSpec);
}
- @Override
- public boolean getPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr,
+ @Override public boolean getPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr,
String defaultPartitionName, short maxParts, List<Partition> result) throws TException {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
@@ -1680,14 +1604,12 @@ public class CachedStore implements RawStore, Configurable {
return hasUnknownPartitions;
}
- @Override
- public int getNumPartitionsByFilter(String catName, String dbName, String tblName, String filter)
+ @Override public int getNumPartitionsByFilter(String catName, String dbName, String tblName, String filter)
throws MetaException, NoSuchObjectException {
return rawStore.getNumPartitionsByFilter(catName, dbName, tblName, filter);
}
- @Override
- public int getNumPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr)
+ @Override public int getNumPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr)
throws MetaException, NoSuchObjectException {
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
@@ -1702,13 +1624,11 @@ public class CachedStore implements RawStore, Configurable {
// The table is not yet loaded in cache
return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr);
}
- getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames,
- sharedCache);
+ getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames, sharedCache);
return partNames.size();
}
- @VisibleForTesting
- public static List<String> partNameToVals(String name) {
+ @VisibleForTesting public static List<String> partNameToVals(String name) {
if (name == null) {
return null;
}
@@ -1720,8 +1640,7 @@ public class CachedStore implements RawStore, Configurable {
return vals;
}
- @Override
- public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName,
+ @Override public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName,
List<String> partNames) throws MetaException, NoSuchObjectException {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
@@ -1737,179 +1656,144 @@ public class CachedStore implements RawStore, Configurable {
List<Partition> partitions = new ArrayList<>();
for (String partName : partNames) {
Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, partNameToVals(partName));
- if (part!=null) {
+ if (part != null) {
partitions.add(part);
}
}
return partitions;
}
- @Override
- public Table markPartitionForEvent(String catName, String dbName, String tblName,
+ @Override public Table markPartitionForEvent(String catName, String dbName, String tblName,
Map<String, String> partVals, PartitionEventType evtType)
- throws MetaException, UnknownTableException, InvalidPartitionException,
- UnknownPartitionException {
+ throws MetaException, UnknownTableException, InvalidPartitionException, UnknownPartitionException {
return rawStore.markPartitionForEvent(catName, dbName, tblName, partVals, evtType);
}
- @Override
- public boolean isPartitionMarkedForEvent(String catName, String dbName, String tblName,
+ @Override public boolean isPartitionMarkedForEvent(String catName, String dbName, String tblName,
Map<String, String> partName, PartitionEventType evtType)
- throws MetaException, UnknownTableException, InvalidPartitionException,
- UnknownPartitionException {
+ throws MetaException, UnknownTableException, InvalidPartitionException, UnknownPartitionException {
return rawStore.isPartitionMarkedForEvent(catName, dbName, tblName, partName, evtType);
}
- @Override
- public boolean addRole(String rowName, String ownerName)
+ @Override public boolean addRole(String rowName, String ownerName)
throws InvalidObjectException, MetaException, NoSuchObjectException {
return rawStore.addRole(rowName, ownerName);
}
- @Override
- public boolean removeRole(String roleName)
- throws MetaException, NoSuchObjectException {
+ @Override public boolean removeRole(String roleName) throws MetaException, NoSuchObjectException {
return rawStore.removeRole(roleName);
}
- @Override
- public boolean grantRole(Role role, String userName,
- PrincipalType principalType, String grantor, PrincipalType grantorType,
- boolean grantOption)
+ @Override public boolean grantRole(Role role, String userName, PrincipalType principalType, String grantor,
+ PrincipalType grantorType, boolean grantOption)
throws MetaException, NoSuchObjectException, InvalidObjectException {
return rawStore.grantRole(role, userName, principalType, grantor, grantorType, grantOption);
}
- @Override
- public boolean revokeRole(Role role, String userName,
- PrincipalType principalType, boolean grantOption)
+ @Override public boolean revokeRole(Role role, String userName, PrincipalType principalType, boolean grantOption)
throws MetaException, NoSuchObjectException {
return rawStore.revokeRole(role, userName, principalType, grantOption);
}
- @Override
- public PrincipalPrivilegeSet getUserPrivilegeSet(String userName,
- List<String> groupNames) throws InvalidObjectException, MetaException {
+ @Override public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, List<String> groupNames)
+ throws InvalidObjectException, MetaException {
return rawStore.getUserPrivilegeSet(userName, groupNames);
}
- @Override
- public PrincipalPrivilegeSet getDBPrivilegeSet(String catName, String dbName, String userName,
+ @Override public PrincipalPrivilegeSet getDBPrivilegeSet(String catName, String dbName, String userName,
List<String> groupNames) throws InvalidObjectException, MetaException {
return rawStore.getDBPrivilegeSet(catName, dbName, userName, groupNames);
}
- @Override
- public PrincipalPrivilegeSet getTablePrivilegeSet(String catName, String dbName,
- String tableName, String userName, List<String> groupNames)
- throws InvalidObjectException, MetaException {
+ @Override public PrincipalPrivilegeSet getTablePrivilegeSet(String catName, String dbName, String tableName,
+ String userName, List<String> groupNames) throws InvalidObjectException, MetaException {
return rawStore.getTablePrivilegeSet(catName, dbName, tableName, userName, groupNames);
}
- @Override
- public PrincipalPrivilegeSet getPartitionPrivilegeSet(String catName, String dbName,
- String tableName, String partition, String userName,
- List<String> groupNames) throws InvalidObjectException, MetaException {
+ @Override public PrincipalPrivilegeSet getPartitionPrivilegeSet(String catName, String dbName, String tableName,
+ String partition, String userName, List<String> groupNames) throws InvalidObjectException, MetaException {
return rawStore.getPartitionPrivilegeSet(catName, dbName, tableName, partition, userName, groupNames);
}
- @Override
- public PrincipalPrivilegeSet getColumnPrivilegeSet(String catName, String dbName,
- String tableName, String partitionName, String columnName,
- String userName, List<String> groupNames)
+ @Override public PrincipalPrivilegeSet getColumnPrivilegeSet(String catName, String dbName, String tableName,
+ String partitionName, String columnName, String userName, List<String> groupNames)
throws InvalidObjectException, MetaException {
return rawStore.getColumnPrivilegeSet(catName, dbName, tableName, partitionName, columnName, userName, groupNames);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalGlobalGrants(
- String principalName, PrincipalType principalType) {
+ @Override public List<HiveObjectPrivilege> listPrincipalGlobalGrants(String principalName,
+ PrincipalType principalType) {
return rawStore.listPrincipalGlobalGrants(principalName, principalType);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalDBGrants(String principalName,
- PrincipalType principalType, String catName, String dbName) {
+ @Override public List<HiveObjectPrivilege> listPrincipalDBGrants(String principalName, PrincipalType principalType,
+ String catName, String dbName) {
return rawStore.listPrincipalDBGrants(principalName, principalType, catName, dbName);
}
- @Override
- public List<HiveObjectPrivilege> listAllTableGrants(String principalName,
- PrincipalType principalType, String catName, String dbName, String tableName) {
+ @Override public List<HiveObjectPrivilege> listAllTableGrants(String principalName, PrincipalType principalType,
+ String catName, String dbName, String tableName) {
return rawStore.listAllTableGrants(principalName, principalType, catName, dbName, tableName);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalPartitionGrants(
- String principalName, PrincipalType principalType, String catName, String dbName,
- String tableName, List<String> partValues, String partName) {
- return rawStore.listPrincipalPartitionGrants(principalName, principalType, catName, dbName, tableName, partValues, partName);
+ @Override public List<HiveObjectPrivilege> listPrincipalPartitionGrants(String principalName,
+ PrincipalType principalType, String catName, String dbName, String tableName, List<String> partValues,
+ String partName) {
+ return rawStore
+ .listPrincipalPartitionGrants(principalName, principalType, catName, dbName, tableName, partValues, partName);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalTableColumnGrants(
- String principalName, PrincipalType principalType, String catName, String dbName,
- String tableName, String columnName) {
- return rawStore.listPrincipalTableColumnGrants(principalName, principalType, catName, dbName, tableName, columnName);
+ @Override public List<HiveObjectPrivilege> listPrincipalTableColumnGrants(String principalName,
+ PrincipalType principalType, String catName, String dbName, String tableName, String columnName) {
+ return rawStore
+ .listPrincipalTableColumnGrants(principalName, principalType, catName, dbName, tableName, columnName);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrants(
- String principalName, PrincipalType principalType, String catName, String dbName,
- String tableName, List<String> partValues, String partName,
- String columnName) {
- return rawStore.listPrincipalPartitionColumnGrants(principalName, principalType, catName, dbName, tableName, partValues, partName, columnName);
+ @Override public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrants(String principalName,
+ PrincipalType principalType, String catName, String dbName, String tableName, List<String> partValues,
+ String partName, String columnName) {
+ return rawStore
+ .listPrincipalPartitionColumnGrants(principalName, principalType, catName, dbName, tableName, partValues,
+ partName, columnName);
}
- @Override
- public boolean grantPrivileges(PrivilegeBag privileges)
+ @Override public boolean grantPrivileges(PrivilegeBag privileges)
throws InvalidObjectException, MetaException, NoSuchObjectException {
return rawStore.grantPrivileges(privileges);
}
- @Override
- public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption)
+ @Override public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption)
throws InvalidObjectException, MetaException, NoSuchObjectException {
return rawStore.revokePrivileges(privileges, grantOption);
}
- @Override
- public boolean refreshPrivileges(HiveObjectRef objToRefresh, String authorizer, PrivilegeBag grantPrivileges)
- throws InvalidObjectException, MetaException, NoSuchObjectException {
+ @Override public boolean refreshPrivileges(HiveObjectRef objToRefresh, String authorizer,
+ PrivilegeBag grantPrivileges) throws InvalidObjectException, MetaException, NoSuchObjectException {
return rawStore.refreshPrivileges(objToRefresh, authorizer, grantPrivileges);
}
- @Override
- public Role getRole(String roleName) throws NoSuchObjectException {
+ @Override public Role getRole(String roleName) throws NoSuchObjectException {
return rawStore.getRole(roleName);
}
- @Override
- public List<String> listRoleNames() {
+ @Override public List<String> listRoleNames() {
return rawStore.listRoleNames();
}
- @Override
- public List<Role> listRoles(String principalName,
- PrincipalType principalType) {
+ @Override public List<Role> listRoles(String principalName, PrincipalType principalType) {
return rawStore.listRoles(principalName, principalType);
}
- @Override
- public List<RolePrincipalGrant> listRolesWithGrants(String principalName,
- PrincipalType principalType) {
+ @Override public List<RolePrincipalGrant> listRolesWithGrants(String principalName, PrincipalType principalType) {
return rawStore.listRolesWithGrants(principalName, principalType);
}
- @Override
- public List<RolePrincipalGrant> listRoleMembers(String roleName) {
+ @Override public List<RolePrincipalGrant> listRoleMembers(String roleName) {
return rawStore.listRoleMembers(roleName);
}
- @Override
- public Partition getPartitionWithAuth(String catName, String dbName, String tblName,
- List<String> partVals, String userName, List<String> groupNames)
- throws MetaException, NoSuchObjectException, InvalidObjectException {
+ @Override public Partition getPartitionWithAuth(String catName, String dbName, String tblName, List<String> partVals,
+ String userName, List<String> groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1924,8 +1808,7 @@ public class CachedStore implements RawStore, Configurable {
Partition p = sharedCache.getPartitionFromCache(catName, dbName, tblName, partVals);
if (p != null) {
String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals);
- PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName,
- userName, groupNames);
+ PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName, userName, groupNames);
p.setPrivileges(privs);
} else {
throw new NoSuchObjectException("partition values=" + partVals.toString());
@@ -1933,10 +1816,8 @@ public class CachedStore implements RawStore, Configurable {
return p;
}
- @Override
- public List<Partition> getPartitionsWithAuth(String catName, String dbName, String tblName,
- short maxParts, String userName, List<String> groupNames)
- throws MetaException, NoSuchObjectException, InvalidObjectException {
+ @Override public List<Partition> getPartitionsWithAuth(String catName, String dbName, String tblName, short maxParts,
+ String userName, List<String> groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1953,8 +1834,8 @@ public class CachedStore implements RawStore, Configurable {
for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) {
if (maxParts == -1 || count < maxParts) {
String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
- PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName,
- userName, groupNames);
+ PrincipalPrivilegeSet privs =
+ getPartitionPrivilegeSet(catName, dbName, tblName, partName, userName, groupNames);
part.setPrivileges(privs);
partitions.add(part);
count++;
@@ -1963,9 +1844,8 @@ public class CachedStore implements RawStore, Configurable {
return partitions;
}
- @Override
- public List<String> listPartitionNamesPs(String catName, String dbName, String tblName, List<String> partSpecs,
- short maxParts) throws MetaException, NoSuchObjectException {
+ @Override public List<String> listPartitionNamesPs(String catName, String dbName, String tblName,
+ List<String> partSpecs, short maxParts) throws MetaException, NoSuchObjectException {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1991,9 +1871,8 @@ public class CachedStore implements RawStore, Configurable {
return partitionNames;
}
- @Override
- public List<Partition> listPartitionsPsWithAuth(String catName, String dbName, String tblName, List<String> partSpecs,
- short maxParts, String userName, List<String> groupNames)
+ @Override public List<Partition> listPartitionsPsWithAuth(String catName, String dbName, String tblName,
+ List<String> partSpecs, short maxParts, String userName, List<String> groupNames)
throws MetaException, InvalidObjectException, NoSuchObjectException {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
@@ -2045,11 +1924,13 @@ public class CachedStore implements RawStore, Configurable {
}
// Note: ideally this should be above both CachedStore and ObjectStore.
- private Map<String, String> adjustStatsParamsForGet(Map<String, String> tableParams,
- Map<String, String> params, long statsWriteId, String validWriteIds) throws MetaException {
- if (!TxnUtils.isTransactionalTable(tableParams)) return params; // Not a txn table.
- if (areTxnStatsSupported && ((validWriteIds == null)
- || ObjectStore.isCurrentStatsValidForTheQuery(params, statsWriteId, validWriteIds, false))) {
+ private Map<String, String> adjustStatsParamsForGet(Map<String, String> tableParams, Map<String, String> params,
+ long statsWriteId, String validWriteIds) throws MetaException {
+ if (!TxnUtils.isTransactionalTable(tableParams)) {
+ return params; // Not a txn table.
+ }
+ if (areTxnStatsSupported && ((validWriteIds == null) || ObjectStore
+ .isCurrentStatsValidForTheQuery(params, statsWriteId, validWriteIds, false))) {
// Valid stats are supported for txn tables, and either no verification was requested by the
// caller, or the verification has succeeded.
return params;
@@ -2060,16 +1941,15 @@ public class CachedStore implements RawStore, Configurable {
return params;
}
-
// Note: ideally this should be above both CachedStore and ObjectStore.
- public static ColumnStatistics adjustColStatForGet(Map<String, String> tableParams,
- ColumnStatistics colStat, long statsWriteId,
- String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
+ public static ColumnStatistics adjustColStatForGet(Map<String, String> tableParams, ColumnStatistics colStat,
+ long statsWriteId, String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
colStat.setIsStatsCompliant(true);
- if (!TxnUtils.isTransactionalTable(tableParams)) return colStat; // Not a txn table.
- if (areTxnStatsSupported && ((validWriteIds == null)
- || ObjectStore.isCurrentStatsValidForTheQuery(
- tableParams, statsWriteId, validWriteIds, false))) {
+ if (!TxnUtils.isTransactionalTable(tableParams)) {
+ return colStat; // Not a txn table.
+ }
+ if (areTxnStatsSupported && ((validWriteIds == null) || ObjectStore
+ .isCurrentStatsValidForTheQuery(tableParams, statsWriteId, validWriteIds, false))) {
// Valid stats are supported for txn tables, and either no verification was requested by the
// caller, or the verification has succeeded.
return colStat;
@@ -2080,11 +1960,9 @@ public class CachedStore implements RawStore, Configurable {
}
private static void updateTableColumnsStatsInternal(Configuration conf, ColumnStatistics colStats,
- Map<String, String> newParams, String validWriteIds,
- long writeId) throws MetaException {
- String catName = colStats.getStatsDesc().isSetCatName() ?
- normalizeIdentifier(colStats.getStatsDesc().getCatName()) :
- getDefaultCatalog(conf);
+ Map<String, String> newParams, String validWriteIds, long writeId) throws MetaException {
+ String catName = colStats.getStatsDesc().isSetCatName() ? normalizeIdentifier(
+ colStats.getStatsDesc().getCatName()) : getDefaultCatalog(conf);
String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
if (!shouldCacheTable(catName, dbName, tblName)) {
@@ -2101,17 +1979,17 @@ public class CachedStore implements RawStore, Configurable {
if (!areTxnStatsSupported) {
StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
} else {
- String errorMsg = ObjectStore.verifyStatsChangeCtx(TableName.getDbTable(dbName, tblName),
- table.getParameters(), newParams, writeId, validWriteIds, true);
+ String errorMsg = ObjectStore
+ .verifyStatsChangeCtx(TableName.getDbTable(dbName, tblName), table.getParameters(), newParams, writeId,
+ validWriteIds, true);
if (errorMsg != null) {
throw new MetaException(errorMsg);
}
- if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, table.getWriteId(),
- validWriteIds, true)) {
+ if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, table.getWriteId(), validWriteIds, true)) {
// Make sure we set the flag to invalid regardless of the current value.
StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
- LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table "
- + table.getDbName() + "." + table.getTableName());
+ LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + table.getDbName() + "." + table
+ .getTableName());
}
}
}
@@ -2122,12 +2000,9 @@ public class CachedStore implements RawStore, Configurable {
sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj());
}
- @Override
- public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats,
- String validWriteIds, long writeId)
- throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
- Map<String, String> newParams = rawStore.updateTableColumnStatistics(
- colStats, validWriteIds, writeId);
+ @Override public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats, String validWriteIds,
+ long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+ Map<String, String> newParams = rawStore.updateTableColumnStatistics(colStats, validWriteIds, writeId);
// in case of event based cache update, cache will be updated during commit.
if (newParams != null && !canUseEvents) {
updateTableColumnsStatsInternal(conf, colStats, newParams, null, writeId);
@@ -2135,43 +2010,35 @@ public class CachedStore implements RawStore, Configurable {
return newParams;
}
- @Override
- public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tblName,
+ @Override public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tblName,
List<String> colNames) throws MetaException, NoSuchObjectException {
return getTableColumnStatistics(catName, dbName, tblName, colNames, null);
}
- @Override
- public ColumnStatistics getTableColumnStatistics(
- String catName, String dbName, String tblName, List<String> colNames,
- String validWriteIds)
- throws MetaException, NoSuchObjectException {
+ @Override public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tblName,
+ List<String> colNames, String validWriteIds) throws MetaException, NoSuchObjectException {
catName = StringUtils.normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
if (!shouldCacheTable(catName, dbName, tblName)) {
- return rawStore.getTableColumnStatistics(
- catName, dbName, tblName, colNames, validWriteIds);
+ return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames, validWriteIds);
}
Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
if (table == null) {
// The table is not yet loaded in cache
- return rawStore.getTableColumnStatistics(
- catName, dbName, tblName, colNames, validWriteIds);
+ return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames, validWriteIds);
}
ColumnStatistics columnStatistics =
sharedCache.getTableColStatsFromCache(catName, dbName, tblName, colNames, validWriteIds, areTxnStatsSupported);
if (columnStatistics == null) {
- LOG.info("Stat of Table {}.{} for column {} is not present in cache." +
- "Getting from raw store", dbName, tblName, colNames);
+ LOG.info("Stat of Table {}.{} for column {} is not present in cache." + "Getting from raw store", dbName, tblName,
+ colNames);
return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames, validWriteIds);
}
return columnStatistics;
}
- @Override
- public boolean deleteTableColumnStatistics(String catName, String dbName, String tblName,
- String colName)
+ @Override public boolean deleteTableColumnStatistics(String catName, String dbName, String tblName, String colName)
throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.deleteTableColumnStatistics(catName, dbName, tblName, colName);
// in case of event based cache update, cache is updated during commit txn
@@ -2187,16 +2054,15 @@ public class CachedStore implements RawStore, Configurable {
return succ;
}
- @Override
- public Map<String, String> updatePartitionColumnStatistics(ColumnStatistics colStats,
- List<String> partVals, String validWriteIds, long writeId)
+ @Override public Map<String, String> updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals,
+ String validWriteIds, long writeId)
throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
- Map<String, String> newParams = rawStore.updatePartitionColumnStatistics(
- colStats, partVals, validWriteIds, writeId);
+ Map<String, String> newParams =
+ rawStore.updatePartitionColumnStatistics(colStats, partVals, validWriteIds, writeId);
// in case of event based cache update, cache is updated during commit txn
if (newParams != null && !canUseEvents) {
- String catName = colStats.getStatsDesc().isSetCatName() ?
- normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME;
+ String catName = colStats.getStatsDesc().isSetCatName() ? normalizeIdentifier(
+ colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME;
String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
if (!shouldCacheTable(catName, dbName, tblName)) {
@@ -2210,36 +2076,31 @@ public class CachedStore implements RawStore, Configurable {
return newParams;
}
- @Override
- public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, String tblName,
+ @Override public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, String tblName,
List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException {
return getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames, null);
}
- @Override
- public List<ColumnStatistics> getPartitionColumnStatistics(
- String catName, String dbName, String tblName, List<String> partNames,
- List<String> colNames, String writeIdList)
- throws MetaException, NoSuchObjectException {
+ @Override public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, String tblName,
+ List<String> partNames, List<String> colNames, String writeIdList) throws MetaException, NoSuchObjectException {
// If writeIdList is not null, that means stats are requested within a txn context. So set stats compliant to false,
// if areTxnStatsSupported is false or the write id which has updated the stats in not compatible with writeIdList.
// This is done within table lock as the number of partitions may be more than one and we need a consistent view
// for all the partitions.
- List<ColumnStatistics> columnStatistics = sharedCache.getPartitionColStatsListFromCache(catName, dbName, tblName,
- partNames, colNames, writeIdList, areTxnStatsSupported);
+ List<ColumnStatistics> columnStatistics = sharedCache
+ .getPartitionColStatsListFromCache(catName, dbName, tblName, partNames, colNames, writeIdList,
+ areTxnStatsSupported);
if (columnStatistics == null) {
return rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames, writeIdList);
}
return columnStatistics;
}
- @Override
- public boolean deletePartitionColumnStatistics(String catName, String dbName, String tblName, String partName,
- List<String> partVals, String colName)
+ @Override public boolean deletePartitionColumnStatistics(String catName, String dbName, String tblName,
+ String partName, List<String> partVals, String colName)
throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
- boolean succ =
- rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName);
+ boolean succ = rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName);
// in case of event based cache update, cache is updated during commit txn.
if (succ && !canUseEvents) {
catName = normalizeIdentifier(catName);
@@ -2253,17 +2114,13 @@ public class CachedStore implements RawStore, Configurable {
return succ;
}
- @Override
- public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, List<String> partNames,
+ @Override public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, List<String> partNames,
List<String> colNames) throws MetaException, NoSuchObjectException {
return get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, null);
}
- @Override
- public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName,
- List<String> partNames, List<String> colNames,
- String writeIdList)
- throws MetaException, NoSuchObjectException {
+ @Override public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, List<String> partNames,
+ List<String> colNames, String writeIdList) throws MetaException, NoSuchObjectException {
List<ColumnStatisticsObj> colStats;
catName = normalizeIdentifier(catName);
dbName = StringUtils.normalizeIdentifier(dbName);
@@ -2272,14 +2129,12 @@ public class CachedStore implements RawStore, Configurable {
// (incl. due to lack of sync w.r.t. the below rawStore call).
// In case the cache is updated using events, aggregate is calculated locally and thus can be read from cache.
if (!shouldCacheTable(catName, dbName, tblName) || (writeIdList != null && !canUseEvents)) {
- return rawStore.get_aggr_stats_for(
- catName, dbName, tblName, partNames, colNames, writeIdList);
+ return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, writeIdList);
}
Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
if (table == null) {
// The table is not yet loaded in cache
- return rawStore.get_aggr_stats_for(
- catName, dbName, tblName, partNames, colNames, writeIdList);
+ return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, writeIdList);
}
List<String> allPartNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1);
@@ -2301,21 +2156,21 @@ public class CachedStore implements RawStore, Configurable {
}
}
- LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}",
- tblName, partNames, colNames);
- MergedColumnStatsForPartitions mergedColStats = mergeColStatsForPartitions(catName, dbName, tblName,
- partNames, colNames, sharedCache, type, writeIdList);
+ LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}", tblName, partNames,
+ colNames);
+ MergedColumnStatsForPartitions mergedColStats =
+ mergeColStatsForPartitions(catName, dbName, tblName, partNames, colNames, sharedCache, type, writeIdList);
if (mergedColStats == null) {
- LOG.info("Aggregate stats of partition " + TableName.getQualified(catName, dbName, tblName) + "." +
- partNames + " for columns " + colNames + " is not present in cache. Getting it from raw store");
+ LOG.info("Aggregate stats of partition " + TableName.getQualified(catName, dbName, tblName) + "." + partNames
+ + " for columns " + colNames + " is not present in cache. Getting it from raw store");
return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, writeIdList);
}
return new AggrStats(mergedColStats.getColStats(), mergedColStats.getPartsFound());
}
- private MergedColumnStatsForPartitions mergeColStatsForPartitions(
- String catName, String dbName, String tblName, List<String> partNames, List<String> colNames,
- SharedCache sharedCache, StatsType type, String writeIdList) throws MetaException {
+ private MergedColumnStatsForPartitions mergeColStatsForPartitions(String catName, String dbName, String tblName,
+ List<String> partNames, List<String> colNames, SharedCache sharedCache, StatsType type, String writeIdList)
+ throws MetaException {
final boolean useDensityFunctionForNDVEstimation =
MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_NDV_DENSITY_FUNCTION);
final double ndvTuner = MetastoreConf.getDoubleVar(getConf(), ConfVars.STATS_NDV_TUNER);
@@ -2335,8 +2190,8 @@ public class CachedStore implements RawStore, Configurable {
// the behavior same as object store.
// 3. Partition is missing or its stat is updated by live(not yet committed) or aborted txn. In this case,
// colStatsWriteId is null. Thus null is returned to keep the behavior same as object store.
- SharedCache.ColumStatsWithWriteId colStatsWriteId = sharedCache.getPartitionColStatsFromCache(catName, dbName,
- tblName, partValue, colName, writeIdList);
+ SharedCache.ColumStatsWithWriteId colStatsWriteId =
+ sharedCache.getPartitionColStatsFromCache(catName, dbName, tblName, partValue, colName, writeIdList);
if (colStatsWriteId == null) {
return null;
}
@@ -2349,15 +2204,14 @@ public class CachedStore implements RawStore, Configurable {
new ColStatsObjWithSourceInfo(colStatsForPart, catName, dbName, tblName, partName);
colStatsWithPartInfoList.add(colStatsWithPartInfo);
if (colStatsAggregator == null) {
- colStatsAggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(
- colStatsForPart.getStatsData().getSetField(), useDensityFunctionForNDVEstimation,
- ndvTuner);
+ colStatsAggregator = ColumnStatsAggregatorFactory
+ .getColumnStatsAggregator(colStatsForPart.getStatsData().getSetField(),
+ useDensityFunctionForNDVEstimation, ndvTuner);
}
partsFoundForColumn++;
} else {
- LOG.debug(
- "Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}",
- dbName, tblName, partName, colName);
+ LOG.debug("Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}", dbName, tblName,
+ partName, colName);
}
}
if (colStatsWithPartInfoList.size() > 0) {
@@ -2369,27 +2223,26 @@ public class CachedStore implements RawStore, Configurable {
partsFound = partsFoundForColumn;
}
if (colStatsMap.size() < 1) {
- LOG.debug("No stats data found for: dbName={} tblName= {} partNames= {} colNames= ", dbName,
- tblName, partNames, colNames);
+ LOG.debug("No stats data found for: dbName={} tblName= {} partNames= {} colNames= ", dbName, tblName, partNames,
+ colNames);
return new MergedColumnStatsForPartitions(new ArrayList<ColumnStatisticsObj>(), 0);
}
}
// Note that enableBitVector does not apply here because ColumnStatisticsObj
// itself will tell whether bitvector is null or not and aggr logic can automatically apply.
- List<ColumnStatisticsObj> colAggrStats = MetaStoreServerUtils.aggrPartitionStats(colStatsMap,
- partNames, partsFound == partNames.size(), useDensityFunctionForNDVEstimation, ndvTuner);
+ List<ColumnStatisticsObj> colAggrStats = MetaStoreServerUtils
+ .aggrPartitionStats(colStatsMap, partNames, partsFound == partNames.size(), useDensityFunctionForNDVEstimation,
+ ndvTuner);
if (canUseEvents) {
if (type == StatsType.ALL) {
sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), new AggrStats(colAggrStats, partsFound),
- null, partNameToWriteId);
+ StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName),
+ new AggrStats(colAggrStats, partsFound), null, partNameToWriteId);
} else if (type == StatsType.ALLBUTDEFAULT) {
sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
- StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), null,
- new AggrStats(colAggrStats, partsFound), partNameToWriteId);
+ StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), null,
+ new AggrStats(colAggrStats, partsFound), partNameToWriteId);
}
}
return new MergedColumnStatsForPartitions(colAggrStats, partsFound);
@@ -2413,440 +2266,355 @@ public class CachedStore implements RawStore, Configurable {
}
}
- @Override
- public long cleanupEvents() {
+ @Override public long cleanupEvents() {
return rawStore.cleanupEvents();
}
- @Override
- public boolean addToken(String tokenIdentifier, String delegationToken) {
+ @Override public boolean addToken(String tokenIdentifier, String delegationToken) {
return rawStore.addToken(tokenIdentifier, delegationToken);
}
- @Override
- public boolean removeToken(String tokenIdentifier) {
+ @Override public boolean removeToken(String tokenIdentifier) {
return rawStore.removeToken(tokenIdentifier);
}
- @Override
- public String getToken(String tokenIdentifier) {
+ @Override public String getToken(String tokenIdentifier) {
return rawStore.getToken(tokenIdentifier);
}
- @Override
- public List<String> getAllTokenIdentifiers() {
+ @Override public List<String> getAllTokenIdentifiers() {
return rawStore.getAllTokenIdentifiers();
}
- @Override
- public int addMasterKey(String key) throws MetaException {
+ @Override public int addMasterKey(String key) throws MetaException {
return rawStore.addMasterKey(key);
}
- @Override
- public void updateMasterKey(Integer seqNo, String key)
- throws NoSuchObjectException, MetaException {
+ @Override public void updateMasterKey(Integer seqNo, String key) throws NoSuchObjectException, MetaException {
rawStore.updateMasterKey(seqNo, key);
}
- @Override
- public boolean removeMasterKey(Integer keySeq) {
+ @Override public boolean removeMasterKey(Integer keySeq) {
return rawStore.removeMasterKey(keySeq);
}
- @Override
- public String[] getMasterKeys() {
+ @Override public String[] getMasterKeys() {
return rawStore.getMasterKeys();
}
- @Override
- public void verifySchema() throws MetaException {
+ @Override public void verifySchema() throws MetaException {
rawStore.verifySchema();
}
- @Override
- public String getMetaStoreSchemaVersion() throws MetaException {
+ @Override public String getMetaStoreSchemaVersion() throws MetaException {
return rawStore.getMetaStoreSchemaVersion();
}
- @Override
- public void setMetaStoreSchemaVersion(String version, String comment)
- throws MetaException {
+ @Override public void setMetaStoreSchemaVersion(String version, String comment) throws MetaException {
rawStore.setMetaStoreSchemaVersion(version, comment);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalDBGrantsAll(
- String principalName, PrincipalType principalType) {
+ @Override public List<HiveObjectPrivilege> listPrincipalDBGrantsAll(String principalName,
+ PrincipalType principalType) {
return rawStore.listPrincipalDBGrantsAll(principalName, principalType);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalTableGrantsAll(
- String principalName, PrincipalType principalType) {
+ @Override public List<HiveObjectPrivilege> listPrincipalTableGrantsAll(String principalName,
+ PrincipalType principalType) {
return rawStore.listPrincipalTableGrantsAll(principalName, principalType);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalPartitionGrantsAll(
- String principalName, PrincipalType principalType) {
+ @Override public List<HiveObjectPrivilege> listPrincipalPartitionGrantsAll(String principalName,
+ PrincipalType principalType) {
return rawStore.listPrincipalPartitionGrantsAll(principalName, principalType);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalTableColumnGrantsAll(
- String principalName, PrincipalType principalType) {
+ @Override public List<HiveObjectPrivilege> listPrincipalTableColumnGrantsAll(String principalName,
+ PrincipalType principalType) {
return rawStore.listPrincipalTableColumnGrantsAll(principalName, principalType);
}
- @Override
- public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrantsAll(
- String principalName, PrincipalType principalType) {
+ @Override public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrantsAll(String principalName,
+ PrincipalType principalType) {
return rawStore.listPrincipalPartitionColumnGrantsAll(principalName, principalType);
}
- @Override
- public List<HiveObjectPrivilege> listGlobalGrantsAll() {
+ @Override public List<HiveObjectPrivilege> listGlobalGrantsAll() {
return rawStore.listGlobalGrantsAll();
}
- @Override
- public List<HiveObjectPrivilege> listDBGrantsAll(String catName, String dbName) {
+ @Override public List<HiveObjectPrivilege> listDBGrantsAll(String catName, String dbName) {
return rawStore.listDBGrantsAll(catName, dbName);
}
- @Override
- public List<HiveObjectPrivilege> listPartitionColumnGrantsAll(String catName, String dbName,
+ @Override public List<HiveObjectPrivilege> listPartitionColumnGrantsAll(String catName, String dbName,
String tableName, String partitionName, String columnName) {
return rawStore.listPartitionColumnGrantsAll(catName, dbName, tableName, partitionName, columnName);
}
- @Override
- public List<HiveObjectPrivilege> listTableGrantsAll(String catName, String dbName,
- String tableName) {
+ @Override public List<HiveObjectPrivilege> listTableGrantsAll(String catName, String dbName, String tableName) {
return rawStore.listTableGrantsAll(catName, dbName, tableName);
}
- @Override
- public List<HiveObjectPrivilege> listPartitionGrantsAll(String catName, String dbName,
- String tableName, String partitionName) {
+ @Override public List<HiveObjectPrivilege> listPartitionGrantsAll(String catName, String dbName, String tableName,
+ String partitionName) {
return rawStore.listPartitionGrantsAll(catName, dbName, tableName, partitionName);
}
- @Override
- public List<HiveObjectPrivilege> listTableColumnGrantsAll(String catName, String dbName,
- String tableName, String columnName) {
+ @Override public List<HiveObjectPrivilege> listTableColumnGrantsAll(String catName, String dbName, String tableName,
+ String columnName) {
return rawStore.listTableColumnGrantsAll(catName, dbName, tableName, columnName);
}
- @Override
- public void createFunction(Function func)
- throws InvalidObjectException, MetaException {
+ @Override public void createFunction(Function func) throws InvalidObjectException, MetaException {
// TODO fucntionCache
rawStore.createFunction(func);
}
- @Override
- public void alterFunction(String catName, String dbName, String funcName,
- Function newFunction) throws InvalidObjectException, MetaException {
+ @Override public void alterFunction(String catName, String dbName, String funcName, Function newFunction)
+ throws InvalidObjectException, MetaException {
// TODO fucntionCache
rawStore.alterFunction(catName, dbName, funcName, newFunction);
}
- @Override
- public void dropFunction(String catName, String dbName, String funcName) throws MetaException,
- NoSuchObjectException, InvalidObjectException, InvalidInputException {
+ @Override public void dropFunction(String catName, String dbName, String funcName)
+ throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
// TODO fucntionCache
rawStore.dropFunction(catName, dbName, funcName);
}
- @Override
- public Function getFunction(String catName, String dbName, String funcName)
- throws MetaException {
+ @Override public Function getFunction(String catName, String dbName, String funcName) throws MetaException {
// TODO fucntionCache
return rawStore.getFunction(catName, dbName, funcName);
}
- @Override
- public List<Function> getAllFunctions(String catName) throws MetaException {
+ @Override public List<Function> getAllFunctions(String catName) throws MetaException {
// TODO fucntionCache
return rawStore.getAllFunctions(catName);
}
- @Override
- public List<String> getFunctions(String catName, String dbName, String pattern)
- throws MetaException {
+ @Override public List<String> getFunctions(String catName, String dbName, String pattern) throws MetaException {
// TODO fucntionCache
return rawStore.getFunctions(catName, dbName, pattern);
}
- @Override
- public NotificationEventResponse getNextNotification(
- NotificationEventRequest rqst) {
+ @Override public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) {
return rawStore.getNextNotification(rqst);
}
- @Override
- public void addNotificationEvent(NotificationEvent event) throws MetaException {
+ @Override public void addNotificationEvent(NotificationEvent event) throws MetaException {
rawStore.addNotificationEvent(event);
}
- @Override
- public void cleanNotificationEvents(int olderThan) {
+ @Override public void cleanNotificationEvents(int olderThan) {
rawStore.cleanNotificationEvents(olderThan);
}
- @Override
- public CurrentNotificationEventId getCurrentNotificationEventId() {
+ @Override public CurrentNotificationEventId getCurrentNotificationEventId() {
return rawStore.getCurrentNotificationEventId();
}
- @Override
- public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) {
+ @Override public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) {
return rawStore.getNotificationEventsCount(rqst);
}
- @Override
- public void flushCache() {
+ @Override public void flushCache() {
rawStore.flushCache();
}
- @Override
- public ByteBuffer[] getFileMetadata(List<Long> fileIds) throws MetaException {
+ @Override public ByteBuffer[] getFileMetadata(List<Long> fileIds) throws MetaException {
return rawStore.getFileMetadata(fileIds);
}
- @Override
- public void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata,
- FileMetadataExprType type) throws MetaException {
+ @Override public void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata, FileMetadataExprType type)
+ throws MetaException {
rawStore.putFileMetadata(fileIds, metadata, type);
}
- @Override
- public boolean isFileMetadataSupported() {
+ @Override public boolean isFileMetadataSupported() {
return rawStore.isFileMetadataSupported();
}
- @Override
- public void getFileMetadataByExpr(List<Long> fileIds,
- FileMetadataExprType type, byte[] expr, ByteBuffer[] metadatas,
- ByteBuffer[] exprResults, boolean[] eliminated) throws MetaException {
+ @Override public void getFileMetadataByExpr(List<Long> fileIds, FileMetadataExprType type, byte[] expr,
+ ByteBuffer[] metadatas, ByteBuffer[] exprResults, boolean[] eliminated) throws MetaException {
rawStore.getFileMetadataByExpr(fileIds, type, expr, metadatas, exprResults, eliminated);
}
- @Override
- public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) {
+ @Override public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) {
return rawStore.getFileMetadataHandler(type);
}
- @Override
- public int getTableCount() throws MetaException {
+ @Override public int getTableCount() throws MetaException {
return rawStore.getTableCount();
}
- @Override
- public int getPartitionCount() throws MetaException {
+ @Override public int getPartitionCount() throws MetaException {
return rawStore.getPartitionCount();
}
- @Override
- public int getDatabaseCount() throws MetaException {
+ @Override public int getDatabaseCount() throws MetaException {
return rawStore.getDatabaseCount();
}
- @Override
- public List<SQLPrimaryKey> getPrimaryKeys(String catName, String db_name, String tbl_name)
+ @Override public List<SQLPrimaryKey> getPrimaryKeys(String catName, String dbName, String tblName)
throws MetaException {
// TODO constraintCache
- return rawStore.getPrimaryKeys(catName, db_name, tbl_name);
+ return rawStore.getPrimaryKeys(catName, dbName, tblName);
}
- @Override
- public List<SQLForeignKey> getForeignKeys(String catName, String parent_db_name,
- String parent_tbl_name, String foreign_db_name, String foreign_tbl_name)
- throws MetaException {
+ @Override public List<SQLForeignKey> getForeignKeys(String catName, String parentDbName, String parentTblName,
+ String foreignDbName, String foreignTblName) throws MetaException {
// TODO constraintCache
- return rawStore.getForeignKeys(catName, parent_db_name, parent_tbl_name, foreign_db_name, foreign_tbl_name);
+ return rawStore.getForeignKeys(catName, parentDbName, parentTblName, foreignDbName, foreignTblName);
}
- @Override
- public List<SQLUniqueConstraint> getUniqueConstraints(String catName, String db_name, String tbl_name)
+ @Override public List<SQLUniqueConstraint> getUniqueConstraints(String catName, String dbName, String tblName)
throws MetaException {
// TODO constraintCache
- return rawStore.getUniqueConstraints(catName, db_name, tbl_name);
+ return rawStore.getUniqueConstraints(catName, dbName, tblName);
}
- @Override
- public List<SQLNotNullConstraint> getNotNullConstraints(String catName, String db_name, String tbl_name)
+ @Override public List<SQLNotNullConstraint> getNotNullConstraints(String catName, String dbName, String tblName)
throws MetaException {
// TODO constraintCache
- return rawStore.getNotNullConstraints(catName, db_name, tbl_name);
+ return rawStore.getNotNullConstraints(catName, dbName, tblName);
}
- @Override
- public List<SQLDefaultConstraint> getDefaultConstraints(String catName, String db_name, String tbl_name)
+ @Override public List<SQLDefaultConstraint> getDefaultConstraints(String catName, String dbName, String tblName)
throws MetaException {
// TODO constraintCache
- return rawStore.getDefaultConstraints(catName, db_name, tbl_name);
+ return rawStore.getDefaultConstraints(catName, dbName, tblName);
}
- @Override
- public List<SQLCheckConstraint> getCheckConstraints(String catName, String db_name, String tbl_name)
+ @Override public List<SQLCheckConstraint> getCheckConstraints(String catName, String dbName, String tblName)
throws MetaException {
// TODO constraintCache
- return rawStore.getCheckConstraints(catName, db_name, tbl_name);
+ return rawStore.getCheckConstraints(catName, dbName, tblName);
}
- @Override
- public List<String> createTableWithConstraints(Table tbl, List<SQLPrimaryKey> primaryKeys,
+ @Override public List<String> createTableWithConstraints(Table tbl, List<SQLPrimaryKey> primaryKeys,
List<SQLForeignKey> foreignKeys, List<SQLUniqueConstraint> uniqueConstraints,
- List<SQLNotNullConstraint> notNullConstraints,
- List<SQLDefaultConstraint> defaultConstraints,
+ List<SQLNotNullConstraint> notNullConstraints, List<SQLDefaultConstraint> defaultConstraints,
List<SQLCheckConstraint> checkConstraints) throws InvalidObjectException, MetaException {
// TODO constraintCache
- List<String> constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys,
- foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints);
+ List<String> constraintNames = rawStore
+ .createTableWithConstraints(tbl, primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints,
+ defaultConstraints, checkConstraints);
// in case of event based cache update, cache is updated during commit.
if (canUseEvents) {
return constraintNames;
}
String dbName = normalizeIdentifier(tbl.getDbName());
String tblName = normalizeIdentifier(tbl.getTableName());
- String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) :
- DEFAULT_CATALOG_NAME;
+ String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) : DEFAULT_CATALOG_NAME;
if (!shouldCacheTable(catName, dbName, tblName)) {
return constraintNames;
}
sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getCatName()),
- StringUtils.normalizeIdentifier(tbl.getDbName()),
- StringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
+ StringUtils.normalizeIdentifier(tbl.getDbName()), StringUtils.normalizeIdentifier(tbl.getTableName()), tbl);
return constraintNames;
}
- @Override
- public void dropConstraint(String catName, String dbName, String tableName,
- String constraintName, boolean missingOk) throws NoSuchObjectException {
+ @Override public void dropConstraint(String catName, String dbName, String tableName, String constraintName,
+ boolean missingOk) throws NoSuchObjectException {
// TODO constraintCache
rawStore.dropConstraint(catName, dbName, tableName, constraintName, missingOk);
}
- @Override
- public List<String> addPrimaryKeys(List<SQLPrimaryKey> pks)
- throws InvalidObjectException, MetaException {
+ @Override public List<String> addPrimaryKeys(List<SQLPrimaryKey> pks) throws InvalidObjectException, MetaException {
// TODO constraintCache
return rawStore.addPrimaryKeys(pks);
}
- @Override
- public List<String> addForeignKeys(List<SQLForeignKey> fks)
- throws InvalidObjectException, MetaException {
+ @Override public List<String> addForeignKeys(List<SQLForeignKey> fks) throws InvalidObjectException, MetaException {
// TODO constraintCache
return rawStore.addForeignKeys(fks);
}
- @Override
- public List<String> addUniqueConstraints(List<SQLUniqueConstraint> uks)
+ @Override public List<String> addUniqueConstraints(List<SQLUniqueConstraint> uks)
throws InvalidObjectException, MetaException {
// TODO constraintCache
return rawStore.addUniqueConstraints(uks);
}
- @Override
- public List<String> addNotNullConstraints(List<SQLNotNullConstraint> nns)
+ @Override public List<String> addNotNullConstraints(List<SQLNotNullConstraint> nns)
throws InvalidObjectException, MetaException {
// TODO constraintCache
return rawStore.addNotNullConstraints(nns);
}
- @Override
- public List<String> addDefaultConstraints(List<SQLDefaultConstraint> nns)
+ @Override public List<String> addDefaultConstraints(List<SQLDefaultConstraint> nns)
throws InvalidObjectException, MetaException {
// TODO constraintCache
return rawStore.addDefaultConstraints(nns);
}
- @Override
- public List<String> addCheckConstraints(List<SQLCheckConstraint> nns)
+ @Override public List<String> addCheckConstraints(List<SQLCheckConstraint> nns)
throws InvalidObjectException, MetaException {
// TODO constraintCache
return rawStore.addCheckConstraints(nns);
}
// TODO - not clear if we should cache these or not. For now, don't bother
- @Override
- public void createISchema(ISchema schema)
+ @Override public void createISchema(ISchema schema)
throws AlreadyExistsException, NoSuchObjectException, MetaException {
rawStore.createISchema(schema);
}
- @Override
- public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String catName, String dbName)
+ @Override public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String catName, String dbName)
throws MetaException, NoSuchObjectException {
return rawStore.getPartitionColStatsForDatabase(catName, dbName);
}
- @Override
- public void alterISchema(ISchemaName schemaName, ISchema newSchema)
+ @Override public void alterISchema(ISchemaName schemaName, ISchema newSchema)
throws NoSuchObjectException, MetaException {
rawStore.alterISchema(schemaName, newSchema);
}
- @Override
- public ISchema getISchema(ISchemaName schemaName) throws MetaException {
+ @Override public ISchema getISchema(ISchemaName schemaName) throws MetaException {
return rawStore.getISchema(schemaName);
}
- @Override
- public void dropISchema(ISchemaName schemaName) throws NoSuchObjectException, MetaException {
+ @Override public void dropISchema(ISchemaName schemaName) throws NoSuchObjectException, MetaException {
rawStore.dropISchema(schemaName);
}
- @Override
- public void addSchemaVersion(SchemaVersion schemaVersion) throws
- AlreadyExistsException, InvalidObjectException, NoSuchObjectException, MetaException {
+ @Override public void addSchemaVersion(SchemaVersion schemaVersion)
+ throws AlreadyExistsException, InvalidObjectException, NoSuchObjectException, MetaException {
rawStore.addSchemaVersion(schemaVersion);
}
- @Override
- public void alterSchemaVersion(SchemaVersionDescriptor version, SchemaVersion newVersion) throws
- NoSuchObjectException, MetaException {
+ @Override public void alterSchemaVersion(SchemaVersionDescriptor version, SchemaVersion newVersion)
+ throws NoSuchObjectException, MetaException {
rawStore.alterSchemaVersion(version, newVersion);
}
- @Override
- public SchemaVersion getSchemaVersion(SchemaVersionDescriptor version) throws MetaException {
+ @Override public SchemaVersion getSchemaVersion(SchemaVersionDescriptor version) throws MetaException {
return rawStore.getSchemaVersion(version);
}
- @Override
- public SchemaVersion getLatestSchemaVersion(ISchemaName schemaName) throws MetaException {
+ @Override public SchemaVersion getLatestSchemaVersion(ISchemaName schemaName) throws MetaException {
return rawStore.getLatestSchemaVersion(schemaName);
}
- @Override
- public List<SchemaVersion> getAllSchemaVersion(ISchemaName schemaName) throws MetaException {
+ @Override public List<SchemaVersion> getAllSchemaVersion(ISchemaName schemaName) throws MetaException {
return rawStore.getAllSchemaVersion(schemaName);
}
- @Override
- public List<SchemaVersion> getSchemaVersionsByColumns(String colName, String colNamespace,
- String type) throws MetaException {
+ @Override public List<SchemaVersion> getSchemaVersionsByColumns(String colName, String colNamespace, String type)
+ throws MetaException {
return rawStore.getSchemaVersionsByColumns(colName, colNamespace, type);
}
- @Override
- public void dropSchemaVersion(SchemaVersionDescriptor version) throws NoSuchObjectException,
- MetaException {
+ @Override public void dropSchemaVersion(SchemaVersionDescriptor version) throws NoSuchObjectException, MetaException {
rawStore.dropSchemaVersion(version);
}
- @Override
- public SerDeInfo getSerDeInfo(String serDeName) throws NoSuchObjectException, MetaException {
+ @Override public SerDeInfo getSerDeInfo(String serDeName) throws NoSuchObjectException, MetaException {
return rawStore.getSerDeInfo(serDeName);
}
- @Override
- public void addSerde(SerDeInfo serde) throws AlreadyExistsException, MetaException {
+ @Override public void addSerde(SerDeInfo serde) throws AlreadyExistsException, MetaException {
rawStore.addSerde(serde);
}
@@ -2854,124 +2622,99 @@ public class CachedStore implements RawStore, Configurable {
return rawStore;
}
- @VisibleForTesting
- public void setRawStore(RawStore rawStore) {
+ @VisibleForTesting public void setRawStore(RawStore rawStore) {
this.rawStore = rawStore;
}
- @Override
- public String getMetastoreDbUuid() throws MetaException {
+ @Override public String getMetastoreDbUuid() throws MetaException {
return rawStore.getMetastoreDbUuid();
}
- @Override
- public void createResourcePlan(WMResourcePlan resourcePlan, String copyFrom, int defaultPoolSize)
+ @Override public void createResourcePlan(WMResourcePlan resourcePlan, String copyFrom, int defaultPoolSize)
throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException {
rawStore.createResourcePlan(resourcePlan, copyFrom, defaultPoolSize);
}
- @Override
- public WMFullResourcePlan getResourcePlan(String name, String ns)
+ @Override public WMFullResourcePlan getResourcePlan(String name, String ns)
throws NoSuchObjectException, MetaException {
return rawStore.getResourcePlan(name, ns);
}
- @Override
- public List<WMResourcePlan> getAllResourcePlans(String ns) throws MetaException {
+ @Override public List<WMResourcePlan> getAllResourcePlans(String ns) throws MetaException {
return rawStore.getAllResourcePlans(ns);
}
- @Override
- public WMFullResourcePlan alterResourcePlan(String name, String ns, WMNullableResourcePlan resourcePlan,
- boolean canActivateDisabled, boolean canDeactivate, boolean isReplace)
- throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException,
- MetaException {
- return rawStore.alterResourcePlan(
- name, ns, resourcePlan, canActivateDisabled, canDeactivate, isReplace);
+ @Override public WMFullResourcePlan alterResourcePlan(String name, String ns, WMNullableResourcePlan resourcePlan,
+ boolean canActivateDisabled, boolean canDeactivate, boolean isReplace)
+ throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
+ return rawStore.alterResourcePlan(name, ns, resourcePlan, canActivateDisabled, canDeactivate, isReplace);
}
- @Override
- public WMFullResourcePlan getActiveResourcePlan(String ns) throws MetaException {
+ @Override public WMFullResourcePlan getActiveResourcePlan(String ns) throws MetaException {
return rawStore.getActiveResourcePlan(ns);
}
- @Override
- public WMValidateResourcePlanResponse validateResourcePlan(String name, String ns)
+ @Override public WMValidateResourcePlanResponse validateResourcePlan(String name, String ns)
throws NoSuchObjectException, InvalidObjectException, MetaException {
return rawStore.validateResourcePlan(name, ns);
}
- @Override
- public void dropResourcePlan(String name, String ns) throws NoSuchObjectException, MetaException {
+ @Override public void dropResourcePlan(String name, String ns) throws NoSuchObjectException, MetaException {
rawStore.dropResourcePlan(name, ns);
}
- @Override
- public void createWMTrigger(WMTrigger trigger)
- throws AlreadyExistsException, MetaException, NoSuchObjectException,
- InvalidOperationException {
+ @Override public void createWMTrigger(WMTrigger trigger)
+ throws AlreadyExistsException, MetaException, NoSuchObjectException, InvalidOperationException {
rawStore.createWMTrigger(trigger);
}
- @Override
- public void alterWMTrigger(WMTrigger trigger)
+ @Override public void alterWMTrigger(WMTrigger trigger)
throws NoSuchObjectException, InvalidOperationException, MetaException {
rawStore.alterWMTrigger(trigger);
}
- @Override
- public void dropWMTrigger(String resourcePlanName, String triggerName, String ns)
+ @Override public void dropWMTrigger(String resourcePlanName, String triggerName, String ns)
throws NoSuchObjectException, InvalidOperationException, MetaException {
rawStore.dropWMTrigger(resourcePlanName, triggerName, ns);
}
- @Override
- public List<WMTrigger> getTriggersForResourcePlan(String resourcePlanName, String ns)
+ @Override public List<WMTrigger> getTriggersForResourcePlan(String resourcePlanName, String ns)
throws NoSuchObjectException, MetaException {
return rawStore.getTriggersForResourcePlan(resourcePlanName, ns);
}
- @Override
- public void createPool(WMPool pool) throws AlreadyExistsException, NoSuchObjectException,
- InvalidOperationException, MetaException {
+ @Override public void createPool(WMPool pool)
+ throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
rawStore.createPool(pool);
}
- @Override
- public void alterPool(WMNullablePool pool, String poolPath) throws AlreadyExistsException,
- NoSuchObjectException, InvalidOperationException, MetaException {
+ @Override public void alterPool(WMNullablePool pool, String poolPath)
+ throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
rawStore.alterPool(pool, poolPath);
}
- @Override
- public void dropWMPool(String resourcePlanName, String poolPath, String ns)
+ @Override public void dropWMPool(String resourcePlanName, String poolPath, String ns)
throws NoSuchObjectException, InvalidOperationException, MetaException {
rawStore.dropWMPool(resourcePlanName, poolPath, ns);
}
- @Override
- public void createOrUpdateWMMapping(WMMapping mapping, boolean update)
- throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException,
- MetaException {
+ @Override public void createOrUpdateWMMapping(WMMapping mapping, boolean update)
+ throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
rawStore.createOrUpdateWMMapping(mapping, update);
}
- @Override
- public void dropWMMapping(WMMapping mapping)
+ @Override public void dropWMMapping(WMMapping mapping)
throws NoSuchObjectException, InvalidOperationException, MetaException {
rawStore.dropWMMapping(mapping);
}
- @Override
- public void createWMTriggerToPoolMapping(String resourcePlanName, String triggerName,
- String poolPath, String ns) throws AlreadyExistsException, NoSuchObjectException,
- InvalidOperationException, MetaException {
+ @Override public void createWMTriggerToPoolMapping(String resourcePlanName, String triggerName, String poolPath,
+ String ns) throws AlreadyExistsException, NoSuchObjectException, InvalidOperationException, MetaException {
rawStore.createWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath, ns);
}
- @Override
- public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerName,
- String poolPath, String ns) throws NoSuchObjectException, InvalidOperationException, MetaException {
+ @Override public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerName, String poolPath,
+ String ns) throws NoSuchObjectException, InvalidOperationException, MetaException {
rawStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath, ns);
}
@@ -2979,14 +2722,12 @@ public class CachedStore implements RawStore, Configurable {
return sharedCache.getUpdateCount();
}
- @Override
- public void cleanWriteNotificationEvents(int olderThan) {
+ @Override public void cleanWriteNotificationEvents(int olderThan) {
rawStore.cleanWriteNotificationEvents(olderThan);
}
-
- @Override
- public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+ @Override public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName)
+ throws MetaException {
return rawStore.getAllWriteEventInfo(txnId, dbName, tableName);
}
@@ -2996,8 +2737,8 @@ public class CachedStore implements RawStore, Configurable {
LOG.debug("Trying to match: {} against blacklist pattern: {}", str, pattern);
Matcher matcher = pattern.matcher(str);
if (matcher.matches()) {
- LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(),
- matcher.start(), matcher.end());
+ LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(), matcher.start(),
+ matcher.end());
return false;
}
}
@@ -3010,8 +2751,8 @@ public class CachedStore implements RawStore, Configurable {
LOG.debug("Trying to match: {} against whitelist pattern: {}", str, pattern);
Matcher matcher = pattern.matcher(str);
if (matcher.matches()) {
- LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(),
- matcher.start(), matcher.end());
+ LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(), matcher.start(),
+ matcher.end());
return true;
}
}
@@ -3052,45 +2793,39 @@ public class CachedStore implements RawStore, Configurable {
}
static boolean isBlacklistWhitelistEmpty(Configuration conf) {
- return MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST)
- .equals(".*")
- && MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST).isEmpty();
+ return
+ MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST).equals(".*")
+ && MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)
+ .isEmpty();
}
- @VisibleForTesting
- void resetCatalogCache() {
+ @VisibleForTesting void resetCatalogCache() {
sharedCache.resetCatalogCache();
setCachePrewarmedState(false);
}
- @Override
- public void addRuntimeStat(RuntimeStat stat) throws MetaException {
+ @Override public void addRuntimeStat(RuntimeStat stat) throws MetaException {
rawStore.addRuntimeStat(stat);
}
- @Override
- public List<RuntimeStat> getRuntimeStats(int maxEntries, int maxCreateTime) throws MetaException {
+ @Override public List<RuntimeStat> getRuntimeStats(int maxEntries, int maxCreateTime) throws MetaException {
return rawStore.getRuntimeStats(maxEntries, maxCreateTime);
}
- @Override
- public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
+ @Override public int deleteRuntimeStats(int maxRetainSecs) throws MetaException {
return rawStore.deleteRuntimeStats(maxRetainSecs);
}
- @Override
- public List<TableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
+ @Override public List<TableName> getTableNamesWithStats() throws MetaException, NoSuchObjectException {
return rawStore.getTableNamesWithStats();
}
- @Override
- public List<TableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
+ @Override public List<TableName> getAllTableNamesForStats() throws MetaException, NoSuchObjectException {
return rawStore.getAllTableNamesForStats();
}
- @Override
- public Map<String, List<String>> getPartitionColsWithStats(String catName,
- String dbName, String tableName) throws MetaException, NoSuchObjectException {
+ @Override public Map<String, List<String>> getPartitionColsWithStats(String catName, String dbName, String tableName)
+ throws MetaException, NoSuchObjectException {
return rawStore.getPartitionColsWithStats(catName, dbName, tableName);
}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index 2c7354a..45b1b0d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.metastore.cache;
+import java.lang.reflect.Field;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -27,19 +28,31 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.TreeMap;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.HiveMetaException;
import org.apache.hadoop.hive.metastore.ObjectStore;
import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.Catalog;
@@ -52,21 +65,23 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import static org.apache.hadoop.hive.metastore.cache.CachedStore.partNameToVals;
import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
public class SharedCache {
private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true);
+ private static final long MAX_DEFAULT_CACHE_SIZE = 1024 * 1024;
private boolean isCatalogCachePrewarmed = false;
private Map<String, Catalog> catalogCache = new TreeMap<>();
private HashSet<String> catalogsDeletedDuringPrewarm = new HashSet<>();
@@ -79,17 +94,22 @@ public class SharedCache {
private AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
// For caching TableWrapper objects. Key is aggregate of database name and table name
- private Map<String, TableWrapper> tableCache = new TreeMap<>();
+ private Cache<String, TableWrapper> tableCache = null;
+ private int concurrencyLevel = -1;
+ private int refreshInterval = 10000;
+
private boolean isTableCachePrewarmed = false;
private HashSet<String> tablesDeletedDuringPrewarm = new HashSet<>();
private AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<>();
private static MessageDigest md;
- static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
private AtomicLong cacheUpdateCount = new AtomicLong(0);
- private static long maxCacheSizeInBytes = -1;
- private static long currentCacheSizeInBytes = 0;
- private static HashMap<Class<?>, ObjectEstimator> sizeEstimators = null;
+ private long maxCacheSizeInBytes = -1;
+ private HashMap<Class<?>, ObjectEstimator> sizeEstimators = null;
+ private Set<String> tableToUpdateSize = new ConcurrentHashSet<>();
+ private ScheduledExecutorService executor = null;
+ private Map<String, Integer> tableSizeMap = null;
enum StatsType {
ALL(0), ALLBUTDEFAULT(1), PARTIAL(2);
@@ -105,6 +125,10 @@ public class SharedCache {
}
}
+ private enum MemberName {
+ TABLE_COL_STATS_CACHE, PARTITION_CACHE, PARTITION_COL_STATS_CACHE, AGGR_COL_STATS_CACHE
+ }
+
static {
try {
md = MessageDigest.getInstance("MD5");
@@ -113,16 +137,98 @@ public class SharedCache {
}
}
+ static class TableWrapperSizeUpdater implements Runnable {
+ private Set<String> setToUpdate;
+ private Cache<String, TableWrapper> cache;
+
+ TableWrapperSizeUpdater(Set<String> set, Cache<String, TableWrapper> cache1) {
+ setToUpdate = set;
+ cache = cache1;
+ }
+
+ @Override
+ public void run() {
+ for (String s : setToUpdate) {
+ refreshTableWrapperInCache(s);
+ }
+ setToUpdate.clear();
+ }
+
+ void refreshTableWrapperInCache(String tblKey) {
+ TableWrapper tw = cache.getIfPresent(tblKey);
+ if (tw != null) {
+ //cache will re-weigh the TableWrapper and record new weight.
+ cache.put(tblKey, tw);
+ }
+ }
+ }
+
+ //concurrency level of table cache. Set to -1 to let Guava use default.
+ public void setConcurrencyLevel(int cl){
+ this.concurrencyLevel = cl;
+ }
+ //number of miliseconds between size updates.
+ public void setRefreshInterval(int interval){
+ this.refreshInterval = interval;
+ }
+ //set the table size map to fake table size. This is for testing only.
+ public void setTableSizeMap(Map<String, Integer> map){
+ this.tableSizeMap = map;
+ }
+
+ public void initialize(Configuration conf) {
+ maxCacheSizeInBytes = MetastoreConf.getSizeVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY);
- public void initialize(long maxSharedCacheSizeInBytes) {
- maxCacheSizeInBytes = maxSharedCacheSizeInBytes;
// Create estimators
if ((maxCacheSizeInBytes > 0) && (sizeEstimators == null)) {
sizeEstimators = IncrementalObjectSizeEstimator.createEstimators(SharedCache.class);
}
+
+ if (tableCache == null) {
+ CacheBuilder<String, TableWrapper> b = CacheBuilder.newBuilder()
+ .maximumWeight(maxCacheSizeInBytes > 0 ? maxCacheSizeInBytes : MAX_DEFAULT_CACHE_SIZE)
+ .weigher(new Weigher<String, TableWrapper>() {
+ @Override
+ public int weigh(String key, TableWrapper value) {
+ return value.getSize();
+ }
+ }).removalListener(new RemovalListener<String, TableWrapper>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, TableWrapper> notification) {
+ LOG.debug("Eviction happened for table " + notification.getKey());
+ LOG.debug("current table cache contains " + tableCache.size() + "entries");
+ TableWrapper tblWrapper = notification.getValue();
+ RemovalCause cause = notification.getCause();
+ if (cause.equals(RemovalCause.COLLECTED) || cause.equals(RemovalCause.EXPIRED)) {
+ byte[] sdHash = tblWrapper.getSdHash();
+ if (sdHash != null) {
+ decrSd(sdHash);
+ }
+ }
+ }
+ });
+
+ if (concurrencyLevel > 0) {
+ b.concurrencyLevel(concurrencyLevel);
+ }
+ tableCache = b.recordStats().build();
+ }
+
+ executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setName("SharedCache table size updater: Thread-" + t.getId());
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ executor.scheduleAtFixedRate(new TableWrapperSizeUpdater(tableToUpdateSize, tableCache), 0,
+ refreshInterval, TimeUnit.MILLISECONDS);
+
}
- private static ObjectEstimator getMemorySizeEstimator(Class<?> clazz) {
+ private ObjectEstimator getMemorySizeEstimator(Class<?> clazz) {
ObjectEstimator estimator = sizeEstimators.get(clazz);
if (estimator == null) {
IncrementalObjectSizeEstimator.createEstimators(clazz, sizeEstimators);
@@ -131,21 +237,43 @@ public class SharedCache {
return estimator;
}
- static class TableWrapper {
- Table t;
- String location;
- Map<String, String> parameters;
- byte[] sdHash;
- ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true);
+ public int getObjectSize(Class<?> clazz, Object obj) {
+ if (sizeEstimators == null) {
+ return 0;
+ }
+
+ try {
+ ObjectEstimator oe = getMemorySizeEstimator(clazz);
+ return oe.estimate(obj, sizeEstimators);
+ } catch (Exception e) {
+ LOG.error("Error while getting object size.", e);
+ }
+ return 0;
+ }
+
+ enum SizeMode {
+ Delta, Snapshot
+ }
+
+ class TableWrapper {
+ private Table t;
+ private String location;
+ private Map<String, String> parameters;
+ private byte[] sdHash;
+ private int otherSize;
+ private int tableColStatsCacheSize;
+ private int partitionCacheSize;
+ private int partitionColStatsCacheSize;
+ private int aggrColStatsCacheSize;
+
+ private ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true);
// For caching column stats for an unpartitioned table
// Key is column name and the value is the col stat object
- private Map<String, ColumnStatisticsObj> tableColStatsCache =
- new ConcurrentHashMap<String, ColumnStatisticsObj>();
+ private Map<String, ColumnStatisticsObj> tableColStatsCache = new ConcurrentHashMap<String, ColumnStatisticsObj>();
private AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
// For caching partition objects
// Ket is partition values and the value is a wrapper around the partition object
- private Map<String, PartitionWrapper> partitionCache =
- new ConcurrentHashMap<String, PartitionWrapper>();
+ private Map<String, PartitionWrapper> partitionCache = new ConcurrentHashMap<String, PartitionWrapper>();
private AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
// For caching column stats for a partitioned table
// Key is aggregate of partition values, column name and the value is the col stat object
@@ -164,6 +292,52 @@ public class SharedCache {
this.sdHash = sdHash;
this.location = location;
this.parameters = parameters;
+ this.tableColStatsCacheSize = 0;
+ this.partitionCacheSize = 0;
+ this.partitionColStatsCacheSize = 0;
+ this.aggrColStatsCacheSize = 0;
+ this.otherSize = getTableWrapperSizeWithoutMaps();
+ }
+
+ private int getTableWrapperSizeWithoutMaps() {
+ Class<?> clazz = TableWrapper.class;
+ Field[] fields = clazz.getDeclaredFields();
+ int size = 0;
+ for (Field field : fields) {
+ if (field.getType().equals(ConcurrentHashMap.class)) {
+ continue;
+ }
+ if (field.getType().equals(SharedCache.class)) {
+ continue;
+ }
+ try {
+ field.setAccessible(true);
+ Object val = field.get(this);
+ ObjectEstimator oe = getMemorySizeEstimator(field.getType());
+ if (oe != null) {
+ size += oe.estimate(val, sizeEstimators);
+ }
+ } catch (Exception ex) {
+ LOG.error("Not able to estimate size.", ex);
+ }
+ }
+
+ return size;
+ }
+
+ public int getSize() {
+ //facilitate testing only. In production we won't use tableSizeMap at all.
+ if (tableSizeMap != null) {
+ String tblKey = CacheUtils.buildTableKey(this.t.getCatName(), this.t.getDbName(), this.t.getTableName());
+ if (tableSizeMap.containsKey(tblKey)) {
+ return tableSizeMap.get(tblKey);
+ }
+ }
+ if (sizeEstimators == null) {
+ return 0;
+ }
+ return otherSize + tableColStatsCacheSize + partitionCacheSize + partitionColStatsCacheSize
+ + aggrColStatsCacheSize;
}
public Table getTable() {
@@ -202,15 +376,69 @@ public class SharedCache {
return catName.equals(t.getCatName()) && dbName.equals(t.getDbName());
}
+ private void updateMemberSize(MemberName mn, Integer size, SizeMode mode) {
+ if (sizeEstimators == null) {
+ return;
+ }
+
+ switch (mn) {
+ case TABLE_COL_STATS_CACHE:
+ if (mode == SizeMode.Delta) {
+ tableColStatsCacheSize += size;
+ } else {
+ tableColStatsCacheSize = size;
+ }
+ break;
+ case PARTITION_CACHE:
+ if (mode == SizeMode.Delta) {
+ partitionCacheSize += size;
+ } else {
+ partitionCacheSize = size;
+ }
+ break;
+ case PARTITION_COL_STATS_CACHE:
+ if (mode == SizeMode.Delta) {
+ partitionColStatsCacheSize += size;
+ } else {
+ partitionColStatsCacheSize = size;
+ }
+ break;
+ case AGGR_COL_STATS_CACHE:
+ if (mode == SizeMode.Delta) {
+ aggrColStatsCacheSize += size;
+ } else {
+ aggrColStatsCacheSize = size;
+ }
+ break;
+ default:
+ break;
+ }
+
+ String tblKey = getTblKey();
+ tableToUpdateSize.add(tblKey);
+ }
+
+ String getTblKey() {
+ Table tbl = this.t;
+ String catName = tbl.getCatName();
+ String dbName = tbl.getDbName();
+ String tblName = tbl.getTableName();
+ return CacheUtils.buildTableKey(catName, dbName, tblName);
+ }
+
void cachePartition(Partition part, SharedCache sharedCache) {
try {
tableLock.writeLock().lock();
PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+ int size = getObjectSize(PartitionWrapper.class, wrapper);
+ updateMemberSize(MemberName.PARTITION_CACHE, size, SizeMode.Delta);
isPartitionCacheDirty.set(true);
+
// Invalidate cached aggregate stats
if (!aggrColStatsCache.isEmpty()) {
aggrColStatsCache.clear();
+ updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
}
} finally {
tableLock.writeLock().unlock();
@@ -220,31 +448,21 @@ public class SharedCache {
boolean cachePartitions(Iterable<Partition> parts, SharedCache sharedCache, boolean fromPrewarm) {
try {
tableLock.writeLock().lock();
+ int size = 0;
for (Partition part : parts) {
- PartitionWrapper ptnWrapper = makePartitionWrapper(part, sharedCache);
- if (maxCacheSizeInBytes > 0) {
- ObjectEstimator ptnWrapperSizeEstimator =
- getMemorySizeEstimator(PartitionWrapper.class);
- long estimatedMemUsage = ptnWrapperSizeEstimator.estimate(ptnWrapper, sizeEstimators);
- LOG.trace("Memory needed to cache Partition: {} is {} bytes", part, estimatedMemUsage);
- if (isCacheMemoryFull(estimatedMemUsage)) {
- LOG.debug(
- "Cannot cache Partition: {}. Memory needed is {} bytes, whereas the memory remaining is: {} bytes.",
- part, estimatedMemUsage, (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes));
- return false;
- } else {
- currentCacheSizeInBytes += estimatedMemUsage;
- }
- LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes);
- }
- partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), ptnWrapper);
+ PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+ partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+ size += getObjectSize(PartitionWrapper.class, wrapper);
+
if (!fromPrewarm) {
isPartitionCacheDirty.set(true);
}
}
+ updateMemberSize(MemberName.PARTITION_CACHE, size, SizeMode.Delta);
// Invalidate cached aggregate stats
if (!aggrColStatsCache.isEmpty()) {
aggrColStatsCache.clear();
+ updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
}
return true;
} finally {
@@ -300,30 +518,36 @@ public class SharedCache {
Partition part = null;
try {
tableLock.writeLock().lock();
- PartitionWrapper wrapper =
- partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
+ PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
if (wrapper == null) {
return null;
}
isPartitionCacheDirty.set(true);
+
+ int size = getObjectSize(PartitionWrapper.class, wrapper);
+ updateMemberSize(MemberName.PARTITION_CACHE, -1 * size, SizeMode.Delta);
+
part = CacheUtils.assemble(wrapper, sharedCache);
if (wrapper.getSdHash() != null) {
sharedCache.decrSd(wrapper.getSdHash());
}
// Remove col stats
String partialKey = CacheUtils.buildPartitionCacheKey(partVal);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator = partitionColStatsCache.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, ColumnStatisticsObj> entry = iterator.next();
String key = entry.getKey();
if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+ int statsSize = getObjectSize(ColumnStatisticsObj.class, entry.getValue());
+ updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, -1 * statsSize, SizeMode.Delta);
iterator.remove();
}
}
+
// Invalidate cached aggregate stats
if (!aggrColStatsCache.isEmpty()) {
aggrColStatsCache.clear();
+ updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, 0, SizeMode.Snapshot);
}
} finally {
tableLock.writeLock().unlock();
@@ -353,7 +577,7 @@ public class SharedCache {
}
public void alterPartitionAndStats(List<String> partVals, SharedCache sharedCache, long writeId,
- Map<String,String> parameters, List<ColumnStatisticsObj> colStatsObjs) {
+ Map<String, String> parameters, List<ColumnStatisticsObj> colStatsObjs) {
try {
tableLock.writeLock().lock();
PartitionWrapper partitionWrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
@@ -372,8 +596,7 @@ public class SharedCache {
}
}
- public void alterPartitions(List<List<String>> partValsList, List<Partition> newParts,
- SharedCache sharedCache) {
+ public void alterPartitions(List<List<String>> partValsList, List<Partition> newParts, SharedCache sharedCache) {
try {
tableLock.writeLock().lock();
for (int i = 0; i < partValsList.size(); i++) {
@@ -390,6 +613,7 @@ public class SharedCache {
Map<String, PartitionWrapper> newPartitionCache = new HashMap<String, PartitionWrapper>();
try {
tableLock.writeLock().lock();
+ int size = 0;
for (Partition part : partitions) {
if (isPartitionCacheDirty.compareAndSet(true, false)) {
LOG.debug("Skipping partition cache update for table: " + getTable().getTableName()
@@ -405,8 +629,10 @@ public class SharedCache {
}
wrapper = makePartitionWrapper(part, sharedCache);
newPartitionCache.put(key, wrapper);
+ size += getObjectSize(PartitionWrapper.class, wrapper);
}
partitionCache = newPartitionCache;
+ updateMemberSize(MemberName.PARTITION_CACHE, size, SizeMode.Snapshot);
} finally {
tableLock.writeLock().unlock();
}
@@ -415,38 +641,23 @@ public class SharedCache {
public boolean updateTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
try {
tableLock.writeLock().lock();
+ int statsSize = 0;
for (ColumnStatisticsObj colStatObj : colStatsForTable) {
// Get old stats object if present
String key = colStatObj.getColName();
ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
if (oldStatsObj != null) {
// Update existing stat object's field
+ statsSize -= getObjectSize(ColumnStatisticsObj.class, oldStatsObj);
StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
} else {
// No stats exist for this key; add a new object to the cache
// TODO: get rid of deepCopy after making sure callers don't use references
- if (maxCacheSizeInBytes > 0) {
- ObjectEstimator tblColStatsSizeEstimator =
- getMemorySizeEstimator(ColumnStatisticsObj.class);
- long estimatedMemUsage =
- tblColStatsSizeEstimator.estimate(colStatObj, sizeEstimators);
- LOG.trace("Memory needed to cache Table Column Statistics Object: {} is {} bytes",
- colStatObj, estimatedMemUsage);
- if (isCacheMemoryFull(estimatedMemUsage)) {
- LOG.debug(
- "Cannot cache Table Column Statistics Object: {}. Memory needed is {} bytes, "
- + "whereas the memory remaining is: {} bytes.",
- colStatObj, estimatedMemUsage,
- (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes));
- return false;
- } else {
- currentCacheSizeInBytes += estimatedMemUsage;
- }
- LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes);
- }
tableColStatsCache.put(key, colStatObj.deepCopy());
}
+ statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj);
}
+ updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, statsSize, SizeMode.Delta);
isTableColStatsCacheDirty.set(true);
return true;
} finally {
@@ -455,29 +666,30 @@ public class SharedCache {
}
public void refreshTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
- Map<String, ColumnStatisticsObj> newTableColStatsCache =
- new HashMap<String, ColumnStatisticsObj>();
+ Map<String, ColumnStatisticsObj> newTableColStatsCache = new HashMap<String, ColumnStatisticsObj>();
try {
tableLock.writeLock().lock();
+ int statsSize = 0;
for (ColumnStatisticsObj colStatObj : colStatsForTable) {
if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping table col stats cache update for table: "
- + getTable().getTableName() + "; the table col stats list we have is dirty.");
+ LOG.debug("Skipping table col stats cache update for table: " + getTable().getTableName()
+ + "; the table col stats list we have is dirty.");
return;
}
String key = colStatObj.getColName();
// TODO: get rid of deepCopy after making sure callers don't use references
newTableColStatsCache.put(key, colStatObj.deepCopy());
+ statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj);
}
tableColStatsCache = newTableColStatsCache;
+ updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, statsSize, SizeMode.Snapshot);
} finally {
tableLock.writeLock().unlock();
}
}
public ColumnStatistics getCachedTableColStats(ColumnStatisticsDesc csd, List<String> colNames,
- String validWriteIds, boolean areTxnStatsSupported)
- throws MetaException {
+ String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
try {
tableLock.readLock().lock();
@@ -488,7 +700,7 @@ public class SharedCache {
}
}
return CachedStore.adjustColStatForGet(getTable().getParameters(), new ColumnStatistics(csd, colStatObjs),
- getTable().getWriteId(), validWriteIds, areTxnStatsSupported);
+ getTable().getWriteId(), validWriteIds, areTxnStatsSupported);
} finally {
tableLock.readLock().unlock();
}
@@ -499,8 +711,10 @@ public class SharedCache {
tableLock.writeLock().lock();
if (colName == null) {
tableColStatsCache.clear();
+ updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, 0, SizeMode.Snapshot);
} else {
tableColStatsCache.remove(colName);
+ updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, 0, SizeMode.Snapshot);
}
isTableColStatsCacheDirty.set(true);
} finally {
@@ -512,6 +726,7 @@ public class SharedCache {
try {
tableLock.writeLock().lock();
tableColStatsCache.clear();
+ updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, 0, SizeMode.Snapshot);
isTableColStatsCacheDirty.set(true);
} finally {
tableLock.writeLock().unlock();
@@ -522,7 +737,7 @@ public class SharedCache {
try {
tableLock.readLock().lock();
ColumnStatisticsObj statisticsObj =
- partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+ partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
if (statisticsObj == null || writeIdList == null) {
return new ColumStatsWithWriteId(-1, statisticsObj);
}
@@ -546,23 +761,22 @@ public class SharedCache {
}
}
- public List<ColumnStatistics> getPartColStatsList(List<String> partNames, List<String> colNames,
- String writeIdList, boolean txnStatSupported) throws MetaException {
+ public List<ColumnStatistics> getPartColStatsList(List<String> partNames, List<String> colNames, String writeIdList,
+ boolean txnStatSupported) throws MetaException {
List<ColumnStatistics> colStatObjs = new ArrayList<>();
try {
tableLock.readLock().lock();
Table tbl = getTable();
for (String partName : partNames) {
- ColumnStatisticsDesc csd = new ColumnStatisticsDesc(false,
- tbl.getDbName(), tbl.getTableName());
+ ColumnStatisticsDesc csd = new ColumnStatisticsDesc(false, tbl.getDbName(), tbl.getTableName());
csd.setCatName(tbl.getCatName());
csd.setPartName(partName);
csd.setLastAnalyzed(0); //TODO : Need to get last analysed. This is not being used by anybody now.
List<ColumnStatisticsObj> statObject = new ArrayList<>();
- List<String> partVal = Warehouse.getPartValuesFromPartName(partName);
+ List<String> partVal = Warehouse.getPartValuesFromPartName(partName);
for (String colName : colNames) {
ColumnStatisticsObj statisticsObj =
- partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+ partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
if (statisticsObj != null) {
statObject.add(statisticsObj);
} else {
@@ -576,17 +790,17 @@ public class SharedCache {
if (!txnStatSupported) {
columnStatistics.setIsStatsCompliant(false);
} else {
- PartitionWrapper wrapper =
- partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
+ PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
if (wrapper == null) {
columnStatistics.setIsStatsCompliant(false);
} else {
Partition partition = wrapper.getPartition();
- if (!ObjectStore.isCurrentStatsValidForTheQuery(partition.getParameters(),
- partition.getWriteId(), writeIdList, false)) {
+ if (!ObjectStore
+ .isCurrentStatsValidForTheQuery(partition.getParameters(), partition.getWriteId(), writeIdList,
+ false)) {
LOG.debug("The current cached store transactional partition column statistics for {}.{}.{} "
- + "(write ID {}) are not valid for current query ({})", tbl.getDbName(),
- tbl.getTableName(), partName, partition.getWriteId(), writeIdList);
+ + "(write ID {}) are not valid for current query ({})", tbl.getDbName(), tbl.getTableName(),
+ partName, partition.getWriteId(), writeIdList);
columnStatistics.setIsStatsCompliant(false);
}
}
@@ -600,46 +814,31 @@ public class SharedCache {
return colStatObjs;
}
- public boolean updatePartitionColStats(List<String> partVal,
- List<ColumnStatisticsObj> colStatsObjs) {
+ public boolean updatePartitionColStats(List<String> partVal, List<ColumnStatisticsObj> colStatsObjs) {
try {
tableLock.writeLock().lock();
+ int statsSize = 0;
for (ColumnStatisticsObj colStatObj : colStatsObjs) {
// Get old stats object if present
String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
if (oldStatsObj != null) {
// Update existing stat object's field
+ statsSize -= getObjectSize(ColumnStatisticsObj.class, oldStatsObj);
StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
} else {
// No stats exist for this key; add a new object to the cache
// TODO: get rid of deepCopy after making sure callers don't use references
- if (maxCacheSizeInBytes > 0) {
- ObjectEstimator ptnColStatsSizeEstimator =
- getMemorySizeEstimator(ColumnStatisticsObj.class);
- long estimatedMemUsage =
- ptnColStatsSizeEstimator.estimate(colStatObj, sizeEstimators);
- LOG.trace("Memory needed to cache Partition Column Statistics Object: {} is {} bytes",
- colStatObj, estimatedMemUsage);
- if (isCacheMemoryFull(estimatedMemUsage)) {
- LOG.debug(
- "Cannot cache Partition Column Statistics Object: {}. Memory needed is {} bytes, "
- + "whereas the memory remaining is: {} bytes.",
- colStatObj, estimatedMemUsage,
- (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes));
- return false;
- } else {
- currentCacheSizeInBytes += estimatedMemUsage;
- }
- LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes);
- }
partitionColStatsCache.put(key, colStatObj.deepCopy());
}
+ statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj);
}
+ updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, statsSize, SizeMode.Delta);
isPartitionColStatsCacheDirty.set(true);
// Invalidate cached aggregate stats
if (!aggrColStatsCache.isEmpty()) {
aggrColStatsCache.clear();
+ updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
}
} finally {
tableLock.writeLock().unlock();
@@ -650,11 +849,17 @@ public class SharedCache {
public void removePartitionColStats(List<String> partVals, String colName) {
try {
tableLock.writeLock().lock();
- partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName));
+ ColumnStatisticsObj statsObj =
+ partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName));
+ if (statsObj != null) {
+ int statsSize = getObjectSize(ColumnStatisticsObj.class, statsObj);
+ updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, -1 * statsSize, SizeMode.Delta);
+ }
isPartitionColStatsCacheDirty.set(true);
// Invalidate cached aggregate stats
if (!aggrColStatsCache.isEmpty()) {
aggrColStatsCache.clear();
+ updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
}
} finally {
tableLock.writeLock().unlock();
@@ -665,10 +870,12 @@ public class SharedCache {
try {
tableLock.writeLock().lock();
partitionColStatsCache.clear();
+ updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, 0, SizeMode.Snapshot);
isPartitionColStatsCacheDirty.set(true);
// Invalidate cached aggregate stats
if (!aggrColStatsCache.isEmpty()) {
aggrColStatsCache.clear();
+ updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot);
}
} finally {
tableLock.writeLock().unlock();
@@ -676,15 +883,15 @@ public class SharedCache {
}
public void refreshPartitionColStats(List<ColumnStatistics> partitionColStats) {
- Map<String, ColumnStatisticsObj> newPartitionColStatsCache =
- new HashMap<String, ColumnStatisticsObj>();
+ Map<String, ColumnStatisticsObj> newPartitionColStatsCache = new HashMap<String, ColumnStatisticsObj>();
try {
tableLock.writeLock().lock();
String tableName = StringUtils.normalizeIdentifier(getTable().getTableName());
+ int statsSize = 0;
for (ColumnStatistics cs : partitionColStats) {
if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping partition column stats cache update for table: "
- + getTable().getTableName() + "; the partition column stats list we have is dirty");
+ LOG.debug("Skipping partition column stats cache update for table: " + getTable().getTableName()
+ + "; the partition column stats list we have is dirty");
return;
}
List<String> partVal;
@@ -693,26 +900,26 @@ public class SharedCache {
List<ColumnStatisticsObj> colStatsObjs = cs.getStatsObj();
for (ColumnStatisticsObj colStatObj : colStatsObjs) {
if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping partition column stats cache update for table: "
- + getTable().getTableName() + "; the partition column list we have is dirty");
+ LOG.debug("Skipping partition column stats cache update for table: " + getTable().getTableName()
+ + "; the partition column list we have is dirty");
return;
}
- String key =
- CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+ String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
newPartitionColStatsCache.put(key, colStatObj.deepCopy());
+ statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj);
}
} catch (MetaException e) {
LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
}
}
partitionColStatsCache = newPartitionColStatsCache;
+ updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, statsSize, SizeMode.Snapshot);
} finally {
tableLock.writeLock().unlock();
}
}
- public List<ColumnStatisticsObj> getAggrPartitionColStats(List<String> colNames,
- StatsType statsType) {
+ public List<ColumnStatisticsObj> getAggrPartitionColStats(List<String> colNames, StatsType statsType) {
List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
try {
tableLock.readLock().lock();
@@ -739,12 +946,14 @@ public class SharedCache {
AggrStats aggrStatsAllButDefaultPartition) {
try {
tableLock.writeLock().lock();
+ int statsSize = 0;
if (aggrStatsAllPartitions != null) {
for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
if (statObj != null) {
List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
aggrColStatsCache.put(statObj.getColName(), aggrStats);
+ statsSize += getObjectSize(ColumnStatisticsObj.class, statObj);
}
}
}
@@ -756,9 +965,11 @@ public class SharedCache {
aggrStats = new ArrayList<ColumnStatisticsObj>();
}
aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+ statsSize += getObjectSize(ColumnStatisticsObj.class, statObj);
}
}
}
+ updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, statsSize, SizeMode.Snapshot);
isAggrPartitionColStatsCacheDirty.set(true);
} finally {
tableLock.writeLock().unlock();
@@ -767,10 +978,10 @@ public class SharedCache {
public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
AggrStats aggrStatsAllButDefaultPartition, SharedCache sharedCache, Map<List<String>, Long> partNameToWriteId) {
- Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
- new HashMap<String, List<ColumnStatisticsObj>>();
+ Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache = new HashMap<String, List<ColumnStatisticsObj>>();
try {
tableLock.writeLock().lock();
+ int statsSize = 0;
if (partNameToWriteId != null) {
for (Entry<List<String>, Long> partValuesWriteIdSet : partNameToWriteId.entrySet()) {
List<String> partValues = partValuesWriteIdSet.getKey();
@@ -784,8 +995,8 @@ public class SharedCache {
// skip updating the aggregate stats in the cache.
long writeId = partition.getWriteId();
if (writeId != partValuesWriteIdSet.getValue()) {
- LOG.info("Could not refresh the aggregate stat as partition " + partValues + " has write id " +
- partValuesWriteIdSet.getValue() + " instead of " + writeId);
+ LOG.info("Could not refresh the aggregate stat as partition " + partValues + " has write id "
+ + partValuesWriteIdSet.getValue() + " instead of " + writeId);
return;
}
}
@@ -793,22 +1004,23 @@ public class SharedCache {
if (aggrStatsAllPartitions != null) {
for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping aggregate stats cache update for table: "
- + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+ LOG.debug("Skipping aggregate stats cache update for table: " + getTable().getTableName()
+ + "; the aggregate stats list we have is dirty");
return;
}
if (statObj != null) {
List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
newAggrColStatsCache.put(statObj.getColName(), aggrStats);
+ statsSize += getObjectSize(ColumnStatisticsObj.class, statObj);
}
}
}
if (aggrStatsAllButDefaultPartition != null) {
for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping aggregate stats cache update for table: "
- + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+ LOG.debug("Skipping aggregate stats cache update for table: " + getTable().getTableName()
+ + "; the aggregate stats list we have is dirty");
return;
}
if (statObj != null) {
@@ -817,10 +1029,12 @@ public class SharedCache {
aggrStats = new ArrayList<ColumnStatisticsObj>();
}
aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+ statsSize += getObjectSize(ColumnStatisticsObj.class, statObj);
}
}
}
aggrColStatsCache = newAggrColStatsCache;
+ updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, statsSize, SizeMode.Snapshot);
} finally {
tableLock.writeLock().unlock();
}
@@ -871,10 +1085,10 @@ public class SharedCache {
}
static class PartitionWrapper {
- Partition p;
- String location;
- Map<String, String> parameters;
- byte[] sdHash;
+ private Partition p;
+ private String location;
+ private Map<String, String> parameters;
+ private byte[] sdHash;
PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
this.p = p;
@@ -901,8 +1115,8 @@ public class SharedCache {
}
static class StorageDescriptorWrapper {
- StorageDescriptor sd;
- int refCount = 0;
+ private StorageDescriptor sd;
+ private int refCount = 0;
StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
this.sd = sd;
@@ -921,6 +1135,7 @@ public class SharedCache {
public static class ColumStatsWithWriteId {
private long writeId;
private ColumnStatisticsObj columnStatisticsObj;
+
public ColumStatsWithWriteId(long writeId, ColumnStatisticsObj columnStatisticsObj) {
this.writeId = writeId;
this.columnStatisticsObj = columnStatisticsObj;
@@ -1049,8 +1264,7 @@ public class SharedCache {
// 1. Don't add databases that were deleted while we were preparing list for prewarm
// 2. Skip overwriting exisiting db object
// (which is present because it was added after prewarm started)
- String key = CacheUtils.buildDbKey(dbCopy.getCatalogName().toLowerCase(),
- dbCopy.getName().toLowerCase());
+ String key = CacheUtils.buildDbKey(dbCopy.getCatalogName().toLowerCase(), dbCopy.getName().toLowerCase());
if (databasesDeletedDuringPrewarm.contains(key)) {
continue;
}
@@ -1104,8 +1318,9 @@ public class SharedCache {
cacheLock.readLock().lock();
for (String pair : databaseCache.keySet()) {
String[] n = CacheUtils.splitDbName(pair);
- if (catName.equals(n[0]))
+ if (catName.equals(n[0])) {
results.add(n[1]);
+ }
}
} finally {
cacheLock.readLock().unlock();
@@ -1173,9 +1388,9 @@ public class SharedCache {
}
}
- public boolean populateTableInCache(Table table, ColumnStatistics tableColStats,
- List<Partition> partitions, List<ColumnStatistics> partitionColStats,
- AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+ public boolean populateTableInCache(Table table, ColumnStatistics tableColStats, List<Partition> partitions,
+ List<ColumnStatistics> partitionColStats, AggrStats aggrStatsAllPartitions,
+ AggrStats aggrStatsAllButDefaultPartition) {
String catName = StringUtils.normalizeIdentifier(table.getCatName());
String dbName = StringUtils.normalizeIdentifier(table.getDbName());
String tableName = StringUtils.normalizeIdentifier(table.getTableName());
@@ -1185,23 +1400,6 @@ public class SharedCache {
return false;
}
TableWrapper tblWrapper = createTableWrapper(catName, dbName, tableName, table);
- if (maxCacheSizeInBytes > 0) {
- ObjectEstimator tblWrapperSizeEstimator = getMemorySizeEstimator(TableWrapper.class);
- long estimatedMemUsage = tblWrapperSizeEstimator.estimate(tblWrapper, sizeEstimators);
- LOG.debug("Memory needed to cache Database: {}'s Table: {}, is {} bytes", dbName, tableName,
- estimatedMemUsage);
- if (isCacheMemoryFull(estimatedMemUsage)) {
- LOG.debug(
- "Cannot cache Database: {}'s Table: {}. Memory needed is {} bytes, "
- + "whereas the memory we have remaining is: {} bytes.",
- dbName, tableName, estimatedMemUsage,
- (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes));
- return false;
- } else {
- currentCacheSizeInBytes += estimatedMemUsage;
- }
- LOG.debug("Current cache size: {} bytes", currentCacheSizeInBytes);
- }
if (!table.isSetPartitionKeys() && (tableColStats != null)) {
if (table.getPartitionKeys().isEmpty() && (tableColStats != null)) {
return false;
@@ -1227,8 +1425,7 @@ public class SharedCache {
}
}
}
- tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
- aggrStatsAllButDefaultPartition);
+ tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
}
tblWrapper.isPartitionCacheDirty.set(false);
tblWrapper.isTableColStatsCacheDirty.set(false);
@@ -1238,17 +1435,13 @@ public class SharedCache {
cacheLock.writeLock().lock();
// 2. Skip overwriting exisiting table object
// (which is present because it was added after prewarm started)
- tableCache.putIfAbsent(CacheUtils.buildTableKey(catName, dbName, tableName), tblWrapper);
+ tableCache.put(CacheUtils.buildTableKey(catName, dbName, tableName), tblWrapper);
return true;
} finally {
cacheLock.writeLock().unlock();
}
}
- private static boolean isCacheMemoryFull(long estimatedMemUsage) {
- return (0.8*maxCacheSizeInBytes) < (currentCacheSizeInBytes + estimatedMemUsage);
- }
-
public void completeTableCachePrewarm() {
try {
cacheLock.writeLock().lock();
@@ -1263,8 +1456,7 @@ public class SharedCache {
Table t = null;
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper =
- tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName));
if (tblWrapper != null) {
t = CacheUtils.assemble(tblWrapper, this);
}
@@ -1286,8 +1478,7 @@ public class SharedCache {
}
}
- private TableWrapper createTableWrapper(String catName, String dbName, String tblName,
- Table tbl) {
+ private TableWrapper createTableWrapper(String catName, String dbName, String tblName, Table tbl) {
TableWrapper wrapper;
Table tblCopy = tbl.deepCopy();
tblCopy.setCatName(normalizeIdentifier(catName));
@@ -1318,19 +1509,19 @@ public class SharedCache {
if (!isTableCachePrewarmed) {
tablesDeletedDuringPrewarm.add(CacheUtils.buildTableKey(catName, dbName, tblName));
}
- TableWrapper tblWrapper =
- tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName));
+ String tblKey = CacheUtils.buildTableKey(catName, dbName, tblName);
+ TableWrapper tblWrapper = tableCache.getIfPresent(tblKey);
if (tblWrapper == null) {
//in case of retry, ignore second try.
return;
}
- if (tblWrapper != null) {
- byte[] sdHash = tblWrapper.getSdHash();
- if (sdHash != null) {
- decrSd(sdHash);
- }
- isTableCacheDirty.set(true);
+
+ byte[] sdHash = tblWrapper.getSdHash();
+ if (sdHash != null) {
+ decrSd(sdHash);
}
+ tableCache.invalidate(tblKey);
+ isTableCacheDirty.set(true);
} finally {
cacheLock.writeLock().unlock();
}
@@ -1339,8 +1530,7 @@ public class SharedCache {
public void alterTableInCache(String catName, String dbName, String tblName, Table newTable) {
try {
cacheLock.writeLock().lock();
- TableWrapper tblWrapper =
- tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.updateTableObj(newTable, this);
String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
@@ -1354,11 +1544,10 @@ public class SharedCache {
}
public void alterTableAndStatsInCache(String catName, String dbName, String tblName, long writeId,
- List<ColumnStatisticsObj> colStatsObjs, Map<String,String> newParams) {
+ List<ColumnStatisticsObj> colStatsObjs, Map<String, String> newParams) {
try {
cacheLock.writeLock().lock();
- TableWrapper tblWrapper =
- tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper == null) {
LOG.info("Table " + tblName + " is missing from cache. Cannot update table stats in cache");
return;
@@ -1369,8 +1558,8 @@ public class SharedCache {
//tblWrapper.updateTableObj(newTable, this);
String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName());
- tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName), tblWrapper);
tblWrapper.updateTableColStats(colStatsObjs);
+ tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName), tblWrapper);
isTableCacheDirty.set(true);
} finally {
cacheLock.writeLock().unlock();
@@ -1381,7 +1570,7 @@ public class SharedCache {
List<Table> tables = new ArrayList<>();
try {
cacheLock.readLock().lock();
- for (TableWrapper wrapper : tableCache.values()) {
+ for (TableWrapper wrapper : tableCache.asMap().values()) {
if (wrapper.sameDatabase(catName, dbName)) {
tables.add(CacheUtils.assemble(wrapper, this));
}
@@ -1396,7 +1585,7 @@ public class SharedCache {
List<String> tableNames = new ArrayList<>();
try {
cacheLock.readLock().lock();
- for (TableWrapper wrapper : tableCache.values()) {
+ for (TableWrapper wrapper : tableCache.asMap().values()) {
if (wrapper.sameDatabase(catName, dbName)) {
tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
}
@@ -1407,16 +1596,14 @@ public class SharedCache {
return tableNames;
}
- public List<String> listCachedTableNames(String catName, String dbName, String pattern,
- int maxTables) {
+ public List<String> listCachedTableNames(String catName, String dbName, String pattern, int maxTables) {
List<String> tableNames = new ArrayList<>();
try {
cacheLock.readLock().lock();
int count = 0;
- for (TableWrapper wrapper : tableCache.values()) {
- if (wrapper.sameDatabase(catName, dbName)
- && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
- && (maxTables == -1 || count < maxTables)) {
+ for (TableWrapper wrapper : tableCache.asMap().values()) {
+ if (wrapper.sameDatabase(catName, dbName) && CacheUtils.matches(wrapper.getTable().getTableName(), pattern) && (
+ maxTables == -1 || count < maxTables)) {
tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
count++;
}
@@ -1427,17 +1614,15 @@ public class SharedCache {
return tableNames;
}
- public List<String> listCachedTableNames(String catName, String dbName, String pattern,
- TableType tableType, int limit) {
+ public List<String> listCachedTableNames(String catName, String dbName, String pattern, TableType tableType,
+ int limit) {
List<String> tableNames = new ArrayList<>();
try {
cacheLock.readLock().lock();
int count = 0;
- for (TableWrapper wrapper : tableCache.values()) {
- if (wrapper.sameDatabase(catName, dbName)
- && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
- && wrapper.getTable().getTableType().equals(tableType.toString())
- && (limit == -1 || count < limit)) {
+ for (TableWrapper wrapper : tableCache.asMap().values()) {
+ if (wrapper.sameDatabase(catName, dbName) && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+ && wrapper.getTable().getTableType().equals(tableType.toString()) && (limit == -1 || count < limit)) {
tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
count++;
}
@@ -1456,7 +1641,7 @@ public class SharedCache {
Map<String, TableWrapper> newCacheForDB = new TreeMap<>();
for (Table tbl : tables) {
String tblName = StringUtils.normalizeIdentifier(tbl.getTableName());
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.updateTableObj(tbl, this);
} else {
@@ -1466,7 +1651,7 @@ public class SharedCache {
}
try {
cacheLock.writeLock().lock();
- Iterator<Entry<String, TableWrapper>> entryIterator = tableCache.entrySet().iterator();
+ Iterator<Entry<String, TableWrapper>> entryIterator = tableCache.asMap().entrySet().iterator();
while (entryIterator.hasNext()) {
String key = entryIterator.next().getKey();
if (key.startsWith(CacheUtils.buildDbKeyWithDelimiterSuffix(catName, dbName))) {
@@ -1480,11 +1665,11 @@ public class SharedCache {
}
}
- public ColumnStatistics getTableColStatsFromCache(String catName, String dbName,
- String tblName, List<String> colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
+ public ColumnStatistics getTableColStatsFromCache(String catName, String dbName, String tblName,
+ List<String> colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper == null) {
LOG.info("Table " + tblName + " is missing from cache.");
return null;
@@ -1496,11 +1681,10 @@ public class SharedCache {
}
}
- public void removeTableColStatsFromCache(String catName, String dbName, String tblName,
- String colName) {
+ public void removeTableColStatsFromCache(String catName, String dbName, String tblName, String colName) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.removeTableColStats(colName);
} else {
@@ -1514,7 +1698,7 @@ public class SharedCache {
public void removeAllTableColStatsFromCache(String catName, String dbName, String tblName) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.removeAllTableColStats();
} else {
@@ -1529,8 +1713,7 @@ public class SharedCache {
List<ColumnStatisticsObj> colStatsForTable) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper =
- tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName));
if (tblWrapper != null) {
tblWrapper.updateTableColStats(colStatsForTable);
} else {
@@ -1545,8 +1728,7 @@ public class SharedCache {
List<ColumnStatisticsObj> colStatsForTable) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper =
- tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName));
if (tblWrapper != null) {
tblWrapper.refreshTableColStats(colStatsForTable);
} else {
@@ -1560,14 +1742,13 @@ public class SharedCache {
public int getCachedTableCount() {
try {
cacheLock.readLock().lock();
- return tableCache.size();
+ return tableCache.asMap().size();
} finally {
cacheLock.readLock().unlock();
}
}
- public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames,
- List<String> tableTypes) {
+ public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames, List<String> tableTypes) {
List<TableMeta> tableMetas = new ArrayList<>();
try {
cacheLock.readLock().lock();
@@ -1576,8 +1757,7 @@ public class SharedCache {
for (Table table : listCachedTables(catName, dbName)) {
if (CacheUtils.matches(table.getTableName(), tableNames)) {
if (tableTypes == null || tableTypes.contains(table.getTableType())) {
- TableMeta metaData =
- new TableMeta(dbName, table.getTableName(), table.getTableType());
+ TableMeta metaData = new TableMeta(dbName, table.getTableName(), table.getTableType());
metaData.setCatName(catName);
metaData.setComments(table.getParameters().get("comment"));
tableMetas.add(metaData);
@@ -1595,7 +1775,8 @@ public class SharedCache {
public void addPartitionToCache(String catName, String dbName, String tblName, Partition part) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ String tblKey = CacheUtils.buildTableKey(catName, dbName, tblName);
+ TableWrapper tblWrapper = tableCache.getIfPresent(tblKey);
if (tblWrapper != null) {
tblWrapper.cachePartition(part, this);
}
@@ -1604,11 +1785,10 @@ public class SharedCache {
}
}
- public void addPartitionsToCache(String catName, String dbName, String tblName,
- Iterable<Partition> parts) {
+ public void addPartitionsToCache(String catName, String dbName, String tblName, Iterable<Partition> parts) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.cachePartitions(parts, this, false);
}
@@ -1617,12 +1797,11 @@ public class SharedCache {
}
}
- public Partition getPartitionFromCache(String catName, String dbName, String tblName,
- List<String> partVals) {
+ public Partition getPartitionFromCache(String catName, String dbName, String tblName, List<String> partVals) {
Partition part = null;
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
part = tblWrapper.getPartition(partVals, this);
}
@@ -1632,12 +1811,11 @@ public class SharedCache {
return part;
}
- public boolean existPartitionFromCache(String catName, String dbName, String tblName,
- List<String> partVals) {
+ public boolean existPartitionFromCache(String catName, String dbName, String tblName, List<String> partVals) {
boolean existsPart = false;
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
existsPart = tblWrapper.containsPartition(partVals);
}
@@ -1647,14 +1825,15 @@ public class SharedCache {
return existsPart;
}
- public Partition removePartitionFromCache(String catName, String dbName, String tblName,
- List<String> partVals) {
+ public Partition removePartitionFromCache(String catName, String dbName, String tblName, List<String> partVals) {
Partition part = null;
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
part = tblWrapper.removePartition(partVals, this);
+ } else {
+ LOG.warn("This is abnormal");
}
} finally {
cacheLock.readLock().unlock();
@@ -1662,11 +1841,10 @@ public class SharedCache {
return part;
}
- public void removePartitionsFromCache(String catName, String dbName, String tblName,
- List<List<String>> partVals) {
+ public void removePartitionsFromCache(String catName, String dbName, String tblName, List<List<String>> partVals) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.removePartitions(partVals, this);
}
@@ -1675,12 +1853,11 @@ public class SharedCache {
}
}
- public List<Partition> listCachedPartitions(String catName, String dbName, String tblName,
- int max) {
+ public List<Partition> listCachedPartitions(String catName, String dbName, String tblName, int max) {
List<Partition> parts = new ArrayList<Partition>();
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
parts = tblWrapper.listPartitions(max, this);
}
@@ -1690,11 +1867,11 @@ public class SharedCache {
return parts;
}
- public void alterPartitionInCache(String catName, String dbName, String tblName,
- List<String> partVals, Partition newPart) {
+ public void alterPartitionInCache(String catName, String dbName, String tblName, List<String> partVals,
+ Partition newPart) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.alterPartition(partVals, newPart, this);
}
@@ -1704,11 +1881,10 @@ public class SharedCache {
}
public void alterPartitionAndStatsInCache(String catName, String dbName, String tblName, long writeId,
- List<String> partVals, Map<String,String> parameters,
- List<ColumnStatisticsObj> colStatsObjs) {
+ List<String> partVals, Map<String, String> parameters, List<ColumnStatisticsObj> colStatsObjs) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.alterPartitionAndStats(partVals, this, writeId, parameters, colStatsObjs);
}
@@ -1717,11 +1893,11 @@ public class SharedCache {
}
}
- public void alterPartitionsInCache(String catName, String dbName, String tblName,
- List<List<String>> partValsList, List<Partition> newParts) {
+ public void alterPartitionsInCache(String catName, String dbName, String tblName, List<List<String>> partValsList,
+ List<Partition> newParts) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.alterPartitions(partValsList, newParts, this);
}
@@ -1730,11 +1906,10 @@ public class SharedCache {
}
}
- public void refreshPartitionsInCache(String catName, String dbName, String tblName,
- List<Partition> partitions) {
+ public void refreshPartitionsInCache(String catName, String dbName, String tblName, List<Partition> partitions) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.refreshPartitions(partitions, this);
}
@@ -1743,11 +1918,11 @@ public class SharedCache {
}
}
- public void removePartitionColStatsFromCache(String catName, String dbName, String tblName,
- List<String> partVals, String colName) {
+ public void removePartitionColStatsFromCache(String catName, String dbName, String tblName, List<String> partVals,
+ String colName) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.removePartitionColStats(partVals, colName);
}
@@ -1759,7 +1934,7 @@ public class SharedCache {
public void removeAllPartitionColStatsFromCache(String catName, String dbName, String tblName) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.removeAllPartitionColStats();
}
@@ -1768,12 +1943,11 @@ public class SharedCache {
}
}
- public void updatePartitionColStatsInCache(String catName, String dbName, String tableName,
- List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
+ public void updatePartitionColStatsInCache(String catName, String dbName, String tableName, List<String> partVals,
+ List<ColumnStatisticsObj> colStatsObjs) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper =
- tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName));
if (tblWrapper != null) {
tblWrapper.updatePartitionColStats(partVals, colStatsObjs);
}
@@ -1782,12 +1956,12 @@ public class SharedCache {
}
}
- public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, String dbName,
- String tblName, List<String> partVal, String colName, String writeIdList) {
+ public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, String dbName, String tblName,
+ List<String> partVal, String colName, String writeIdList) {
ColumStatsWithWriteId colStatObj = null;
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
colStatObj = tblWrapper.getPartitionColStats(partVal, colName, writeIdList);
}
@@ -1798,12 +1972,11 @@ public class SharedCache {
}
public List<ColumnStatistics> getPartitionColStatsListFromCache(String catName, String dbName, String tblName,
- List<String> partNames, List<String> colNames,
- String writeIdList, boolean txnStatSupported) {
+ List<String> partNames, List<String> colNames, String writeIdList, boolean txnStatSupported) {
List<ColumnStatistics> colStatObjs = null;
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
colStatObjs = tblWrapper.getPartColStatsList(partNames, colNames, writeIdList, txnStatSupported);
}
@@ -1819,7 +1992,7 @@ public class SharedCache {
List<ColumnStatistics> partitionColStats) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.refreshPartitionColStats(partitionColStats);
}
@@ -1828,11 +2001,11 @@ public class SharedCache {
}
}
- public List<ColumnStatisticsObj> getAggrStatsFromCache(String catName, String dbName,
- String tblName, List<String> colNames, StatsType statsType) {
+ public List<ColumnStatisticsObj> getAggrStatsFromCache(String catName, String dbName, String tblName,
+ List<String> colNames, StatsType statsType) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
return tblWrapper.getAggrPartitionColStats(colNames, statsType);
}
@@ -1842,14 +2015,13 @@ public class SharedCache {
return null;
}
- public void addAggregateStatsToCache(String catName, String dbName, String tblName,
- AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+ public void addAggregateStatsToCache(String catName, String dbName, String tblName, AggrStats aggrStatsAllPartitions,
+ AggrStats aggrStatsAllButDefaultPartition) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
- tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
- aggrStatsAllButDefaultPartition);
+ tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
}
} finally {
cacheLock.readLock().unlock();
@@ -1858,13 +2030,13 @@ public class SharedCache {
public void refreshAggregateStatsInCache(String catName, String dbName, String tblName,
AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition,
- Map<List<String>, Long> partNameToWriteId) {
+ Map<List<String>, Long> partNameToWriteId) {
try {
cacheLock.readLock().lock();
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
- tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions,
- aggrStatsAllButDefaultPartition, this, partNameToWriteId);
+ tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions, aggrStatsAllButDefaultPartition, this,
+ partNameToWriteId);
}
} finally {
cacheLock.readLock().unlock();
@@ -1903,8 +2075,8 @@ public class SharedCache {
}
@VisibleForTesting
- Map<String, TableWrapper> getTableCache() {
- return tableCache;
+ void clearTableCache() {
+ tableCache.invalidateAll();
}
@VisibleForTesting
@@ -1928,6 +2100,11 @@ public class SharedCache {
isTableCacheDirty.set(false);
}
+ public void printCacheStats() {
+ CacheStats cs = tableCache.stats();
+ LOG.info(cs.toString());
+ }
+
public long getUpdateCount() {
return cacheUpdateCount.get();
}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
index e30d4a8..420369d 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog;
import org.apache.hadoop.hive.metastore.Deadline;
@@ -61,7 +62,6 @@ import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataI
import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector;
import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.junit.After;
import org.junit.Assert;
@@ -69,12 +69,12 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import jline.internal.Log;
-
import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
-@Category(MetastoreCheckinTest.class)
-public class TestCachedStore {
+/**
+ * Unit tests for CachedStore
+ */
+@Category(MetastoreCheckinTest.class) public class TestCachedStore {
// cs_db1
Database db1;
// cs_db2
@@ -94,8 +94,7 @@ public class TestCachedStore {
List<Partition> db2Ptbl1Ptns;
List<String> db2Ptbl1PtnNames;
- @Before
- public void setUp() throws Exception {
+ @Before public void setUp() throws Exception {
Deadline.registerIfNot(10000000);
Deadline.startTimer("");
Configuration conf = MetastoreConf.newMetastoreConf();
@@ -130,8 +129,7 @@ public class TestCachedStore {
objectStore.shutdown();
}
- @After
- public void teardown() throws Exception {
+ @After public void teardown() throws Exception {
Deadline.startTimer("");
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
@@ -155,8 +153,7 @@ public class TestCachedStore {
* Methods that test CachedStore
*********************************************************************************************/
- @Test
- public void testPrewarm() throws Exception {
+ @Test public void testPrewarm() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -199,8 +196,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testPrewarmBlackList() throws Exception {
+ @Test public void testPrewarmBlackList() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -223,8 +219,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testPrewarmWhiteList() throws Exception {
+ @Test public void testPrewarmWhiteList() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -270,8 +265,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testCacheUpdate() throws Exception {
+ @Test public void testCacheUpdate() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -353,8 +347,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testCreateAndGetDatabase() throws Exception {
+ @Test public void testCreateAndGetDatabase() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -379,12 +372,12 @@ public class TestCachedStore {
Assert.assertEquals(3, allDatabases.size());
// Add another db via CachedStore
String dbName1 = "testCreateAndGetDatabase1";
- Database db1 = createDatabaseObject(dbName1, dbOwner);
- cachedStore.createDatabase(db1);
- db1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
+ Database localDb1 = createDatabaseObject(dbName1, dbOwner);
+ cachedStore.createDatabase(localDb1);
+ localDb1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
// Read db via ObjectStore
dbRead = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
- Assert.assertEquals(db1, dbRead);
+ Assert.assertEquals(localDb1, dbRead);
allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME);
Assert.assertEquals(4, allDatabases.size());
// Clean up
@@ -393,8 +386,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testDropDatabase() throws Exception {
+ @Test public void testDropDatabase() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -424,12 +416,12 @@ public class TestCachedStore {
Assert.assertEquals(2, allDatabases.size());
// Create another db via CachedStore and drop via ObjectStore
String dbName1 = "testDropDatabase1";
- Database db1 = createDatabaseObject(dbName1, dbOwner);
- cachedStore.createDatabase(db1);
- db1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
+ Database localDb1 = createDatabaseObject(dbName1, dbOwner);
+ cachedStore.createDatabase(localDb1);
+ localDb1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
// Read db via ObjectStore
dbRead = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1);
- Assert.assertEquals(db1, dbRead);
+ Assert.assertEquals(localDb1, dbRead);
allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME);
Assert.assertEquals(3, allDatabases.size());
objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName1);
@@ -440,8 +432,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testAlterDatabase() throws Exception {
+ @Test public void testAlterDatabase() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -480,8 +471,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testCreateAndGetTable() throws Exception {
+ @Test public void testCreateAndGetTable() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -527,7 +517,6 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
// Note: the 44Kb approximation has been determined based on trial/error.
// If this starts failing on different env, might need another look.
public void testGetAllTablesPrewarmMemoryLimit() throws Exception {
@@ -553,8 +542,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testGetAllTablesBlacklist() throws Exception {
+ @Test public void testGetAllTablesBlacklist() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -579,8 +567,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testGetAllTablesWhitelist() throws Exception {
+ @Test public void testGetAllTablesWhitelist() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -605,8 +592,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testGetTableByPattern() throws Exception {
+ @Test public void testGetTableByPattern() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -631,8 +617,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testAlterTable() throws Exception {
+ @Test public void testAlterTable() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -653,8 +638,8 @@ public class TestCachedStore {
String newOwner = "newOwner";
Table db1Utbl1ReadAlt = new Table(db1Utbl1Read);
db1Utbl1ReadAlt.setOwner(newOwner);
- cachedStore.alterTable(DEFAULT_CATALOG_NAME, db1Utbl1Read.getDbName(), db1Utbl1Read.getTableName(), db1Utbl1ReadAlt,
- "0");
+ cachedStore
+ .alterTable(DEFAULT_CATALOG_NAME, db1Utbl1Read.getDbName(), db1Utbl1Read.getTableName(), db1Utbl1ReadAlt, "0");
db1Utbl1Read =
cachedStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1ReadAlt.getDbName(), db1Utbl1ReadAlt.getTableName());
Table db1Utbl1ReadOS =
@@ -664,8 +649,8 @@ public class TestCachedStore {
Table db2Utbl1Read = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName());
Table db2Utbl1ReadAlt = new Table(db2Utbl1Read);
db2Utbl1ReadAlt.setOwner(newOwner);
- objectStore.alterTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName(), db2Utbl1ReadAlt,
- "0");
+ objectStore
+ .alterTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName(), db2Utbl1ReadAlt, "0");
updateCache(cachedStore);
db2Utbl1Read =
objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1ReadAlt.getDbName(), db2Utbl1ReadAlt.getTableName());
@@ -675,8 +660,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testDropTable() throws Exception {
+ @Test public void testDropTable() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -716,12 +700,11 @@ public class TestCachedStore {
/**********************************************************************************************
* Methods that test SharedCache
- * @throws MetaException
- * @throws NoSuchObjectException
+ * @throws MetaException
+ * @throws NoSuchObjectException
*********************************************************************************************/
- @Test
- public void testSharedStoreDb() throws NoSuchObjectException, MetaException {
+ @Test public void testSharedStoreDb() throws NoSuchObjectException, MetaException {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -731,13 +714,13 @@ public class TestCachedStore {
cachedStore.setConfForTest(conf);
SharedCache sharedCache = CachedStore.getSharedCache();
- Database db1 = createDatabaseObject("db1", "user1");
- Database db2 = createDatabaseObject("db2", "user1");
- Database db3 = createDatabaseObject("db3", "user1");
+ Database localDb1 = createDatabaseObject("db1", "user1");
+ Database localDb2 = createDatabaseObject("db2", "user1");
+ Database localDb3 = createDatabaseObject("db3", "user1");
Database newDb1 = createDatabaseObject("newdb1", "user1");
- sharedCache.addDatabaseToCache(db1);
- sharedCache.addDatabaseToCache(db2);
- sharedCache.addDatabaseToCache(db3);
+ sharedCache.addDatabaseToCache(localDb1);
+ sharedCache.addDatabaseToCache(localDb2);
+ sharedCache.addDatabaseToCache(localDb3);
Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
sharedCache.alterDatabaseInCache(DEFAULT_CATALOG_NAME, "db1", newDb1);
Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3);
@@ -750,8 +733,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testSharedStoreTable() {
+ @Test public void testSharedStoreTable() {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -836,8 +818,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testSharedStorePartition() {
+ @Test public void testSharedStorePartition() {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -1002,14 +983,14 @@ public class TestCachedStore {
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100);
aggrStats = cachedStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, aggrPartVals, colNames);
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100);
-
+
objectStore.deletePartitionColumnStatistics(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(),
Warehouse.makePartName(tbl.getPartitionKeys(), partVals1), partVals1, colName);
objectStore.deletePartitionColumnStatistics(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(),
Warehouse.makePartName(tbl.getPartitionKeys(), partVals2), partVals2, colName);
objectStore.dropPartition(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(), partVals1);
objectStore.dropPartition(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(), partVals2);
- objectStore.dropTable(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName()) ;
+ objectStore.dropTable(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName());
objectStore.dropDatabase(DEFAULT_CATALOG_NAME, db.getName());
cachedStore.shutdown();
}
@@ -1185,8 +1166,7 @@ public class TestCachedStore {
cachedStore.shutdown();
}
- @Test
- public void testMultiThreadedSharedCacheOps() throws Exception {
+ @Test public void testMultiThreadedSharedCacheOps() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb");
@@ -1199,8 +1179,7 @@ public class TestCachedStore {
List<String> dbNames = new ArrayList<String>(Arrays.asList("db1", "db2", "db3", "db4", "db5"));
List<Callable<Object>> tasks = new ArrayList<Callable<Object>>();
ExecutorService executor = Executors.newFixedThreadPool(50, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
+ @Override public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
@@ -1332,6 +1311,199 @@ public class TestCachedStore {
cachedStore.shutdown();
}
+ @Test public void testPartitionSize() {
+ Configuration conf = MetastoreConf.newMetastoreConf();
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "5Kb");
+ MetaStoreTestUtils.setConfForStandloneMode(conf);
+ CachedStore cachedStore = new CachedStore();
+ CachedStore.clearSharedCache();
+ cachedStore.setConfForTestExceptSharedCache(conf);
+
+ String dbName = "db1";
+ String tbl1Name = "tbl1";
+ String tbl2Name = "tbl2";
+ String owner = "user1";
+ Database db = createDatabaseObject(dbName, owner);
+
+ FieldSchema col1 = new FieldSchema("col1", "int", "integer column");
+ FieldSchema col2 = new FieldSchema("col2", "string", "string column");
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(col1);
+ cols.add(col2);
+ List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+ Table tbl1 = createTestTbl(dbName, tbl1Name, owner, cols, ptnCols);
+ Table tbl2 = createTestTbl(dbName, tbl2Name, owner, cols, ptnCols);
+
+ Map<String, Integer> tableSizeMap = new HashMap<>();
+ String tbl1Key = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, dbName, tbl1Name);
+ String tbl2Key = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, dbName, tbl2Name);
+ tableSizeMap.put(tbl1Key, 1000);
+ tableSizeMap.put(tbl2Key, 4500);
+
+ Partition part1 = new Partition();
+ StorageDescriptor sd1 = new StorageDescriptor();
+ List<FieldSchema> cols1 = new ArrayList<>();
+ cols1.add(new FieldSchema("col1", "int", ""));
+ Map<String, String> params1 = new HashMap<>();
+ params1.put("key", "value");
+ sd1.setCols(cols1);
+ sd1.setParameters(params1);
+ sd1.setLocation("loc1");
+ part1.setSd(sd1);
+ part1.setValues(Arrays.asList("201701"));
+
+ Partition part2 = new Partition();
+ StorageDescriptor sd2 = new StorageDescriptor();
+ List<FieldSchema> cols2 = new ArrayList<>();
+ cols2.add(new FieldSchema("col1", "int", ""));
+ Map<String, String> params2 = new HashMap<>();
+ params2.put("key", "value");
+ sd2.setCols(cols2);
+ sd2.setParameters(params2);
+ sd2.setLocation("loc2");
+ part2.setSd(sd2);
+ part2.setValues(Arrays.asList("201702"));
+
+ Partition part3 = new Partition();
+ StorageDescriptor sd3 = new StorageDescriptor();
+ List<FieldSchema> cols3 = new ArrayList<>();
+ cols3.add(new FieldSchema("col3", "int", ""));
+ Map<String, String> params3 = new HashMap<>();
+ params3.put("key2", "value2");
+ sd3.setCols(cols3);
+ sd3.setParameters(params3);
+ sd3.setLocation("loc3");
+ part3.setSd(sd3);
+ part3.setValues(Arrays.asList("201703"));
+
+ Partition newPart1 = new Partition();
+ newPart1.setDbName(dbName);
+ newPart1.setTableName(tbl1Name);
+ StorageDescriptor newSd1 = new StorageDescriptor();
+ List<FieldSchema> newCols1 = new ArrayList<>();
+ newCols1.add(new FieldSchema("newcol1", "int", ""));
+ Map<String, String> newParams1 = new HashMap<>();
+ newParams1.put("key", "value");
+ newSd1.setCols(newCols1);
+ newSd1.setParameters(params1);
+ newSd1.setLocation("loc1new");
+ newPart1.setSd(newSd1);
+ newPart1.setValues(Arrays.asList("201701"));
+
+ SharedCache sharedCache = cachedStore.getSharedCache();
+ sharedCache.setConcurrencyLevel(1);
+ sharedCache.setTableSizeMap(tableSizeMap);
+ sharedCache.initialize(conf);
+
+ sharedCache.addDatabaseToCache(db);
+ sharedCache.addTableToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, tbl1);
+ sharedCache.addTableToCache(DEFAULT_CATALOG_NAME, dbName, tbl2Name, tbl2);
+
+ sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, part1);
+ sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, part2);
+ sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, part3);
+
+ Partition p = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, Arrays.asList("201701"));
+ Assert.assertNull(p);
+
+ sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl2Name, newPart1);
+ p = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tbl2Name, Arrays.asList("201701"));
+ Assert.assertNotNull(p);
+ cachedStore.shutdown();
+ }
+
+ @Test public void testShowTables() throws Exception {
+ Configuration conf = MetastoreConf.newMetastoreConf();
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "5kb");
+ MetaStoreTestUtils.setConfForStandloneMode(conf);
+ CachedStore cachedStore = new CachedStore();
+ CachedStore.clearSharedCache();
+
+ cachedStore.setConfForTestExceptSharedCache(conf);
+ ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore();
+ //set up table size map
+ Map<String, Integer> tableSizeMap = new HashMap<>();
+ String db1Utbl1TblKey =
+ CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName());
+ String db1Ptbl1TblKey =
+ CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName());
+ String db2Utbl1TblKey =
+ CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName());
+ String db2Ptbl1TblKey =
+ CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName());
+ tableSizeMap.put(db1Utbl1TblKey, 4000);
+ tableSizeMap.put(db1Ptbl1TblKey, 4000);
+ tableSizeMap.put(db2Utbl1TblKey, 4000);
+ tableSizeMap.put(db2Ptbl1TblKey, 4000);
+
+ SharedCache sc = cachedStore.getSharedCache();
+ sc.setConcurrencyLevel(1);
+ sc.setTableSizeMap(tableSizeMap);
+ sc.initialize(conf);
+
+ // Prewarm CachedStore
+ CachedStore.setCachePrewarmedState(false);
+ CachedStore.prewarm(objectStore);
+
+ List<String> db1Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName());
+ Assert.assertEquals(2, db1Tables.size());
+ List<String> db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName());
+ Assert.assertEquals(2, db2Tables.size());
+
+ cachedStore.shutdown();
+ }
+
+ @Test public void testTableEviction() throws Exception {
+ Configuration conf = MetastoreConf.newMetastoreConf();
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "5kb");
+ MetaStoreTestUtils.setConfForStandloneMode(conf);
+ CachedStore cachedStore = new CachedStore();
+ CachedStore.clearSharedCache();
+
+ cachedStore.setConfForTestExceptSharedCache(conf);
+ ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore();
+ //set up table size map
+ Map<String, Integer> tableSizeMap = new HashMap<>();
+ String db1Utbl1TblKey =
+ CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName());
+ String db1Ptbl1TblKey =
+ CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName());
+ String db2Utbl1TblKey =
+ CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName());
+ String db2Ptbl1TblKey =
+ CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName());
+ tableSizeMap.put(db1Utbl1TblKey, 4000);
+ tableSizeMap.put(db1Ptbl1TblKey, 4000);
+ tableSizeMap.put(db2Utbl1TblKey, 4000);
+ tableSizeMap.put(db2Ptbl1TblKey, 4000);
+ Table tblDb1Utbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName());
+ Table tblDb1Ptbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName());
+ Table tblDb2Utbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName());
+ Table tblDb2Ptbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName());
+
+ SharedCache sc = cachedStore.getSharedCache();
+ sc.setConcurrencyLevel(1);
+ sc.setTableSizeMap(tableSizeMap);
+ sc.initialize(conf);
+
+ sc.addDatabaseToCache(db1);
+ sc.addDatabaseToCache(db2);
+ sc.addTableToCache(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName(), tblDb1Utbl1);
+ sc.addTableToCache(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName(), tblDb1Ptbl1);
+ sc.addTableToCache(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName(), tblDb2Utbl1);
+ sc.addTableToCache(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName(), tblDb2Ptbl1);
+
+ List<String> db1Tables = sc.listCachedTableNames(DEFAULT_CATALOG_NAME, db1.getName());
+ Assert.assertEquals(0, db1Tables.size());
+ List<String> db2Tables = sc.listCachedTableNames(DEFAULT_CATALOG_NAME, db2.getName());
+ Assert.assertEquals(1, db2Tables.size());
+
+ cachedStore.shutdown();
+ }
+
private Table createTestTbl(String dbName, String tblName, String tblOwner, List<FieldSchema> cols,
List<FieldSchema> ptnCols) {
String serdeLocation = "file:/tmp";
@@ -1535,8 +1707,8 @@ public class TestCachedStore {
}
class TableAndColStats {
- Table table;
- ColumnStatistics colStats;
+ private Table table;
+ private ColumnStatistics colStats;
TableAndColStats(Table table, ColumnStatistics colStats) {
this.table = table;
@@ -1592,8 +1764,8 @@ public class TestCachedStore {
}
class PartitionObjectsAndNames {
- List<Partition> ptns;
- List<String> ptnNames;
+ private List<Partition> ptns;
+ private List<String> ptnNames;
PartitionObjectsAndNames(List<Partition> ptns, List<String> ptnNames) {
this.ptns = ptns;