You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/03/19 17:54:32 UTC
[3/4] hive git commit: HIVE-18264: CachedStore: Store cached
partitions/col stats within the table cache and make prewarm non-blocking
(Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index d28b196..d37b201 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hive.metastore.cache;
import org.apache.hadoop.hive.metastore.api.CreationMetadata;
import org.apache.hadoop.hive.metastore.api.ISchemaName;
import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -35,7 +35,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -95,8 +94,6 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
import org.apache.hadoop.hive.metastore.api.SchemaVersion;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.hadoop.hive.metastore.api.Type;
@@ -124,130 +121,50 @@ import com.google.common.annotations.VisibleForTesting;
// TODO constraintCache
// TODO need sd nested copy?
// TODO String intern
-// TODO restructure HBaseStore
// TODO monitor event queue
// TODO initial load slow?
// TODO size estimation
-// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation
public class CachedStore implements RawStore, Configurable {
private static ScheduledExecutorService cacheUpdateMaster = null;
- private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true);
- private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
- private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true);
- private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
- private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true);
- private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
- private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true);
- private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
- private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock(
- true);
- private static ReentrantReadWriteLock partitionAggrColStatsCacheLock =
- new ReentrantReadWriteLock(true);
- private static AtomicBoolean isPartitionAggrColStatsCacheDirty = new AtomicBoolean(false);
- private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
private static List<Pattern> whitelistPatterns = null;
private static List<Pattern> blacklistPatterns = null;
+ // Default value set to 100 milliseconds for test purpose
+ private static long DEFAULT_CACHE_REFRESH_PERIOD = 100;
+ // Time after which metastore cache is updated from metastore DB by the background update thread
+ private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD;
+ private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false);
private RawStore rawStore = null;
private Configuration conf;
private PartitionExpressionProxy expressionProxy = null;
- // Default value set to 100 milliseconds for test purpose
- private static long cacheRefreshPeriod = 100;
-
- /** A wrapper over SharedCache. Allows one to get SharedCache safely; should be merged
- * into SharedCache itself (see the TODO on the class). */
- private static final SharedCacheWrapper sharedCacheWrapper = new SharedCacheWrapper();
+ private static final SharedCache sharedCache = new SharedCache();
static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
- static class TableWrapper {
- Table t;
- String location;
- Map<String, String> parameters;
- byte[] sdHash;
- TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) {
- this.t = t;
- this.sdHash = sdHash;
- this.location = location;
- this.parameters = parameters;
- }
- public Table getTable() {
- return t;
- }
- public byte[] getSdHash() {
- return sdHash;
- }
- public String getLocation() {
- return location;
- }
- public Map<String, String> getParameters() {
- return parameters;
- }
- }
-
- static class PartitionWrapper {
- Partition p;
- String location;
- Map<String, String> parameters;
- byte[] sdHash;
- PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
- this.p = p;
- this.sdHash = sdHash;
- this.location = location;
- this.parameters = parameters;
- }
- public Partition getPartition() {
- return p;
- }
- public byte[] getSdHash() {
- return sdHash;
- }
- public String getLocation() {
- return location;
- }
- public Map<String, String> getParameters() {
- return parameters;
- }
- }
+ public CachedStore() {
- static class StorageDescriptorWrapper {
- StorageDescriptor sd;
- int refCount = 0;
- StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
- this.sd = sd;
- this.refCount = refCount;
- }
- public StorageDescriptor getSd() {
- return sd;
- }
- public int getRefCount() {
- return refCount;
- }
}
- public CachedStore() {
+ @Override
+ public void setConf(Configuration conf) {
+ setConfInternal(conf);
+ initBlackListWhiteList(conf);
+ startCacheUpdateService(conf, false, true);
}
- public static void initSharedCacheAsync(Configuration conf) {
- String clazzName = null;
- boolean isEnabled = false;
- try {
- clazzName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL);
- isEnabled = JavaUtils.getClass(clazzName, RawStore.class).isAssignableFrom(CachedStore.class);
- } catch (MetaException e) {
- LOG.error("Cannot instantiate metastore class", e);
- }
- if (!isEnabled) {
- LOG.debug("CachedStore is not enabled; using " + clazzName);
- return;
- }
- sharedCacheWrapper.startInit(conf);
+ /**
+ * Similar to setConf but used from within the tests
+ * This does start the background thread for prewarm and update
+ * @param conf
+ */
+ void setConfForTest(Configuration conf) {
+ setConfInternal(conf);
+ initBlackListWhiteList(conf);
}
- @Override
- public void setConf(Configuration conf) {
- String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL,
- ObjectStore.class.getName());
+ private void setConfInternal(Configuration conf) {
+ String rawStoreClassName =
+ MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
if (rawStore == null) {
try {
rawStore = (JavaUtils.getClass(rawStoreClassName, RawStore.class)).newInstance();
@@ -260,94 +177,145 @@ public class CachedStore implements RawStore, Configurable {
this.conf = conf;
if (expressionProxy != null && conf != oldConf) {
LOG.warn("Unexpected setConf when we were already configured");
- }
- if (expressionProxy == null || conf != oldConf) {
+ } else {
expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
}
- initBlackListWhiteList(conf);
}
@VisibleForTesting
- static void prewarm(RawStore rawStore) throws Exception {
- // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
- Deadline.registerIfNot(1000000);
- List<String> dbNames = rawStore.getAllDatabases();
- LOG.info("Number of databases to prewarm: " + dbNames.size());
- SharedCache sharedCache = sharedCacheWrapper.getUnsafe();
- for (int i = 0; i < dbNames.size(); i++) {
- String dbName = StringUtils.normalizeIdentifier(dbNames.get(i));
- // Cache partition column stats
- Deadline.startTimer("getColStatsForDatabase");
- List<ColStatsObjWithSourceInfo> colStatsForDB =
- rawStore.getPartitionColStatsForDatabase(dbName);
- Deadline.stopTimer();
- if (colStatsForDB != null) {
- sharedCache.addPartitionColStatsToCache(colStatsForDB);
+ /**
+ * This initializes the caches in SharedCache by getting the objects from Metastore DB via
+ * ObjectStore and populating the respective caches
+ *
+ * @param rawStore
+ * @throws Exception
+ */
+ static void prewarm(RawStore rawStore) {
+ if (isCachePrewarmed.get()) {
+ return;
+ }
+ long startTime = System.nanoTime();
+ LOG.info("Prewarming CachedStore");
+ while (!isCachePrewarmed.get()) {
+ // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+ Deadline.registerIfNot(1000000);
+ List<String> dbNames;
+ try {
+ dbNames = rawStore.getAllDatabases();
+ } catch (MetaException e) {
+ // Try again
+ continue;
}
- LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size());
- Database db = rawStore.getDatabase(dbName);
- sharedCache.addDatabaseToCache(dbName, db);
- List<String> tblNames = rawStore.getAllTables(dbName);
- LOG.debug("Tables in database: {} : {}", dbName, tblNames);
- for (int j = 0; j < tblNames.size(); j++) {
- String tblName = StringUtils.normalizeIdentifier(tblNames.get(j));
- if (!shouldCacheTable(dbName, tblName)) {
- LOG.info("Not caching database: {}'s table: {}", dbName, tblName);
+ LOG.info("Number of databases to prewarm: {}", dbNames.size());
+ List<Database> databases = new ArrayList<>(dbNames.size());
+ for (String dbName : dbNames) {
+ try {
+ databases.add(rawStore.getDatabase(dbName));
+ } catch (NoSuchObjectException e) {
+ // Continue with next database
continue;
}
- LOG.info("Caching database: {}'s table: {}. Cached {} / {} tables so far.", dbName,
- tblName, j, tblNames.size());
- Table table = null;
- table = rawStore.getTable(dbName, tblName);
- // It is possible the table is deleted during fetching tables of the database,
- // in that case, continue with the next table
- if (table == null) {
+ }
+ sharedCache.populateDatabasesInCache(databases);
+ LOG.debug(
+ "Databases cache is now prewarmed. Now adding tables, partitions and statistics to the cache");
+ int numberOfDatabasesCachedSoFar = 0;
+ for (String dbName : dbNames) {
+ dbName = StringUtils.normalizeIdentifier(dbName);
+ List<String> tblNames;
+ try {
+ tblNames = rawStore.getAllTables(dbName);
+ } catch (MetaException e) {
+ // Continue with next database
continue;
}
- sharedCache.addTableToCache(dbName, tblName, table);
- if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) {
- Deadline.startTimer("getPartitions");
- List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
- Deadline.stopTimer();
- for (Partition partition : partitions) {
- sharedCache.addPartitionToCache(dbName, tblName, partition);
+ int numberOfTablesCachedSoFar = 0;
+ for (String tblName : tblNames) {
+ tblName = StringUtils.normalizeIdentifier(tblName);
+ if (!shouldCacheTable(dbName, tblName)) {
+ continue;
}
- }
- // Cache table column stats
- List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
- Deadline.startTimer("getTableColumnStatistics");
- ColumnStatistics tableColStats =
- rawStore.getTableColumnStatistics(dbName, tblName, colNames);
- Deadline.stopTimer();
- if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) {
- sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj());
- }
- // Cache aggregate stats for all partitions of a table and for all but default partition
- List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
- if ((partNames != null) && (partNames.size() > 0)) {
- AggrStats aggrStatsAllPartitions =
- rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
- // Remove default partition from partition names and get aggregate
- // stats again
- List<FieldSchema> partKeys = table.getPartitionKeys();
- String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
- List<String> partCols = new ArrayList<String>();
- List<String> partVals = new ArrayList<String>();
- for (FieldSchema fs : partKeys) {
- partCols.add(fs.getName());
- partVals.add(defaultPartitionValue);
+ Table table;
+ try {
+ table = rawStore.getTable(dbName, tblName);
+ } catch (MetaException e) {
+ // It is possible the table is deleted during fetching tables of the database,
+ // in that case, continue with the next table
+ continue;
}
- String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
- partNames.remove(defaultPartitionName);
- AggrStats aggrStatsAllButDefaultPartition =
- rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
- sharedCache.addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions,
- aggrStatsAllButDefaultPartition);
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ try {
+ ColumnStatistics tableColStats = null;
+ List<Partition> partitions = null;
+ List<ColumnStatistics> partitionColStats = null;
+ AggrStats aggrStatsAllPartitions = null;
+ AggrStats aggrStatsAllButDefaultPartition = null;
+ if (table.isSetPartitionKeys()) {
+ Deadline.startTimer("getPartitions");
+ partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
+ Deadline.stopTimer();
+ List<String> partNames = new ArrayList<>(partitions.size());
+ for (Partition p : partitions) {
+ partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues()));
+ }
+ if (!partNames.isEmpty()) {
+ // Get partition column stats for this table
+ Deadline.startTimer("getPartitionColumnStatistics");
+ partitionColStats =
+ rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ // Get aggregate stats for all partitions of a table and for all but default
+ // partition
+ Deadline.startTimer("getAggrPartitionColumnStatistics");
+ aggrStatsAllPartitions =
+ rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ // Remove default partition from partition names and get aggregate
+ // stats again
+ List<FieldSchema> partKeys = table.getPartitionKeys();
+ String defaultPartitionValue =
+ MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+ List<String> partCols = new ArrayList<>();
+ List<String> partVals = new ArrayList<>();
+ for (FieldSchema fs : partKeys) {
+ partCols.add(fs.getName());
+ partVals.add(defaultPartitionValue);
+ }
+ String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+ partNames.remove(defaultPartitionName);
+ Deadline.startTimer("getAggrPartitionColumnStatistics");
+ aggrStatsAllButDefaultPartition =
+ rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ }
+ } else {
+ Deadline.startTimer("getTableColumnStatistics");
+ tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ Deadline.stopTimer();
+ }
+ sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats,
+ aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
+ } catch (MetaException | NoSuchObjectException e) {
+ // Continue with next table
+ continue;
+ }
+ LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName,
+ tblName, ++numberOfTablesCachedSoFar, tblNames.size());
}
+ LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName,
+ ++numberOfDatabasesCachedSoFar, dbNames.size());
}
+ isCachePrewarmed.set(true);
}
- // Notify all blocked threads that prewarm is complete now
- sharedCacheWrapper.notifyAllBlocked();
+ LOG.info("CachedStore initialized");
+ long endTime = System.nanoTime();
+ LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms");
+ sharedCache.completeTableCachePrewarm();
+ }
+
+ @VisibleForTesting
+ static void setCachePrewarmedState(boolean state) {
+ isCachePrewarmed.set(state);
}
private static void initBlackListWhiteList(Configuration conf) {
@@ -356,20 +324,27 @@ public class CachedStore implements RawStore, Configurable {
MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST));
blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf,
MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST));
- // The last specified blacklist pattern gets precedence
- Collections.reverse(blacklistPatterns);
}
}
@VisibleForTesting
- synchronized static void startCacheUpdateService(Configuration conf) {
+ /**
+ * This starts a background thread, which initially populates the SharedCache and later
+ * periodically gets updates from the metastore db
+ *
+ * @param conf
+ * @param runOnlyOnce
+ * @param shouldRunPrewarm
+ */
+ static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce,
+ boolean shouldRunPrewarm) {
if (cacheUpdateMaster == null) {
initBlackListWhiteList(conf);
if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) {
- cacheRefreshPeriod = MetastoreConf.getTimeVar(conf,
+ cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf,
ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS);
}
- LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriod);
+ LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriodMS);
cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -379,13 +354,20 @@ public class CachedStore implements RawStore, Configurable {
return t;
}
});
- cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf), 0, cacheRefreshPeriod,
+ if (!runOnlyOnce) {
+ cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
+ cacheRefreshPeriodMS, TimeUnit.MILLISECONDS);
+ }
+ }
+ if (runOnlyOnce) {
+ // Some tests control the execution of the background update thread
+ cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
TimeUnit.MILLISECONDS);
}
}
@VisibleForTesting
- synchronized static boolean stopCacheUpdateService(long timeout) {
+ static synchronized boolean stopCacheUpdateService(long timeout) {
boolean tasksStoppedBeforeShutdown = false;
if (cacheUpdateMaster != null) {
LOG.info("CachedStore: shutting down cache update service");
@@ -404,167 +386,83 @@ public class CachedStore implements RawStore, Configurable {
@VisibleForTesting
static void setCacheRefreshPeriod(long time) {
- cacheRefreshPeriod = time;
+ cacheRefreshPeriodMS = time;
}
static class CacheUpdateMasterWork implements Runnable {
- private boolean isFirstRun = true;
+ private boolean shouldRunPrewarm = true;
private final RawStore rawStore;
- public CacheUpdateMasterWork(Configuration conf) {
- String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL,
- ObjectStore.class.getName());
+ CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) {
+ this.shouldRunPrewarm = shouldRunPrewarm;
+ String rawStoreClassName =
+ MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
try {
rawStore = JavaUtils.getClass(rawStoreClassName, RawStore.class).newInstance();
rawStore.setConf(conf);
} catch (InstantiationException | IllegalAccessException | MetaException e) {
// MetaException here really means ClassNotFound (see the utility method).
// So, if any of these happen, that means we can never succeed.
- sharedCacheWrapper.updateInitState(e, true);
throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
}
}
@Override
public void run() {
- if (isFirstRun) {
- while (isFirstRun) {
- try {
- long startTime = System.nanoTime();
- LOG.info("Prewarming CachedStore");
- prewarm(rawStore);
- LOG.info("CachedStore initialized");
- long endTime = System.nanoTime();
- LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms");
- } catch (Exception e) {
- LOG.error("Prewarm failure", e);
- sharedCacheWrapper.updateInitState(e, false);
- return;
- }
- sharedCacheWrapper.updateInitState(null, false);
- isFirstRun = false;
- }
- } else {
+ if (!shouldRunPrewarm) {
// TODO: prewarm and update can probably be merged.
update();
+ } else {
+ try {
+ prewarm(rawStore);
+ } catch (Exception e) {
+ LOG.error("Prewarm failure", e);
+ return;
+ }
}
}
- public void update() {
+ void update() {
Deadline.registerIfNot(1000000);
LOG.debug("CachedStore: updating cached objects");
+ List<String> dbNames;
try {
- List<String> dbNames = rawStore.getAllDatabases();
- if (dbNames != null) {
- // Update the database in cache
- updateDatabases(rawStore, dbNames);
- for (String dbName : dbNames) {
- updateDatabasePartitionColStats(rawStore, dbName);
- // Update the tables in cache
- updateTables(rawStore, dbName);
- List<String> tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe());
- for (String tblName : tblNames) {
- if (!shouldCacheTable(dbName, tblName)) {
- continue;
- }
- // Update the partitions for a table in cache
- updateTablePartitions(rawStore, dbName, tblName);
- // Update the table column stats for a table in cache
- updateTableColStats(rawStore, dbName, tblName);
- // Update aggregate column stats cache
- updateAggregateStatsCache(rawStore, dbName, tblName);
- }
- }
- }
- } catch (Exception e) {
- LOG.error("Updating CachedStore: error happen when refresh; ignoring", e);
+ dbNames = rawStore.getAllDatabases();
+ } catch (MetaException e) {
+ LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e);
+ return;
}
- }
-
- private void updateDatabasePartitionColStats(RawStore rawStore, String dbName) {
- try {
- Deadline.startTimer("getColStatsForDatabasePartitions");
- List<ColStatsObjWithSourceInfo> colStatsForDB =
- rawStore.getPartitionColStatsForDatabase(dbName);
- Deadline.stopTimer();
- if (colStatsForDB != null) {
- if (partitionColStatsCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping partition column stats cache update; the partition column stats "
- + "list we have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe()
- .refreshPartitionColStats(StringUtils.normalizeIdentifier(dbName), colStatsForDB);
- }
- }
- } catch (MetaException | NoSuchObjectException e) {
- LOG.info("Updating CachedStore: unable to read partitions column stats of database: {}",
- dbName, e);
- } finally {
- if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) {
- partitionColStatsCacheLock.writeLock().unlock();
+ // Update the database in cache
+ updateDatabases(rawStore, dbNames);
+ for (String dbName : dbNames) {
+ // Update the tables in cache
+ updateTables(rawStore, dbName);
+ List<String> tblNames;
+ try {
+ tblNames = rawStore.getAllTables(dbName);
+ } catch (MetaException e) {
+ // Continue with next database
+ continue;
}
- }
- }
-
- // Update cached aggregate stats for all partitions of a table and for all
- // but default partition
- private void updateAggregateStatsCache(RawStore rawStore, String dbName, String tblName) {
- try {
- Table table = rawStore.getTable(dbName, tblName);
- List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
- List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
- if ((partNames != null) && (partNames.size() > 0)) {
- Deadline.startTimer("getAggregareStatsForAllPartitions");
- AggrStats aggrStatsAllPartitions =
- rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
- Deadline.stopTimer();
- // Remove default partition from partition names and get aggregate stats again
- List<FieldSchema> partKeys = table.getPartitionKeys();
- String defaultPartitionValue =
- MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
- List<String> partCols = new ArrayList<String>();
- List<String> partVals = new ArrayList<String>();
- for (FieldSchema fs : partKeys) {
- partCols.add(fs.getName());
- partVals.add(defaultPartitionValue);
- }
- String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
- partNames.remove(defaultPartitionName);
- Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault");
- AggrStats aggrStatsAllButDefaultPartition =
- rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
- Deadline.stopTimer();
- if ((aggrStatsAllPartitions != null) && (aggrStatsAllButDefaultPartition != null)) {
- if (partitionAggrColStatsCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isPartitionAggrColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug(
- "Skipping aggregate column stats cache update; the aggregate column stats we "
- + "have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshAggregateStatsCache(
- StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName),
- aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
- }
+ for (String tblName : tblNames) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ continue;
}
- }
- } catch (MetaException | NoSuchObjectException e) {
- LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
- e);
- } finally {
- if (partitionAggrColStatsCacheLock.isWriteLockedByCurrentThread()) {
- partitionAggrColStatsCacheLock.writeLock().unlock();
+ // Update the table column stats for a table in cache
+ updateTableColStats(rawStore, dbName, tblName);
+ // Update the partitions for a table in cache
+ updateTablePartitions(rawStore, dbName, tblName);
+ // Update the partition col stats for a table in cache
+ updateTablePartitionColStats(rawStore, dbName, tblName);
+ // Update aggregate partition column stats for a table in cache
+ updateTableAggregatePartitionColStats(rawStore, dbName, tblName);
}
}
+ sharedCache.incrementUpdateCount();
}
private void updateDatabases(RawStore rawStore, List<String> dbNames) {
- // Prepare the list of databases
- List<Database> databases = new ArrayList<>();
+ List<Database> databases = new ArrayList<>(dbNames.size());
for (String dbName : dbNames) {
Database db;
try {
@@ -574,24 +472,9 @@ public class CachedStore implements RawStore, Configurable {
LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e);
}
}
- // Update the cached database objects
- try {
- if (databaseCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isDatabaseCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping database cache update; the database list we have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshDatabases(databases);
- }
- } finally {
- if (databaseCacheLock.isWriteLockedByCurrentThread()) {
- databaseCacheLock.writeLock().unlock();
- }
- }
+ sharedCache.refreshDatabasesInCache(databases);
}
- // Update the cached table objects
private void updateTables(RawStore rawStore, String dbName) {
List<Table> tables = new ArrayList<>();
try {
@@ -600,81 +483,99 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
continue;
}
- Table table =
- rawStore.getTable(StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName));
+ Table table = rawStore.getTable(StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName));
tables.add(table);
}
- if (tableCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isTableCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping table cache update; the table list we have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshTables(dbName, tables);
- }
+ sharedCache.refreshTablesInCache(dbName, tables);
} catch (MetaException e) {
- LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e);
- } finally {
- if (tableCacheLock.isWriteLockedByCurrentThread()) {
- tableCacheLock.writeLock().unlock();
+ LOG.debug("Unable to refresh cached tables for database: " + dbName, e);
+ }
+ }
+
+ private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+ try {
+ Table table = rawStore.getTable(dbName, tblName);
+ if (!table.isSetPartitionKeys()) {
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ Deadline.startTimer("getTableColumnStatistics");
+ ColumnStatistics tableColStats =
+ rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ Deadline.stopTimer();
+ if (tableColStats != null) {
+ sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+ }
}
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Unable to refresh table column stats for table: " + tblName, e);
}
}
- // Update the cached partition objects for a table
private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) {
try {
Deadline.startTimer("getPartitions");
List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE);
Deadline.stopTimer();
- if (partitionCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isPartitionCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping partition cache update; the partition list we have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshPartitions(
- StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), partitions);
- }
+ sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName), partitions);
} catch (MetaException | NoSuchObjectException e) {
LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
- } finally {
- if (partitionCacheLock.isWriteLockedByCurrentThread()) {
- partitionCacheLock.writeLock().unlock();
- }
}
}
- // Update the cached col stats for this table
- private void updateTableColStats(RawStore rawStore, String dbName, String tblName) {
+ private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) {
try {
Table table = rawStore.getTable(dbName, tblName);
List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
- Deadline.startTimer("getTableColumnStatistics");
- ColumnStatistics tableColStats =
- rawStore.getTableColumnStatistics(dbName, tblName, colNames);
+ List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
+ // Get partition column stats for this table
+ Deadline.startTimer("getPartitionColumnStatistics");
+ List<ColumnStatistics> partitionColStats =
+ rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames);
Deadline.stopTimer();
- if (tableColStats != null) {
- if (tableColStatsCacheLock.writeLock().tryLock()) {
- // Skip background updates if we detect change
- if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
- LOG.debug("Skipping table column stats cache update; the table column stats list we "
- + "have is dirty.");
- return;
- }
- sharedCacheWrapper.getUnsafe().refreshTableColStats(
- StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+ sharedCache.refreshPartitionColStatsInCache(dbName, tblName, partitionColStats);
+ } catch (MetaException | NoSuchObjectException e) {
+ LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+ }
+ }
+
+ // Update cached aggregate stats for all partitions of a table and for all
+ // but default partition
+ private void updateTableAggregatePartitionColStats(RawStore rawStore, String dbName,
+ String tblName) {
+ try {
+ Table table = rawStore.getTable(dbName, tblName);
+ List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
+ List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+ if ((partNames != null) && (partNames.size() > 0)) {
+ Deadline.startTimer("getAggregareStatsForAllPartitions");
+ AggrStats aggrStatsAllPartitions =
+ rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ // Remove default partition from partition names and get aggregate stats again
+ List<FieldSchema> partKeys = table.getPartitionKeys();
+ String defaultPartitionValue =
+ MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+ List<String> partCols = new ArrayList<String>();
+ List<String> partVals = new ArrayList<String>();
+ for (FieldSchema fs : partKeys) {
+ partCols.add(fs.getName());
+ partVals.add(defaultPartitionValue);
}
+ String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+ partNames.remove(defaultPartitionName);
+ Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault");
+ AggrStats aggrStatsAllButDefaultPartition =
+ rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ Deadline.stopTimer();
+ sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
+ aggrStatsAllButDefaultPartition);
}
} catch (MetaException | NoSuchObjectException e) {
- LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e);
- } finally {
- if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) {
- tableColStatsCacheLock.writeLock().unlock();
- }
+ LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
+ e);
}
}
}
@@ -712,35 +613,17 @@ public class CachedStore implements RawStore, Configurable {
@Override
public void createDatabase(Database db) throws InvalidObjectException, MetaException {
rawStore.createDatabase(db);
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
- try {
- // Wait if background cache update is happening
- databaseCacheLock.readLock().lock();
- isDatabaseCacheDirty.set(true);
- sharedCache.addDatabaseToCache(StringUtils.normalizeIdentifier(db.getName()),
- db.deepCopy());
- } finally {
- databaseCacheLock.readLock().unlock();
- }
+ sharedCache.addDatabaseToCache(db);
}
@Override
public Database getDatabase(String dbName) throws NoSuchObjectException {
- SharedCache sharedCache;
-
- if (!sharedCacheWrapper.isInitialized()) {
+ if (!sharedCache.isDatabaseCachePrewarmed()) {
return rawStore.getDatabase(dbName);
}
-
- try {
- sharedCache = sharedCacheWrapper.get();
- } catch (MetaException e) {
- throw new RuntimeException(e); // TODO: why doesn't getDatabase throw MetaEx?
- }
- Database db = sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
+ dbName = dbName.toLowerCase();
+ Database db =
+ sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
if (db == null) {
throw new NoSuchObjectException();
}
@@ -748,68 +631,39 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException {
- boolean succ = rawStore.dropDatabase(dbname);
+ public boolean dropDatabase(String dbName) throws NoSuchObjectException, MetaException {
+ boolean succ = rawStore.dropDatabase(dbName);
if (succ) {
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- databaseCacheLock.readLock().lock();
- isDatabaseCacheDirty.set(true);
- sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbname));
- } finally {
- databaseCacheLock.readLock().unlock();
- }
+ dbName = dbName.toLowerCase();
+ sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
}
return succ;
}
@Override
- public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException,
- MetaException {
+ public boolean alterDatabase(String dbName, Database db)
+ throws NoSuchObjectException, MetaException {
boolean succ = rawStore.alterDatabase(dbName, db);
if (succ) {
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- databaseCacheLock.readLock().lock();
- isDatabaseCacheDirty.set(true);
- sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db);
- } finally {
- databaseCacheLock.readLock().unlock();
- }
+ dbName = dbName.toLowerCase();
+ sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db);
}
return succ;
}
@Override
public List<String> getDatabases(String pattern) throws MetaException {
- if (!sharedCacheWrapper.isInitialized()) {
+ if (!sharedCache.isDatabaseCachePrewarmed()) {
return rawStore.getDatabases(pattern);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- List<String> results = new ArrayList<>();
- for (String dbName : sharedCache.listCachedDatabases()) {
- dbName = StringUtils.normalizeIdentifier(dbName);
- if (CacheUtils.matches(dbName, pattern)) {
- results.add(dbName);
- }
- }
- return results;
+ return sharedCache.listCachedDatabases(pattern);
}
@Override
public List<String> getAllDatabases() throws MetaException {
- if (!sharedCacheWrapper.isInitialized()) {
+ if (!sharedCache.isDatabaseCachePrewarmed()) {
return rawStore.getAllDatabases();
}
- SharedCache sharedCache = sharedCacheWrapper.get();
return sharedCache.listCachedDatabases();
}
@@ -854,24 +708,13 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
validateTableType(tbl);
- try {
- // Wait if background cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.addTableToCache(dbName, tblName, tbl);
- } finally {
- tableCacheLock.readLock().unlock();
- }
+ sharedCache.addTableToCache(dbName, tblName, tbl);
}
@Override
- public boolean dropTable(String dbName, String tblName) throws MetaException,
- NoSuchObjectException, InvalidObjectException, InvalidInputException {
+ public boolean dropTable(String dbName, String tblName)
+ throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
boolean succ = rawStore.dropTable(dbName, tblName);
if (succ) {
dbName = StringUtils.normalizeIdentifier(dbName);
@@ -879,28 +722,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- // Remove table
- try {
- // Wait if background table cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.removeTableFromCache(dbName, tblName);
- } finally {
- tableCacheLock.readLock().unlock();
- }
- // Remove table col stats
- try {
- // Wait if background table col stats cache update is happening
- tableColStatsCacheLock.readLock().lock();
- isTableColStatsCacheDirty.set(true);
- sharedCache.removeTableColStatsFromCache(dbName, tblName);
- } finally {
- tableColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.removeTableFromCache(dbName, tblName);
}
return succ;
}
@@ -909,11 +731,14 @@ public class CachedStore implements RawStore, Configurable {
public Table getTable(String dbName, String tblName) throws MetaException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getTable(dbName, tblName);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // This table is not yet loaded in cache
+ return rawStore.getTable(dbName, tblName);
+ }
if (tbl != null) {
tbl.unsetPrivileges();
tbl.setRewriteEnabled(tbl.isRewriteEnabled());
@@ -930,27 +755,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.addPartitionToCache(dbName, tblName, part);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.addPartitionToCache(dbName, tblName, part);
}
return succ;
}
@@ -965,29 +770,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- for (Partition part : parts) {
- sharedCache.addPartitionToCache(dbName, tblName, part);
- }
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.addPartitionsToCache(dbName, tblName, parts);
}
return succ;
}
@@ -1002,30 +785,10 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
- while (iterator.hasNext()) {
- Partition part = iterator.next();
- sharedCache.addPartitionToCache(dbName, tblName, part);
- }
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
+ PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+ while (iterator.hasNext()) {
+ Partition part = iterator.next();
+ sharedCache.addPartitionToCache(dbName, tblName, part);
}
}
return succ;
@@ -1036,16 +799,13 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
-
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getPartition(dbName, tblName, part_vals);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- Partition part =
- sharedCache.getPartitionFromCache(dbName, tblName, part_vals);
+ Partition part = sharedCache.getPartitionFromCache(dbName, tblName, part_vals);
if (part == null) {
- // TODO Manage privileges
- throw new NoSuchObjectException("partition values=" + part_vals.toString());
+ // The table containing the partition is not yet loaded in cache
+ return rawStore.getPartition(dbName, tblName, part_vals);
}
return part;
}
@@ -1055,10 +815,14 @@ public class CachedStore implements RawStore, Configurable {
List<String> part_vals) throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.doesPartitionExist(dbName, tblName, part_vals);
+ }
+ Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // The table containing the partition is not yet loaded in cache
return rawStore.doesPartitionExist(dbName, tblName, part_vals);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
return sharedCache.existPartitionFromCache(dbName, tblName, part_vals);
}
@@ -1072,50 +836,40 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- // Remove partition
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.removePartitionFromCache(dbName, tblName, part_vals);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove partition col stats
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals);
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.removePartitionFromCache(dbName, tblName, part_vals);
}
return succ;
}
@Override
+ public void dropPartitions(String dbName, String tblName, List<String> partNames)
+ throws MetaException, NoSuchObjectException {
+ rawStore.dropPartitions(dbName, tblName, partNames);
+ dbName = StringUtils.normalizeIdentifier(dbName);
+ tblName = StringUtils.normalizeIdentifier(tblName);
+ if (!shouldCacheTable(dbName, tblName)) {
+ return;
+ }
+ List<List<String>> partVals = new ArrayList<List<String>>();
+ for (String partName : partNames) {
+ partVals.add(partNameToVals(partName));
+ }
+ sharedCache.removePartitionsFromCache(dbName, tblName, partVals);
+ }
+
+ @Override
public List<Partition> getPartitions(String dbName, String tblName, int max)
throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.getPartitions(dbName, tblName, max);
+ }
+ Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // The table containing the partitions is not yet loaded in cache
return rawStore.getPartitions(dbName, tblName, max);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<Partition> parts = sharedCache.listCachedPartitions(dbName, tblName, max);
return parts;
}
@@ -1130,73 +884,20 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) {
return;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
+ Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // The table is not yet loaded in cache
return;
}
-
- if (shouldCacheTable(dbName, newTblName)) {
- validateTableType(newTable);
- // Update table cache
- try {
- // Wait if background cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), newTable);
- } finally {
- tableCacheLock.readLock().unlock();
- }
- // Update partition cache (key might have changed since table name is a
- // component of key)
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName),
- StringUtils.normalizeIdentifier(tblName), newTable);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- } else {
- // Remove the table and its cached partitions, stats etc,
- // since it does not pass the whitelist/blacklist filter.
- // Remove table
- try {
- // Wait if background cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.removeTableFromCache(dbName, tblName);
- } finally {
- tableCacheLock.readLock().unlock();
- }
- // Remove partitions
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.removePartitionsFromCache(dbName, tblName);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove partition col stats
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.removePartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Update aggregate partition col stats keys wherever applicable
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.alterTableInAggrPartitionColStatsCache(dbName, tblName, newTable);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ if (shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) {
+ // If old table is in the cache and the new table can also be cached
+ sharedCache.alterTableInCache(dbName, tblName, newTable);
+ } else if (!shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) {
+ // If old table is *not* in the cache but the new table can be cached
+ sharedCache.addTableToCache(dbName, newTblName, newTable);
+ } else if (shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) {
+ // If old table is in the cache but the new table *cannot* be cached
+ sharedCache.removeTableFromCache(dbName, tblName);
}
}
@@ -1208,34 +909,21 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getTables(String dbName, String pattern) throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.getTables(dbName, pattern);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- List<String> tableNames = new ArrayList<>();
- for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
- if (CacheUtils.matches(table.getTableName(), pattern)) {
- tableNames.add(table.getTableName());
- }
- }
- return tableNames;
+ return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern,
+ (short) -1);
}
@Override
public List<String> getTables(String dbName, String pattern, TableType tableType)
throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.getTables(dbName, pattern, tableType);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- List<String> tableNames = new ArrayList<>();
- for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
- if (CacheUtils.matches(table.getTableName(), pattern) &&
- table.getTableType().equals(tableType.toString())) {
- tableNames.add(table.getTableName());
- }
- }
- return tableNames;
+ return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern,
+ tableType);
}
@Override
@@ -1248,10 +936,9 @@ public class CachedStore implements RawStore, Configurable {
public List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes)
throws MetaException {
// TODO Check if all required tables are allowed, if so, get it from cache
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.getTableMeta(dbNames, tableNames, tableTypes);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames),
StringUtils.normalizeIdentifier(tableNames), tableTypes);
}
@@ -1268,10 +955,9 @@ public class CachedStore implements RawStore, Configurable {
break;
}
}
- if (!sharedCacheWrapper.isInitialized() || missSomeInCache) {
+ if (!isCachePrewarmed.get() || missSomeInCache) {
return rawStore.getTableObjectsByName(dbName, tblNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<Table> tables = new ArrayList<>();
for (String tblName : tblNames) {
tblName = StringUtils.normalizeIdentifier(tblName);
@@ -1286,38 +972,20 @@ public class CachedStore implements RawStore, Configurable {
@Override
public List<String> getAllTables(String dbName) throws MetaException {
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.getAllTables(dbName);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- return getAllTablesInternal(dbName, sharedCache);
- }
-
- private static List<String> getAllTablesInternal(String dbName, SharedCache sharedCache) {
- List<String> tblNames = new ArrayList<>();
- for (Table tbl : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
- tblNames.add(StringUtils.normalizeIdentifier(tbl.getTableName()));
- }
- return tblNames;
+ return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName));
}
@Override
public List<String> listTableNamesByFilter(String dbName, String filter, short max_tables)
throws MetaException, UnknownDBException {
- if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) {
+ if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
return rawStore.listTableNamesByFilter(dbName, filter, max_tables);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- List<String> tableNames = new ArrayList<>();
- int count = 0;
- for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) {
- if (CacheUtils.matches(table.getTableName(), filter)
- && (max_tables == -1 || count < max_tables)) {
- tableNames.add(table.getTableName());
- count++;
- }
- }
- return tableNames;
+ return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), filter,
+ max_tables);
}
@Override
@@ -1325,16 +993,19 @@ public class CachedStore implements RawStore, Configurable {
short max_parts) throws MetaException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.listPartitionNames(dbName, tblName, max_parts);
+ }
+ Table tbl = sharedCache.getTableFromCache(dbName, tblName);
+ if (tbl == null) {
+ // The table is not yet loaded in cache
return rawStore.listPartitionNames(dbName, tblName, max_parts);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<String> partitionNames = new ArrayList<>();
- Table t = sharedCache.getTableFromCache(dbName, tblName);
int count = 0;
for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, max_parts)) {
if (max_parts == -1 || count < max_parts) {
- partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+ partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()));
}
}
return partitionNames;
@@ -1363,37 +1034,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
- // Update partition cache
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Update partition column stats cache
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart);
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
}
@Override
@@ -1405,61 +1046,23 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
- // Update partition cache
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- for (int i = 0; i < partValsList.size(); i++) {
- List<String> partVals = partValsList.get(i);
- Partition newPart = newParts.get(i);
- sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart);
- }
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Update partition column stats cache
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- for (int i = 0; i < partValsList.size(); i++) {
- List<String> partVals = partValsList.get(i);
- Partition newPart = newParts.get(i);
- sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart);
- }
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.alterPartitionsInCache(dbName, tblName, partValsList, newParts);
}
private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr,
String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache)
- throws MetaException, NoSuchObjectException {
- List<Partition> parts = sharedCache.listCachedPartitions(
- StringUtils.normalizeIdentifier(table.getDbName()),
- StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
+ throws MetaException, NoSuchObjectException {
+ List<Partition> parts =
+ sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getDbName()),
+ StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
for (Partition part : parts) {
result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
}
if (defaultPartName == null || defaultPartName.isEmpty()) {
defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
}
- return expressionProxy.filterPartitionsByExpr(
- table.getPartitionKeys(), expr, defaultPartName, result);
+ return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName,
+ result);
}
@Override
@@ -1474,13 +1077,17 @@ public class CachedStore implements RawStore, Configurable {
String defaultPartitionName, short maxParts, List<Partition> result) throws TException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts,
result);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<String> partNames = new LinkedList<>();
Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
+ return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts,
+ result);
+ }
boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr,
defaultPartitionName, maxParts, partNames, sharedCache);
return hasUnknownPartitions;
@@ -1497,13 +1104,16 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getNumPartitionsByExpr(dbName, tblName, expr);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
List<String> partNames = new LinkedList<>();
Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
+ return rawStore.getNumPartitionsByExpr(dbName, tblName, expr);
+ }
getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames,
sharedCache);
return partNames.size();
@@ -1526,10 +1136,14 @@ public class CachedStore implements RawStore, Configurable {
List<String> partNames) throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.getPartitionsByNames(dbName, tblName, partNames);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.getPartitionsByNames(dbName, tblName, partNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<Partition> partitions = new ArrayList<>();
for (String partName : partNames) {
Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName));
@@ -1702,14 +1316,17 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException, InvalidObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
Partition p = sharedCache.getPartitionFromCache(dbName, tblName, partVals);
- if (p!=null) {
- Table t = sharedCache.getTableFromCache(dbName, tblName);
- String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals);
+ if (p != null) {
+ String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals);
PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
userName, groupNames);
p.setPrivileges(privs);
@@ -1723,16 +1340,19 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException, InvalidObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- Table t = sharedCache.getTableFromCache(dbName, tblName);
List<Partition> partitions = new ArrayList<>();
int count = 0;
for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
if (maxParts == -1 || count < maxParts) {
- String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+ String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName,
userName, groupNames);
part.setPrivileges(privs);
@@ -1749,13 +1369,16 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<String> partNames = new ArrayList<>();
int count = 0;
- Table t = sharedCache.getTableFromCache(dbName, tblName);
for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
boolean psMatch = true;
for (int i=0;i<partVals.size();i++) {
@@ -1770,7 +1393,7 @@ public class CachedStore implements RawStore, Configurable {
continue;
}
if (maxParts == -1 || count < maxParts) {
- partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues()));
+ partNames.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
count++;
}
}
@@ -1783,13 +1406,17 @@ public class CachedStore implements RawStore, Configurable {
throws MetaException, InvalidObjectException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
+ return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName,
+ groupNames);
+ }
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName,
groupNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
List<Partition> partitions = new ArrayList<>();
- Table t = sharedCache.getTableFromCache(dbName, tblName);
int count = 0;
for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) {
boolean psMatch = true;
@@ -1805,7 +1432,7 @@ public class CachedStore implements RawStore, Configurable {
continue;
}
if (maxParts == -1 || count < maxParts) {
- String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues());
+ String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
PrincipalPrivilegeSet privs =
getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames);
part.setPrivileges(privs);
@@ -1825,35 +1452,19 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
return succ;
}
List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
- Table tbl = getTable(dbName, tblName);
List<String> colNames = new ArrayList<>();
for (ColumnStatisticsObj statsObj : statsObjs) {
colNames.add(statsObj.getColName());
}
- StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
- // Update table
- try {
- // Wait if background cache update is happening
- tableCacheLock.readLock().lock();
- isTableCacheDirty.set(true);
- sharedCache.alterTableInCache(dbName, tblName, tbl);
- } finally {
- tableCacheLock.readLock().unlock();
- }
- // Update table col stats
- try {
- // Wait if background cache update is happening
- tableColStatsCacheLock.readLock().lock();
- isTableColStatsCacheDirty.set(true);
- sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs);
- } finally {
- tableColStatsCacheLock.readLock().unlock();
- }
+ StatsSetupConst.setColumnStatsState(table.getParameters(), colNames);
+ sharedCache.alterTableInCache(dbName, tblName, table);
+ sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs);
}
return succ;
}
@@ -1863,24 +1474,18 @@ public class CachedStore implements RawStore, Configurable {
List<String> colNames) throws MetaException, NoSuchObjectException {
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
return rawStore.getTableColumnStatistics(dbName, tblName, colNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
- List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
- for (String colName : colNames) {
- String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName);
- ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey);
- if (colStat != null) {
- colStatObjs.add(colStat);
- }
- }
- if (colStatObjs.isEmpty()) {
- return null;
- } else {
- return new ColumnStatistics(csd, colStatObjs);
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
+ return rawStore.getTableColumnStatistics(dbName, tblName, colNames);
}
+ ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
+ List<ColumnStatisticsObj> colStatObjs =
+ sharedCache.getTableColStatsFromCache(dbName, tblName, colNames);
+ return new ColumnStatistics(csd, colStatObjs);
}
@Override
@@ -1893,18 +1498,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- tableColStatsCacheLock.readLock().lock();
- isTableColStatsCacheDirty.set(true);
- sharedCache.removeTableColStatsFromCache(dbName, tblName, colName);
- } finally {
- tableColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.removeTableColStatsFromCache(dbName, tblName, colName);
}
return succ;
}
@@ -1919,10 +1513,6 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
Partition part = getPartition(dbName, tblName, partVals);
List<String> colNames = new ArrayList<>();
@@ -1930,34 +1520,8 @@ public class CachedStore implements RawStore, Configurable {
colNames.add(statsObj.getColName());
}
StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
- // Update partition
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- sharedCache.alterPartitionInCache(dbName, tblName, partVals, part);
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Update partition column stats
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals,
- colStats.getStatsObj());
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.alterPartitionInCache(dbName, tblName, partVals, part);
+ sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, colStats.getStatsObj());
}
return succ;
}
@@ -1981,27 +1545,7 @@ public class CachedStore implements RawStore, Configurable {
if (!shouldCacheTable(dbName, tblName)) {
return succ;
}
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return succ;
- }
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName);
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
- // Remove aggregate partition col stats for this table
- try {
- // Wait if background cache update is happening
- partitionAggrColStatsCacheLock.readLock().lock();
- isPartitionAggrColStatsCacheDirty.set(true);
- sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName);
- } finally {
- partitionAggrColStatsCacheLock.readLock().unlock();
- }
+ sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName);
}
return succ;
}
@@ -2012,10 +1556,14 @@ public class CachedStore implements RawStore, Configurable {
List<ColumnStatisticsObj> colStats;
dbName = StringUtils.normalizeIdentifier(dbName);
tblName = StringUtils.normalizeIdentifier(tblName);
- if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) {
+ if (!shouldCacheTable(dbName, tblName)) {
rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
}
- SharedCache sharedCache = sharedCacheWrapper.get();
+ Table table = sharedCache.getTableFromCache(dbName, tblName);
+ if (table == null) {
+ // The table is not yet loaded in cache
+ return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames);
+ }
List<String> allPartNames = rawStore.listPartitionNames(dbName, tblName, (short) -1);
if (partNames.size() == allPartNames.size()) {
colStats = sharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALL);
@@ -2054,10 +1602,8 @@ public class CachedStore implements RawStore, Configurable {
List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList =
new ArrayList<ColStatsObjWithSourceInfo>();
for (String partName : partNames) {
- String colStatsCacheKey =
- CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName);
ColumnStatisticsObj colStatsForPart =
- sharedCache.getCachedPartitionColStats(colStatsCacheKey);
+ sharedCache.getPartitionColStatsFromCache(dbName, tblName, partNameToVals(partName), colName);
if (colStatsForPart != null) {
ColStatsObjWithSourceInfo colStatsWithPartInfo =
new ColStatsObjWithSourceInfo(colStatsForPart, dbName, tblName, partName);
@@ -2173,54 +1719,6 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
- public void dropPartitions(String dbName, String tblName, List<String> partNames)
- throws MetaException, NoSuchObjectException {
- rawStore.dropPartitions(dbName, tblName, partNames);
- dbName = StringUtils.normalizeIdentifier(dbName);
- tblName = StringUtils.normalizeIdentifier(tblName);
- if (!shouldCacheTable(dbName, tblName)) {
- return;
- }
- SharedCache sharedCache = sharedCacheWrapper.get();
- if (sharedCache == null) {
- return;
- }
- // Remove partitions
- try {
- // Wait if background cache update is happening
- partitionCacheLock.readLock().lock();
- isPartitionCacheDirty.set(true);
- for (String partName : partNames) {
- List<String> vals = partNameToVals(partName);
- sharedCache.removePartitionFromCache(dbName, tblName, vals);
- }
- } finally {
- partitionCacheLock.readLock().unlock();
- }
- // Remove partition col stats
- try {
- // Wait if background cache update is happening
- partitionColStatsCacheLock.readLock().lock();
- isPartitionColStatsCacheDirty.set(true);
- for (String partName : partNames) {
- List<String> part_vals = partNameToVals(partName);
- sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals);
- }
- } finally {
- partitionColStatsCacheLock.readLock().unlock();
- }
-
<TRUNCATED>