You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:29 UTC

[20/50] [abbrv] hive git commit: HIVE-19416 : merge master into branch (Sergey Shelukhin) 0719

http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 0000000,8ff056f..9bee0db
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@@ -1,0 -1,2532 +1,2532 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hive.metastore.cache;
+ 
+ 
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.EmptyStackException;
+ import java.util.HashMap;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Stack;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ThreadFactory;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.hadoop.conf.Configurable;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.common.DatabaseName;
+ import org.apache.hadoop.hive.common.StatsSetupConst;
+ import org.apache.hadoop.hive.common.TableName;
+ import org.apache.hadoop.hive.metastore.Deadline;
+ import org.apache.hadoop.hive.metastore.FileMetadataHandler;
+ import org.apache.hadoop.hive.metastore.ObjectStore;
+ import org.apache.hadoop.hive.metastore.PartFilterExprUtil;
+ import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
+ import org.apache.hadoop.hive.metastore.RawStore;
+ import org.apache.hadoop.hive.metastore.TableType;
+ import org.apache.hadoop.hive.metastore.Warehouse;
 -import org.apache.hadoop.hive.metastore.api.AggrStats;
 -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 -import org.apache.hadoop.hive.metastore.api.Catalog;
 -import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 -import org.apache.hadoop.hive.metastore.api.CreationMetadata;
 -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 -import org.apache.hadoop.hive.metastore.api.Database;
 -import org.apache.hadoop.hive.metastore.api.FieldSchema;
 -import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
 -import org.apache.hadoop.hive.metastore.api.Function;
 -import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
 -import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 -import org.apache.hadoop.hive.metastore.api.ISchema;
 -import org.apache.hadoop.hive.metastore.api.ISchemaName;
 -import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 -import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 -import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 -import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
 -import org.apache.hadoop.hive.metastore.api.MetaException;
 -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 -import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 -import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 -import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
 -import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
 -import org.apache.hadoop.hive.metastore.api.Partition;
 -import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 -import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse;
 -import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
 -import org.apache.hadoop.hive.metastore.api.PrincipalType;
 -import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
 -import org.apache.hadoop.hive.metastore.api.WMNullablePool;
 -import org.apache.hadoop.hive.metastore.api.WMNullableResourcePlan;
 -import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
 -import org.apache.hadoop.hive.metastore.api.WMTrigger;
 -import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
++import org.apache.hadoop.hive.metastore.api.*;
+ import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType;
+ import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
+ import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory;
 -import org.apache.hadoop.hive.metastore.api.Role;
 -import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
 -import org.apache.hadoop.hive.metastore.api.RuntimeStat;
 -import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
 -import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
 -import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 -import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 -import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 -import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 -import org.apache.hadoop.hive.metastore.api.SchemaVersion;
 -import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
 -import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 -import org.apache.hadoop.hive.metastore.api.Table;
 -import org.apache.hadoop.hive.metastore.api.TableMeta;
 -import org.apache.hadoop.hive.metastore.api.Type;
 -import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 -import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 -import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 -import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 -import org.apache.hadoop.hive.metastore.api.WMMapping;
 -import org.apache.hadoop.hive.metastore.api.WMPool;
 -import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+ import org.apache.hadoop.hive.metastore.utils.FileUtils;
+ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
+ import org.apache.hadoop.hive.metastore.utils.StringUtils;
+ import org.apache.thrift.TException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ 
+ import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+ import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+ 
+ // TODO filter->expr
+ // TODO functionCache
+ // TODO constraintCache
+ // TODO need sd nested copy?
+ // TODO String intern
+ // TODO monitor event queue
+ // TODO initial load slow?
+ // TODO size estimation
+ 
+ public class CachedStore implements RawStore, Configurable {
+   private static ScheduledExecutorService cacheUpdateMaster = null;
+   private static List<Pattern> whitelistPatterns = null;
+   private static List<Pattern> blacklistPatterns = null;
+   // Default value set to 100 milliseconds for test purpose
+   private static long DEFAULT_CACHE_REFRESH_PERIOD = 100;
+   // Time after which metastore cache is updated from metastore DB by the background update thread
+   private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD;
+   private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false);
+   private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm();
+   private RawStore rawStore = null;
+   private Configuration conf;
+   private PartitionExpressionProxy expressionProxy = null;
+   private static final SharedCache sharedCache = new SharedCache();
+ 
+   static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName());
+ 
+   @Override
+   public void setConf(Configuration conf) {
+     setConfInternal(conf);
+     initBlackListWhiteList(conf);
+     initSharedCache(conf);
+     startCacheUpdateService(conf, false, true);
+   }
+ 
+   /**
+    * Similar to setConf but used from within the tests
+    * This does start the background thread for prewarm and update
+    * @param conf
+    */
+   void setConfForTest(Configuration conf) {
+     setConfInternal(conf);
+     initBlackListWhiteList(conf);
+     initSharedCache(conf);
+   }
+ 
+   private void setConfInternal(Configuration conf) {
+     String rawStoreClassName =
+         MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
+     if (rawStore == null) {
+       try {
+         rawStore = (JavaUtils.getClass(rawStoreClassName, RawStore.class)).newInstance();
+       } catch (Exception e) {
+         throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
+       }
+     }
+     rawStore.setConf(conf);
+     Configuration oldConf = this.conf;
+     this.conf = conf;
+     if (expressionProxy != null && conf != oldConf) {
+       LOG.warn("Unexpected setConf when we were already configured");
+     } else {
+       expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
+     }
+   }
+ 
+   private void initSharedCache(Configuration conf) {
+     long maxSharedCacheSizeInBytes =
+         MetastoreConf.getSizeVar(conf, ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY);
+     sharedCache.initialize(maxSharedCacheSizeInBytes);
+     if (maxSharedCacheSizeInBytes > 0) {
+       LOG.info("Maximum memory that the cache will use: {} GB",
+           maxSharedCacheSizeInBytes / (1024 * 1024 * 1024));
+     }
+   }
+ 
+   @VisibleForTesting
+   /**
+    * This initializes the caches in SharedCache by getting the objects from Metastore DB via
+    * ObjectStore and populating the respective caches
+    */
+   static void prewarm(RawStore rawStore) {
+     if (isCachePrewarmed.get()) {
+       return;
+     }
+     long startTime = System.nanoTime();
+     LOG.info("Prewarming CachedStore");
+     while (!isCachePrewarmed.get()) {
+       // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy
+       Deadline.registerIfNot(1000000);
+       Collection<String> catalogsToCache;
+       try {
+         catalogsToCache = catalogsToCache(rawStore);
+         LOG.info("Going to cache catalogs: "
+             + org.apache.commons.lang.StringUtils.join(catalogsToCache, ", "));
+         List<Catalog> catalogs = new ArrayList<>(catalogsToCache.size());
+         for (String catName : catalogsToCache) {
+           catalogs.add(rawStore.getCatalog(catName));
+         }
+         sharedCache.populateCatalogsInCache(catalogs);
+       } catch (MetaException | NoSuchObjectException e) {
+         LOG.warn("Failed to populate catalogs in cache, going to try again", e);
+         // try again
+         continue;
+       }
+       LOG.info("Finished prewarming catalogs, starting on databases");
+       List<Database> databases = new ArrayList<>();
+       for (String catName : catalogsToCache) {
+         try {
+           List<String> dbNames = rawStore.getAllDatabases(catName);
+           LOG.info("Number of databases to prewarm in catalog {}: {}", catName, dbNames.size());
+           for (String dbName : dbNames) {
+             try {
+               databases.add(rawStore.getDatabase(catName, dbName));
+             } catch (NoSuchObjectException e) {
+               // Continue with next database
+               LOG.warn("Failed to cache database "
+                   + DatabaseName.getQualified(catName, dbName) + ", moving on", e);
+             }
+           }
+         } catch (MetaException e) {
+           LOG.warn("Failed to cache databases in catalog " + catName + ", moving on", e);
+         }
+       }
+       sharedCache.populateDatabasesInCache(databases);
+       LOG.info(
+           "Databases cache is now prewarmed. Now adding tables, partitions and statistics to the cache");
+       int numberOfDatabasesCachedSoFar = 0;
+       for (Database db : databases) {
+         String catName = StringUtils.normalizeIdentifier(db.getCatalogName());
+         String dbName = StringUtils.normalizeIdentifier(db.getName());
+         List<String> tblNames;
+         try {
+           tblNames = rawStore.getAllTables(catName, dbName);
+         } catch (MetaException e) {
+           LOG.warn("Failed to cache tables for database "
+               + DatabaseName.getQualified(catName, dbName) + ", moving on");
+           // Continue with next database
+           continue;
+         }
+         tblsPendingPrewarm.addTableNamesForPrewarming(tblNames);
+         int totalTablesToCache = tblNames.size();
+         int numberOfTablesCachedSoFar = 0;
+         while (tblsPendingPrewarm.hasMoreTablesToPrewarm()) {
+           try {
+             String tblName =
+                 StringUtils.normalizeIdentifier(tblsPendingPrewarm.getNextTableNameToPrewarm());
+             if (!shouldCacheTable(catName, dbName, tblName)) {
+               continue;
+             }
+             Table table;
+             try {
+               table = rawStore.getTable(catName, dbName, tblName);
+             } catch (MetaException e) {
+               // It is possible the table is deleted during fetching tables of the database,
+               // in that case, continue with the next table
+               continue;
+             }
+             List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+             try {
+               ColumnStatistics tableColStats = null;
+               List<Partition> partitions = null;
+               List<ColumnStatistics> partitionColStats = null;
+               AggrStats aggrStatsAllPartitions = null;
+               AggrStats aggrStatsAllButDefaultPartition = null;
+               if (table.isSetPartitionKeys()) {
+                 Deadline.startTimer("getPartitions");
+                 partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE);
+                 Deadline.stopTimer();
+                 List<String> partNames = new ArrayList<>(partitions.size());
+                 for (Partition p : partitions) {
+                   partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues()));
+                 }
+                 if (!partNames.isEmpty()) {
+                   // Get partition column stats for this table
+                   Deadline.startTimer("getPartitionColumnStatistics");
+                   partitionColStats = rawStore.getPartitionColumnStatistics(catName, dbName,
+                       tblName, partNames, colNames);
+                   Deadline.stopTimer();
+                   // Get aggregate stats for all partitions of a table and for all but default
+                   // partition
+                   Deadline.startTimer("getAggrPartitionColumnStatistics");
+                   aggrStatsAllPartitions =
+                       rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+                   Deadline.stopTimer();
+                   // Remove default partition from partition names and get aggregate
+                   // stats again
+                   List<FieldSchema> partKeys = table.getPartitionKeys();
+                   String defaultPartitionValue =
+                       MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+                   List<String> partCols = new ArrayList<>();
+                   List<String> partVals = new ArrayList<>();
+                   for (FieldSchema fs : partKeys) {
+                     partCols.add(fs.getName());
+                     partVals.add(defaultPartitionValue);
+                   }
+                   String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+                   partNames.remove(defaultPartitionName);
+                   Deadline.startTimer("getAggrPartitionColumnStatistics");
+                   aggrStatsAllButDefaultPartition =
+                       rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+                   Deadline.stopTimer();
+                 }
+               } else {
+                 Deadline.startTimer("getTableColumnStatistics");
+                 tableColStats =
+                     rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames);
+                 Deadline.stopTimer();
+               }
++              // TODO## should this take write ID into account? or at least cache write ID to verify?
+               // If the table could not cached due to memory limit, stop prewarm
+               boolean isSuccess = sharedCache.populateTableInCache(table, tableColStats, partitions,
+                   partitionColStats, aggrStatsAllPartitions, aggrStatsAllButDefaultPartition);
+               if (isSuccess) {
+                 LOG.trace("Cached Database: {}'s Table: {}.", dbName, tblName);
+               } else {
+                 LOG.info(
+                     "Unable to cache Database: {}'s Table: {}, since the cache memory is full. "
+                         + "Will stop attempting to cache any more tables.",
+                     dbName, tblName);
+                 completePrewarm(startTime);
+                 return;
+               }
+             } catch (MetaException | NoSuchObjectException e) {
+               // Continue with next table
+               continue;
+             }
+             LOG.debug("Processed database: {}'s table: {}. Cached {} / {}  tables so far.", dbName,
+                 tblName, ++numberOfTablesCachedSoFar, totalTablesToCache);
+           } catch (EmptyStackException e) {
+             // We've prewarmed this database, continue with the next one
+             continue;
+           }
+         }
+         LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName,
+             ++numberOfDatabasesCachedSoFar, databases.size());
+       }
+       completePrewarm(startTime);
+     }
+   }
+ 
+   private static void completePrewarm(long startTime) {
+     isCachePrewarmed.set(true);
+     LOG.info("CachedStore initialized");
+     long endTime = System.nanoTime();
+     LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms");
+     sharedCache.completeTableCachePrewarm();
+   }
+ 
+   static class TablesPendingPrewarm {
+     private Stack<String> tableNames = new Stack<>();
+ 
+     private synchronized void addTableNamesForPrewarming(List<String> tblNames) {
+       tableNames.clear();
+       if (tblNames != null) {
+         tableNames.addAll(tblNames);
+       }
+     }
+ 
+     private synchronized boolean hasMoreTablesToPrewarm() {
+       return !tableNames.empty();
+     }
+ 
+     private synchronized String getNextTableNameToPrewarm() {
+       return tableNames.pop();
+     }
+ 
+     private synchronized void prioritizeTableForPrewarm(String tblName) {
+       // If the table is in the pending prewarm list, move it to the top
+       if (tableNames.remove(tblName)) {
+         tableNames.push(tblName);
+       }
+     }
+   }
+ 
+   @VisibleForTesting
+   static void setCachePrewarmedState(boolean state) {
+     isCachePrewarmed.set(state);
+   }
+ 
+   private static void initBlackListWhiteList(Configuration conf) {
+     if (whitelistPatterns == null || blacklistPatterns == null) {
+       whitelistPatterns = createPatterns(MetastoreConf.getAsString(conf,
+           MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST));
+       blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf,
+           MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST));
+     }
+   }
+ 
+   private static Collection<String> catalogsToCache(RawStore rs) throws MetaException {
+     Collection<String> confValue =
+         MetastoreConf.getStringCollection(rs.getConf(), ConfVars.CATALOGS_TO_CACHE);
+     if (confValue == null || confValue.isEmpty() ||
+         (confValue.size() == 1 && confValue.contains(""))) {
+       return rs.getCatalogs();
+     } else {
+       return confValue;
+     }
+   }
+ 
+   @VisibleForTesting
+   /**
+    * This starts a background thread, which initially populates the SharedCache and later
+    * periodically gets updates from the metastore db
+    *
+    * @param conf
+    * @param runOnlyOnce
+    * @param shouldRunPrewarm
+    */
+   static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce,
+       boolean shouldRunPrewarm) {
+     if (cacheUpdateMaster == null) {
+       initBlackListWhiteList(conf);
+       if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) {
+         cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf,
+             ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS);
+       }
+       LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriodMS);
+       cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+         @Override
+         public Thread newThread(Runnable r) {
+           Thread t = Executors.defaultThreadFactory().newThread(r);
+           t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId());
+           t.setDaemon(true);
+           return t;
+         }
+       });
+       if (!runOnlyOnce) {
+         cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
+             cacheRefreshPeriodMS, TimeUnit.MILLISECONDS);
+       }
+     }
+     if (runOnlyOnce) {
+       // Some tests control the execution of the background update thread
+       cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0,
+           TimeUnit.MILLISECONDS);
+     }
+   }
+ 
+   @VisibleForTesting
+   static synchronized boolean stopCacheUpdateService(long timeout) {
+     boolean tasksStoppedBeforeShutdown = false;
+     if (cacheUpdateMaster != null) {
+       LOG.info("CachedStore: shutting down cache update service");
+       try {
+         tasksStoppedBeforeShutdown =
+             cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+       } catch (InterruptedException e) {
+         LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to "
+             + "complete before shutting down. Will make a hard stop now.");
+       }
+       cacheUpdateMaster.shutdownNow();
+       cacheUpdateMaster = null;
+     }
+     return tasksStoppedBeforeShutdown;
+   }
+ 
+   @VisibleForTesting
+   static void setCacheRefreshPeriod(long time) {
+     cacheRefreshPeriodMS = time;
+   }
+ 
+   static class CacheUpdateMasterWork implements Runnable {
+     private boolean shouldRunPrewarm = true;
+     private final RawStore rawStore;
+ 
+     CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) {
+       this.shouldRunPrewarm = shouldRunPrewarm;
+       String rawStoreClassName =
+           MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName());
+       try {
+         rawStore = JavaUtils.getClass(rawStoreClassName, RawStore.class).newInstance();
+         rawStore.setConf(conf);
+       } catch (InstantiationException | IllegalAccessException | MetaException e) {
+         // MetaException here really means ClassNotFound (see the utility method).
+         // So, if any of these happen, that means we can never succeed.
+         throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e);
+       }
+     }
+ 
+     @Override
+     public void run() {
+       if (!shouldRunPrewarm) {
+         // TODO: prewarm and update can probably be merged.
+         update();
+       } else {
+         try {
+           prewarm(rawStore);
+         } catch (Exception e) {
+           LOG.error("Prewarm failure", e);
+           return;
+         }
+       }
+     }
+ 
+     void update() {
+       Deadline.registerIfNot(1000000);
+       LOG.debug("CachedStore: updating cached objects");
+       try {
+         for (String catName : catalogsToCache(rawStore)) {
+           List<String> dbNames = rawStore.getAllDatabases(catName);
+           // Update the database in cache
+           updateDatabases(rawStore, catName, dbNames);
+           for (String dbName : dbNames) {
+             // Update the tables in cache
+             updateTables(rawStore, catName, dbName);
+             List<String> tblNames;
+             try {
+               tblNames = rawStore.getAllTables(catName, dbName);
+             } catch (MetaException e) {
+               // Continue with next database
+               continue;
+             }
+             for (String tblName : tblNames) {
+               if (!shouldCacheTable(catName, dbName, tblName)) {
+                 continue;
+               }
+               // Update the table column stats for a table in cache
+               updateTableColStats(rawStore, catName, dbName, tblName);
+               // Update the partitions for a table in cache
+               updateTablePartitions(rawStore, catName, dbName, tblName);
+               // Update the partition col stats for a table in cache
+               updateTablePartitionColStats(rawStore, catName, dbName, tblName);
+               // Update aggregate partition column stats for a table in cache
+               updateTableAggregatePartitionColStats(rawStore, catName, dbName, tblName);
+             }
+           }
+       }
+       sharedCache.incrementUpdateCount();
+       } catch (MetaException e) {
+         LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e);
+       }
+     }
+ 
+ 
+     private void updateDatabases(RawStore rawStore, String catName, List<String> dbNames) {
+       // Prepare the list of databases
+       List<Database> databases = new ArrayList<>();
+       for (String dbName : dbNames) {
+         Database db;
+         try {
+           db = rawStore.getDatabase(catName, dbName);
+           databases.add(db);
+         } catch (NoSuchObjectException e) {
+           LOG.info("Updating CachedStore: database - " + catName + "." + dbName
+               + " does not exist.", e);
+         }
+       }
+       sharedCache.refreshDatabasesInCache(databases);
+     }
+ 
+     private void updateTables(RawStore rawStore, String catName, String dbName) {
+       List<Table> tables = new ArrayList<>();
+       try {
+         List<String> tblNames = rawStore.getAllTables(catName, dbName);
+         for (String tblName : tblNames) {
+           if (!shouldCacheTable(catName, dbName, tblName)) {
+             continue;
+           }
+           Table table = rawStore.getTable(StringUtils.normalizeIdentifier(catName),
+               StringUtils.normalizeIdentifier(dbName),
+               StringUtils.normalizeIdentifier(tblName));
+           tables.add(table);
+         }
+         sharedCache.refreshTablesInCache(catName, dbName, tables);
+       } catch (MetaException e) {
+         LOG.debug("Unable to refresh cached tables for database: " + dbName, e);
+       }
+     }
+ 
+ 
+     private void updateTableColStats(RawStore rawStore, String catName, String dbName, String tblName) {
+       try {
+         Table table = rawStore.getTable(catName, dbName, tblName);
+         if (!table.isSetPartitionKeys()) {
+           List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+           Deadline.startTimer("getTableColumnStatistics");
++          // TODO## should this take write ID into account? or at least cache write ID to verify?
+           ColumnStatistics tableColStats =
+               rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames);
+           Deadline.stopTimer();
+           if (tableColStats != null) {
++            // TODO## should this take write ID into account? or at least cache write ID to verify?
+             sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(catName),
+                 StringUtils.normalizeIdentifier(dbName),
+                 StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj());
+           }
+         }
+       } catch (MetaException | NoSuchObjectException e) {
+         LOG.info("Unable to refresh table column stats for table: " + tblName, e);
+       }
+     }
+ 
+     private void updateTablePartitions(RawStore rawStore, String catName, String dbName, String tblName) {
+       try {
+         Deadline.startTimer("getPartitions");
+         List<Partition> partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE);
+         Deadline.stopTimer();
+         sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(catName),
+             StringUtils.normalizeIdentifier(dbName),
+             StringUtils.normalizeIdentifier(tblName), partitions);
+       } catch (MetaException | NoSuchObjectException e) {
+         LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+       }
+     }
+ 
+     private void updateTablePartitionColStats(RawStore rawStore, String catName, String dbName, String tblName) {
+       try {
+         Table table = rawStore.getTable(catName, dbName, tblName);
+         List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+         List<String> partNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1);
+         // Get partition column stats for this table
+         Deadline.startTimer("getPartitionColumnStatistics");
++        // TODO## should this take write ID into account? or at least cache write ID to verify?
+         List<ColumnStatistics> partitionColStats =
+             rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames);
+         Deadline.stopTimer();
+         sharedCache.refreshPartitionColStatsInCache(catName, dbName, tblName, partitionColStats);
+       } catch (MetaException | NoSuchObjectException e) {
+         LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e);
+       }
+     }
+ 
+     // Update cached aggregate stats for all partitions of a table and for all
+     // but default partition
+     private void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName,
+                                                        String tblName) {
+       try {
+         Table table = rawStore.getTable(catName, dbName, tblName);
+         List<String> partNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1);
+         List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table);
+         if ((partNames != null) && (partNames.size() > 0)) {
+           Deadline.startTimer("getAggregareStatsForAllPartitions");
+           AggrStats aggrStatsAllPartitions =
+               rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+           Deadline.stopTimer();
+           // Remove default partition from partition names and get aggregate stats again
+           List<FieldSchema> partKeys = table.getPartitionKeys();
+           String defaultPartitionValue =
+               MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME);
+           List<String> partCols = new ArrayList<String>();
+           List<String> partVals = new ArrayList<String>();
+           for (FieldSchema fs : partKeys) {
+             partCols.add(fs.getName());
+             partVals.add(defaultPartitionValue);
+           }
+           String defaultPartitionName = FileUtils.makePartName(partCols, partVals);
+           partNames.remove(defaultPartitionName);
+           Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault");
+           AggrStats aggrStatsAllButDefaultPartition =
+               rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
+           Deadline.stopTimer();
+           sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
+               StringUtils.normalizeIdentifier(dbName),
+               StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
+               aggrStatsAllButDefaultPartition);
+         }
+       } catch (MetaException | NoSuchObjectException e) {
+         LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
+             e);
+       }
+     }
+   }
+ 
+   @Override
+   public Configuration getConf() {
+     return rawStore.getConf();
+   }
+ 
+   @Override
+   public void shutdown() {
+     rawStore.shutdown();
+   }
+ 
+   @Override
+   public boolean openTransaction() {
+     return rawStore.openTransaction();
+   }
+ 
+   @Override
+   public boolean commitTransaction() {
+     return rawStore.commitTransaction();
+   }
+ 
+   @Override
+   public boolean isActiveTransaction() {
+     return rawStore.isActiveTransaction();
+   }
+ 
+   @Override
+   public void rollbackTransaction() {
+     rawStore.rollbackTransaction();
+   }
+ 
+   @Override
+   public void createCatalog(Catalog cat) throws MetaException {
+     rawStore.createCatalog(cat);
+     sharedCache.addCatalogToCache(cat);
+   }
+ 
+   @Override
+   public void alterCatalog(String catName, Catalog cat) throws MetaException,
+       InvalidOperationException {
+     rawStore.alterCatalog(catName, cat);
+     sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat);
+   }
+ 
+   @Override
+   public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException {
+     if (!sharedCache.isCatalogCachePrewarmed()) {
+       return rawStore.getCatalog(catalogName);
+     }
+     Catalog cat = sharedCache.getCatalogFromCache(normalizeIdentifier(catalogName));
+     if (cat == null) {
+       throw new NoSuchObjectException();
+     }
+     return cat;
+   }
+ 
+   @Override
+   public List<String> getCatalogs() throws MetaException {
+     if (!sharedCache.isCatalogCachePrewarmed()) {
+       return rawStore.getCatalogs();
+     }
+     return sharedCache.listCachedCatalogs();
+   }
+ 
+   @Override
+   public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException {
+     rawStore.dropCatalog(catalogName);
+     catalogName = catalogName.toLowerCase();
+     sharedCache.removeCatalogFromCache(catalogName);
+   }
+ 
+   @Override
+   public void createDatabase(Database db) throws InvalidObjectException, MetaException {
+     rawStore.createDatabase(db);
+     sharedCache.addDatabaseToCache(db);
+   }
+ 
+   @Override
+   public Database getDatabase(String catName, String dbName) throws NoSuchObjectException {
+     if (!sharedCache.isDatabaseCachePrewarmed()) {
+       return rawStore.getDatabase(catName, dbName);
+     }
+     dbName = dbName.toLowerCase();
+     Database db = sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(catName),
+             StringUtils.normalizeIdentifier(dbName));
+     if (db == null) {
+       throw new NoSuchObjectException();
+     }
+     return db;
+   }
+ 
+   @Override
+   public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException {
+     boolean succ = rawStore.dropDatabase(catName, dbName);
+     if (succ) {
+       sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName),
+           StringUtils.normalizeIdentifier(dbName));
+     }
+     return succ;
+   }
+ 
+   @Override
+   public boolean alterDatabase(String catName, String dbName, Database db)
+       throws NoSuchObjectException, MetaException {
+     boolean succ = rawStore.alterDatabase(catName, dbName, db);
+     if (succ) {
+       sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(catName),
+           StringUtils.normalizeIdentifier(dbName), db);
+     }
+     return succ;
+   }
+ 
+   @Override
+   public List<String> getDatabases(String catName, String pattern) throws MetaException {
+     if (!sharedCache.isDatabaseCachePrewarmed()) {
+       return rawStore.getDatabases(catName, pattern);
+     }
+     return sharedCache.listCachedDatabases(catName, pattern);
+   }
+ 
+   @Override
+   public List<String> getAllDatabases(String catName) throws MetaException {
+     if (!sharedCache.isDatabaseCachePrewarmed()) {
+       return rawStore.getAllDatabases(catName);
+     }
+     return sharedCache.listCachedDatabases(catName);
+   }
+ 
+   @Override
+   public boolean createType(Type type) {
+     return rawStore.createType(type);
+   }
+ 
+   @Override
+   public Type getType(String typeName) {
+     return rawStore.getType(typeName);
+   }
+ 
+   @Override
+   public boolean dropType(String typeName) {
+     return rawStore.dropType(typeName);
+   }
+ 
+   private void validateTableType(Table tbl) {
+     // If the table has property EXTERNAL set, update table type
+     // accordingly
+     String tableType = tbl.getTableType();
+     boolean isExternal = Boolean.parseBoolean(tbl.getParameters().get("EXTERNAL"));
+     if (TableType.MANAGED_TABLE.toString().equals(tableType)) {
+       if (isExternal) {
+         tableType = TableType.EXTERNAL_TABLE.toString();
+       }
+     }
+     if (TableType.EXTERNAL_TABLE.toString().equals(tableType)) {
+       if (!isExternal) {
+         tableType = TableType.MANAGED_TABLE.toString();
+       }
+     }
+     tbl.setTableType(tableType);
+   }
+ 
+   @Override
+   public void createTable(Table tbl) throws InvalidObjectException, MetaException {
+     rawStore.createTable(tbl);
+     String catName = normalizeIdentifier(tbl.getCatName());
+     String dbName = normalizeIdentifier(tbl.getDbName());
+     String tblName = normalizeIdentifier(tbl.getTableName());
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return;
+     }
+     validateTableType(tbl);
+     sharedCache.addTableToCache(catName, dbName, tblName, tbl);
+   }
+ 
+   @Override
+   public boolean dropTable(String catName, String dbName, String tblName)
+       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
+     boolean succ = rawStore.dropTable(catName, dbName, tblName);
+     if (succ) {
+       catName = normalizeIdentifier(catName);
+       dbName = normalizeIdentifier(dbName);
+       tblName = normalizeIdentifier(tblName);
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         return succ;
+       }
+       sharedCache.removeTableFromCache(catName, dbName, tblName);
+     }
+     return succ;
+   }
+ 
+   @Override
+   public Table getTable(String catName, String dbName, String tblName) throws MetaException {
++    return getTable(catName, dbName, tblName, -1, null);
++  }
++
++  // TODO: if writeIdList is not null, check isolation level compliance for SVS,
++  // possibly with getTableFromCache() with table snapshot in cache.
++  @Override
++  public Table getTable(String catName, String dbName, String tblName,
++                        long txnId, String writeIdList)
++      throws MetaException {
+     catName = normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
 -      return rawStore.getTable(catName, dbName, tblName);
++      return rawStore.getTable(catName, dbName, tblName, txnId,writeIdList);
+     }
+     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
 -    if (tbl == null) {
++    if (tbl == null || writeIdList != null) {
+       // This table is not yet loaded in cache
+       // If the prewarm thread is working on this table's database,
+       // let's move this table to the top of tblNamesBeingPrewarmed stack,
+       // so that it gets loaded to the cache faster and is available for subsequent requests
+       tblsPendingPrewarm.prioritizeTableForPrewarm(tblName);
 -      return rawStore.getTable(catName, dbName, tblName);
++      return rawStore.getTable(catName, dbName, tblName, txnId, writeIdList);
+     }
+     if (tbl != null) {
+       tbl.unsetPrivileges();
+       tbl.setRewriteEnabled(tbl.isRewriteEnabled());
+     }
+     return tbl;
+   }
+ 
+   @Override
+   public boolean addPartition(Partition part) throws InvalidObjectException, MetaException {
+     boolean succ = rawStore.addPartition(part);
+     if (succ) {
+       String dbName = normalizeIdentifier(part.getDbName());
+       String tblName = normalizeIdentifier(part.getTableName());
+       String catName = part.isSetCatName() ? normalizeIdentifier(part.getCatName()) : DEFAULT_CATALOG_NAME;
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         return succ;
+       }
+       sharedCache.addPartitionToCache(catName, dbName, tblName, part);
+     }
+     return succ;
+   }
+ 
+   @Override
+   public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
+       throws InvalidObjectException, MetaException {
+     boolean succ = rawStore.addPartitions(catName, dbName, tblName, parts);
+     if (succ) {
+       catName = normalizeIdentifier(catName);
+       dbName = normalizeIdentifier(dbName);
+       tblName = normalizeIdentifier(tblName);
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         return succ;
+       }
+       sharedCache.addPartitionsToCache(catName, dbName, tblName, parts);
+     }
+     return succ;
+   }
+ 
+   @Override
+   public boolean addPartitions(String catName, String dbName, String tblName, PartitionSpecProxy partitionSpec,
+       boolean ifNotExists) throws InvalidObjectException, MetaException {
+     boolean succ = rawStore.addPartitions(catName, dbName, tblName, partitionSpec, ifNotExists);
+     if (succ) {
+       catName = normalizeIdentifier(catName);
+       dbName = normalizeIdentifier(dbName);
+       tblName = normalizeIdentifier(tblName);
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         return succ;
+       }
+       PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+       while (iterator.hasNext()) {
+         Partition part = iterator.next();
+         sharedCache.addPartitionToCache(catName, dbName, tblName, part);
+       }
+     }
+     return succ;
+   }
+ 
+   @Override
+   public Partition getPartition(String catName, String dbName, String tblName, List<String> part_vals)
+       throws MetaException, NoSuchObjectException {
++    return getPartition(catName, dbName, tblName, part_vals, -1, null);
++  }
++
++  // TODO: the same as getTable()
++  @Override
++  public Partition getPartition(String catName, String dbName, String tblName,
++                                List<String> part_vals, long txnId, String writeIdList)
++      throws MetaException, NoSuchObjectException {
+     catName = normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
 -      return rawStore.getPartition(catName, dbName, tblName, part_vals);
++      return rawStore.getPartition(
++          catName, dbName, tblName, part_vals, txnId, writeIdList);
+     }
+     Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, part_vals);
 -    if (part == null) {
++    if (part == null || writeIdList != null) {
+       // The table containing the partition is not yet loaded in cache
 -      return rawStore.getPartition(catName, dbName, tblName, part_vals);
++      return rawStore.getPartition(
++          catName, dbName, tblName, part_vals, txnId, writeIdList);
+     }
+     return part;
+   }
+ 
+   @Override
+   public boolean doesPartitionExist(String catName, String dbName, String tblName,
+       List<FieldSchema> partKeys, List<String> part_vals)
+       throws MetaException, NoSuchObjectException {
+     catName = normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals);
+     }
+     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (tbl == null) {
+       // The table containing the partition is not yet loaded in cache
+       return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals);
+     }
+     return sharedCache.existPartitionFromCache(catName, dbName, tblName, part_vals);
+   }
+ 
+   @Override
+   public boolean dropPartition(String catName, String dbName, String tblName, List<String> part_vals)
+       throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
+     boolean succ = rawStore.dropPartition(catName, dbName, tblName, part_vals);
+     if (succ) {
+       catName = normalizeIdentifier(catName);
+       dbName = normalizeIdentifier(dbName);
+       tblName = normalizeIdentifier(tblName);
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         return succ;
+       }
+       sharedCache.removePartitionFromCache(catName, dbName, tblName, part_vals);
+     }
+     return succ;
+   }
+ 
+   @Override
+   public void dropPartitions(String catName, String dbName, String tblName, List<String> partNames)
+       throws MetaException, NoSuchObjectException {
+     rawStore.dropPartitions(catName, dbName, tblName, partNames);
+     catName = normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return;
+     }
+     List<List<String>> partVals = new ArrayList<>();
+     for (String partName : partNames) {
+       partVals.add(partNameToVals(partName));
+     }
+     sharedCache.removePartitionsFromCache(catName, dbName, tblName, partVals);
+   }
+ 
+   @Override
+   public List<Partition> getPartitions(String catName, String dbName, String tblName, int max)
+       throws MetaException, NoSuchObjectException {
+     catName = normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.getPartitions(catName, dbName, tblName, max);
+     }
+     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (tbl == null) {
+       // The table containing the partitions is not yet loaded in cache
+       return rawStore.getPartitions(catName, dbName, tblName, max);
+     }
+     List<Partition> parts = sharedCache.listCachedPartitions(catName, dbName, tblName, max);
+     return parts;
+   }
+ 
+   @Override
+   public Map<String, String> getPartitionLocations(String catName, String dbName, String tblName,
+       String baseLocationToNotShow, int max) {
+     return rawStore.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max);
+   }
+ 
+   @Override
 -  public void alterTable(String catName, String dbName, String tblName, Table newTable)
 -      throws InvalidObjectException, MetaException {
 -    rawStore.alterTable(catName, dbName, tblName, newTable);
++  public void alterTable(String catName, String dbName, String tblName, Table newTable,
++      long txnId, String validWriteIds) throws InvalidObjectException, MetaException {
++    rawStore.alterTable(catName, dbName, tblName, newTable, txnId, validWriteIds);
+     catName = normalizeIdentifier(catName);
+     dbName = normalizeIdentifier(dbName);
+     tblName = normalizeIdentifier(tblName);
+     String newTblName = normalizeIdentifier(newTable.getTableName());
+     if (!shouldCacheTable(catName, dbName, tblName) &&
+         !shouldCacheTable(catName, dbName, newTblName)) {
+       return;
+     }
+     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (tbl == null) {
+       // The table is not yet loaded in cache
+       return;
+     }
+     if (shouldCacheTable(catName, dbName, tblName) && shouldCacheTable(catName, dbName, newTblName)) {
+       // If old table is in the cache and the new table can also be cached
+       sharedCache.alterTableInCache(catName, dbName, tblName, newTable);
+     } else if (!shouldCacheTable(catName, dbName, tblName) && shouldCacheTable(catName, dbName, newTblName)) {
+       // If old table is *not* in the cache but the new table can be cached
+       sharedCache.addTableToCache(catName, dbName, newTblName, newTable);
+     } else if (shouldCacheTable(catName, dbName, tblName) && !shouldCacheTable(catName, dbName, newTblName)) {
+       // If old table is in the cache but the new table *cannot* be cached
+       sharedCache.removeTableFromCache(catName, dbName, tblName);
+     }
+   }
+ 
+   @Override
+   public void updateCreationMetadata(String catName, String dbname, String tablename, CreationMetadata cm)
+       throws MetaException {
+     rawStore.updateCreationMetadata(catName, dbname, tablename, cm);
+   }
+ 
+   @Override
+   public List<String> getTables(String catName, String dbName, String pattern) throws MetaException {
+     if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+       return rawStore.getTables(catName, dbName, pattern);
+     }
+     return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
+         StringUtils.normalizeIdentifier(dbName), pattern, (short) -1);
+   }
+ 
+   @Override
+   public List<String> getTables(String catName, String dbName, String pattern, TableType tableType)
+       throws MetaException {
+     if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+       return rawStore.getTables(catName, dbName, pattern, tableType);
+     }
+     return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
+         StringUtils.normalizeIdentifier(dbName), pattern, tableType);
+   }
+ 
+   @Override
+   public List<String> getMaterializedViewsForRewriting(String catName, String dbName)
+       throws MetaException, NoSuchObjectException {
+     return rawStore.getMaterializedViewsForRewriting(catName, dbName);
+   }
+ 
+   @Override
+   public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames,
+                                       List<String> tableTypes) throws MetaException {
+     // TODO Check if all required tables are allowed, if so, get it from cache
+     if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+       return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes);
+     }
+     return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName),
+         StringUtils.normalizeIdentifier(dbNames),
+         StringUtils.normalizeIdentifier(tableNames), tableTypes);
+   }
+ 
+   @Override
+   public List<Table> getTableObjectsByName(String catName, String dbName, List<String> tblNames)
+       throws MetaException, UnknownDBException {
+     dbName = normalizeIdentifier(dbName);
+     catName = normalizeIdentifier(catName);
+     boolean missSomeInCache = false;
+     for (String tblName : tblNames) {
+       tblName = normalizeIdentifier(tblName);
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         missSomeInCache = true;
+         break;
+       }
+     }
+     if (!isCachePrewarmed.get() || missSomeInCache) {
+       return rawStore.getTableObjectsByName(catName, dbName, tblNames);
+     }
+     List<Table> tables = new ArrayList<>();
+     for (String tblName : tblNames) {
+       tblName = normalizeIdentifier(tblName);
+       Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
+       if (tbl == null) {
+         tbl = rawStore.getTable(catName, dbName, tblName);
+       }
+       tables.add(tbl);
+     }
+     return tables;
+   }
+ 
+   @Override
+   public List<String> getAllTables(String catName, String dbName) throws MetaException {
+     if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+       return rawStore.getAllTables(catName, dbName);
+     }
+     return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
+         StringUtils.normalizeIdentifier(dbName));
+   }
+ 
+   @Override
+   public List<String> listTableNamesByFilter(String catName, String dbName, String filter,
+                                              short max_tables)
+       throws MetaException, UnknownDBException {
+     if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) {
+       return rawStore.listTableNamesByFilter(catName, dbName, filter, max_tables);
+     }
+     return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName),
+         StringUtils.normalizeIdentifier(dbName), filter, max_tables);
+   }
+ 
+   @Override
+   public List<String> listPartitionNames(String catName, String dbName, String tblName,
+       short max_parts) throws MetaException {
+     catName = StringUtils.normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.listPartitionNames(catName, dbName, tblName, max_parts);
+     }
+     Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (tbl == null) {
+       // The table is not yet loaded in cache
+       return rawStore.listPartitionNames(catName, dbName, tblName, max_parts);
+     }
+     List<String> partitionNames = new ArrayList<>();
+     int count = 0;
+     for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, max_parts)) {
+       if (max_parts == -1 || count < max_parts) {
+         partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()));
+       }
+     }
+     return partitionNames;
+   }
+ 
+   @Override
+   public PartitionValuesResponse listPartitionValues(String catName, String db_name, String tbl_name,
+       List<FieldSchema> cols, boolean applyDistinct, String filter, boolean ascending,
+       List<FieldSchema> order, long maxParts) throws MetaException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   public void alterPartition(String catName, String dbName, String tblName, List<String> partVals,
 -                             Partition newPart) throws InvalidObjectException, MetaException {
 -    rawStore.alterPartition(catName, dbName, tblName, partVals, newPart);
++                             Partition newPart, long queryTxnId, String queryValidWriteIds)
++                                 throws InvalidObjectException, MetaException {
++    rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, queryTxnId, queryValidWriteIds);
+     catName = normalizeIdentifier(catName);
+     dbName = normalizeIdentifier(dbName);
+     tblName = normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return;
+     }
+     sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, newPart);
+   }
+ 
+   @Override
+   public void alterPartitions(String catName, String dbName, String tblName,
 -                              List<List<String>> partValsList, List<Partition> newParts)
