You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/18 16:39:47 UTC

[impala] 02/05: fe: improve logging for metadata loading

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 50e0fbf889465eb288a5ef69477ccbc627681105
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Wed May 29 21:23:39 2019 -0700

    fe: improve logging for metadata loading
    
    This annotates various catalogd-internal calls associated with
    refreshing and loading metadata with a 'String why' parameter, useful
    for logging. This can help understand why a particular piece of metadata
    was loaded, and in the case of REFRESH calls, who issued the original
    refresh.
    
    Additionally, some of the log statements were improved to add a bit of
    extra detail such as the list of partitions being refreshed
    (abbreviated) and whether or not the table schema is being refreshed.
    
    Change-Id: I4d90a5f075b05d2dc96692b3349abe35ce24b8b8
    Reviewed-on: http://gerrit.cloudera.org:8080/13463
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 .../apache/impala/analysis/ResetMetadataStmt.java  |  11 ++-
 .../impala/catalog/CatalogServiceCatalog.java      |  35 +++----
 .../org/apache/impala/catalog/DataSourceTable.java |   3 +-
 .../java/org/apache/impala/catalog/HBaseTable.java |   3 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |  27 ++++--
 .../org/apache/impala/catalog/IncompleteTable.java |   3 +-
 .../java/org/apache/impala/catalog/KuduTable.java  |   3 +-
 .../main/java/org/apache/impala/catalog/Table.java |   3 +-
 .../org/apache/impala/catalog/TableLoader.java     |   7 +-
 .../org/apache/impala/catalog/TableLoadingMgr.java |  24 +++--
 .../main/java/org/apache/impala/catalog/View.java  |   3 +-
 .../impala/catalog/events/MetastoreEvents.java     |  15 ++-
 .../apache/impala/service/CatalogOpExecutor.java   |  91 ++++++++++++-------
 .../catalog/CatalogObjectToFromThriftTest.java     |  14 +--
 .../org/apache/impala/catalog/CatalogTest.java     | 101 +++++++++++----------
 .../catalog/CatalogdTableInvalidatorTest.java      |   2 +-
 .../events/MetastoreEventsProcessorTest.java       |   4 +-
 .../apache/impala/testutil/ImpaladTestCatalog.java |   2 +-
 18 files changed, 207 insertions(+), 144 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index 2ca4cf5..ac4e69e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -21,8 +21,11 @@ import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.impala.authorization.Privilege;
+import org.apache.impala.authorization.User;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TTableName;
 
