You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/03/19 17:54:31 UTC
[2/4] hive git commit: HIVE-18264: CachedStore: Store cached
partitions/col stats within the table cache and make prewarm non-blocking
(Vaibhav Gumashta reviewed by Daniel Dai, Alexander Kolbasov)
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index 32ea174..cf92eda 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -21,15 +21,20 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.TreeMap;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -38,11 +43,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper;
-import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,15 +51,21 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
public class SharedCache {
- private Map<String, Database> databaseCache = new TreeMap<>();
- private Map<String, TableWrapper> tableCache = new TreeMap<>();
- private Map<String, PartitionWrapper> partitionCache = new TreeMap<>();
- private Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<>();
- private Map<String, ColumnStatisticsObj> tableColStatsCache = new TreeMap<>();
- private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<>();
- private Map<String, List<ColumnStatisticsObj>> aggrColStatsCache =
- new HashMap<String, List<ColumnStatisticsObj>>();
+ private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true);
+ // For caching Database objects. Key is database name
+ private Map<String, Database> databaseCache = new ConcurrentHashMap<String, Database>();
+ private boolean isDatabaseCachePrewarmed = false;
+ private HashSet<String> databasesDeletedDuringPrewarm = new HashSet<String>();
+ private AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false);
+ // For caching TableWrapper objects. Key is aggregate of database name and table name
+ private Map<String, TableWrapper> tableCache = new ConcurrentHashMap<String, TableWrapper>();
+ private boolean isTableCachePrewarmed = false;
+ private HashSet<String> tablesDeletedDuringPrewarm = new HashSet<String>();
+ private AtomicBoolean isTableCacheDirty = new AtomicBoolean(false);
+ private Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new ConcurrentHashMap<>();
private static MessageDigest md;
+ static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName());
+ private AtomicLong cacheUpdateCount = new AtomicLong(0);
static enum StatsType {
ALL(0), ALLBUTDEFAULT(1);
@@ -74,8 +81,6 @@ public class SharedCache {
}
}
- private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class);
-
static {
try {
md = MessageDigest.getInstance("MD5");
@@ -84,43 +89,804 @@ public class SharedCache {
}
}
- public synchronized Database getDatabaseFromCache(String name) {
- return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null;
+ static class TableWrapper {
+ Table t;
+ String location;
+ Map<String, String> parameters;
+ byte[] sdHash;
+ ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true);
+ // For caching column stats for an unpartitioned table
+ // Key is column name and the value is the col stat object
+ private Map<String, ColumnStatisticsObj> tableColStatsCache =
+ new ConcurrentHashMap<String, ColumnStatisticsObj>();
+ private AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false);
+ // For caching partition objects
+ // Ket is partition values and the value is a wrapper around the partition object
+ private Map<String, PartitionWrapper> partitionCache = new ConcurrentHashMap<String, PartitionWrapper>();
+ private AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false);
+ // For caching column stats for a partitioned table
+ // Key is aggregate of partition values, column name and the value is the col stat object
+ private Map<String, ColumnStatisticsObj> partitionColStatsCache =
+ new ConcurrentHashMap<String, ColumnStatisticsObj>();
+ private AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false);
+ // For caching aggregate column stats for all and all minus default partition
+ // Key is column name and the value is a list of 2 col stat objects
+ // (all partitions and all but default)
+ private Map<String, List<ColumnStatisticsObj>> aggrColStatsCache =
+ new ConcurrentHashMap<String, List<ColumnStatisticsObj>>();
+ private AtomicBoolean isAggrPartitionColStatsCacheDirty = new AtomicBoolean(false);
+
+ TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) {
+ this.t = t;
+ this.sdHash = sdHash;
+ this.location = location;
+ this.parameters = parameters;
+ }
+
+ public Table getTable() {
+ return t;
+ }
+
+ public void setTable(Table t) {
+ this.t = t;
+ }
+
+ public byte[] getSdHash() {
+ return sdHash;
+ }
+
+ public void setSdHash(byte[] sdHash) {
+ this.sdHash = sdHash;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public void setLocation(String location) {
+ this.location = location;
+ }
+
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+
+ public void setParameters(Map<String, String> parameters) {
+ this.parameters = parameters;
+ }
+
+ void cachePartition(Partition part, SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+ partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+ isPartitionCacheDirty.set(true);
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ void cachePartitions(List<Partition> parts, SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ for (Partition part : parts) {
+ PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache);
+ partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper);
+ isPartitionCacheDirty.set(true);
+ }
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public Partition getPartition(List<String> partVals, SharedCache sharedCache) {
+ Partition part = null;
+ try {
+ tableLock.readLock().lock();
+ PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
+ if (wrapper == null) {
+ return null;
+ }
+ part = CacheUtils.assemble(wrapper, sharedCache);
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return part;
+ }
+
+ public List<Partition> listPartitions(int max, SharedCache sharedCache) {
+ List<Partition> parts = new ArrayList<>();
+ int count = 0;
+ try {
+ tableLock.readLock().lock();
+ for (PartitionWrapper wrapper : partitionCache.values()) {
+ if (max == -1 || count < max) {
+ parts.add(CacheUtils.assemble(wrapper, sharedCache));
+ count++;
+ }
+ }
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return parts;
+ }
+
+ public boolean containsPartition(List<String> partVals) {
+ boolean containsPart = false;
+ try {
+ tableLock.readLock().lock();
+ containsPart = partitionCache.containsKey(CacheUtils.buildPartitionCacheKey(partVals));
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return containsPart;
+ }
+
+ public Partition removePartition(List<String> partVal, SharedCache sharedCache) {
+ Partition part = null;
+ try {
+ tableLock.writeLock().lock();
+ PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal));
+ isPartitionCacheDirty.set(true);
+ if (wrapper.getSdHash() != null) {
+ sharedCache.decrSd(wrapper.getSdHash());
+ }
+ part = CacheUtils.assemble(wrapper, sharedCache);
+ // Remove col stats
+ String partialKey = CacheUtils.buildPartitionCacheKey(partVal);
+ Iterator<Entry<String, ColumnStatisticsObj>> iterator =
+ partitionColStatsCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, ColumnStatisticsObj> entry = iterator.next();
+ String key = entry.getKey();
+ if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
+ iterator.remove();
+ }
+ }
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ return part;
+ }
+
+ public void removePartitions(List<List<String>> partVals, SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ for (List<String> partVal : partVals) {
+ removePartition(partVal, sharedCache);
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void alterPartition(List<String> partVals, Partition newPart, SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ removePartition(partVals, sharedCache);
+ cachePartition(newPart, sharedCache);
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void alterPartitions(List<List<String>> partValsList, List<Partition> newParts,
+ SharedCache sharedCache) {
+ try {
+ tableLock.writeLock().lock();
+ for (int i = 0; i < partValsList.size(); i++) {
+ List<String> partVals = partValsList.get(i);
+ Partition newPart = newParts.get(i);
+ alterPartition(partVals, newPart, sharedCache);
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void refreshPartitions(List<Partition> partitions, SharedCache sharedCache) {
+ Map<String, PartitionWrapper> newPartitionCache = new HashMap<String, PartitionWrapper>();
+ try {
+ tableLock.writeLock().lock();
+ for (Partition part : partitions) {
+ if (isPartitionCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition cache update for table: " + getTable().getTableName()
+ + "; the partition list we have is dirty.");
+ return;
+ }
+ String key = CacheUtils.buildPartitionCacheKey(part.getValues());
+ PartitionWrapper wrapper = partitionCache.get(key);
+ if (wrapper != null) {
+ if (wrapper.getSdHash() != null) {
+ sharedCache.decrSd(wrapper.getSdHash());
+ }
+ }
+ wrapper = makePartitionWrapper(part, sharedCache);
+ newPartitionCache.put(key, wrapper);
+ }
+ partitionCache = newPartitionCache;
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void updateTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
+ try {
+ tableLock.writeLock().lock();
+ for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+ // Get old stats object if present
+ String key = colStatObj.getColName();
+ ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
+ if (oldStatsObj != null) {
+ // Update existing stat object's field
+ StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+ } else {
+ // No stats exist for this key; add a new object to the cache
+ // TODO: get rid of deepCopy after making sure callers don't use references
+ tableColStatsCache.put(key, colStatObj.deepCopy());
+ }
+ }
+ isTableColStatsCacheDirty.set(true);
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void refreshTableColStats(List<ColumnStatisticsObj> colStatsForTable) {
+ Map<String, ColumnStatisticsObj> newTableColStatsCache =
+ new HashMap<String, ColumnStatisticsObj>();
+ try {
+ tableLock.writeLock().lock();
+ for (ColumnStatisticsObj colStatObj : colStatsForTable) {
+ if (isTableColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping table col stats cache update for table: "
+ + getTable().getTableName() + "; the table col stats list we have is dirty.");
+ return;
+ }
+ String key = colStatObj.getColName();
+ // TODO: get rid of deepCopy after making sure callers don't use references
+ newTableColStatsCache.put(key, colStatObj.deepCopy());
+ }
+ tableColStatsCache = newTableColStatsCache;
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public List<ColumnStatisticsObj> getCachedTableColStats(List<String> colNames) {
+ List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+ try {
+ tableLock.readLock().lock();
+ for (String colName : colNames) {
+ ColumnStatisticsObj colStatObj = tableColStatsCache.get(colName);
+ if (colStatObj != null) {
+ colStatObjs.add(colStatObj);
+ }
+ }
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return colStatObjs;
+ }
+
+ public void removeTableColStats(String colName) {
+ try {
+ tableLock.writeLock().lock();
+ tableColStatsCache.remove(colName);
+ isTableColStatsCacheDirty.set(true);
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public ColumnStatisticsObj getPartitionColStats(List<String> partVal, String colName) {
+ try {
+ tableLock.readLock().lock();
+ return partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ }
+
+ public void updatePartitionColStats(List<String> partVal,
+ List<ColumnStatisticsObj> colStatsObjs) {
+ try {
+ tableLock.writeLock().lock();
+ addPartitionColStatsToCache(partVal, colStatsObjs);
+ isPartitionColStatsCacheDirty.set(true);
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void removePartitionColStats(List<String> partVals, String colName) {
+ try {
+ tableLock.writeLock().lock();
+ partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName));
+ isPartitionColStatsCacheDirty.set(true);
+ // Invalidate cached aggregate stats
+ if (!aggrColStatsCache.isEmpty()) {
+ aggrColStatsCache.clear();
+ }
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ private void addPartitionColStatsToCache(List<String> partVal,
+ List<ColumnStatisticsObj> colStatsObjs) {
+ for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+ // Get old stats object if present
+ String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+ ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
+ if (oldStatsObj != null) {
+ // Update existing stat object's field
+ StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
+ } else {
+ // No stats exist for this key; add a new object to the cache
+ // TODO: get rid of deepCopy after making sure callers don't use references
+ partitionColStatsCache.put(key, colStatObj.deepCopy());
+ }
+ }
+ }
+
+ public void refreshPartitionColStats(List<ColumnStatistics> partitionColStats) {
+ Map<String, ColumnStatisticsObj> newPartitionColStatsCache =
+ new HashMap<String, ColumnStatisticsObj>();
+ try {
+ tableLock.writeLock().lock();
+ String tableName = StringUtils.normalizeIdentifier(getTable().getTableName());
+ for (ColumnStatistics cs : partitionColStats) {
+ if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition column stats cache update for table: "
+ + getTable().getTableName() + "; the partition column stats list we have is dirty");
+ return;
+ }
+ List<String> partVal;
+ try {
+ partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null);
+ List<ColumnStatisticsObj> colStatsObjs = cs.getStatsObj();
+ for (ColumnStatisticsObj colStatObj : colStatsObjs) {
+ if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping partition column stats cache update for table: "
+ + getTable().getTableName() + "; the partition column list we have is dirty");
+ return;
+ }
+ String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName());
+ newPartitionColStatsCache.put(key, colStatObj.deepCopy());
+ }
+ } catch (MetaException e) {
+ LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
+ }
+ }
+ partitionColStatsCache = newPartitionColStatsCache;
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public List<ColumnStatisticsObj> getAggrPartitionColStats(List<String> colNames,
+ StatsType statsType) {
+ List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
+ try {
+ tableLock.readLock().lock();
+ for (String colName : colNames) {
+ List<ColumnStatisticsObj> colStatList = aggrColStatsCache.get(colName);
+ // If unable to find stats for a column, return null so we can build stats
+ if (colStatList == null) {
+ return null;
+ }
+ ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition());
+ // If unable to find stats for this StatsType, return null so we can build stats
+ if (colStatObj == null) {
+ return null;
+ }
+ colStats.add(colStatObj);
+ }
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return colStats;
+ }
+
+ public void cacheAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
+ AggrStats aggrStatsAllButDefaultPartition) {
+ try {
+ tableLock.writeLock().lock();
+ if (aggrStatsAllPartitions != null) {
+ for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
+ if (statObj != null) {
+ List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
+ aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
+ aggrColStatsCache.put(statObj.getColName(), aggrStats);
+ }
+ }
+ }
+ if (aggrStatsAllButDefaultPartition != null) {
+ for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
+ if (statObj != null) {
+ List<ColumnStatisticsObj> aggrStats = aggrColStatsCache.get(statObj.getColName());
+ if (aggrStats == null) {
+ aggrStats = new ArrayList<ColumnStatisticsObj>();
+ }
+ aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+ }
+ }
+ }
+ isAggrPartitionColStatsCacheDirty.set(true);
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
+ AggrStats aggrStatsAllButDefaultPartition) {
+ Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
+ new HashMap<String, List<ColumnStatisticsObj>>();
+ try {
+ tableLock.writeLock().lock();
+ if (aggrStatsAllPartitions != null) {
+ for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
+ if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping aggregate stats cache update for table: "
+ + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+ return;
+ }
+ if (statObj != null) {
+ List<ColumnStatisticsObj> aggrStats = new ArrayList<ColumnStatisticsObj>();
+ aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy());
+ newAggrColStatsCache.put(statObj.getColName(), aggrStats);
+ }
+ }
+ }
+ if (aggrStatsAllButDefaultPartition != null) {
+ for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) {
+ if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping aggregate stats cache update for table: "
+ + getTable().getTableName() + "; the aggregate stats list we have is dirty");
+ return;
+ }
+ if (statObj != null) {
+ List<ColumnStatisticsObj> aggrStats = newAggrColStatsCache.get(statObj.getColName());
+ if (aggrStats == null) {
+ aggrStats = new ArrayList<ColumnStatisticsObj>();
+ }
+ aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy());
+ }
+ }
+ }
+ aggrColStatsCache = newAggrColStatsCache;
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
+ private void updateTableObj(Table newTable, SharedCache sharedCache) {
+ byte[] sdHash = getSdHash();
+ // Remove old table object's sd hash
+ if (sdHash != null) {
+ sharedCache.decrSd(sdHash);
+ }
+ Table tblCopy = newTable.deepCopy();
+ if (tblCopy.getPartitionKeys() != null) {
+ for (FieldSchema fs : tblCopy.getPartitionKeys()) {
+ fs.setName(StringUtils.normalizeIdentifier(fs.getName()));
+ }
+ }
+ setTable(tblCopy);
+ if (tblCopy.getSd() != null) {
+ sdHash = MetaStoreUtils.hashStorageDescriptor(tblCopy.getSd(), md);
+ StorageDescriptor sd = tblCopy.getSd();
+ sharedCache.increSd(sd, sdHash);
+ tblCopy.setSd(null);
+ setSdHash(sdHash);
+ setLocation(sd.getLocation());
+ setParameters(sd.getParameters());
+ } else {
+ setSdHash(null);
+ setLocation(null);
+ setParameters(null);
+ }
+ }
+
+ private PartitionWrapper makePartitionWrapper(Partition part, SharedCache sharedCache) {
+ Partition partCopy = part.deepCopy();
+ PartitionWrapper wrapper;
+ if (part.getSd() != null) {
+ byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
+ StorageDescriptor sd = part.getSd();
+ sharedCache.increSd(sd, sdHash);
+ partCopy.setSd(null);
+ wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
+ } else {
+ wrapper = new PartitionWrapper(partCopy, null, null, null);
+ }
+ return wrapper;
+ }
+ }
+
+ static class PartitionWrapper {
+ Partition p;
+ String location;
+ Map<String, String> parameters;
+ byte[] sdHash;
+
+ PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) {
+ this.p = p;
+ this.sdHash = sdHash;
+ this.location = location;
+ this.parameters = parameters;
+ }
+
+ public Partition getPartition() {
+ return p;
+ }
+
+ public byte[] getSdHash() {
+ return sdHash;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+ }
+
+ static class StorageDescriptorWrapper {
+ StorageDescriptor sd;
+ int refCount = 0;
+
+ StorageDescriptorWrapper(StorageDescriptor sd, int refCount) {
+ this.sd = sd;
+ this.refCount = refCount;
+ }
+
+ public StorageDescriptor getSd() {
+ return sd;
+ }
+
+ public int getRefCount() {
+ return refCount;
+ }
+ }
+
+ public Database getDatabaseFromCache(String name) {
+ Database db = null;
+ try {
+ cacheLock.readLock().lock();
+ if (databaseCache.get(name) != null) {
+ db = databaseCache.get(name).deepCopy();
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ return db;
+ }
+
+ public void populateDatabasesInCache(List<Database> databases) {
+ for (Database db : databases) {
+ Database dbCopy = db.deepCopy();
+ // ObjectStore also stores db name in lowercase
+ dbCopy.setName(dbCopy.getName().toLowerCase());
+ try {
+ cacheLock.writeLock().lock();
+ // Since we allow write operations on cache while prewarm is happening:
+ // 1. Don't add databases that were deleted while we were preparing list for prewarm
+ // 2. Skip overwriting exisiting db object
+ // (which is present because it was added after prewarm started)
+ if (databasesDeletedDuringPrewarm.contains(dbCopy.getName().toLowerCase())) {
+ continue;
+ }
+ databaseCache.putIfAbsent(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy);
+ databasesDeletedDuringPrewarm.clear();
+ isDatabaseCachePrewarmed = true;
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
}
- public synchronized void addDatabaseToCache(String dbName, Database db) {
- Database dbCopy = db.deepCopy();
- dbCopy.setName(StringUtils.normalizeIdentifier(dbName));
- databaseCache.put(dbName, dbCopy);
+ public boolean isDatabaseCachePrewarmed() {
+ return isDatabaseCachePrewarmed;
}
- public synchronized void removeDatabaseFromCache(String dbName) {
- databaseCache.remove(dbName);
+ public void addDatabaseToCache(Database db) {
+ try {
+ cacheLock.writeLock().lock();
+ Database dbCopy = db.deepCopy();
+ // ObjectStore also stores db name in lowercase
+ dbCopy.setName(dbCopy.getName().toLowerCase());
+ databaseCache.put(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy);
+ isDatabaseCacheDirty.set(true);
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
}
- public synchronized List<String> listCachedDatabases() {
- return new ArrayList<>(databaseCache.keySet());
+ public void removeDatabaseFromCache(String dbName) {
+ try {
+ cacheLock.writeLock().lock();
+ // If db cache is not yet prewarmed, add this to a set which the prewarm thread can check
+ // so that the prewarm thread does not add it back
+ if (!isDatabaseCachePrewarmed) {
+ databasesDeletedDuringPrewarm.add(dbName.toLowerCase());
+ }
+ if (databaseCache.remove(dbName) != null) {
+ isDatabaseCacheDirty.set(true);
+ }
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
+ public List<String> listCachedDatabases() {
+ List<String> results = new ArrayList<>();
+ try {
+ cacheLock.readLock().lock();
+ results.addAll(databaseCache.keySet());
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ return results;
+ }
+
+ public List<String> listCachedDatabases(String pattern) {
+ List<String> results = new ArrayList<>();
+ try {
+ cacheLock.readLock().lock();
+ for (String dbName : databaseCache.keySet()) {
+ dbName = StringUtils.normalizeIdentifier(dbName);
+ if (CacheUtils.matches(dbName, pattern)) {
+ results.add(dbName);
+ }
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ return results;
}
- public synchronized void alterDatabaseInCache(String dbName, Database newDb) {
- removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName));
- addDatabaseToCache(StringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy());
+ /**
+ * Replaces the old db object with the new one.
+ * This will add the new database to cache if it does not exist.
+ * @param dbName
+ * @param newDb
+ */
+ public void alterDatabaseInCache(String dbName, Database newDb) {
+ try {
+ cacheLock.writeLock().lock();
+ removeDatabaseFromCache(dbName);
+ addDatabaseToCache(newDb.deepCopy());
+ isDatabaseCacheDirty.set(true);
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
+ public void refreshDatabasesInCache(List<Database> databases) {
+ try {
+ cacheLock.writeLock().lock();
+ if (isDatabaseCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping database cache update; the database list we have is dirty.");
+ return;
+ }
+ databaseCache.clear();
+ for (Database db : databases) {
+ addDatabaseToCache(db);
+ }
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
+ public int getCachedDatabaseCount() {
+ try {
+ cacheLock.readLock().lock();
+ return databaseCache.size();
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ }
+
+ public void populateTableInCache(Table table, ColumnStatistics tableColStats,
+ List<Partition> partitions, List<ColumnStatistics> partitionColStats,
+ AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+ String dbName = StringUtils.normalizeIdentifier(table.getDbName());
+ String tableName = StringUtils.normalizeIdentifier(table.getTableName());
+ // Since we allow write operations on cache while prewarm is happening:
+ // 1. Don't add tables that were deleted while we were preparing list for prewarm
+ if (tablesDeletedDuringPrewarm.contains(CacheUtils.buildTableCacheKey(dbName, tableName))) {
+ return;
+ }
+ TableWrapper tblWrapper = createTableWrapper(dbName, tableName, table);
+ if (!table.isSetPartitionKeys() && (tableColStats != null)) {
+ tblWrapper.updateTableColStats(tableColStats.getStatsObj());
+ } else {
+ if (partitions != null) {
+ tblWrapper.cachePartitions(partitions, this);
+ }
+ if (partitionColStats != null) {
+ for (ColumnStatistics cs : partitionColStats) {
+ List<String> partVal;
+ try {
+ partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null);
+ List<ColumnStatisticsObj> colStats = cs.getStatsObj();
+ tblWrapper.updatePartitionColStats(partVal, colStats);
+ } catch (MetaException e) {
+ LOG.debug("Unable to cache partition column stats for table: " + tableName, e);
+ }
+ }
+ }
+ tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
+ aggrStatsAllButDefaultPartition);
+ }
+ try {
+ cacheLock.writeLock().lock();
+ // 2. Skip overwriting exisiting table object
+ // (which is present because it was added after prewarm started)
+ tableCache.putIfAbsent(CacheUtils.buildTableCacheKey(dbName, tableName), tblWrapper);
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
}
- public synchronized int getCachedDatabaseCount() {
- return databaseCache.size();
+ public void completeTableCachePrewarm() {
+ try {
+ cacheLock.writeLock().lock();
+ tablesDeletedDuringPrewarm.clear();
+ isTableCachePrewarmed = true;
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
}
- public synchronized Table getTableFromCache(String dbName, String tableName) {
- TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName));
- if (tblWrapper == null) {
- return null;
+ public Table getTableFromCache(String dbName, String tableName) {
+ Table t = null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+ if (tblWrapper != null) {
+ t = CacheUtils.assemble(tblWrapper, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
- Table t = CacheUtils.assemble(tblWrapper, this);
return t;
}
- public synchronized void addTableToCache(String dbName, String tblName, Table tbl) {
+ public TableWrapper addTableToCache(String dbName, String tblName, Table tbl) {
+ try {
+ cacheLock.writeLock().lock();
+ TableWrapper wrapper = createTableWrapper(dbName, tblName, tbl);
+ tableCache.put(CacheUtils.buildTableCacheKey(dbName, tblName), wrapper);
+ isTableCacheDirty.set(true);
+ return wrapper;
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
+ private TableWrapper createTableWrapper(String dbName, String tblName, Table tbl) {
+ TableWrapper wrapper;
Table tblCopy = tbl.deepCopy();
tblCopy.setDbName(StringUtils.normalizeIdentifier(dbName));
tblCopy.setTableName(StringUtils.normalizeIdentifier(tblName));
@@ -129,7 +895,6 @@ public class SharedCache {
fs.setName(StringUtils.normalizeIdentifier(fs.getName()));
}
}
- TableWrapper wrapper;
if (tbl.getSd() != null) {
byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md);
StorageDescriptor sd = tbl.getSd();
@@ -139,482 +904,452 @@ public class SharedCache {
} else {
wrapper = new TableWrapper(tblCopy, null, null, null);
}
- tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper);
+ return wrapper;
}
- public synchronized void removeTableFromCache(String dbName, String tblName) {
- TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName));
- byte[] sdHash = tblWrapper.getSdHash();
- if (sdHash!=null) {
- decrSd(sdHash);
+ public void removeTableFromCache(String dbName, String tblName) {
+ try {
+ cacheLock.writeLock().lock();
+ // If table cache is not yet prewarmed, add this to a set which the prewarm thread can check
+ // so that the prewarm thread does not add it back
+ if (!isTableCachePrewarmed) {
+ tablesDeletedDuringPrewarm.add(CacheUtils.buildTableCacheKey(dbName, tblName));
+ }
+ TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildTableCacheKey(dbName, tblName));
+ byte[] sdHash = tblWrapper.getSdHash();
+ if (sdHash != null) {
+ decrSd(sdHash);
+ }
+ isTableCacheDirty.set(true);
+ } finally {
+ cacheLock.writeLock().unlock();
}
}
- public synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) {
- return tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null;
- }
-
- public synchronized void removeTableColStatsFromCache(String dbName, String tblName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- tableColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public void alterTableInCache(String dbName, String tblName, Table newTable) {
+ try {
+ cacheLock.writeLock().lock();
+ TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.updateTableObj(newTable, this);
+ String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
+ String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName());
+ tableCache.put(CacheUtils.buildTableCacheKey(newDbName, newTblName), tblWrapper);
+ isTableCacheDirty.set(true);
}
+ } finally {
+ cacheLock.writeLock().unlock();
}
}
- public synchronized void removeTableColStatsFromCache(String dbName, String tblName,
- String colName) {
- if (colName == null) {
- removeTableColStatsFromCache(dbName, tblName);
- } else {
- tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName));
+ public List<Table> listCachedTables(String dbName) {
+ List<Table> tables = new ArrayList<>();
+ try {
+ cacheLock.readLock().lock();
+ for (TableWrapper wrapper : tableCache.values()) {
+ if (wrapper.getTable().getDbName().equals(dbName)) {
+ tables.add(CacheUtils.assemble(wrapper, this));
+ }
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return tables;
}
- public synchronized void updateTableColStatsInCache(String dbName, String tableName,
- List<ColumnStatisticsObj> colStatsForTable) {
- for (ColumnStatisticsObj colStatObj : colStatsForTable) {
- // Get old stats object if present
- String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
- ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key);
- if (oldStatsObj != null) {
- LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName()
- + ", of table: " + tableName + " and database: " + dbName);
- // Update existing stat object's field
- StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
- } else {
- // No stats exist for this key; add a new object to the cache
- tableColStatsCache.put(key, colStatObj);
+ public List<String> listCachedTableNames(String dbName) {
+ List<String> tableNames = new ArrayList<>();
+ try {
+ cacheLock.readLock().lock();
+ for (TableWrapper wrapper : tableCache.values()) {
+ if (wrapper.getTable().getDbName().equals(dbName)) {
+ tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+ }
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return tableNames;
}
- public synchronized void alterTableInCache(String dbName, String tblName, Table newTable) {
- removeTableFromCache(dbName, tblName);
- addTableToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
- StringUtils.normalizeIdentifier(newTable.getTableName()), newTable);
+ public List<String> listCachedTableNames(String dbName, String pattern, short maxTables) {
+ List<String> tableNames = new ArrayList<String>();
+ try {
+ cacheLock.readLock().lock();
+ int count = 0;
+ for (TableWrapper wrapper : tableCache.values()) {
+ if ((wrapper.getTable().getDbName().equals(dbName))
+ && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+ && (maxTables == -1 || count < maxTables)) {
+ tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+ count++;
+ }
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ return tableNames;
}
- public synchronized void alterTableInPartitionCache(String dbName, String tblName,
- Table newTable) {
- if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
- List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
- for (Partition part : partitions) {
- removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues());
- part.setDbName(StringUtils.normalizeIdentifier(newTable.getDbName()));
- part.setTableName(StringUtils.normalizeIdentifier(newTable.getTableName()));
- addPartitionToCache(StringUtils.normalizeIdentifier(newTable.getDbName()),
- StringUtils.normalizeIdentifier(newTable.getTableName()), part);
+ public List<String> listCachedTableNames(String dbName, String pattern, TableType tableType) {
+ List<String> tableNames = new ArrayList<String>();
+ try {
+ cacheLock.readLock().lock();
+ for (TableWrapper wrapper : tableCache.values()) {
+ if ((wrapper.getTable().getDbName().equals(dbName))
+ && CacheUtils.matches(wrapper.getTable().getTableName(), pattern)
+ && wrapper.getTable().getTableType().equals(tableType.toString())) {
+ tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName()));
+ }
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return tableNames;
}
- public synchronized void alterTableInTableColStatsCache(String dbName, String tblName,
- Table newTable) {
- if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
- String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- tableColStatsCache.entrySet().iterator();
- Map<String, ColumnStatisticsObj> newTableColStats =
- new HashMap<>();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- ColumnStatisticsObj colStatObj = entry.getValue();
- if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) {
- String[] decomposedKey = CacheUtils.splitTableColStats(key);
- String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]);
- newTableColStats.put(newKey, colStatObj);
- iterator.remove();
+ public void refreshTablesInCache(String dbName, List<Table> tables) {
+ try {
+ cacheLock.writeLock().lock();
+ if (isTableCacheDirty.compareAndSet(true, false)) {
+ LOG.debug("Skipping table cache update; the table list we have is dirty.");
+ return;
+ }
+ Map<String, TableWrapper> newTableCache = new HashMap<String, TableWrapper>();
+ for (Table tbl : tables) {
+ String tblName = StringUtils.normalizeIdentifier(tbl.getTableName());
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.updateTableObj(tbl, this);
+ } else {
+ tblWrapper = createTableWrapper(dbName, tblName, tbl);
}
+ newTableCache.put(CacheUtils.buildTableCacheKey(dbName, tblName), tblWrapper);
}
- tableColStatsCache.putAll(newTableColStats);
+ tableCache.clear();
+ tableCache = newTableCache;
+ } finally {
+ cacheLock.writeLock().unlock();
}
}
- public synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName,
- Table newTable) {
- if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
- List<Partition> partitions = listCachedPartitions(dbName, tblName, -1);
- Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
- for (Partition part : partitions) {
- String oldPartialPartitionKey =
- CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues());
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- ColumnStatisticsObj colStatObj = entry.getValue();
- if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
- Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
- // New key has the new table name
- String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(),
- (List<String>) decomposedKey[2], (String) decomposedKey[3]);
- newPartitionColStats.put(newKey, colStatObj);
- iterator.remove();
- }
- }
+ public List<ColumnStatisticsObj> getTableColStatsFromCache(String dbName, String tblName,
+ List<String> colNames) {
+ List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ colStatObjs = tblWrapper.getCachedTableColStats(colNames);
}
- partitionColStatsCache.putAll(newPartitionColStats);
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return colStatObjs;
}
- public synchronized void alterTableInAggrPartitionColStatsCache(String dbName, String tblName,
- Table newTable) {
- if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) {
- Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
- new HashMap<String, List<ColumnStatisticsObj>>();
- String oldPartialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, List<ColumnStatisticsObj>>> iterator =
- aggrColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, List<ColumnStatisticsObj>> entry = iterator.next();
- String key = entry.getKey();
- List<ColumnStatisticsObj> value = entry.getValue();
- if (key.toLowerCase().startsWith(oldPartialKey.toLowerCase())) {
- Object[] decomposedKey = CacheUtils.splitAggrColStats(key);
- // New key has the new table name
- String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(),
- (String) decomposedKey[2]);
- newAggrColStatsCache.put(newKey, value);
- iterator.remove();
- }
+ public void removeTableColStatsFromCache(String dbName, String tblName, String colName) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.removeTableColStats(colName);
}
- aggrColStatsCache.putAll(newAggrColStatsCache);
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- public synchronized int getCachedTableCount() {
- return tableCache.size();
+ public void updateTableColStatsInCache(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+ if (tblWrapper != null) {
+ tblWrapper.updateTableColStats(colStatsForTable);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
}
- public synchronized List<Table> listCachedTables(String dbName) {
- List<Table> tables = new ArrayList<>();
- for (TableWrapper wrapper : tableCache.values()) {
- if (wrapper.getTable().getDbName().equals(dbName)) {
- tables.add(CacheUtils.assemble(wrapper, this));
+ public void refreshTableColStatsInCache(String dbName, String tableName,
+ List<ColumnStatisticsObj> colStatsForTable) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+ if (tblWrapper != null) {
+ tblWrapper.refreshTableColStats(colStatsForTable);
}
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ }
+
+ public int getCachedTableCount() {
+ try {
+ cacheLock.readLock().lock();
+ return tableCache.size();
+ } finally {
+ cacheLock.readLock().unlock();
}
- return tables;
}
- public synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) {
+ public List<TableMeta> getTableMeta(String dbNames, String tableNames,
+ List<String> tableTypes) {
List<TableMeta> tableMetas = new ArrayList<>();
- for (String dbName : listCachedDatabases()) {
- if (CacheUtils.matches(dbName, dbNames)) {
- for (Table table : listCachedTables(dbName)) {
- if (CacheUtils.matches(table.getTableName(), tableNames)) {
- if (tableTypes==null || tableTypes.contains(table.getTableType())) {
- TableMeta metaData = new TableMeta(
- dbName, table.getTableName(), table.getTableType());
+ try {
+ cacheLock.readLock().lock();
+ for (String dbName : listCachedDatabases()) {
+ if (CacheUtils.matches(dbName, dbNames)) {
+ for (Table table : listCachedTables(dbName)) {
+ if (CacheUtils.matches(table.getTableName(), tableNames)) {
+ if (tableTypes == null || tableTypes.contains(table.getTableType())) {
+ TableMeta metaData =
+ new TableMeta(dbName, table.getTableName(), table.getTableType());
metaData.setComments(table.getParameters().get("comment"));
tableMetas.add(metaData);
+ }
}
}
}
}
+ } finally {
+ cacheLock.readLock().unlock();
}
return tableMetas;
}
- public synchronized void addPartitionToCache(String dbName, String tblName, Partition part) {
- Partition partCopy = part.deepCopy();
- PartitionWrapper wrapper;
- if (part.getSd()!=null) {
- byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md);
- StorageDescriptor sd = part.getSd();
- increSd(sd, sdHash);
- partCopy.setSd(null);
- wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters());
- } else {
- wrapper = new PartitionWrapper(partCopy, null, null, null);
+ public void addPartitionToCache(String dbName, String tblName, Partition part) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.cachePartition(part, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
- partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper);
}
- public synchronized Partition getPartitionFromCache(String key) {
- PartitionWrapper wrapper = partitionCache.get(key);
- if (wrapper == null) {
- return null;
+ public void addPartitionsToCache(String dbName, String tblName, List<Partition> parts) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.cachePartitions(parts, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
- Partition p = CacheUtils.assemble(wrapper, this);
- return p;
- }
-
- public synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
- return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals));
}
- public synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) {
- return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals));
- }
-
- public synchronized Partition removePartitionFromCache(String dbName, String tblName,
- List<String> part_vals) {
- PartitionWrapper wrapper =
- partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals));
- if (wrapper.getSdHash() != null) {
- decrSd(wrapper.getSdHash());
+ public Partition getPartitionFromCache(String dbName, String tblName,
+ List<String> partVals) {
+ Partition part = null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ part = tblWrapper.getPartition(partVals, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
- return wrapper.getPartition();
+ return part;
}
- /**
- * Given a db + table, remove all partitions for this table from the cache
- * @param dbName
- * @param tblName
- * @return
- */
- public synchronized void removePartitionsFromCache(String dbName, String tblName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, PartitionWrapper> entry = iterator.next();
- String key = entry.getKey();
- PartitionWrapper wrapper = entry.getValue();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
- if (wrapper.getSdHash() != null) {
- decrSd(wrapper.getSdHash());
- }
+ public boolean existPartitionFromCache(String dbName, String tblName, List<String> partVals) {
+ boolean existsPart = false;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ existsPart = tblWrapper.containsPartition(partVals);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return existsPart;
}
- // Remove cached column stats for all partitions of all tables in a db
- public synchronized void removePartitionColStatsFromCache(String dbName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public Partition removePartitionFromCache(String dbName, String tblName,
+ List<String> partVals) {
+ Partition part = null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ part = tblWrapper.removePartition(partVals, this);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return part;
}
- // Remove cached column stats for all partitions of a table
- public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public void removePartitionsFromCache(String dbName, String tblName,
+ List<List<String>> partVals) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.removePartitions(partVals, this);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- // Remove cached column stats for a particular partition of a table
- public synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
- List<String> partVals) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
+ List<Partition> parts = new ArrayList<Partition>();
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ parts = tblWrapper.listPartitions(max, this);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
+ return parts;
}
- // Remove cached column stats for a particular partition and a particular column of a table
- public synchronized void removePartitionColStatsFromCache(String dbName, String tblName,
- List<String> partVals, String colName) {
- partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName));
+ public void alterPartitionInCache(String dbName, String tblName, List<String> partVals,
+ Partition newPart) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.alterPartition(partVals, newPart, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
}
- public synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) {
- List<Partition> partitions = new ArrayList<>();
- int count = 0;
- for (PartitionWrapper wrapper : partitionCache.values()) {
- if (wrapper.getPartition().getDbName().equals(dbName)
- && wrapper.getPartition().getTableName().equals(tblName)
- && (max == -1 || count < max)) {
- partitions.add(CacheUtils.assemble(wrapper, this));
- count++;
+ public void alterPartitionsInCache(String dbName, String tblName, List<List<String>> partValsList,
+ List<Partition> newParts) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.alterPartitions(partValsList, newParts, this);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
- return partitions;
}
- public synchronized void alterPartitionInCache(String dbName, String tblName,
- List<String> partVals, Partition newPart) {
- removePartitionFromCache(dbName, tblName, partVals);
- addPartitionToCache(StringUtils.normalizeIdentifier(newPart.getDbName()),
- StringUtils.normalizeIdentifier(newPart.getTableName()), newPart);
+ public void refreshPartitionsInCache(String dbName, String tblName, List<Partition> partitions) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.refreshPartitions(partitions, this);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
}
- public synchronized void alterPartitionInColStatsCache(String dbName, String tblName,
- List<String> partVals, Partition newPart) {
- String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals);
- Map<String, ColumnStatisticsObj> newPartitionColStats = new HashMap<>();
- Iterator<Entry<String, ColumnStatisticsObj>> iterator =
- partitionColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, ColumnStatisticsObj> entry = iterator.next();
- String key = entry.getKey();
- ColumnStatisticsObj colStatObj = entry.getValue();
- if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) {
- Object[] decomposedKey = CacheUtils.splitPartitionColStats(key);
- String newKey =
- CacheUtils.buildKey(StringUtils.normalizeIdentifier(newPart.getDbName()),
- StringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(),
- (String) decomposedKey[3]);
- newPartitionColStats.put(newKey, colStatObj);
- iterator.remove();
+ public void removePartitionColStatsFromCache(String dbName, String tblName,
+ List<String> partVals, String colName) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.removePartitionColStats(partVals, colName);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
- partitionColStatsCache.putAll(newPartitionColStats);
}
- public synchronized void updatePartitionColStatsInCache(String dbName, String tableName,
+ public void updatePartitionColStatsInCache(String dbName, String tableName,
List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) {
- for (ColumnStatisticsObj colStatObj : colStatsObjs) {
- // Get old stats object if present
- String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName());
- ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key);
- if (oldStatsObj != null) {
- // Update existing stat object's field
- LOG.debug("CachedStore: updating partition column stats for column: "
- + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName);
- StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj);
- } else {
- // No stats exist for this key; add a new object to the cache
- partitionColStatsCache.put(key, colStatObj);
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tableName));
+ if (tblWrapper != null) {
+ tblWrapper.updatePartitionColStats(partVals, colStatsObjs);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- public synchronized int getCachedPartitionCount() {
- return partitionCache.size();
- }
-
- public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) {
- return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null;
- }
-
- public synchronized void addPartitionColStatsToCache(
- List<ColStatsObjWithSourceInfo> colStatsForDB) {
- for (ColStatsObjWithSourceInfo colStatWithSourceInfo : colStatsForDB) {
- List<String> partVals;
- try {
- partVals = Warehouse.getPartValuesFromPartName(colStatWithSourceInfo.getPartName());
- ColumnStatisticsObj colStatObj = colStatWithSourceInfo.getColStatsObj();
- String key = CacheUtils.buildKey(colStatWithSourceInfo.getDbName(),
- colStatWithSourceInfo.getTblName(), partVals, colStatObj.getColName());
- partitionColStatsCache.put(key, colStatObj);
- } catch (MetaException e) {
- LOG.info("Unable to add partition stats for: {} to SharedCache",
- colStatWithSourceInfo.getPartName(), e);
+ public ColumnStatisticsObj getPartitionColStatsFromCache(String dbName, String tblName,
+ List<String> partVal, String colName) {
+ ColumnStatisticsObj colStatObj = null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null){
+ colStatObj = tblWrapper.getPartitionColStats(partVal, colName);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
-
- }
-
- public synchronized void refreshPartitionColStats(String dbName,
- List<ColStatsObjWithSourceInfo> colStatsForDB) {
- LOG.debug("CachedStore: updating cached partition column stats objects for database: {}",
- dbName);
- removePartitionColStatsFromCache(dbName);
- addPartitionColStatsToCache(colStatsForDB);
+ return colStatObj;
}
- public synchronized void addAggregateStatsToCache(String dbName, String tblName,
- AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
- if (aggrStatsAllPartitions != null) {
- for (ColumnStatisticsObj colStatObj : aggrStatsAllPartitions.getColStats()) {
- String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName());
- List<ColumnStatisticsObj> value = new ArrayList<ColumnStatisticsObj>();
- value.add(StatsType.ALL.getPosition(), colStatObj);
- aggrColStatsCache.put(key, value);
- }
- }
- if (aggrStatsAllButDefaultPartition != null) {
- for (ColumnStatisticsObj colStatObj : aggrStatsAllButDefaultPartition.getColStats()) {
- String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName());
- List<ColumnStatisticsObj> value = aggrColStatsCache.get(key);
- if ((value != null) && (value.size() > 0)) {
- value.add(StatsType.ALLBUTDEFAULT.getPosition(), colStatObj);
- }
+ public void refreshPartitionColStatsInCache(String dbName, String tblName,
+ List<ColumnStatistics> partitionColStats) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.refreshPartitionColStats(partitionColStats);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
}
public List<ColumnStatisticsObj> getAggrStatsFromCache(String dbName, String tblName,
List<String> colNames, StatsType statsType) {
- List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
- for (String colName : colNames) {
- String key = CacheUtils.buildKey(dbName, tblName, colName);
- List<ColumnStatisticsObj> colStatList = aggrColStatsCache.get(key);
- // If unable to find stats for a column, return null so we can build stats
- if (colStatList == null) {
- return null;
- }
- ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition());
- // If unable to find stats for this StatsType, return null so we can build
- // stats
- if (colStatObj == null) {
- return null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ return tblWrapper.getAggrPartitionColStats(colNames, statsType);
}
- colStats.add(colStatObj);
+ } finally {
+ cacheLock.readLock().unlock();
}
- return colStats;
+ return null;
}
- public synchronized void removeAggrPartitionColStatsFromCache(String dbName, String tblName) {
- String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName);
- Iterator<Entry<String, List<ColumnStatisticsObj>>> iterator =
- aggrColStatsCache.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry<String, List<ColumnStatisticsObj>> entry = iterator.next();
- String key = entry.getKey();
- if (key.toLowerCase().startsWith(partialKey.toLowerCase())) {
- iterator.remove();
+ public void addAggregateStatsToCache(String dbName, String tblName,
+ AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null){
+ tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions,
+ aggrStatsAllButDefaultPartition);
}
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- public synchronized void refreshAggregateStatsCache(String dbName, String tblName,
+ public void refreshAggregateStatsInCache(String dbName, String tblName,
AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
- LOG.debug("CachedStore: updating aggregate stats cache for database: {}, table: {}", dbName,
- tblName);
- removeAggrPartitionColStatsFromCache(dbName, tblName);
- addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions,
- aggrStatsAllButDefaultPartition);
- }
-
- public synchronized void addTableColStatsToCache(String dbName, String tableName,
- List<ColumnStatisticsObj> colStatsForTable) {
- for (ColumnStatisticsObj colStatObj : colStatsForTable) {
- String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName());
- tableColStatsCache.put(key, colStatObj);
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableCacheKey(dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions,
+ aggrStatsAllButDefaultPartition);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
}
}
- public synchronized void refreshTableColStats(String dbName, String tableName,
- List<ColumnStatisticsObj> colStatsForTable) {
- LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName
- + " and table: " + tableName);
- // Remove all old cache entries for this table
- removeTableColStatsFromCache(dbName, tableName);
- // Add new entries to cache
- addTableColStatsToCache(dbName, tableName, colStatsForTable);
- }
-
- public void increSd(StorageDescriptor sd, byte[] sdHash) {
+ public synchronized void increSd(StorageDescriptor sd, byte[] sdHash) {
ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
if (sdCache.containsKey(byteArray)) {
sdCache.get(byteArray).refCount++;
@@ -626,7 +1361,7 @@ public class SharedCache {
}
}
- public void decrSd(byte[] sdHash) {
+ public synchronized void decrSd(byte[] sdHash) {
ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash);
StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray);
sdWrapper.refCount--;
@@ -635,50 +1370,11 @@ public class SharedCache {
}
}
- public StorageDescriptor getSdFromCache(byte[] sdHash) {
+ public synchronized StorageDescriptor getSdFromCache(byte[] sdHash) {
StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash));
return sdWrapper.getSd();
}
- // Replace databases in databaseCache with the new list
- public synchronized void refreshDatabases(List<Database> databases) {
- LOG.debug("CachedStore: updating cached database objects");
- for (String dbName : listCachedDatabases()) {
- removeDatabaseFromCache(dbName);
- }
- for (Database db : databases) {
- addDatabaseToCache(db.getName(), db);
- }
- }
-
- // Replace tables in tableCache with the new list
- public synchronized void refreshTables(String dbName, List<Table> tables) {
- LOG.debug("CachedStore: updating cached table objects for database: " + dbName);
- for (Table tbl : listCachedTables(dbName)) {
- removeTableFromCache(dbName, tbl.getTableName());
- }
- for (Table tbl : tables) {
- addTableToCache(dbName, tbl.getTableName(), tbl);
- }
- }
-
- public synchronized void refreshPartitions(String dbName, String tblName,
- List<Partition> partitions) {
- LOG.debug("CachedStore: updating cached partition objects for database: " + dbName
- + " and table: " + tblName);
- Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator();
- while (iterator.hasNext()) {
- PartitionWrapper partitionWrapper = iterator.next().getValue();
- if (partitionWrapper.getPartition().getDbName().equals(dbName)
- && partitionWrapper.getPartition().getTableName().equals(tblName)) {
- iterator.remove();
- }
- }
- for (Partition part : partitions) {
- addPartitionToCache(dbName, tblName, part);
- }
- }
-
@VisibleForTesting
Map<String, Database> getDatabaseCache() {
return databaseCache;
@@ -690,17 +1386,15 @@ public class SharedCache {
}
@VisibleForTesting
- Map<String, PartitionWrapper> getPartitionCache() {
- return partitionCache;
- }
-
- @VisibleForTesting
Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() {
return sdCache;
}
- @VisibleForTesting
- Map<String, ColumnStatisticsObj> getPartitionColStatsCache() {
- return partitionColStatsCache;
+ public long getUpdateCount() {
+ return cacheUpdateCount.get();
+ }
+
+ public void incrementUpdateCount() {
+ cacheUpdateCount.incrementAndGet();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 5aee2e3..e373753 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -339,7 +339,7 @@ public class MetaStoreUtils {
* @param md message descriptor to use to generate the hash
* @return the hash as a byte array
*/
- public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) {
+ public static synchronized byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) {
// Note all maps and lists have to be absolutely sorted. Otherwise we'll produce different
// results for hashes based on the OS or JVM being used.
md.reset();
@@ -434,6 +434,15 @@ public class MetaStoreUtils {
return colNames;
}
+ public static List<String> getColumnNamesForPartition(Partition partition) {
+ List<String> colNames = new ArrayList<>();
+ Iterator<FieldSchema> colsIterator = partition.getSd().getColsIterator();
+ while (colsIterator.hasNext()) {
+ colNames.add(colsIterator.next().getName());
+ }
+ return colNames;
+ }
+
/**
* validateName
*
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index 5edf8b3..e9527c7 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -1046,13 +1046,6 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath);
}
- @Override
- public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
- throws MetaException, NoSuchObjectException {
- // TODO Auto-generated method stub
- return null;
- }
-
public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException,
NoSuchObjectException {
objectStore.createISchema(schema);
http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index 132cdc3..8fc0c83 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -1033,13 +1033,6 @@ public class DummyRawStoreForJdoConnection implements RawStore {
String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException {
}
- @Override
- public List<ColStatsObjWithSourceInfo> getPartitionColStatsForDatabase(String dbName)
- throws MetaException, NoSuchObjectException {
- // TODO Auto-generated method stub
- return null;
- }
-
public void createISchema(ISchema schema) throws AlreadyExistsException, MetaException {
}