++                              List<List<String>> partValsList, List<Partition> newParts,
++                              long writeId, long txnId, String validWriteIds)
+       throws InvalidObjectException, MetaException {
 -    rawStore.alterPartitions(catName, dbName, tblName, partValsList, newParts);
++    rawStore.alterPartitions(
++        catName, dbName, tblName, partValsList, newParts, writeId, txnId, validWriteIds);
+     catName = normalizeIdentifier(catName);
+     dbName = normalizeIdentifier(dbName);
+     tblName = normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return;
+     }
++    // TODO: modify the following method for the case when writeIdList != null.
+     sharedCache.alterPartitionsInCache(catName, dbName, tblName, partValsList, newParts);
+   }
+ 
+   private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr,
+       String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache)
+       throws MetaException, NoSuchObjectException {
+     List<Partition> parts =
+         sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getCatName()),
+             StringUtils.normalizeIdentifier(table.getDbName()),
+             StringUtils.normalizeIdentifier(table.getTableName()), maxParts);
+     for (Partition part : parts) {
+       result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
+     }
+     if (defaultPartName == null || defaultPartName.isEmpty()) {
+       defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
+     }
+     return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName,
+         result);
+   }
+ 
+   @Override
+   public List<Partition> getPartitionsByFilter(String catName, String dbName, String tblName,
+       String filter, short maxParts)
+       throws MetaException, NoSuchObjectException {
+     return rawStore.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts);
+   }
+ 
+   @Override
+   public boolean getPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr,
+       String defaultPartitionName, short maxParts, List<Partition> result) throws TException {
+     catName = StringUtils.normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts,
+           result);
+     }
+     List<String> partNames = new LinkedList<>();
+     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (table == null) {
+       // The table is not yet loaded in cache
+       return rawStore.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts,
+           result);
+     }
+     boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr,
+         defaultPartitionName, maxParts, partNames, sharedCache);
+     return hasUnknownPartitions;
+   }
+ 
+   @Override
+   public int getNumPartitionsByFilter(String catName, String dbName, String tblName, String filter)
+       throws MetaException, NoSuchObjectException {
+     return rawStore.getNumPartitionsByFilter(catName, dbName, tblName, filter);
+   }
+ 
+   @Override
+   public int getNumPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr)
+       throws MetaException, NoSuchObjectException {
+     catName = normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr);
+     }
+     String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
+     List<String> partNames = new LinkedList<>();
+     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (table == null) {
+       // The table is not yet loaded in cache
+       return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr);
+     }
+     getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames,
+         sharedCache);
+     return partNames.size();
+   }
+ 
+   private static List<String> partNameToVals(String name) {
+     if (name == null) {
+       return null;
+     }
+     List<String> vals = new ArrayList<>();
+     String[] kvp = name.split("/");
+     for (String kv : kvp) {
+       vals.add(FileUtils.unescapePathName(kv.substring(kv.indexOf('=') + 1)));
+     }
+     return vals;
+   }
+ 
+   @Override
+   public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName,
+       List<String> partNames) throws MetaException, NoSuchObjectException {
+     catName = StringUtils.normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames);
+     }
+     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (table == null) {
+       // The table is not yet loaded in cache
+       return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames);
+     }
+     List<Partition> partitions = new ArrayList<>();
+     for (String partName : partNames) {
+       Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, partNameToVals(partName));
+       if (part!=null) {
+         partitions.add(part);
+       }
+     }
+     return partitions;
+   }
+ 
+   @Override
+   public Table markPartitionForEvent(String catName, String dbName, String tblName,
+       Map<String, String> partVals, PartitionEventType evtType)
+       throws MetaException, UnknownTableException, InvalidPartitionException,
+       UnknownPartitionException {
+     return rawStore.markPartitionForEvent(catName, dbName, tblName, partVals, evtType);
+   }
+ 
+   @Override
+   public boolean isPartitionMarkedForEvent(String catName, String dbName, String tblName,
+       Map<String, String> partName, PartitionEventType evtType)
+       throws MetaException, UnknownTableException, InvalidPartitionException,
+       UnknownPartitionException {
+     return rawStore.isPartitionMarkedForEvent(catName, dbName, tblName, partName, evtType);
+   }
+ 
+   @Override
+   public boolean addRole(String rowName, String ownerName)
+       throws InvalidObjectException, MetaException, NoSuchObjectException {
+     return rawStore.addRole(rowName, ownerName);
+   }
+ 
+   @Override
+   public boolean removeRole(String roleName)
+       throws MetaException, NoSuchObjectException {
+     return rawStore.removeRole(roleName);
+   }
+ 
+   @Override
+   public boolean grantRole(Role role, String userName,
+       PrincipalType principalType, String grantor, PrincipalType grantorType,
+       boolean grantOption)
+       throws MetaException, NoSuchObjectException, InvalidObjectException {
+     return rawStore.grantRole(role, userName, principalType, grantor, grantorType, grantOption);
+   }
+ 
+   @Override
+   public boolean revokeRole(Role role, String userName,
+       PrincipalType principalType, boolean grantOption)
+       throws MetaException, NoSuchObjectException {
+     return rawStore.revokeRole(role, userName, principalType, grantOption);
+   }
+ 
+   @Override
+   public PrincipalPrivilegeSet getUserPrivilegeSet(String userName,
+       List<String> groupNames) throws InvalidObjectException, MetaException {
+     return rawStore.getUserPrivilegeSet(userName, groupNames);
+   }
+ 
+   @Override
+   public PrincipalPrivilegeSet getDBPrivilegeSet(String catName, String dbName, String userName,
+       List<String> groupNames) throws InvalidObjectException, MetaException {
+     return rawStore.getDBPrivilegeSet(catName, dbName, userName, groupNames);
+   }
+ 
+   @Override
+   public PrincipalPrivilegeSet getTablePrivilegeSet(String catName, String dbName,
+       String tableName, String userName, List<String> groupNames)
+       throws InvalidObjectException, MetaException {
+     return rawStore.getTablePrivilegeSet(catName, dbName, tableName, userName, groupNames);
+   }
+ 
+   @Override
+   public PrincipalPrivilegeSet getPartitionPrivilegeSet(String catName, String dbName,
+       String tableName, String partition, String userName,
+       List<String> groupNames) throws InvalidObjectException, MetaException {
+     return rawStore.getPartitionPrivilegeSet(catName, dbName, tableName, partition, userName, groupNames);
+   }
+ 
+   @Override
+   public PrincipalPrivilegeSet getColumnPrivilegeSet(String catName, String dbName,
+       String tableName, String partitionName, String columnName,
+       String userName, List<String> groupNames)
+       throws InvalidObjectException, MetaException {
+     return rawStore.getColumnPrivilegeSet(catName, dbName, tableName, partitionName, columnName, userName, groupNames);
+   }
+ 
+   @Override
+   public List<HiveObjectPrivilege> listPrincipalGlobalGrants(
+       String principalName, PrincipalType principalType) {
+     return rawStore.listPrincipalGlobalGrants(principalName, principalType);
+   }
+ 
+   @Override
+   public List<HiveObjectPrivilege> listPrincipalDBGrants(String principalName,
+       PrincipalType principalType, String catName, String dbName) {
+     return rawStore.listPrincipalDBGrants(principalName, principalType, catName, dbName);
+   }
+ 
+   @Override
+   public List<HiveObjectPrivilege> listAllTableGrants(String principalName,
+       PrincipalType principalType, String catName, String dbName, String tableName) {
+     return rawStore.listAllTableGrants(principalName, principalType, catName, dbName, tableName);
+   }
+ 
+   @Override
+   public List<HiveObjectPrivilege> listPrincipalPartitionGrants(
+       String principalName, PrincipalType principalType, String catName, String dbName,
+       String tableName, List<String> partValues, String partName) {
+     return rawStore.listPrincipalPartitionGrants(principalName, principalType, catName, dbName, tableName, partValues, partName);
+   }
+ 
+   @Override
+   public List<HiveObjectPrivilege> listPrincipalTableColumnGrants(
+       String principalName, PrincipalType principalType, String catName, String dbName,
+       String tableName, String columnName) {
+     return rawStore.listPrincipalTableColumnGrants(principalName, principalType, catName, dbName, tableName, columnName);
+   }
+ 
+   @Override
+   public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrants(
+       String principalName, PrincipalType principalType, String catName, String dbName,
+       String tableName, List<String> partValues, String partName,
+       String columnName) {
+     return rawStore.listPrincipalPartitionColumnGrants(principalName, principalType, catName, dbName, tableName, partValues, partName, columnName);
+   }
+ 
+   @Override
+   public boolean grantPrivileges(PrivilegeBag privileges)
+       throws InvalidObjectException, MetaException, NoSuchObjectException {
+     return rawStore.grantPrivileges(privileges);
+   }
+ 
+   @Override
+   public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption)
+       throws InvalidObjectException, MetaException, NoSuchObjectException {
+     return rawStore.revokePrivileges(privileges, grantOption);
+   }
+ 
+   @Override
+   public boolean refreshPrivileges(HiveObjectRef objToRefresh, String authorizer, PrivilegeBag grantPrivileges)
+       throws InvalidObjectException, MetaException, NoSuchObjectException {
+     return rawStore.refreshPrivileges(objToRefresh, authorizer, grantPrivileges);
+   }
+ 
+   @Override
+   public Role getRole(String roleName) throws NoSuchObjectException {
+     return rawStore.getRole(roleName);
+   }
+ 
+   @Override
+   public List<String> listRoleNames() {
+     return rawStore.listRoleNames();
+   }
+ 
+   @Override
+   public List<Role> listRoles(String principalName,
+       PrincipalType principalType) {
+     return rawStore.listRoles(principalName, principalType);
+   }
+ 
+   @Override
+   public List<RolePrincipalGrant> listRolesWithGrants(String principalName,
+       PrincipalType principalType) {
+     return rawStore.listRolesWithGrants(principalName, principalType);
+   }
+ 
+   @Override
+   public List<RolePrincipalGrant> listRoleMembers(String roleName) {
+     return rawStore.listRoleMembers(roleName);
+   }
+ 
+   @Override
+   public Partition getPartitionWithAuth(String catName, String dbName, String tblName,
+       List<String> partVals, String userName, List<String> groupNames)
+       throws MetaException, NoSuchObjectException, InvalidObjectException {
+     catName = StringUtils.normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName, groupNames);
+     }
+     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (table == null) {
+       // The table is not yet loaded in cache
+       return rawStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName, groupNames);
+     }
+     Partition p = sharedCache.getPartitionFromCache(catName, dbName, tblName, partVals);
+     if (p != null) {
+       String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals);
+       PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName,
+           userName, groupNames);
+       p.setPrivileges(privs);
+     }
+     return p;
+   }
+ 
+   @Override
+   public List<Partition> getPartitionsWithAuth(String catName, String dbName, String tblName,
+       short maxParts, String userName, List<String> groupNames)
+       throws MetaException, NoSuchObjectException, InvalidObjectException {
+     catName = StringUtils.normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName, groupNames);
+     }
+     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (table == null) {
+       // The table is not yet loaded in cache
+       return rawStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName, groupNames);
+     }
+     List<Partition> partitions = new ArrayList<>();
+     int count = 0;
+     for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) {
+       if (maxParts == -1 || count < maxParts) {
+         String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
+         PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName,
+             userName, groupNames);
+         part.setPrivileges(privs);
+         partitions.add(part);
+         count++;
+       }
+     }
+     return partitions;
+   }
+ 
+   @Override
+   public List<String> listPartitionNamesPs(String catName, String dbName, String tblName,
+       List<String> partVals, short maxParts)
+       throws MetaException, NoSuchObjectException {
+     catName = StringUtils.normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.listPartitionNamesPs(catName, dbName, tblName, partVals, maxParts);
+     }
+     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (table == null) {
+       // The table is not yet loaded in cache
+       return rawStore.listPartitionNamesPs(catName, dbName, tblName, partVals, maxParts);
+     }
+     List<String> partNames = new ArrayList<>();
+     int count = 0;
+     for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) {
+       boolean psMatch = true;
+       for (int i=0;i<partVals.size();i++) {
+         String psVal = partVals.get(i);
+         String partVal = part.getValues().get(i);
+         if (psVal!=null && !psVal.isEmpty() && !psVal.equals(partVal)) {
+           psMatch = false;
+           break;
+         }
+       }
+       if (!psMatch) {
+         continue;
+       }
+       if (maxParts == -1 || count < maxParts) {
+         partNames.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues()));
+         count++;
+       }
+     }
+     return partNames;
+   }
+ 
+   @Override
+   public List<Partition> listPartitionsPsWithAuth(String catName, String dbName, String tblName,
+       List<String> partVals, short maxParts, String userName, List<String> groupNames)
+       throws MetaException, InvalidObjectException, NoSuchObjectException {
+     catName = StringUtils.normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
+       return rawStore.listPartitionsPsWithAuth(catName, dbName, tblName, partVals, maxParts, userName,
+           groupNames);
+     }
+     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+     if (table == null) {
+       // The table is not yet loaded in cache
+       return rawStore.listPartitionsPsWithAuth(catName, dbName, tblName, partVals, maxParts, userName,
+           groupNames);
+     }
+     List<Partition> partitions = new ArrayList<>();
+     int count = 0;
+     for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) {
+       boolean psMatch = true;
+       for (int i = 0; i < partVals.size(); i++) {
+         String psVal = partVals.get(i);
+         String partVal = part.getValues().get(i);
+         if (psVal != null && !psVal.isEmpty() && !psVal.equals(partVal)) {
+           psMatch = false;
+           break;
+         }
+       }
+       if (!psMatch) {
+         continue;
+       }
+       if (maxParts == -1 || count < maxParts) {
+         String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues());
+         PrincipalPrivilegeSet privs =
+             getPartitionPrivilegeSet(catName, dbName, tblName, partName, userName, groupNames);
+         part.setPrivileges(privs);
+         partitions.add(part);
+       }
+     }
+     return partitions;
+   }
+ 
+   @Override
 -  public boolean updateTableColumnStatistics(ColumnStatistics colStats)