@@ -68,6 +71,9 @@ public class ResetMetadataStmt extends StatementBase {
   // The type of action.
   private final Action action_;
 
+  // Set during analysis.
+  private User requestingUser_;
+
   private ResetMetadataStmt(Action action, String db, TableName tableName,
       PartitionSpec partitionSpec) {
     Preconditions.checkNotNull(action);
@@ -124,6 +130,7 @@ public class ResetMetadataStmt extends StatementBase {
 
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
+    requestingUser_ = analyzer.getUser();
     switch (action_) {
       case INVALIDATE_METADATA_TABLE:
       case REFRESH_TABLE:
@@ -214,8 +221,10 @@ public class ResetMetadataStmt extends StatementBase {
     return result.toString();
   }
 
-  public TResetMetadataRequest toThrift() {
+  public TResetMetadataRequest toThrift() throws InternalException {
     TResetMetadataRequest params = new TResetMetadataRequest();
+    params.setHeader(new TCatalogServiceRequestHeader());
+    params.header.setRequesting_user(requestingUser_.getShortName());
     params.setIs_refresh(action_.isRefresh());
     if (tableName_ != null) {
       params.setTable_name(new TTableName(tableName_.getDb(), tableName_.getTbl()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 8c4678f..db143fb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.hadoop.fs.Hdfs;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@@ -481,8 +480,8 @@ public class CatalogServiceCatalog extends Catalog {
    * Adds a list of cache directive IDs for the given table name. Asynchronously
    * refreshes the table metadata once all cache directives complete.
    */
-  public void watchCacheDirs(List<Long> dirIds, TTableName tblName) {
-    tableLoadingMgr_.watchCacheDirs(dirIds, tblName);
+  public void watchCacheDirs(List<Long> dirIds, TTableName tblName, String reason) {
+    tableLoadingMgr_.watchCacheDirs(dirIds, tblName, reason);
   }
 
   /**
@@ -512,7 +511,8 @@ public class CatalogServiceCatalog extends Catalog {
     TTableName tableName = request.table_name;
     LOG.info("Fetching partition statistics for: " + tableName.getDb_name() + "."
         + tableName.getTable_name());
-    Table table = getOrLoadTable(tableName.db_name, tableName.table_name);
+    Table table = getOrLoadTable(tableName.db_name, tableName.table_name,
+        "needed to fetch partition stats");
 
     // Table could be null if it does not exist anymore.
     if (table == null) {
@@ -1695,7 +1695,7 @@ public class CatalogServiceCatalog extends Catalog {
    * and the current cached value will be returned. This may mean that a missing table
    * (not yet loaded table) will be returned.
    */
-  public Table getOrLoadTable(String dbName, String tblName)
+  public Table getOrLoadTable(String dbName, String tblName, String reason)
       throws CatalogException {
     TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
     TableLoadingMgr.LoadRequest loadReq;
@@ -1707,7 +1707,7 @@ public class CatalogServiceCatalog extends Catalog {
       Table tbl = getTable(dbName, tblName);
       if (tbl == null || tbl.isLoaded()) return tbl;
       previousCatalogVersion = tbl.getCatalogVersion();
-      loadReq = tableLoadingMgr_.loadAsync(tableName);
+      loadReq = tableLoadingMgr_.loadAsync(tableName, reason);
     } finally {
       versionLock_.readLock().unlock();
     }
@@ -1951,7 +1951,7 @@ public class CatalogServiceCatalog extends Catalog {
    * metadata load from concurrent table modifications and assigns a new catalog version.
    * Throws a CatalogException if there is an error loading table metadata.
    */
-  public TCatalogObject reloadTable(Table tbl) throws CatalogException {
+  public TCatalogObject reloadTable(Table tbl, String reason) throws CatalogException {
     LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName()));
     Preconditions.checkState(!(tbl instanceof IncompleteTable));
     String dbName = tbl.getDb().getName();
@@ -1973,7 +1973,7 @@ public class CatalogServiceCatalog extends Catalog {
           throw new TableLoadingException("Error loading metadata for table: " +
               dbName + "." + tblName, e);
         }
-        tbl.load(true, msClient.getHiveClient(), msTbl);
+        tbl.load(true, msClient.getHiveClient(), msTbl, reason);
       }
       tbl.setCatalogVersion(newCatalogVersion);
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
@@ -2146,10 +2146,10 @@ public class CatalogServiceCatalog extends Catalog {
    * @throws DatabaseNotFoundException if Db doesn't exist.
    */
   public boolean reloadPartitionIfExists(String dbName, String tblName,
-      List<TPartitionKeyValue> tPartSpec) throws CatalogException {
+      List<TPartitionKeyValue> tPartSpec, String reason) throws CatalogException {
     Table table = getTable(dbName, tblName);
     if (table == null || table instanceof IncompleteTable) return false;
-    reloadPartition(table, tPartSpec);
+    reloadPartition(table, tPartSpec, reason);
     return true;
   }
 
@@ -2158,11 +2158,11 @@ public class CatalogServiceCatalog extends Catalog {
    * otherwise. Throws CatalogException if reloadTable() is unsuccessful. Throws
    * DatabaseNotFoundException if Db doesn't exist.
    */
-  public boolean reloadTableIfExists(String dbName, String tblName)
+  public boolean reloadTableIfExists(String dbName, String tblName, String reason)
       throws CatalogException {
     Table table = getTable(dbName, tblName);
     if (table == null || table instanceof IncompleteTable) return false;
-    reloadTable(table);
+    reloadTable(table, reason);
     return true;
   }
 
@@ -2467,8 +2467,8 @@ public class CatalogServiceCatalog extends Catalog {
    * 'partitionSpec' in table 'tbl'. Returns the resulting table's TCatalogObject after
    * the partition metadata was reloaded.
    */
-  public TCatalogObject reloadPartition(Table tbl, List<TPartitionKeyValue> partitionSpec)
-      throws CatalogException {
+  public TCatalogObject reloadPartition(Table tbl,
+      List<TPartitionKeyValue> partitionSpec, String reason) throws CatalogException {
     if (!tryLockTable(tbl)) {
       throw new CatalogException(String.format("Error reloading partition of table %s " +
           "due to lock contention", tbl.getFullName()));
@@ -2484,8 +2484,8 @@ public class CatalogServiceCatalog extends Catalog {
       String partitionName = hdfsPartition == null
           ? HdfsTable.constructPartitionName(partitionSpec)
           : hdfsPartition.getPartitionName();
-      LOG.info(String.format("Refreshing partition metadata: %s %s",
-          hdfsTable.getFullName(), partitionName));
+      LOG.info(String.format("Refreshing partition metadata: %s %s (%s)",
+          hdfsTable.getFullName(), partitionName, reason));
       try (MetaStoreClient msClient = getMetaStoreClient()) {
         org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
         try {
@@ -2789,7 +2789,8 @@ public class CatalogServiceCatalog extends Catalog {
       Table table;
       try {
         table = getOrLoadTable(
-            objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name());
+            objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
+            "needed by coordinator");
       } catch (DatabaseNotFoundException e) {
         return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
       }
diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
index ee90c6f..4bf6d68 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
@@ -171,7 +171,8 @@ public class DataSourceTable extends Table implements FeDataSourceTable {
 
   @Override
   public void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+      org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+      throws TableLoadingException {
     Preconditions.checkNotNull(msTbl);
     msTable_ = msTbl;
     clearColumns();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
index 0b81b5d..d75afa7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
@@ -98,7 +98,8 @@ public class HBaseTable extends Table implements FeHBaseTable {
    */
   @Override
   public void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+      org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+      throws TableLoadingException {
     Preconditions.checkNotNull(getMetaStoreTable());
     try (Timer.Context timer = getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time()) {
       msTable_ = msTbl;
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 5dc0f03..980cf47 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -91,6 +91,7 @@ import org.slf4j.LoggerFactory;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableList;
@@ -615,7 +616,15 @@ public class HdfsTable extends Table implements FeFsTable {
     // - how many block locations did we reuse/load individually/load via batch
     // - how many partitions did we read metadata for
     // - etc...
-    LOG.info("Loaded file and block metadata for {}", getFullName());
+    String partNames = Joiner.on(", ").join(
+        Iterables.limit(Iterables.transform(parts, HdfsPartition::getPartitionName), 3));
+    if (partsByPath.size() > 3) {
+      partNames += String.format(", and %s others",
+          Iterables.size(parts) - 3);
+    }
+
+    LOG.info("Loaded file and block metadata for {} partitions: {}", getFullName(),
+        partNames);
   }
 
   /**
@@ -878,8 +887,9 @@ public class HdfsTable extends Table implements FeFsTable {
 
   @Override
   public void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
-    load(reuseMetadata, client, msTbl, true, true, null);
+      org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+      throws TableLoadingException {
+    load(reuseMetadata, client, msTbl, true, true, null, reason);
   }
 
   /**
@@ -909,13 +919,15 @@ public class HdfsTable extends Table implements FeFsTable {
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl,
       boolean loadParitionFileMetadata, boolean loadTableSchema,
-      Set<String> partitionsToUpdate) throws TableLoadingException {
+      Set<String> partitionsToUpdate, String reason) throws TableLoadingException {
     final Timer.Context context =
         getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
-    String annotation = String.format("%s metadata for %s partition(s) of %s.%s",
+    String annotation = String.format("%s metadata for %s%s partition(s) of %s.%s (%s)",
         reuseMetadata ? "Reloading" : "Loading",
+        loadTableSchema ? "table definition and " : "",
         partitionsToUpdate == null ? "all" : String.valueOf(partitionsToUpdate.size()),
-        msTbl.getDbName(), msTbl.getTableName());
+        msTbl.getDbName(), msTbl.getTableName(), reason);
+    LOG.info(annotation);;
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
       // turn all exceptions into TableLoadingException
       msTable_ = msTbl;
@@ -932,7 +944,6 @@ public class HdfsTable extends Table implements FeFsTable {
         // Load partition and file metadata
         if (reuseMetadata) {
           // Incrementally update this table's partitions and file metadata
-          LOG.info("Incrementally loading table metadata for: " + getFullName());
           Preconditions.checkState(
               partitionsToUpdate == null || loadParitionFileMetadata);
           updateMdFromHmsTable(msTbl);
@@ -944,8 +955,8 @@ public class HdfsTable extends Table implements FeFsTable {
           }
           LOG.info("Incrementally loaded table metadata for: " + getFullName());
         } else {
-          // Load all partitions from Hive Metastore, including file metadata.
           LOG.info("Fetching partition metadata from the Metastore: " + getFullName());
+          // Load all partitions from Hive Metastore, including file metadata.
           List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
               MetaStoreUtil.fetchAllPartitions(
                   client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
diff --git a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
index ac2930e..be8ac97 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
@@ -80,7 +80,8 @@ public class IncompleteTable extends Table implements FeIncompleteTable {
 
   @Override
   public void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+      org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+      throws TableLoadingException {
     if (cause_ instanceof TableLoadingException) {
       throw (TableLoadingException) cause_;
     } else {
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 329c5aa..338c979 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -289,7 +289,8 @@ public class KuduTable extends Table implements FeKuduTable {
    */
   @Override
   public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
-      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+      org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+      throws TableLoadingException {
     final Timer.Context context =
         getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     try {
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index f3ddd63..b0264c2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -219,7 +219,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
    * valid existing metadata.
    */
   public abstract void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException;
+      org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+      throws TableLoadingException;
 
   /**
    * Sets 'tableStats_' by extracting the table statistics from the given HMS table.
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index 1c6c815..db29a7b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.catalog;
 
-import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.metastore.TableType;
@@ -54,10 +53,10 @@ public class TableLoader {
    * Returns new instance of Table, If there were any errors loading the table metadata
    * an IncompleteTable will be returned that contains details on the error.
    */
-  public Table load(Db db, String tblName) {
+  public Table load(Db db, String tblName, String reason) {
     Stopwatch sw = new Stopwatch().start();
     String fullTblName = db.getName() + "." + tblName;
-    String annotation = "Loading metadata for: " + fullTblName;
+    String annotation = "Loading metadata for: " + fullTblName + " (" + reason + ")";
     LOG.info(annotation);
     Table table;
     // turn all exceptions into TableLoadingException
@@ -81,7 +80,7 @@ public class TableLoader {
         throw new TableLoadingException(
             "Unrecognized table type for table: " + fullTblName);
       }
-      table.load(false, msClient.getHiveClient(), msTbl);
+      table.load(false, msClient.getHiveClient(), msTbl, reason);
       table.validate();
     } catch (TableLoadingException e) {
       table = IncompleteTable.createFailedMetadataLoadTable(db, tblName, e);
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
index b79af30..e2ad513 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
@@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.log4j.Logger;
@@ -151,8 +152,8 @@ public class TableLoadingMgr {
 
   // Tables for the async refresh thread to process. Synchronization must be handled
   // externally.
-  private final LinkedBlockingQueue<TTableName> refreshThreadWork_ =
-      new LinkedBlockingQueue<TTableName>();
+  private final LinkedBlockingQueue<Pair<TTableName, String>> refreshThreadWork_ =
+      new LinkedBlockingQueue<>();
 
   private final CatalogServiceCatalog catalog_;
   private final TableLoader tblLoader_;
@@ -172,7 +173,8 @@ public class TableLoadingMgr {
       @Override
       public Void call() throws Exception {
         while(true) {
-          execAsyncRefreshWork(refreshThreadWork_.take());
+          Pair<TTableName, String> work = refreshThreadWork_.take();
+          execAsyncRefreshWork(work.first, /* reason=*/work.second);
         }
       }});
   }
@@ -206,7 +208,8 @@ public class TableLoadingMgr {
    * asyncRefreshThread_ will refresh the table metadata. After processing the
    * request the watch will be deleted.
    */
-  public void watchCacheDirs(List<Long> cacheDirIds, final TTableName tblName) {
+  public void watchCacheDirs(List<Long> cacheDirIds, final TTableName tblName,
+      final String reason) {
     synchronized (pendingTableCacheDirs_) {
       // A single table may have multiple pending cache requests since one request
       // gets submitted per-partition.
@@ -214,7 +217,7 @@ public class TableLoadingMgr {
       if (existingCacheReqIds == null) {
         existingCacheReqIds = cacheDirIds;
         pendingTableCacheDirs_.put(tblName, cacheDirIds);
-        refreshThreadWork_.add(tblName);
+        refreshThreadWork_.add(Pair.create(tblName, reason));
       } else {
         existingCacheReqIds.addAll(cacheDirIds);
       }
@@ -227,7 +230,7 @@ public class TableLoadingMgr {
    * the same underlying loading task (Future) will be used, helping to prevent duplicate
    * loads of the same table.
    */
-  public LoadRequest loadAsync(final TTableName tblName)
+  public LoadRequest loadAsync(final TTableName tblName, final String reason)
       throws DatabaseNotFoundException {
     final Db parentDb = catalog_.getDb(tblName.getDb_name());
     if (parentDb == null) {
@@ -238,7 +241,7 @@ public class TableLoadingMgr {
     FutureTask<Table> tableLoadTask = new FutureTask<Table>(new Callable<Table>() {
         @Override
         public Table call() throws Exception {
-          return tblLoader_.load(parentDb, tblName.table_name);
+          return tblLoader_.load(parentDb, tblName.table_name, reason);
         }});
 
     FutureTask<Table> existingValue = loadingTables_.putIfAbsent(tblName, tableLoadTask);
@@ -297,7 +300,8 @@ public class TableLoadingMgr {
     try {
       // TODO: Instead of calling "getOrLoad" here we could call "loadAsync". We would
       // just need to add a mechanism for moving loaded tables into the Catalog.
-      catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name());
+      catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name(),
+          "background load");
     } catch (CatalogException e) {
       // Ignore.
     } finally {
@@ -312,12 +316,12 @@ public class TableLoadingMgr {
    * anyway, and if the table failed to load, then we do not want to hide errors by
    * reloading it 'silently' in response to the completion of an HDFS caching request.
    */
-  private void execAsyncRefreshWork(TTableName tblName) {
+  private void execAsyncRefreshWork(TTableName tblName, String reason) {
     if (!waitForCacheDirs(tblName)) return;
     try {
       Table tbl = catalog_.getTable(tblName.getDb_name(), tblName.getTable_name());
       if (tbl == null || tbl instanceof IncompleteTable || !tbl.isLoaded()) return;
-      catalog_.reloadTable(tbl);
+      catalog_.reloadTable(tbl, reason);
     } catch (CatalogException e) {
       LOG.error("Error reloading cached table: ", e);
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/View.java b/fe/src/main/java/org/apache/impala/catalog/View.java
index abc6be8..a705f91 100644
--- a/fe/src/main/java/org/apache/impala/catalog/View.java
+++ b/fe/src/main/java/org/apache/impala/catalog/View.java
@@ -82,7 +82,8 @@ public class View extends Table implements FeView {
 
   @Override
   public void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+      org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+      throws TableLoadingException {
     try {
       clearColumns();
       msTable_ = msTbl;
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 07251c9..d626b52 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -779,7 +779,8 @@ public class MetastoreEvents {
         // Ignore event if table or database is not in catalog. Throw exception if
         // refresh fails. If the partition does not exist in metastore the reload
         // method below removes it from the catalog
-        if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec)) {
+        if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+            "processing partition-level INSERT event from HMS")) {
           debugLog("Refresh of table {} partition {} after insert "
                   + "event failed as the table is not present in the catalog.",
               getFullyQualifiedTblName(), (tPartSpec));
@@ -811,7 +812,8 @@ public class MetastoreEvents {
       try {
         // Ignore event if table or database is not in the catalog. Throw exception if
         // refresh fails.
-        if (!catalog_.reloadTableIfExists(dbName_, tblName_)) {
+        if (!catalog_.reloadTableIfExists(dbName_, tblName_,
+              "processing table-level INSERT event from HMS")) {
           debugLog("Automatic refresh table {} failed as the table is not "
               + "present either catalog or metastore.", getFullyQualifiedTblName());
         } else {
@@ -1366,7 +1368,8 @@ public class MetastoreEvents {
         for (Partition partition : addedPartitions_) {
           List<TPartitionKeyValue> tPartSpec =
               getTPartitionSpecFromHmsPartition(msTbl_, partition);
-          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec)) {
+          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+              "processing ADD_PARTITION event from HMS")) {
             debugLog("Refresh partitions on table {} failed "
                 + "as table was not present in the catalog.", getFullyQualifiedTblName());
             success = false;
@@ -1453,7 +1456,8 @@ public class MetastoreEvents {
         // Ignore event if table or database is not in catalog. Throw exception if
         // refresh fails.
 
-        if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec)) {
+        if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+            "processing ALTER_PARTITION event from HMS")) {
           debugLog("Refresh of table {} partition {} failed as the table "
                   + "is not present in the catalog.", getFullyQualifiedTblName(),
               constructPartitionStringFromTPartitionSpec(tPartSpec));
@@ -1538,7 +1542,8 @@ public class MetastoreEvents {
           for (Map.Entry<String, String> entry : partSpec.entrySet()) {
             tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
           }
-          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec)) {
+          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+              "processing DROP_PARTITION event from HMS")) {
             debugLog("Could not refresh partition {} of table {} as table "
                     + "was not present in the catalog.",
                     getFullyQualifiedTblName());
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 2dfca63..ce4570b 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -512,7 +512,8 @@ public class CatalogOpExecutor {
     Reference<Long> numUpdatedPartitions = new Reference<>(0L);
 
     TableName tableName = TableName.fromThrift(params.getTable_name());
-    Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
+    Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
+        "Load for ALTER TABLE");
     tryLock(tbl);
     // Get a new catalog version to assign to the table being altered.
     long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
@@ -699,7 +700,7 @@ public class CatalogOpExecutor {
 
       if (reloadMetadata) {
         loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata,
-            reloadTableSchema, null);
+            reloadTableSchema, null, "ALTER TABLE " + params.getAlter_type().name());
         addTableToCatalogUpdate(tbl, response.result);
       }
       // now that HMS alter operation has succeeded, add this version to list of inflight
@@ -766,7 +767,8 @@ public class CatalogOpExecutor {
             params.getAlter_type());
     }
 
-    loadTableMetadata(tbl, newCatalogVersion, true, true, null);
+    loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER KUDU TABLE " +
+        params.getAlter_type().name());
     addTableToCatalogUpdate(tbl, response.result);
   }
 
@@ -778,16 +780,16 @@ public class CatalogOpExecutor {
    */
   private void loadTableMetadata(Table tbl, long newCatalogVersion,
       boolean reloadFileMetadata, boolean reloadTableSchema,
-      Set<String> partitionsToUpdate) throws CatalogException {
+      Set<String> partitionsToUpdate, String reason) throws CatalogException {
     Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Table msTbl =
           getMetaStoreTable(msClient, tbl);
       if (tbl instanceof HdfsTable) {
         ((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
-            reloadFileMetadata, reloadTableSchema, partitionsToUpdate);
+            reloadFileMetadata, reloadTableSchema, partitionsToUpdate, reason);
       } else {
-        tbl.load(true, msClient.getHiveClient(), msTbl);
+        tbl.load(true, msClient.getHiveClient(), msTbl, reason);
       }
     }
     tbl.setCatalogVersion(newCatalogVersion);
@@ -832,7 +834,8 @@ public class CatalogOpExecutor {
     Preconditions.checkState(params.getColumns() != null &&
         params.getColumns().size() > 0,
           "Null or empty column list given as argument to DdlExecutor.alterView");
-    Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
+    Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
+        "Load for ALTER VIEW");
     Preconditions.checkState(tbl instanceof View, "Expected view: %s",
         tableName);
     tryLock(tbl);
@@ -859,7 +862,7 @@ public class CatalogOpExecutor {
       }
       applyAlterTable(msTbl, true);
       try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        tbl.load(true, msClient.getHiveClient(), msTbl);
+        tbl.load(true, msClient.getHiveClient(), msTbl, "ALTER VIEW");
       }
       addSummary(resp, "View has been altered.");
       tbl.setCatalogVersion(newCatalogVersion);
@@ -1282,7 +1285,7 @@ public class CatalogOpExecutor {
   private void dropStats(TDropStatsParams params, TDdlExecResponse resp)
       throws ImpalaException {
     Table table = getExistingTable(params.getTable_name().getDb_name(),
-        params.getTable_name().getTable_name());
+        params.getTable_name().getTable_name(), "Load for DROP STATS");
     Preconditions.checkNotNull(table);
     if (!catalog_.tryLockTable(table)) {
       throw new InternalException(String.format("Error dropping stats for table %s " +
@@ -1316,7 +1319,7 @@ public class CatalogOpExecutor {
           }
         }
       }
-      loadTableMetadata(table, newCatalogVersion, false, true, null);
+      loadTableMetadata(table, newCatalogVersion, false, true, null, "DROP STATS");
       addTableToCatalogUpdate(table, resp.result);
       addSummary(resp, "Stats have been dropped.");
     } finally {
@@ -1539,7 +1542,8 @@ public class CatalogOpExecutor {
     // In the LocalCatalog configuration, however, this is often necessary.
     try {
       catalog_.getOrLoadTable(params.getTable_name().db_name,
-          params.getTable_name().table_name);
+          params.getTable_name().table_name, "Load for DROP TABLE/VIEW");
+
     } catch (CatalogException e) {
       // Ignore exceptions -- the above was just to trigger loading. Failure to load
       // or non-existence of the database will be handled down below.
@@ -1686,7 +1690,8 @@ public class CatalogOpExecutor {
     TTableName tblName = params.getTable_name();
     Table table = null;
     try {
-      table = getExistingTable(tblName.getDb_name(), tblName.getTable_name());
+      table = getExistingTable(tblName.getDb_name(), tblName.getTable_name(),
+          "Load for TRUNCATE TABLE");
     } catch (TableNotFoundException e) {
       if (params.if_exists) {
         addSummary(resp, "Table does not exist.");
@@ -1724,7 +1729,7 @@ public class CatalogOpExecutor {
       }
       addSummary(resp, "Table has been truncated.");
 
-      loadTableMetadata(table, newCatalogVersion, true, true, null);
+      loadTableMetadata(table, newCatalogVersion, true, true, null, "TRUNCATE");
       addTableToCatalogUpdate(table, resp.result);
     } finally {
       Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
@@ -2014,7 +2019,8 @@ public class CatalogOpExecutor {
         long id = HdfsCachingUtil.submitCacheTblDirective(newTable,
             cacheOp.getCache_pool_name(), replication);
         catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
-            new TTableName(newTable.getDbName(), newTable.getTableName()));
+            new TTableName(newTable.getDbName(), newTable.getTableName()),
+                "CREATE TABLE CACHED");
         applyAlterTable(newTable, true);
       }
       Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
@@ -2110,7 +2116,8 @@ public class CatalogOpExecutor {
       }
       return;
     }
-    Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl());
+    Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl(),
+        "Load source for CREATE TABLE LIKE");
     org.apache.hadoop.hive.metastore.api.Table tbl =
         srcTable.getMetaStoreTable().deepCopy();
     Preconditions.checkState(!KuduTable.isKuduTable(tbl),
@@ -2464,7 +2471,8 @@ public class CatalogOpExecutor {
     // Update the partition metadata to include the cache directive id.
     if (!cacheIds.isEmpty()) {
       applyAlterHmsPartitions(msTbl, msClient, tableName, hmsPartitionsToCache);
-      catalog_.watchCacheDirs(cacheIds, tableName.toThrift());
+      catalog_.watchCacheDirs(cacheIds, tableName.toThrift(),
+         "ALTER TABLE CACHE PARTITIONS");
     }
   }
 
@@ -2924,7 +2932,8 @@ public class CatalogOpExecutor {
 
       // Submit a request to watch these cache directives. The TableLoadingMgr will
       // asynchronously refresh the table metadata once the directives complete.
-      catalog_.watchCacheDirs(cacheDirIds, tableName.toThrift());
+      catalog_.watchCacheDirs(cacheDirIds, tableName.toThrift(),
+          "ALTER TABLE SET CACHED");
     } else {
       // Uncache the table.
       if (cacheDirId != null) HdfsCachingUtil.removeTblCacheDirective(msTbl);
@@ -2995,7 +3004,8 @@ public class CatalogOpExecutor {
         // Once the cache directives are submitted, observe the status of the caching
         // until no more progress is made -- either fully cached or out of cache memory
         if (!cacheDirs.isEmpty()) {
-          catalog_.watchCacheDirs(cacheDirs, tableName.toThrift());
+          catalog_.watchCacheDirs(cacheDirs, tableName.toThrift(),
+              "ALTER PARTITION SET CACHED");
         }
         if (!partition.isMarkedCached()) {
           modifiedParts.add(partition);
@@ -3080,7 +3090,8 @@ public class CatalogOpExecutor {
           String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e);
     }
     if (!cacheIds.isEmpty()) {
-      catalog_.watchCacheDirs(cacheIds, tableName.toThrift());
+      catalog_.watchCacheDirs(cacheIds, tableName.toThrift(),
+          "ALTER TABLE RECOVER PARTITIONS");
     }
   }
 
@@ -3493,6 +3504,9 @@ public class CatalogOpExecutor {
    */
   public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)
       throws CatalogException {
+    String cmdString = String.format("%s issued by %s",
+        req.is_refresh ? "REFRESH":"INVALIDATE",
+        req.header != null ? req.header.requesting_user : " unknown user");
     TResetMetadataResponse resp = new TResetMetadataResponse();
     resp.setResult(new TCatalogUpdateResult());
     resp.getResult().setCatalog_service_id(JniCatalog.getServiceId());
@@ -3529,13 +3543,15 @@ public class CatalogOpExecutor {
           // If the table is not loaded, no need to perform refresh after the initial
           // metadata load.
           boolean needsRefresh = tbl.isLoaded();
-          tbl = getExistingTable(tblName.getDb(), tblName.getTbl());
+          tbl = getExistingTable(tblName.getDb(), tblName.getTbl(),
+              "Load triggered by " + cmdString);
           if (tbl != null) {
             if (needsRefresh) {
               if (req.isSetPartition_spec()) {
-                updatedThriftTable = catalog_.reloadPartition(tbl, req.getPartition_spec());
+                updatedThriftTable = catalog_.reloadPartition(tbl,
+                    req.getPartition_spec(), cmdString);
               } else {
-                updatedThriftTable = catalog_.reloadTable(tbl);
+                updatedThriftTable = catalog_.reloadTable(tbl, cmdString);
               }
             } else {
               // Table was loaded from scratch, so it's already "refreshed".
@@ -3610,7 +3626,8 @@ public class CatalogOpExecutor {
       throws ImpalaException {
     TUpdateCatalogResponse response = new TUpdateCatalogResponse();
     // Only update metastore for Hdfs tables.
-    Table table = getExistingTable(update.getDb_name(), update.getTarget_table());
+    Table table = getExistingTable(update.getDb_name(), update.getTarget_table(),
+        "Load for INSERT");
     if (!(table instanceof HdfsTable)) {
       throw new InternalException("Unexpected table type: " +
           update.getTarget_table());
@@ -3658,14 +3675,14 @@ public class CatalogOpExecutor {
           // consistent.
           String partName = partition.getPartitionName() + "/";
 
-          // Attempt to remove this partition name from from partsToCreate. If remove
+          // Attempt to remove this partition name from partsToCreate. If remove
           // returns true, it indicates the partition already exists.
           if (partsToCreate.remove(partName)) {
             affectedExistingPartitions.add(partition);
             if (partition.isMarkedCached()) {
-              // The partition was targeted by the insert and is also a cached. Since
+              // The partition was targeted by the insert and is also cached. Since
               // data was written to the partition, a watch needs to be placed on the
-              // cache cache directive so the TableLoadingMgr can perform an async
+              // cache directive so the TableLoadingMgr can perform an async
               // refresh once all data becomes cached.
               cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId(
                   partition.getParameters()));
@@ -3762,7 +3779,8 @@ public class CatalogOpExecutor {
       }
       // Submit the watch request for the given cache directives.
       if (!cacheDirIds.isEmpty()) {
-        catalog_.watchCacheDirs(cacheDirIds, tblName.toThrift());
+        catalog_.watchCacheDirs(cacheDirIds, tblName.toThrift(),
+            "INSERT into cached partitions");
       }
 
       response.setResult(new TCatalogUpdateResult());
@@ -3777,7 +3795,8 @@ public class CatalogOpExecutor {
             new TStatus(TErrorCode.OK, new ArrayList<String>()));
       }
 
-      loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata);
+      loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata,
+          "INSERT");
       // After loading metadata, fire insert events if external event processing is
       // enabled.
       createInsertEvents(table, affectedExistingPartitions, update.is_overwrite);
@@ -3876,8 +3895,9 @@ public class CatalogOpExecutor {
    * TODO: Track object IDs to
    * know when a table has been dropped and re-created with the same name.
    */
-  private Table getExistingTable(String dbName, String tblName) throws CatalogException {
-    Table tbl = catalog_.getOrLoadTable(dbName, tblName);
+  private Table getExistingTable(String dbName, String tblName, String reason)
+      throws CatalogException {
+    Table tbl = catalog_.getOrLoadTable(dbName, tblName, reason);
     if (tbl == null) {
       throw new TableNotFoundException("Table not found: " + dbName + "." + tblName);
     }
@@ -4017,7 +4037,8 @@ public class CatalogOpExecutor {
   private void alterCommentOnTableOrView(TableName tableName, String comment,
       TDdlExecResponse response) throws CatalogException, InternalException,
       ImpalaRuntimeException {
-    Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
+    Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
+        "Load for ALTER COMMENT");
     tryLock(tbl);
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
@@ -4034,7 +4055,7 @@ public class CatalogOpExecutor {
         msTbl.getParameters().put("comment", comment);
       }
       applyAlterTable(msTbl, true);
-      loadTableMetadata(tbl, newCatalogVersion, false, false, null);
+      loadTableMetadata(tbl, newCatalogVersion, false, false, null, "ALTER COMMENT");
       addTableToCatalogUpdate(tbl, response.result);
       addSummary(response, String.format("Updated %s.", (isView) ? "view" : "table"));
     } finally {
@@ -4045,7 +4066,8 @@ public class CatalogOpExecutor {
   private void alterCommentOnColumn(TableName tableName, String columnName,
       String comment, TDdlExecResponse response) throws CatalogException,
       InternalException, ImpalaRuntimeException {
-    Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
+    Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
+        "Load for ALTER COLUMN COMMENT");
     tryLock(tbl);
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
@@ -4067,7 +4089,8 @@ public class CatalogOpExecutor {
         }
         applyAlterTable(msTbl, true);
       }
-      loadTableMetadata(tbl, newCatalogVersion, false, true, null);
+      loadTableMetadata(tbl, newCatalogVersion, false, true, null,
+          "ALTER COLUMN COMMENT");
       addTableToCatalogUpdate(tbl, response.result);
       addSummary(response, "Column has been altered.");
     } finally {
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 0431373..995f3ba 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -63,7 +63,7 @@ public class CatalogObjectToFromThriftTest {
     String[] dbNames = {"functional", "functional_avro", "functional_parquet",
                         "functional_seq"};
     for (String dbName: dbNames) {
-      Table table = catalog_.getOrLoadTable(dbName, "alltypes");
+      Table table = catalog_.getOrLoadTable(dbName, "alltypes", "test");
       Assert.assertEquals(24, ((HdfsTable)table).getPartitions().size());
       Assert.assertEquals(24, ((HdfsTable)table).getPartitionIds().size());
 
@@ -131,7 +131,7 @@ public class CatalogObjectToFromThriftTest {
   @Test
   public void TestMismatchedAvroAndTableSchemas() throws CatalogException {
     Table table = catalog_.getOrLoadTable("functional_avro_snap",
-        "schema_resolution_test");
+        "schema_resolution_test", "test");
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "schema_resolution_test");
     Assert.assertTrue(thriftTable.isSetTable_type());
@@ -151,7 +151,7 @@ public class CatalogObjectToFromThriftTest {
   @Test
   public void TestHBaseTables() throws CatalogException {
     String dbName = "functional_hbase";
-    Table table = catalog_.getOrLoadTable(dbName, "alltypes");
+    Table table = catalog_.getOrLoadTable(dbName, "alltypes", "test");
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "alltypes");
     Assert.assertEquals(thriftTable.db_name, dbName);
@@ -180,7 +180,7 @@ public class CatalogObjectToFromThriftTest {
   public void TestHBaseTableWithBinaryEncodedCols()
       throws CatalogException {
     String dbName = "functional_hbase";
-    Table table = catalog_.getOrLoadTable(dbName, "alltypessmallbinary");
+    Table table = catalog_.getOrLoadTable(dbName, "alltypessmallbinary", "test");
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "alltypessmallbinary");
     Assert.assertEquals(thriftTable.db_name, dbName);
@@ -217,7 +217,7 @@ public class CatalogObjectToFromThriftTest {
     Assume.assumeTrue(
         "Skipping this test since it is only supported when running against Hive-2",
         TestUtils.getHiveMajorVersion() == 2);
-    Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl");
+    Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl", "test");
     Assert.assertNotNull(table);
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "hive_index_tbl");
@@ -226,7 +226,7 @@ public class CatalogObjectToFromThriftTest {
 
   @Test
   public void TestTableLoadingErrors() throws ImpalaException {
-    Table table = catalog_.getOrLoadTable("functional", "alltypes");
+    Table table = catalog_.getOrLoadTable("functional", "alltypes", "test");
     HdfsTable hdfsTable = (HdfsTable) table;
     // Get any partition with valid HMS parameters to create a
     // dummy partition.
@@ -250,7 +250,7 @@ public class CatalogObjectToFromThriftTest {
 
   @Test
   public void TestView() throws CatalogException {
-    Table table = catalog_.getOrLoadTable("functional", "view_view");
+    Table table = catalog_.getOrLoadTable("functional", "view_view", "test");
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "view_view");
     Assert.assertEquals(thriftTable.db_name, "functional");
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index ad47845..8fd46f2 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -130,50 +130,52 @@ public class CatalogTest {
     Db functionalDb = catalog_.getDb("functional");
     assertNotNull(functionalDb);
     assertEquals(functionalDb.getName(), "functional");
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view_sub"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypessmall"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserror"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserrornonulls"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesagg"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesaggnonulls"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesnopart"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesinsert"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "complex_view"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "testtbl"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "dimtbl"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "jointbl"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "liketbl"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "greptiny"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "rankingssmall"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "uservisitssmall"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "view_view"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "date_tbl"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view_sub", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypessmall", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserror", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserrornonulls", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesagg", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesaggnonulls", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesnopart", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesinsert", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "complex_view", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "testtbl", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "dimtbl", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "jointbl", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "liketbl", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "greptiny", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "rankingssmall", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "uservisitssmall", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "view_view", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "date_tbl", "test"));
     // IMP-163 - table with string partition column does not load if there are partitions
-    assertNotNull(catalog_.getOrLoadTable("functional", "StringPartitionKey"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "StringPartitionKey", "test"));
     // Test non-existent table
-    assertNull(catalog_.getOrLoadTable("functional", "nonexistenttable"));
+    assertNull(catalog_.getOrLoadTable("functional", "nonexistenttable", "test"));
 
     // functional_seq contains the same tables as functional
     Db testDb = catalog_.getDb("functional_seq");
     assertNotNull(testDb);
     assertEquals(testDb.getName(), "functional_seq");
-    assertNotNull(catalog_.getOrLoadTable("functional_seq", "alltypes"));
-    assertNotNull(catalog_.getOrLoadTable("functional_seq", "testtbl"));
+    assertNotNull(catalog_.getOrLoadTable("functional_seq", "alltypes", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional_seq", "testtbl", "test"));
 
     Db hbaseDb = catalog_.getDb("functional_hbase");
     assertNotNull(hbaseDb);
     assertEquals(hbaseDb.getName(), "functional_hbase");
     // Loading succeeds for an HBase table that has binary columns and an implicit key
     // column mapping
-    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmallbinary"));
-    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmall"));
-    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "hbasealltypeserror"));
+    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmallbinary",
+        "test"));
+    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmall", "test"));
+    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "hbasealltypeserror",
+        "test"));
     assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(),
-        "hbasealltypeserrornonulls"));
-    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypesagg"));
-    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "stringids"));
+        "hbasealltypeserrornonulls", "test"));
+    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypesagg", "test"));
+    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "stringids", "test"));
 
     checkTableCols(functionalDb, "alltypes", 2,
         new String[]
@@ -323,8 +325,8 @@ public class CatalogTest {
         new Type[] {Type.DATE, Type.INT, Type.DATE});
 
     // case-insensitive lookup
-    assertEquals(catalog_.getOrLoadTable("functional", "alltypes"),
-        catalog_.getOrLoadTable("functional", "AllTypes"));
+    assertEquals(catalog_.getOrLoadTable("functional", "alltypes", "test"),
+        catalog_.getOrLoadTable("functional", "AllTypes", "test"));
   }
 
   // Count of listFiles (list status + blocks) calls
@@ -352,7 +354,8 @@ public class CatalogTest {
         /*tblWasRemoved=*/new Reference<Boolean>(),
         /*dbWasAdded=*/new Reference<Boolean>());
 
-    HdfsTable table = (HdfsTable)catalog_.getOrLoadTable("functional", "AllTypes");
+    HdfsTable table = (HdfsTable)catalog_.getOrLoadTable("functional", "AllTypes",
+        "test");
     StorageStatistics opsCounts = stats.get(DFSOpsCountStatistics.NAME);
 
     // We expect:
@@ -375,7 +378,7 @@ public class CatalogTest {
 
     // Now test REFRESH on the table...
     stats.reset();
-    catalog_.reloadTable(table);
+    catalog_.reloadTable(table, "test");
 
     // Again, we expect only one getFileStatus call, for the top-level directory.
     assertEquals(1L, (long)opsCounts.getLong(GET_FILE_STATUS));
@@ -393,17 +396,17 @@ public class CatalogTest {
     List<TPartitionKeyValue> partitionSpec = ImmutableList.of(
         new TPartitionKeyValue("year", "2010"),
         new TPartitionKeyValue("month", "10"));
-    catalog_.reloadPartition(table, partitionSpec);
+    catalog_.reloadPartition(table, partitionSpec, "test");
     assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
 
     // Loading or reloading an unpartitioned table with some files in it should not make
     // an RPC per file.
     stats.reset();
     HdfsTable unpartTable = (HdfsTable)catalog_.getOrLoadTable(
-        "functional", "alltypesaggmultifilesnopart");
+        "functional", "alltypesaggmultifilesnopart", "test");
     assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
     stats.reset();
-    catalog_.reloadTable(unpartTable);
+    catalog_.reloadTable(unpartTable, "test");
     assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
 
     // Simulate an empty partition, which will trigger the full
@@ -414,7 +417,7 @@ public class CatalogTest {
         .getPartitionFromThriftPartitionSpec(partitionSpec);
     hdfsPartition.setFileDescriptors(new ArrayList<>());
     stats.reset();
-    catalog_.reloadPartition(table, partitionSpec);
+    catalog_.reloadPartition(table, partitionSpec, "test");
 
     // Should not scan the directory file-by-file, should use a single
     // listLocatedStatus() to get the whole directory (partition)
@@ -427,7 +430,7 @@ public class CatalogTest {
   @Test
   public void TestPartitions() throws CatalogException {
     HdfsTable table =
-        (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypes");
+        (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypes", "test");
     checkAllTypesPartitioning(table, true);
   }
 
@@ -477,7 +480,8 @@ public class CatalogTest {
   @Test
   public void testStats() throws CatalogException {
     // make sure the stats for functional.alltypesagg look correct
-    HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypesAgg");
+    HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypesAgg",
+        "test");
 
     Column idCol = table.getColumn("id");
     assertEquals(idCol.getStats().getAvgSerializedSize(),
@@ -550,7 +554,8 @@ public class CatalogTest {
   public void testColStatsColTypeMismatch() throws Exception {
     // First load a table that has column stats.
     //catalog_.refreshTable("functional", "alltypesagg", false);
-    HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "alltypesagg");
+    HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "alltypesagg",
+        "test");
 
     // Now attempt to update a column's stats with mismatched stats data and ensure
     // we get the expected results.
@@ -643,7 +648,7 @@ public class CatalogTest {
       gflags.setPull_incremental_statistics(true);
 
       // Partitioned table with stats. Load the table prior to fetching.
-      catalog_.getOrLoadTable("functional", "alltypesagg");
+      catalog_.getOrLoadTable("functional", "alltypesagg", "test");
       expectStatistics("functional", "alltypesagg", 11);
 
       // Partitioned table with stats. Invalidate the table prior to fetching.
@@ -675,7 +680,7 @@ public class CatalogTest {
   public void testInternalHBaseTable() throws CatalogException {
     // Cast will fail if table not an HBaseTable
    HBaseTable table = (HBaseTable)
-       catalog_.getOrLoadTable("functional_hbase", "internal_hbase_table");
+       catalog_.getOrLoadTable("functional_hbase", "internal_hbase_table", "test");
     assertNotNull("functional_hbase.internal_hbase_table was not found", table);
   }
 
@@ -687,14 +692,14 @@ public class CatalogTest {
 
   @Test
   public void testCreateTableMetadata() throws CatalogException {
-    Table table = catalog_.getOrLoadTable("functional", "alltypes");
+    Table table = catalog_.getOrLoadTable("functional", "alltypes", "test");
     // Tables are created via Impala so the metadata should have been populated properly.
     // alltypes is an external table.
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
     assertEquals(TableType.EXTERNAL_TABLE.toString(),
         table.getMetaStoreTable().getTableType());
     // alltypesinsert is created using CREATE TABLE LIKE and is a MANAGED table
-    table = catalog_.getOrLoadTable("functional", "alltypesinsert");
+    table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
     assertEquals(TableType.MANAGED_TABLE.toString(),
         table.getMetaStoreTable().getTableType());
@@ -707,7 +712,7 @@ public class CatalogTest {
     Assume.assumeTrue(
         "Skipping this test since it is only supported when running against Hive-2",
         TestUtils.getHiveMajorVersion() == 2);
-    Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl");
+    Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl", "test");
     assertTrue(table instanceof IncompleteTable);
     IncompleteTable incompleteTable = (IncompleteTable) table;
     assertTrue(incompleteTable.getCause() instanceof TableLoadingException);
@@ -718,7 +723,7 @@ public class CatalogTest {
   @Test
   public void testLoadingUnsupportedTableTypes() throws CatalogException {
     // Table with unsupported SerDe library.
-    Table table = catalog_.getOrLoadTable("functional", "bad_serde");
+    Table table = catalog_.getOrLoadTable("functional", "bad_serde", "test");
     assertTrue(table instanceof IncompleteTable);
     IncompleteTable incompleteTable = (IncompleteTable) table;
     assertTrue(incompleteTable.getCause() instanceof TableLoadingException);
@@ -728,7 +733,7 @@ public class CatalogTest {
 
     // Impala does not yet support Hive's LazyBinaryColumnarSerDe which can be
     // used for RCFILE tables.
-    table = catalog_.getOrLoadTable("functional_rc", "rcfile_lazy_binary_serde");
+    table = catalog_.getOrLoadTable("functional_rc", "rcfile_lazy_binary_serde", "test");
     assertTrue(table instanceof IncompleteTable);
     incompleteTable = (IncompleteTable) table;
     assertTrue(incompleteTable.getCause() instanceof TableLoadingException);
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java
index 4410ea5..e20566e 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java
@@ -64,7 +64,7 @@ public class CatalogdTableInvalidatorTest {
             2, /*invalidateTablesOnMemoryPressure=*/false, /*oldGenFullThreshold=*/
             0.6, /*gcInvalidationFraction=*/0.1));
     Assert.assertFalse(catalog_.getDb(dbName).getTable(tblName).isLoaded());
-    Table table = catalog_.getOrLoadTable(dbName, tblName);
+    Table table = catalog_.getOrLoadTable(dbName, tblName, "test");
     Assert.assertTrue(table.isLoaded());
     Assert.assertEquals(ticker.now_, table.getLastUsedTime());
     long previousTriggerCount = catalog_.getCatalogdTableInvalidator().scanCount_.get();
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index d9bce89..9d1ddbe 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -682,7 +682,7 @@ public class MetastoreEventsProcessorTest {
     eventsProcessor_.processEvents();
 
     // Simulate a load table
-    Table tbl = catalog_.getOrLoadTable(dbName, tblName);
+    Table tbl = catalog_.getOrLoadTable(dbName, tblName, "test");
     Partition partition = null;
     if (isPartitionInsert) {
       // Get the partition from metastore. This should now contain the new file.
@@ -2622,7 +2622,7 @@ public class MetastoreEventsProcessorTest {
   }
 
   private Table loadTable(String dbName, String tblName) throws CatalogException {
-    Table loadedTable = catalog_.getOrLoadTable(dbName, tblName);
+    Table loadedTable = catalog_.getOrLoadTable(dbName, tblName, "test");
     assertFalse("Table should have been loaded after getOrLoadTable call",
         loadedTable instanceof IncompleteTable);
     return loadedTable;
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index 81bb9bd..336fc88 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -125,7 +125,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
     // The table was not yet loaded. Load it in to the catalog now.
     Table newTbl = null;
     try {
-      newTbl = srcCatalog_.getOrLoadTable(dbName, tblName);
+      newTbl = srcCatalog_.getOrLoadTable(dbName, tblName, "test");
     } catch (CatalogException e) {
       throw new IllegalStateException("Unexpected table loading failure.", e);
     }