++  public boolean updateTableColumnStatistics(ColumnStatistics colStats, long txnId, String validWriteIds, long writeId)
+       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
 -    boolean succ = rawStore.updateTableColumnStatistics(colStats);
++    boolean succ = rawStore.updateTableColumnStatistics(colStats, txnId, validWriteIds, writeId);
+     if (succ) {
+       String catName = colStats.getStatsDesc().isSetCatName() ?
+           normalizeIdentifier(colStats.getStatsDesc().getCatName()) :
+           getDefaultCatalog(conf);
+       String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
+       String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         return succ;
+       }
+       Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
+       if (table == null) {
+         // The table is not yet loaded in cache
+         return succ;
+       }
+       List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+       List<String> colNames = new ArrayList<>();
+       for (ColumnStatisticsObj statsObj : statsObjs) {
+         colNames.add(statsObj.getColName());
+       }
+       StatsSetupConst.setColumnStatsState(table.getParameters(), colNames);
+       sharedCache.alterTableInCache(catName, dbName, tblName, table);
+       sharedCache.updateTableColStatsInCache(catName, dbName, tblName, statsObjs);
+     }
+     return succ;
+   }
+ 
+   @Override
+   public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tblName,
+       List<String> colNames) throws MetaException, NoSuchObjectException {
++    return getTableColumnStatistics(catName, dbName, tblName, colNames, -1, null);
++  }
++
++  // TODO: the same as getTable()
++  @Override
++  public ColumnStatistics getTableColumnStatistics(
++      String catName, String dbName, String tblName, List<String> colNames,
++      long txnId, String writeIdList)
++      throws MetaException, NoSuchObjectException {
+     catName = StringUtils.normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
 -      return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames);
++      return rawStore.getTableColumnStatistics(
++          catName, dbName, tblName, colNames, txnId, writeIdList);
+     }
+     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
 -    if (table == null) {
++    if (table == null || writeIdList != null) {
+       // The table is not yet loaded in cache
 -      return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames);
++      return rawStore.getTableColumnStatistics(
++          catName, dbName, tblName, colNames, txnId, writeIdList);
+     }
+     ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
+     List<ColumnStatisticsObj> colStatObjs =
+         sharedCache.getTableColStatsFromCache(catName, dbName, tblName, colNames);
+     return new ColumnStatistics(csd, colStatObjs);
+   }
+ 
+   @Override
+   public boolean deleteTableColumnStatistics(String catName, String dbName, String tblName,
+                                              String colName)
+       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+     boolean succ = rawStore.deleteTableColumnStatistics(catName, dbName, tblName, colName);
+     if (succ) {
+       catName = normalizeIdentifier(catName);
+       dbName = normalizeIdentifier(dbName);
+       tblName = normalizeIdentifier(tblName);
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         return succ;
+       }
+       sharedCache.removeTableColStatsFromCache(catName, dbName, tblName, colName);
+     }
+     return succ;
+   }
+ 
+   @Override
 -  public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals)
++  public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals,
++      long txnId, String validWriteIds, long writeId)
+       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
 -    boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals);
++    boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals, txnId, validWriteIds, writeId);
+     if (succ) {
+       String catName = colStats.getStatsDesc().isSetCatName() ?
+           normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME;
+       String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName());
+       String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName());
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         return succ;
+       }
+       List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
+       Partition part = getPartition(catName, dbName, tblName, partVals);
+       List<String> colNames = new ArrayList<>();
+       for (ColumnStatisticsObj statsObj : statsObjs) {
+         colNames.add(statsObj.getColName());
+       }
+       StatsSetupConst.setColumnStatsState(part.getParameters(), colNames);
+       sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, part);
+       sharedCache.updatePartitionColStatsInCache(catName, dbName, tblName, partVals, colStats.getStatsObj());
+     }
+     return succ;
+   }
+ 
+   @Override
+   // TODO: calculate from cached values.
+   public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, String tblName,
+       List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException {
+     return rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames);
+   }
+ 
+   @Override
++  public List<ColumnStatistics> getPartitionColumnStatistics(
++      String catName, String dbName, String tblName, List<String> partNames,
++      List<String> colNames, long txnId, String writeIdList)
++      throws MetaException, NoSuchObjectException {
++    return rawStore.getPartitionColumnStatistics(
++        catName, dbName, tblName, partNames, colNames, txnId, writeIdList);
++  }
++
++  @Override
+   public boolean deletePartitionColumnStatistics(String catName, String dbName, String tblName, String partName,
+       List<String> partVals, String colName)
+       throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException {
+     boolean succ =
+         rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName);
+     if (succ) {
+       catName = normalizeIdentifier(catName);
+       dbName = normalizeIdentifier(dbName);
+       tblName = normalizeIdentifier(tblName);
+       if (!shouldCacheTable(catName, dbName, tblName)) {
+         return succ;
+       }
+       sharedCache.removePartitionColStatsFromCache(catName, dbName, tblName, partVals, colName);
+     }
+     return succ;
+   }
+ 
+   @Override
+   public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, List<String> partNames,
+       List<String> colNames) throws MetaException, NoSuchObjectException {
++    return get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, -1, null);
++  }
++
++  @Override
++  // TODO: the same as getTable() for transactional stats.
++  public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName,
++                                      List<String> partNames, List<String> colNames,
++                                      long txnId, String writeIdList)
++      throws MetaException, NoSuchObjectException {
+     List<ColumnStatisticsObj> colStats;
+     catName = normalizeIdentifier(catName);
+     dbName = StringUtils.normalizeIdentifier(dbName);
+     tblName = StringUtils.normalizeIdentifier(tblName);
+     if (!shouldCacheTable(catName, dbName, tblName)) {
 -      rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
++      rawStore.get_aggr_stats_for(
++          catName, dbName, tblName, partNames, colNames, txnId, writeIdList);
+     }
+     Table table = sharedCache.getTableFromCache(catName, dbName, tblName);
 -    if (table == null) {
++    if (table == null || writeIdList != null) {
+       // The table is not yet loaded in cache
 -      return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames);
++      return rawStore.get_aggr_stats_for(
++          catName, dbName, tblName, partNames, colNames, txnId, writeIdList);
+     }
+     List<String> allPartNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1);
+     if (partNames.size() == allPartNames.size()) {
+       colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALL);
+       if (colStats != null) {
+         return new AggrStats(colStats, partNames.size());
+       }
+     } else if (partNames.size() == (allPartNames.size() - 1)) {
+       String defaultPartitionName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
+       if (!partNames.contains(defaultPartitionName)) {
+         colStats =
+             sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALLBUTDEFAULT);
+         if (colStats != null) {
+           return new AggrStats(colStats, partNames.size());
+         }
+       }
+     }
+     LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}",
+         tblName, partNames, colNames);
+     MergedColumnStatsForPartitions mergedColStats =
+         mergeColStatsForPartitions(catName, dbName, tblName, partNames, colNames, sharedCache);
+     return new AggrStats(mergedColStats.getColStats(), mergedColStats.getPartsFound());
+   }
+ 
+   private MergedColumnStatsForPartitions mergeColStatsForPartitions(
+       String catName, String dbName, String tblName, List<String> partNames, List<String> colNames,
+       SharedCache sharedCache) throws MetaException {
+     final boolean useDensityFunctionForNDVEstimation =
+         MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_NDV_DENSITY_FUNCTION);
+     final double ndvTuner = MetastoreConf.getDoubleVar(getConf(), ConfVars.STATS_NDV_TUNER);
+     Map<ColumnStatsAggregator, List<ColStatsObjWithSourceInfo>> colStatsMap = new HashMap<>();
+     boolean areAllPartsFound = true;
+     long partsFound = 0;
+     for (String colName : colNames) {
+       long partsFoundForColumn = 0;
+       ColumnStatsAggregator colStatsAggregator = null;
+       List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList = new ArrayList<>();
+       for (String partName : partNames) {
+         ColumnStatisticsObj colStatsForPart =
+             sharedCache.getPartitionColStatsFromCache(catName, dbName, tblName, partNameToVals(partName), colName);
+         if (colStatsForPart != null) {
+           ColStatsObjWithSourceInfo colStatsWithPartInfo =
+               new ColStatsObjWithSourceInfo(colStatsForPart, catName, dbName, tblName, partName);
+           colStatsWithPartInfoList.add(colStatsWithPartInfo);
+           if (colStatsAggregator == null) {
+             colStatsAggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator(
+                 colStatsForPart.getStatsData().getSetField(), useDensityFunctionForNDVEstimation,
+                 ndvTuner);
+           }
+           partsFoundForColumn++;
+         } else {
+           LOG.debug(
+               "Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}",
+               dbName, tblName, partName, colName);
+         }
+       }
+       if (colStatsWithPartInfoList.size() > 0) {
+         colStatsMap.put(colStatsAggregator, colStatsWithPartInfoList);
+       }
+       if (partsFoundForColumn == partNames.size()) {
+         partsFound++;
+       }
+       if (colStatsMap.size() < 1) {
+         LOG.debug("No stats data found for: dbName={} tblName= {} partNames= {} colNames= ", dbName,
+             tblName, partNames, colNames);
+         return new MergedColumnStatsForPartitions(new ArrayList<ColumnStatisticsObj>(), 0);
+       }
+     }
+     // Note that enableBitVector does not apply here because ColumnStatisticsObj
+     // itself will tell whether bitvector is null or not and aggr logic can automatically apply.
+     return new MergedColumnStatsForPartitions(MetaStoreUtils.aggrPartitionStats(colStatsMap,
+         partNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner), partsFound);
+   }
+ 
+   class MergedColumnStatsForPartitions {
+     List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>();
+     long partsFound;
+ 
+     MergedColumnStatsForPartitions(List<ColumnStatisticsObj> colStats, long partsFound) {
+       this.colStats = colStats;
+       this.partsFound = partsFound;
+     }
+ 
+     List<ColumnStatisticsObj> getColStats() {
+       return colStats;
+     }
+ 
+     long getPartsFound() {
+       return partsFound;
+     }
+   }
+ 
+   @Override
+   public long cleanupEvents() {
+     return rawStore.cleanupEvents();
+   }
+ 
+   @Override
+   public boolean addToken(String tokenIdentifier, String delegationToken) {
+     return rawStore.addToken(tokenIdentifier, delegationToken);
+   }
+ 
+   @Override
+   public boolean removeToken(String tokenIdentifier) {
+     return rawStore.removeToken(tokenIdentifier);
+   }
+ 
+   @Override
+   public String getToken(String tokenIdentifier) {
+     return rawStore.getToken(tokenIdentifier);
+   }
+ 
+   @Override
+   public List<String> getAllTokenIdentifiers() {
+    

<TRUNCATED>