You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/18 16:39:47 UTC
[impala] 02/05: fe: improve logging for metadata loading
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 50e0fbf889465eb288a5ef69477ccbc627681105
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Wed May 29 21:23:39 2019 -0700
fe: improve logging for metadata loading
This annotates various catalogd-internal calls associated with
refreshing and loading metadata with a 'String why' parameter, useful
for logging. This can help understand why a particular piece of metadata
was loaded, and in the case of REFRESH calls, who issued the original
refresh.
Additionally, some of the log statements were improved to add a bit of
extra detail such as the list of partitions being refreshed
(abbreviated) and whether or not the table schema is being refreshed.
Change-Id: I4d90a5f075b05d2dc96692b3349abe35ce24b8b8
Reviewed-on: http://gerrit.cloudera.org:8080/13463
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
.../apache/impala/analysis/ResetMetadataStmt.java | 11 ++-
.../impala/catalog/CatalogServiceCatalog.java | 35 +++----
.../org/apache/impala/catalog/DataSourceTable.java | 3 +-
.../java/org/apache/impala/catalog/HBaseTable.java | 3 +-
.../java/org/apache/impala/catalog/HdfsTable.java | 27 ++++--
.../org/apache/impala/catalog/IncompleteTable.java | 3 +-
.../java/org/apache/impala/catalog/KuduTable.java | 3 +-
.../main/java/org/apache/impala/catalog/Table.java | 3 +-
.../org/apache/impala/catalog/TableLoader.java | 7 +-
.../org/apache/impala/catalog/TableLoadingMgr.java | 24 +++--
.../main/java/org/apache/impala/catalog/View.java | 3 +-
.../impala/catalog/events/MetastoreEvents.java | 15 ++-
.../apache/impala/service/CatalogOpExecutor.java | 91 ++++++++++++-------
.../catalog/CatalogObjectToFromThriftTest.java | 14 +--
.../org/apache/impala/catalog/CatalogTest.java | 101 +++++++++++----------
.../catalog/CatalogdTableInvalidatorTest.java | 2 +-
.../events/MetastoreEventsProcessorTest.java | 4 +-
.../apache/impala/testutil/ImpaladTestCatalog.java | 2 +-
18 files changed, 207 insertions(+), 144 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index 2ca4cf5..ac4e69e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -21,8 +21,11 @@ import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.impala.authorization.Privilege;
+import org.apache.impala.authorization.User;
import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.InternalException;
import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TCatalogServiceRequestHeader;
import org.apache.impala.thrift.TResetMetadataRequest;
import org.apache.impala.thrift.TTableName;
@@ -68,6 +71,9 @@ public class ResetMetadataStmt extends StatementBase {
// The type of action.
private final Action action_;
+ // Set during analysis.
+ private User requestingUser_;
+
private ResetMetadataStmt(Action action, String db, TableName tableName,
PartitionSpec partitionSpec) {
Preconditions.checkNotNull(action);
@@ -124,6 +130,7 @@ public class ResetMetadataStmt extends StatementBase {
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
+ requestingUser_ = analyzer.getUser();
switch (action_) {
case INVALIDATE_METADATA_TABLE:
case REFRESH_TABLE:
@@ -214,8 +221,10 @@ public class ResetMetadataStmt extends StatementBase {
return result.toString();
}
- public TResetMetadataRequest toThrift() {
+ public TResetMetadataRequest toThrift() throws InternalException {
TResetMetadataRequest params = new TResetMetadataRequest();
+ params.setHeader(new TCatalogServiceRequestHeader());
+ params.header.setRequesting_user(requestingUser_.getShortName());
params.setIs_refresh(action_.isRefresh());
if (tableName_ != null) {
params.setTable_name(new TTableName(tableName_.getDb(), tableName_.getTbl()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 8c4678f..db143fb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.hadoop.fs.Hdfs;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@@ -481,8 +480,8 @@ public class CatalogServiceCatalog extends Catalog {
* Adds a list of cache directive IDs for the given table name. Asynchronously
* refreshes the table metadata once all cache directives complete.
*/
- public void watchCacheDirs(List<Long> dirIds, TTableName tblName) {
- tableLoadingMgr_.watchCacheDirs(dirIds, tblName);
+ public void watchCacheDirs(List<Long> dirIds, TTableName tblName, String reason) {
+ tableLoadingMgr_.watchCacheDirs(dirIds, tblName, reason);
}
/**
@@ -512,7 +511,8 @@ public class CatalogServiceCatalog extends Catalog {
TTableName tableName = request.table_name;
LOG.info("Fetching partition statistics for: " + tableName.getDb_name() + "."
+ tableName.getTable_name());
- Table table = getOrLoadTable(tableName.db_name, tableName.table_name);
+ Table table = getOrLoadTable(tableName.db_name, tableName.table_name,
+ "needed to fetch partition stats");
// Table could be null if it does not exist anymore.
if (table == null) {
@@ -1695,7 +1695,7 @@ public class CatalogServiceCatalog extends Catalog {
* and the current cached value will be returned. This may mean that a missing table
* (not yet loaded table) will be returned.
*/
- public Table getOrLoadTable(String dbName, String tblName)
+ public Table getOrLoadTable(String dbName, String tblName, String reason)
throws CatalogException {
TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
TableLoadingMgr.LoadRequest loadReq;
@@ -1707,7 +1707,7 @@ public class CatalogServiceCatalog extends Catalog {
Table tbl = getTable(dbName, tblName);
if (tbl == null || tbl.isLoaded()) return tbl;
previousCatalogVersion = tbl.getCatalogVersion();
- loadReq = tableLoadingMgr_.loadAsync(tableName);
+ loadReq = tableLoadingMgr_.loadAsync(tableName, reason);
} finally {
versionLock_.readLock().unlock();
}
@@ -1951,7 +1951,7 @@ public class CatalogServiceCatalog extends Catalog {
* metadata load from concurrent table modifications and assigns a new catalog version.
* Throws a CatalogException if there is an error loading table metadata.
*/
- public TCatalogObject reloadTable(Table tbl) throws CatalogException {
+ public TCatalogObject reloadTable(Table tbl, String reason) throws CatalogException {
LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName()));
Preconditions.checkState(!(tbl instanceof IncompleteTable));
String dbName = tbl.getDb().getName();
@@ -1973,7 +1973,7 @@ public class CatalogServiceCatalog extends Catalog {
throw new TableLoadingException("Error loading metadata for table: " +
dbName + "." + tblName, e);
}
- tbl.load(true, msClient.getHiveClient(), msTbl);
+ tbl.load(true, msClient.getHiveClient(), msTbl, reason);
}
tbl.setCatalogVersion(newCatalogVersion);
LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
@@ -2146,10 +2146,10 @@ public class CatalogServiceCatalog extends Catalog {
* @throws DatabaseNotFoundException if Db doesn't exist.
*/
public boolean reloadPartitionIfExists(String dbName, String tblName,
- List<TPartitionKeyValue> tPartSpec) throws CatalogException {
+ List<TPartitionKeyValue> tPartSpec, String reason) throws CatalogException {
Table table = getTable(dbName, tblName);
if (table == null || table instanceof IncompleteTable) return false;
- reloadPartition(table, tPartSpec);
+ reloadPartition(table, tPartSpec, reason);
return true;
}
@@ -2158,11 +2158,11 @@ public class CatalogServiceCatalog extends Catalog {
* otherwise. Throws CatalogException if reloadTable() is unsuccessful. Throws
* DatabaseNotFoundException if Db doesn't exist.
*/
- public boolean reloadTableIfExists(String dbName, String tblName)
+ public boolean reloadTableIfExists(String dbName, String tblName, String reason)
throws CatalogException {
Table table = getTable(dbName, tblName);
if (table == null || table instanceof IncompleteTable) return false;
- reloadTable(table);
+ reloadTable(table, reason);
return true;
}
@@ -2467,8 +2467,8 @@ public class CatalogServiceCatalog extends Catalog {
* 'partitionSpec' in table 'tbl'. Returns the resulting table's TCatalogObject after
* the partition metadata was reloaded.
*/
- public TCatalogObject reloadPartition(Table tbl, List<TPartitionKeyValue> partitionSpec)
- throws CatalogException {
+ public TCatalogObject reloadPartition(Table tbl,
+ List<TPartitionKeyValue> partitionSpec, String reason) throws CatalogException {
if (!tryLockTable(tbl)) {
throw new CatalogException(String.format("Error reloading partition of table %s " +
"due to lock contention", tbl.getFullName()));
@@ -2484,8 +2484,8 @@ public class CatalogServiceCatalog extends Catalog {
String partitionName = hdfsPartition == null
? HdfsTable.constructPartitionName(partitionSpec)
: hdfsPartition.getPartitionName();
- LOG.info(String.format("Refreshing partition metadata: %s %s",
- hdfsTable.getFullName(), partitionName));
+ LOG.info(String.format("Refreshing partition metadata: %s %s (%s)",
+ hdfsTable.getFullName(), partitionName, reason));
try (MetaStoreClient msClient = getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
try {
@@ -2789,7 +2789,8 @@ public class CatalogServiceCatalog extends Catalog {
Table table;
try {
table = getOrLoadTable(
- objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name());
+ objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
+ "needed by coordinator");
} catch (DatabaseNotFoundException e) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
index ee90c6f..4bf6d68 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
@@ -171,7 +171,8 @@ public class DataSourceTable extends Table implements FeDataSourceTable {
@Override
public void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+ org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+ throws TableLoadingException {
Preconditions.checkNotNull(msTbl);
msTable_ = msTbl;
clearColumns();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
index 0b81b5d..d75afa7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
@@ -98,7 +98,8 @@ public class HBaseTable extends Table implements FeHBaseTable {
*/
@Override
public void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+ org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+ throws TableLoadingException {
Preconditions.checkNotNull(getMetaStoreTable());
try (Timer.Context timer = getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time()) {
msTable_ = msTbl;
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 5dc0f03..980cf47 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -91,6 +91,7 @@ import org.slf4j.LoggerFactory;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
@@ -615,7 +616,15 @@ public class HdfsTable extends Table implements FeFsTable {
// - how many block locations did we reuse/load individually/load via batch
// - how many partitions did we read metadata for
// - etc...
- LOG.info("Loaded file and block metadata for {}", getFullName());
+ String partNames = Joiner.on(", ").join(
+ Iterables.limit(Iterables.transform(parts, HdfsPartition::getPartitionName), 3));
+ if (partsByPath.size() > 3) {
+ partNames += String.format(", and %s others",
+ Iterables.size(parts) - 3);
+ }
+
+ LOG.info("Loaded file and block metadata for {} partitions: {}", getFullName(),
+ partNames);
}
/**
@@ -878,8 +887,9 @@ public class HdfsTable extends Table implements FeFsTable {
@Override
public void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
- load(reuseMetadata, client, msTbl, true, true, null);
+ org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+ throws TableLoadingException {
+ load(reuseMetadata, client, msTbl, true, true, null, reason);
}
/**
@@ -909,13 +919,15 @@ public class HdfsTable extends Table implements FeFsTable {
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl,
boolean loadParitionFileMetadata, boolean loadTableSchema,
- Set<String> partitionsToUpdate) throws TableLoadingException {
+ Set<String> partitionsToUpdate, String reason) throws TableLoadingException {
final Timer.Context context =
getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
- String annotation = String.format("%s metadata for %s partition(s) of %s.%s",
+ String annotation = String.format("%s metadata for %s%s partition(s) of %s.%s (%s)",
reuseMetadata ? "Reloading" : "Loading",
+ loadTableSchema ? "table definition and " : "",
partitionsToUpdate == null ? "all" : String.valueOf(partitionsToUpdate.size()),
- msTbl.getDbName(), msTbl.getTableName());
+ msTbl.getDbName(), msTbl.getTableName(), reason);
+ LOG.info(annotation);;
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
// turn all exceptions into TableLoadingException
msTable_ = msTbl;
@@ -932,7 +944,6 @@ public class HdfsTable extends Table implements FeFsTable {
// Load partition and file metadata
if (reuseMetadata) {
// Incrementally update this table's partitions and file metadata
- LOG.info("Incrementally loading table metadata for: " + getFullName());
Preconditions.checkState(
partitionsToUpdate == null || loadParitionFileMetadata);
updateMdFromHmsTable(msTbl);
@@ -944,8 +955,8 @@ public class HdfsTable extends Table implements FeFsTable {
}
LOG.info("Incrementally loaded table metadata for: " + getFullName());
} else {
- // Load all partitions from Hive Metastore, including file metadata.
LOG.info("Fetching partition metadata from the Metastore: " + getFullName());
+ // Load all partitions from Hive Metastore, including file metadata.
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
MetaStoreUtil.fetchAllPartitions(
client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
diff --git a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
index ac2930e..be8ac97 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
@@ -80,7 +80,8 @@ public class IncompleteTable extends Table implements FeIncompleteTable {
@Override
public void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+ org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+ throws TableLoadingException {
if (cause_ instanceof TableLoadingException) {
throw (TableLoadingException) cause_;
} else {
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 329c5aa..338c979 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -289,7 +289,8 @@ public class KuduTable extends Table implements FeKuduTable {
*/
@Override
public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+ org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+ throws TableLoadingException {
final Timer.Context context =
getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
try {
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index f3ddd63..b0264c2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -219,7 +219,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
* valid existing metadata.
*/
public abstract void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException;
+ org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+ throws TableLoadingException;
/**
* Sets 'tableStats_' by extracting the table statistics from the given HMS table.
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index 1c6c815..db29a7b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -17,7 +17,6 @@
package org.apache.impala.catalog;
-import java.util.EnumSet;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.metastore.TableType;
@@ -54,10 +53,10 @@ public class TableLoader {
* Returns new instance of Table, If there were any errors loading the table metadata
* an IncompleteTable will be returned that contains details on the error.
*/
- public Table load(Db db, String tblName) {
+ public Table load(Db db, String tblName, String reason) {
Stopwatch sw = new Stopwatch().start();
String fullTblName = db.getName() + "." + tblName;
- String annotation = "Loading metadata for: " + fullTblName;
+ String annotation = "Loading metadata for: " + fullTblName + " (" + reason + ")";
LOG.info(annotation);
Table table;
// turn all exceptions into TableLoadingException
@@ -81,7 +80,7 @@ public class TableLoader {
throw new TableLoadingException(
"Unrecognized table type for table: " + fullTblName);
}
- table.load(false, msClient.getHiveClient(), msTbl);
+ table.load(false, msClient.getHiveClient(), msTbl, reason);
table.validate();
} catch (TableLoadingException e) {
table = IncompleteTable.createFailedMetadataLoadTable(db, tblName, e);
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
index b79af30..e2ad513 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
@@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.log4j.Logger;
@@ -151,8 +152,8 @@ public class TableLoadingMgr {
// Tables for the async refresh thread to process. Synchronization must be handled
// externally.
- private final LinkedBlockingQueue<TTableName> refreshThreadWork_ =
- new LinkedBlockingQueue<TTableName>();
+ private final LinkedBlockingQueue<Pair<TTableName, String>> refreshThreadWork_ =
+ new LinkedBlockingQueue<>();
private final CatalogServiceCatalog catalog_;
private final TableLoader tblLoader_;
@@ -172,7 +173,8 @@ public class TableLoadingMgr {
@Override
public Void call() throws Exception {
while(true) {
- execAsyncRefreshWork(refreshThreadWork_.take());
+ Pair<TTableName, String> work = refreshThreadWork_.take();
+ execAsyncRefreshWork(work.first, /* reason=*/work.second);
}
}});
}
@@ -206,7 +208,8 @@ public class TableLoadingMgr {
* asyncRefreshThread_ will refresh the table metadata. After processing the
* request the watch will be deleted.
*/
- public void watchCacheDirs(List<Long> cacheDirIds, final TTableName tblName) {
+ public void watchCacheDirs(List<Long> cacheDirIds, final TTableName tblName,
+ final String reason) {
synchronized (pendingTableCacheDirs_) {
// A single table may have multiple pending cache requests since one request
// gets submitted per-partition.
@@ -214,7 +217,7 @@ public class TableLoadingMgr {
if (existingCacheReqIds == null) {
existingCacheReqIds = cacheDirIds;
pendingTableCacheDirs_.put(tblName, cacheDirIds);
- refreshThreadWork_.add(tblName);
+ refreshThreadWork_.add(Pair.create(tblName, reason));
} else {
existingCacheReqIds.addAll(cacheDirIds);
}
@@ -227,7 +230,7 @@ public class TableLoadingMgr {
* the same underlying loading task (Future) will be used, helping to prevent duplicate
* loads of the same table.
*/
- public LoadRequest loadAsync(final TTableName tblName)
+ public LoadRequest loadAsync(final TTableName tblName, final String reason)
throws DatabaseNotFoundException {
final Db parentDb = catalog_.getDb(tblName.getDb_name());
if (parentDb == null) {
@@ -238,7 +241,7 @@ public class TableLoadingMgr {
FutureTask<Table> tableLoadTask = new FutureTask<Table>(new Callable<Table>() {
@Override
public Table call() throws Exception {
- return tblLoader_.load(parentDb, tblName.table_name);
+ return tblLoader_.load(parentDb, tblName.table_name, reason);
}});
FutureTask<Table> existingValue = loadingTables_.putIfAbsent(tblName, tableLoadTask);
@@ -297,7 +300,8 @@ public class TableLoadingMgr {
try {
// TODO: Instead of calling "getOrLoad" here we could call "loadAsync". We would
// just need to add a mechanism for moving loaded tables into the Catalog.
- catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name());
+ catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name(),
+ "background load");
} catch (CatalogException e) {
// Ignore.
} finally {
@@ -312,12 +316,12 @@ public class TableLoadingMgr {
* anyway, and if the table failed to load, then we do not want to hide errors by
* reloading it 'silently' in response to the completion of an HDFS caching request.
*/
- private void execAsyncRefreshWork(TTableName tblName) {
+ private void execAsyncRefreshWork(TTableName tblName, String reason) {
if (!waitForCacheDirs(tblName)) return;
try {
Table tbl = catalog_.getTable(tblName.getDb_name(), tblName.getTable_name());
if (tbl == null || tbl instanceof IncompleteTable || !tbl.isLoaded()) return;
- catalog_.reloadTable(tbl);
+ catalog_.reloadTable(tbl, reason);
} catch (CatalogException e) {
LOG.error("Error reloading cached table: ", e);
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/View.java b/fe/src/main/java/org/apache/impala/catalog/View.java
index abc6be8..a705f91 100644
--- a/fe/src/main/java/org/apache/impala/catalog/View.java
+++ b/fe/src/main/java/org/apache/impala/catalog/View.java
@@ -82,7 +82,8 @@ public class View extends Table implements FeView {
@Override
public void load(boolean reuseMetadata, IMetaStoreClient client,
- org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
+ org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+ throws TableLoadingException {
try {
clearColumns();
msTable_ = msTbl;
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 07251c9..d626b52 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -779,7 +779,8 @@ public class MetastoreEvents {
// Ignore event if table or database is not in catalog. Throw exception if
// refresh fails. If the partition does not exist in metastore the reload
// method below removes it from the catalog
- if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec)) {
+ if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+ "processing partition-level INSERT event from HMS")) {
debugLog("Refresh of table {} partition {} after insert "
+ "event failed as the table is not present in the catalog.",
getFullyQualifiedTblName(), (tPartSpec));
@@ -811,7 +812,8 @@ public class MetastoreEvents {
try {
// Ignore event if table or database is not in the catalog. Throw exception if
// refresh fails.
- if (!catalog_.reloadTableIfExists(dbName_, tblName_)) {
+ if (!catalog_.reloadTableIfExists(dbName_, tblName_,
+ "processing table-level INSERT event from HMS")) {
debugLog("Automatic refresh table {} failed as the table is not "
+ "present either catalog or metastore.", getFullyQualifiedTblName());
} else {
@@ -1366,7 +1368,8 @@ public class MetastoreEvents {
for (Partition partition : addedPartitions_) {
List<TPartitionKeyValue> tPartSpec =
getTPartitionSpecFromHmsPartition(msTbl_, partition);
- if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec)) {
+ if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+ "processing ADD_PARTITION event from HMS")) {
debugLog("Refresh partitions on table {} failed "
+ "as table was not present in the catalog.", getFullyQualifiedTblName());
success = false;
@@ -1453,7 +1456,8 @@ public class MetastoreEvents {
// Ignore event if table or database is not in catalog. Throw exception if
// refresh fails.
- if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec)) {
+ if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+ "processing ALTER_PARTITION event from HMS")) {
debugLog("Refresh of table {} partition {} failed as the table "
+ "is not present in the catalog.", getFullyQualifiedTblName(),
constructPartitionStringFromTPartitionSpec(tPartSpec));
@@ -1538,7 +1542,8 @@ public class MetastoreEvents {
for (Map.Entry<String, String> entry : partSpec.entrySet()) {
tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
}
- if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec)) {
+ if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+ "processing DROP_PARTITION event from HMS")) {
debugLog("Could not refresh partition {} of table {} as table "
+ "was not present in the catalog.",
getFullyQualifiedTblName());
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 2dfca63..ce4570b 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -512,7 +512,8 @@ public class CatalogOpExecutor {
Reference<Long> numUpdatedPartitions = new Reference<>(0L);
TableName tableName = TableName.fromThrift(params.getTable_name());
- Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
+ Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
+ "Load for ALTER TABLE");
tryLock(tbl);
// Get a new catalog version to assign to the table being altered.
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
@@ -699,7 +700,7 @@ public class CatalogOpExecutor {
if (reloadMetadata) {
loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata,
- reloadTableSchema, null);
+ reloadTableSchema, null, "ALTER TABLE " + params.getAlter_type().name());
addTableToCatalogUpdate(tbl, response.result);
}
// now that HMS alter operation has succeeded, add this version to list of inflight
@@ -766,7 +767,8 @@ public class CatalogOpExecutor {
params.getAlter_type());
}
- loadTableMetadata(tbl, newCatalogVersion, true, true, null);
+ loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER KUDU TABLE " +
+ params.getAlter_type().name());
addTableToCatalogUpdate(tbl, response.result);
}
@@ -778,16 +780,16 @@ public class CatalogOpExecutor {
*/
private void loadTableMetadata(Table tbl, long newCatalogVersion,
boolean reloadFileMetadata, boolean reloadTableSchema,
- Set<String> partitionsToUpdate) throws CatalogException {
+ Set<String> partitionsToUpdate, String reason) throws CatalogException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Table msTbl =
getMetaStoreTable(msClient, tbl);
if (tbl instanceof HdfsTable) {
((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
- reloadFileMetadata, reloadTableSchema, partitionsToUpdate);
+ reloadFileMetadata, reloadTableSchema, partitionsToUpdate, reason);
} else {
- tbl.load(true, msClient.getHiveClient(), msTbl);
+ tbl.load(true, msClient.getHiveClient(), msTbl, reason);
}
}
tbl.setCatalogVersion(newCatalogVersion);
@@ -832,7 +834,8 @@ public class CatalogOpExecutor {
Preconditions.checkState(params.getColumns() != null &&
params.getColumns().size() > 0,
"Null or empty column list given as argument to DdlExecutor.alterView");
- Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
+ Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
+ "Load for ALTER VIEW");
Preconditions.checkState(tbl instanceof View, "Expected view: %s",
tableName);
tryLock(tbl);
@@ -859,7 +862,7 @@ public class CatalogOpExecutor {
}
applyAlterTable(msTbl, true);
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
- tbl.load(true, msClient.getHiveClient(), msTbl);
+ tbl.load(true, msClient.getHiveClient(), msTbl, "ALTER VIEW");
}
addSummary(resp, "View has been altered.");
tbl.setCatalogVersion(newCatalogVersion);
@@ -1282,7 +1285,7 @@ public class CatalogOpExecutor {
private void dropStats(TDropStatsParams params, TDdlExecResponse resp)
throws ImpalaException {
Table table = getExistingTable(params.getTable_name().getDb_name(),
- params.getTable_name().getTable_name());
+ params.getTable_name().getTable_name(), "Load for DROP STATS");
Preconditions.checkNotNull(table);
if (!catalog_.tryLockTable(table)) {
throw new InternalException(String.format("Error dropping stats for table %s " +
@@ -1316,7 +1319,7 @@ public class CatalogOpExecutor {
}
}
}
- loadTableMetadata(table, newCatalogVersion, false, true, null);
+ loadTableMetadata(table, newCatalogVersion, false, true, null, "DROP STATS");
addTableToCatalogUpdate(table, resp.result);
addSummary(resp, "Stats have been dropped.");
} finally {
@@ -1539,7 +1542,8 @@ public class CatalogOpExecutor {
// In the LocalCatalog configuration, however, this is often necessary.
try {
catalog_.getOrLoadTable(params.getTable_name().db_name,
- params.getTable_name().table_name);
+ params.getTable_name().table_name, "Load for DROP TABLE/VIEW");
+
} catch (CatalogException e) {
// Ignore exceptions -- the above was just to trigger loading. Failure to load
// or non-existence of the database will be handled down below.
@@ -1686,7 +1690,8 @@ public class CatalogOpExecutor {
TTableName tblName = params.getTable_name();
Table table = null;
try {
- table = getExistingTable(tblName.getDb_name(), tblName.getTable_name());
+ table = getExistingTable(tblName.getDb_name(), tblName.getTable_name(),
+ "Load for TRUNCATE TABLE");
} catch (TableNotFoundException e) {
if (params.if_exists) {
addSummary(resp, "Table does not exist.");
@@ -1724,7 +1729,7 @@ public class CatalogOpExecutor {
}
addSummary(resp, "Table has been truncated.");
- loadTableMetadata(table, newCatalogVersion, true, true, null);
+ loadTableMetadata(table, newCatalogVersion, true, true, null, "TRUNCATE");
addTableToCatalogUpdate(table, resp.result);
} finally {
Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
@@ -2014,7 +2019,8 @@ public class CatalogOpExecutor {
long id = HdfsCachingUtil.submitCacheTblDirective(newTable,
cacheOp.getCache_pool_name(), replication);
catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
- new TTableName(newTable.getDbName(), newTable.getTableName()));
+ new TTableName(newTable.getDbName(), newTable.getTableName()),
+ "CREATE TABLE CACHED");
applyAlterTable(newTable, true);
}
Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
@@ -2110,7 +2116,8 @@ public class CatalogOpExecutor {
}
return;
}
- Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl());
+ Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl(),
+ "Load source for CREATE TABLE LIKE");
org.apache.hadoop.hive.metastore.api.Table tbl =
srcTable.getMetaStoreTable().deepCopy();
Preconditions.checkState(!KuduTable.isKuduTable(tbl),
@@ -2464,7 +2471,8 @@ public class CatalogOpExecutor {
// Update the partition metadata to include the cache directive id.
if (!cacheIds.isEmpty()) {
applyAlterHmsPartitions(msTbl, msClient, tableName, hmsPartitionsToCache);
- catalog_.watchCacheDirs(cacheIds, tableName.toThrift());
+ catalog_.watchCacheDirs(cacheIds, tableName.toThrift(),
+ "ALTER TABLE CACHE PARTITIONS");
}
}
@@ -2924,7 +2932,8 @@ public class CatalogOpExecutor {
// Submit a request to watch these cache directives. The TableLoadingMgr will
// asynchronously refresh the table metadata once the directives complete.
- catalog_.watchCacheDirs(cacheDirIds, tableName.toThrift());
+ catalog_.watchCacheDirs(cacheDirIds, tableName.toThrift(),
+ "ALTER TABLE SET CACHED");
} else {
// Uncache the table.
if (cacheDirId != null) HdfsCachingUtil.removeTblCacheDirective(msTbl);
@@ -2995,7 +3004,8 @@ public class CatalogOpExecutor {
// Once the cache directives are submitted, observe the status of the caching
// until no more progress is made -- either fully cached or out of cache memory
if (!cacheDirs.isEmpty()) {
- catalog_.watchCacheDirs(cacheDirs, tableName.toThrift());
+ catalog_.watchCacheDirs(cacheDirs, tableName.toThrift(),
+ "ALTER PARTITION SET CACHED");
}
if (!partition.isMarkedCached()) {
modifiedParts.add(partition);
@@ -3080,7 +3090,8 @@ public class CatalogOpExecutor {
String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e);
}
if (!cacheIds.isEmpty()) {
- catalog_.watchCacheDirs(cacheIds, tableName.toThrift());
+ catalog_.watchCacheDirs(cacheIds, tableName.toThrift(),
+ "ALTER TABLE RECOVER PARTITIONS");
}
}
@@ -3493,6 +3504,9 @@ public class CatalogOpExecutor {
*/
public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)
throws CatalogException {
+ String cmdString = String.format("%s issued by %s",
+ req.is_refresh ? "REFRESH":"INVALIDATE",
+ req.header != null ? req.header.requesting_user : " unknown user");
TResetMetadataResponse resp = new TResetMetadataResponse();
resp.setResult(new TCatalogUpdateResult());
resp.getResult().setCatalog_service_id(JniCatalog.getServiceId());
@@ -3529,13 +3543,15 @@ public class CatalogOpExecutor {
// If the table is not loaded, no need to perform refresh after the initial
// metadata load.
boolean needsRefresh = tbl.isLoaded();
- tbl = getExistingTable(tblName.getDb(), tblName.getTbl());
+ tbl = getExistingTable(tblName.getDb(), tblName.getTbl(),
+ "Load triggered by " + cmdString);
if (tbl != null) {
if (needsRefresh) {
if (req.isSetPartition_spec()) {
- updatedThriftTable = catalog_.reloadPartition(tbl, req.getPartition_spec());
+ updatedThriftTable = catalog_.reloadPartition(tbl,
+ req.getPartition_spec(), cmdString);
} else {
- updatedThriftTable = catalog_.reloadTable(tbl);
+ updatedThriftTable = catalog_.reloadTable(tbl, cmdString);
}
} else {
// Table was loaded from scratch, so it's already "refreshed".
@@ -3610,7 +3626,8 @@ public class CatalogOpExecutor {
throws ImpalaException {
TUpdateCatalogResponse response = new TUpdateCatalogResponse();
// Only update metastore for Hdfs tables.
- Table table = getExistingTable(update.getDb_name(), update.getTarget_table());
+ Table table = getExistingTable(update.getDb_name(), update.getTarget_table(),
+ "Load for INSERT");
if (!(table instanceof HdfsTable)) {
throw new InternalException("Unexpected table type: " +
update.getTarget_table());
@@ -3658,14 +3675,14 @@ public class CatalogOpExecutor {
// consistent.
String partName = partition.getPartitionName() + "/";
- // Attempt to remove this partition name from from partsToCreate. If remove
+ // Attempt to remove this partition name from partsToCreate. If remove
// returns true, it indicates the partition already exists.
if (partsToCreate.remove(partName)) {
affectedExistingPartitions.add(partition);
if (partition.isMarkedCached()) {
- // The partition was targeted by the insert and is also a cached. Since
+ // The partition was targeted by the insert and is also cached. Since
// data was written to the partition, a watch needs to be placed on the
- // cache cache directive so the TableLoadingMgr can perform an async
+ // cache directive so the TableLoadingMgr can perform an async
// refresh once all data becomes cached.
cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId(
partition.getParameters()));
@@ -3762,7 +3779,8 @@ public class CatalogOpExecutor {
}
// Submit the watch request for the given cache directives.
if (!cacheDirIds.isEmpty()) {
- catalog_.watchCacheDirs(cacheDirIds, tblName.toThrift());
+ catalog_.watchCacheDirs(cacheDirIds, tblName.toThrift(),
+ "INSERT into cached partitions");
}
response.setResult(new TCatalogUpdateResult());
@@ -3777,7 +3795,8 @@ public class CatalogOpExecutor {
new TStatus(TErrorCode.OK, new ArrayList<String>()));
}
- loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata);
+ loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata,
+ "INSERT");
// After loading metadata, fire insert events if external event processing is
// enabled.
createInsertEvents(table, affectedExistingPartitions, update.is_overwrite);
@@ -3876,8 +3895,9 @@ public class CatalogOpExecutor {
* TODO: Track object IDs to
* know when a table has been dropped and re-created with the same name.
*/
- private Table getExistingTable(String dbName, String tblName) throws CatalogException {
- Table tbl = catalog_.getOrLoadTable(dbName, tblName);
+ private Table getExistingTable(String dbName, String tblName, String reason)
+ throws CatalogException {
+ Table tbl = catalog_.getOrLoadTable(dbName, tblName, reason);
if (tbl == null) {
throw new TableNotFoundException("Table not found: " + dbName + "." + tblName);
}
@@ -4017,7 +4037,8 @@ public class CatalogOpExecutor {
private void alterCommentOnTableOrView(TableName tableName, String comment,
TDdlExecResponse response) throws CatalogException, InternalException,
ImpalaRuntimeException {
- Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
+ Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
+ "Load for ALTER COMMENT");
tryLock(tbl);
try {
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
@@ -4034,7 +4055,7 @@ public class CatalogOpExecutor {
msTbl.getParameters().put("comment", comment);
}
applyAlterTable(msTbl, true);
- loadTableMetadata(tbl, newCatalogVersion, false, false, null);
+ loadTableMetadata(tbl, newCatalogVersion, false, false, null, "ALTER COMMENT");
addTableToCatalogUpdate(tbl, response.result);
addSummary(response, String.format("Updated %s.", (isView) ? "view" : "table"));
} finally {
@@ -4045,7 +4066,8 @@ public class CatalogOpExecutor {
private void alterCommentOnColumn(TableName tableName, String columnName,
String comment, TDdlExecResponse response) throws CatalogException,
InternalException, ImpalaRuntimeException {
- Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl());
+ Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
+ "Load for ALTER COLUMN COMMENT");
tryLock(tbl);
try {
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
@@ -4067,7 +4089,8 @@ public class CatalogOpExecutor {
}
applyAlterTable(msTbl, true);
}
- loadTableMetadata(tbl, newCatalogVersion, false, true, null);
+ loadTableMetadata(tbl, newCatalogVersion, false, true, null,
+ "ALTER COLUMN COMMENT");
addTableToCatalogUpdate(tbl, response.result);
addSummary(response, "Column has been altered.");
} finally {
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 0431373..995f3ba 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -63,7 +63,7 @@ public class CatalogObjectToFromThriftTest {
String[] dbNames = {"functional", "functional_avro", "functional_parquet",
"functional_seq"};
for (String dbName: dbNames) {
- Table table = catalog_.getOrLoadTable(dbName, "alltypes");
+ Table table = catalog_.getOrLoadTable(dbName, "alltypes", "test");
Assert.assertEquals(24, ((HdfsTable)table).getPartitions().size());
Assert.assertEquals(24, ((HdfsTable)table).getPartitionIds().size());
@@ -131,7 +131,7 @@ public class CatalogObjectToFromThriftTest {
@Test
public void TestMismatchedAvroAndTableSchemas() throws CatalogException {
Table table = catalog_.getOrLoadTable("functional_avro_snap",
- "schema_resolution_test");
+ "schema_resolution_test", "test");
TTable thriftTable = getThriftTable(table);
Assert.assertEquals(thriftTable.tbl_name, "schema_resolution_test");
Assert.assertTrue(thriftTable.isSetTable_type());
@@ -151,7 +151,7 @@ public class CatalogObjectToFromThriftTest {
@Test
public void TestHBaseTables() throws CatalogException {
String dbName = "functional_hbase";
- Table table = catalog_.getOrLoadTable(dbName, "alltypes");
+ Table table = catalog_.getOrLoadTable(dbName, "alltypes", "test");
TTable thriftTable = getThriftTable(table);
Assert.assertEquals(thriftTable.tbl_name, "alltypes");
Assert.assertEquals(thriftTable.db_name, dbName);
@@ -180,7 +180,7 @@ public class CatalogObjectToFromThriftTest {
public void TestHBaseTableWithBinaryEncodedCols()
throws CatalogException {
String dbName = "functional_hbase";
- Table table = catalog_.getOrLoadTable(dbName, "alltypessmallbinary");
+ Table table = catalog_.getOrLoadTable(dbName, "alltypessmallbinary", "test");
TTable thriftTable = getThriftTable(table);
Assert.assertEquals(thriftTable.tbl_name, "alltypessmallbinary");
Assert.assertEquals(thriftTable.db_name, dbName);
@@ -217,7 +217,7 @@ public class CatalogObjectToFromThriftTest {
Assume.assumeTrue(
"Skipping this test since it is only supported when running against Hive-2",
TestUtils.getHiveMajorVersion() == 2);
- Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl");
+ Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl", "test");
Assert.assertNotNull(table);
TTable thriftTable = getThriftTable(table);
Assert.assertEquals(thriftTable.tbl_name, "hive_index_tbl");
@@ -226,7 +226,7 @@ public class CatalogObjectToFromThriftTest {
@Test
public void TestTableLoadingErrors() throws ImpalaException {
- Table table = catalog_.getOrLoadTable("functional", "alltypes");
+ Table table = catalog_.getOrLoadTable("functional", "alltypes", "test");
HdfsTable hdfsTable = (HdfsTable) table;
// Get any partition with valid HMS parameters to create a
// dummy partition.
@@ -250,7 +250,7 @@ public class CatalogObjectToFromThriftTest {
@Test
public void TestView() throws CatalogException {
- Table table = catalog_.getOrLoadTable("functional", "view_view");
+ Table table = catalog_.getOrLoadTable("functional", "view_view", "test");
TTable thriftTable = getThriftTable(table);
Assert.assertEquals(thriftTable.tbl_name, "view_view");
Assert.assertEquals(thriftTable.db_name, "functional");
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index ad47845..8fd46f2 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -130,50 +130,52 @@ public class CatalogTest {
Db functionalDb = catalog_.getDb("functional");
assertNotNull(functionalDb);
assertEquals(functionalDb.getName(), "functional");
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypes"));
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view"));
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view_sub"));
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypessmall"));
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserror"));
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserrornonulls"));
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypesagg"));
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypesaggnonulls"));
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypesnopart"));
- assertNotNull(catalog_.getOrLoadTable("functional", "alltypesinsert"));
- assertNotNull(catalog_.getOrLoadTable("functional", "complex_view"));
- assertNotNull(catalog_.getOrLoadTable("functional", "testtbl"));
- assertNotNull(catalog_.getOrLoadTable("functional", "dimtbl"));
- assertNotNull(catalog_.getOrLoadTable("functional", "jointbl"));
- assertNotNull(catalog_.getOrLoadTable("functional", "liketbl"));
- assertNotNull(catalog_.getOrLoadTable("functional", "greptiny"));
- assertNotNull(catalog_.getOrLoadTable("functional", "rankingssmall"));
- assertNotNull(catalog_.getOrLoadTable("functional", "uservisitssmall"));
- assertNotNull(catalog_.getOrLoadTable("functional", "view_view"));
- assertNotNull(catalog_.getOrLoadTable("functional", "date_tbl"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypes", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view_sub", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypessmall", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserror", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserrornonulls", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypesagg", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypesaggnonulls", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypesnopart", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "alltypesinsert", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "complex_view", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "testtbl", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "dimtbl", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "jointbl", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "liketbl", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "greptiny", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "rankingssmall", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "uservisitssmall", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "view_view", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "date_tbl", "test"));
// IMP-163 - table with string partition column does not load if there are partitions
- assertNotNull(catalog_.getOrLoadTable("functional", "StringPartitionKey"));
+ assertNotNull(catalog_.getOrLoadTable("functional", "StringPartitionKey", "test"));
// Test non-existent table
- assertNull(catalog_.getOrLoadTable("functional", "nonexistenttable"));
+ assertNull(catalog_.getOrLoadTable("functional", "nonexistenttable", "test"));
// functional_seq contains the same tables as functional
Db testDb = catalog_.getDb("functional_seq");
assertNotNull(testDb);
assertEquals(testDb.getName(), "functional_seq");
- assertNotNull(catalog_.getOrLoadTable("functional_seq", "alltypes"));
- assertNotNull(catalog_.getOrLoadTable("functional_seq", "testtbl"));
+ assertNotNull(catalog_.getOrLoadTable("functional_seq", "alltypes", "test"));
+ assertNotNull(catalog_.getOrLoadTable("functional_seq", "testtbl", "test"));
Db hbaseDb = catalog_.getDb("functional_hbase");
assertNotNull(hbaseDb);
assertEquals(hbaseDb.getName(), "functional_hbase");
// Loading succeeds for an HBase table that has binary columns and an implicit key
// column mapping
- assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmallbinary"));
- assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmall"));
- assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "hbasealltypeserror"));
+ assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmallbinary",
+ "test"));
+ assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmall", "test"));
+ assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "hbasealltypeserror",
+ "test"));
assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(),
- "hbasealltypeserrornonulls"));
- assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypesagg"));
- assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "stringids"));
+ "hbasealltypeserrornonulls", "test"));
+ assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypesagg", "test"));
+ assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "stringids", "test"));
checkTableCols(functionalDb, "alltypes", 2,
new String[]
@@ -323,8 +325,8 @@ public class CatalogTest {
new Type[] {Type.DATE, Type.INT, Type.DATE});
// case-insensitive lookup
- assertEquals(catalog_.getOrLoadTable("functional", "alltypes"),
- catalog_.getOrLoadTable("functional", "AllTypes"));
+ assertEquals(catalog_.getOrLoadTable("functional", "alltypes", "test"),
+ catalog_.getOrLoadTable("functional", "AllTypes", "test"));
}
// Count of listFiles (list status + blocks) calls
@@ -352,7 +354,8 @@ public class CatalogTest {
/*tblWasRemoved=*/new Reference<Boolean>(),
/*dbWasAdded=*/new Reference<Boolean>());
- HdfsTable table = (HdfsTable)catalog_.getOrLoadTable("functional", "AllTypes");
+ HdfsTable table = (HdfsTable)catalog_.getOrLoadTable("functional", "AllTypes",
+ "test");
StorageStatistics opsCounts = stats.get(DFSOpsCountStatistics.NAME);
// We expect:
@@ -375,7 +378,7 @@ public class CatalogTest {
// Now test REFRESH on the table...
stats.reset();
- catalog_.reloadTable(table);
+ catalog_.reloadTable(table, "test");
// Again, we expect only one getFileStatus call, for the top-level directory.
assertEquals(1L, (long)opsCounts.getLong(GET_FILE_STATUS));
@@ -393,17 +396,17 @@ public class CatalogTest {
List<TPartitionKeyValue> partitionSpec = ImmutableList.of(
new TPartitionKeyValue("year", "2010"),
new TPartitionKeyValue("month", "10"));
- catalog_.reloadPartition(table, partitionSpec);
+ catalog_.reloadPartition(table, partitionSpec, "test");
assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
// Loading or reloading an unpartitioned table with some files in it should not make
// an RPC per file.
stats.reset();
HdfsTable unpartTable = (HdfsTable)catalog_.getOrLoadTable(
- "functional", "alltypesaggmultifilesnopart");
+ "functional", "alltypesaggmultifilesnopart", "test");
assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
stats.reset();
- catalog_.reloadTable(unpartTable);
+ catalog_.reloadTable(unpartTable, "test");
assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
// Simulate an empty partition, which will trigger the full
@@ -414,7 +417,7 @@ public class CatalogTest {
.getPartitionFromThriftPartitionSpec(partitionSpec);
hdfsPartition.setFileDescriptors(new ArrayList<>());
stats.reset();
- catalog_.reloadPartition(table, partitionSpec);
+ catalog_.reloadPartition(table, partitionSpec, "test");
// Should not scan the directory file-by-file, should use a single
// listLocatedStatus() to get the whole directory (partition)
@@ -427,7 +430,7 @@ public class CatalogTest {
@Test
public void TestPartitions() throws CatalogException {
HdfsTable table =
- (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypes");
+ (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypes", "test");
checkAllTypesPartitioning(table, true);
}
@@ -477,7 +480,8 @@ public class CatalogTest {
@Test
public void testStats() throws CatalogException {
// make sure the stats for functional.alltypesagg look correct
- HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypesAgg");
+ HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypesAgg",
+ "test");
Column idCol = table.getColumn("id");
assertEquals(idCol.getStats().getAvgSerializedSize(),
@@ -550,7 +554,8 @@ public class CatalogTest {
public void testColStatsColTypeMismatch() throws Exception {
// First load a table that has column stats.
//catalog_.refreshTable("functional", "alltypesagg", false);
- HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "alltypesagg");
+ HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "alltypesagg",
+ "test");
// Now attempt to update a column's stats with mismatched stats data and ensure
// we get the expected results.
@@ -643,7 +648,7 @@ public class CatalogTest {
gflags.setPull_incremental_statistics(true);
// Partitioned table with stats. Load the table prior to fetching.
- catalog_.getOrLoadTable("functional", "alltypesagg");
+ catalog_.getOrLoadTable("functional", "alltypesagg", "test");
expectStatistics("functional", "alltypesagg", 11);
// Partitioned table with stats. Invalidate the table prior to fetching.
@@ -675,7 +680,7 @@ public class CatalogTest {
public void testInternalHBaseTable() throws CatalogException {
// Cast will fail if table not an HBaseTable
HBaseTable table = (HBaseTable)
- catalog_.getOrLoadTable("functional_hbase", "internal_hbase_table");
+ catalog_.getOrLoadTable("functional_hbase", "internal_hbase_table", "test");
assertNotNull("functional_hbase.internal_hbase_table was not found", table);
}
@@ -687,14 +692,14 @@ public class CatalogTest {
@Test
public void testCreateTableMetadata() throws CatalogException {
- Table table = catalog_.getOrLoadTable("functional", "alltypes");
+ Table table = catalog_.getOrLoadTable("functional", "alltypes", "test");
// Tables are created via Impala so the metadata should have been populated properly.
// alltypes is an external table.
assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
assertEquals(TableType.EXTERNAL_TABLE.toString(),
table.getMetaStoreTable().getTableType());
// alltypesinsert is created using CREATE TABLE LIKE and is a MANAGED table
- table = catalog_.getOrLoadTable("functional", "alltypesinsert");
+ table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
assertEquals(TableType.MANAGED_TABLE.toString(),
table.getMetaStoreTable().getTableType());
@@ -707,7 +712,7 @@ public class CatalogTest {
Assume.assumeTrue(
"Skipping this test since it is only supported when running against Hive-2",
TestUtils.getHiveMajorVersion() == 2);
- Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl");
+ Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl", "test");
assertTrue(table instanceof IncompleteTable);
IncompleteTable incompleteTable = (IncompleteTable) table;
assertTrue(incompleteTable.getCause() instanceof TableLoadingException);
@@ -718,7 +723,7 @@ public class CatalogTest {
@Test
public void testLoadingUnsupportedTableTypes() throws CatalogException {
// Table with unsupported SerDe library.
- Table table = catalog_.getOrLoadTable("functional", "bad_serde");
+ Table table = catalog_.getOrLoadTable("functional", "bad_serde", "test");
assertTrue(table instanceof IncompleteTable);
IncompleteTable incompleteTable = (IncompleteTable) table;
assertTrue(incompleteTable.getCause() instanceof TableLoadingException);
@@ -728,7 +733,7 @@ public class CatalogTest {
// Impala does not yet support Hive's LazyBinaryColumnarSerDe which can be
// used for RCFILE tables.
- table = catalog_.getOrLoadTable("functional_rc", "rcfile_lazy_binary_serde");
+ table = catalog_.getOrLoadTable("functional_rc", "rcfile_lazy_binary_serde", "test");
assertTrue(table instanceof IncompleteTable);
incompleteTable = (IncompleteTable) table;
assertTrue(incompleteTable.getCause() instanceof TableLoadingException);
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java
index 4410ea5..e20566e 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java
@@ -64,7 +64,7 @@ public class CatalogdTableInvalidatorTest {
2, /*invalidateTablesOnMemoryPressure=*/false, /*oldGenFullThreshold=*/
0.6, /*gcInvalidationFraction=*/0.1));
Assert.assertFalse(catalog_.getDb(dbName).getTable(tblName).isLoaded());
- Table table = catalog_.getOrLoadTable(dbName, tblName);
+ Table table = catalog_.getOrLoadTable(dbName, tblName, "test");
Assert.assertTrue(table.isLoaded());
Assert.assertEquals(ticker.now_, table.getLastUsedTime());
long previousTriggerCount = catalog_.getCatalogdTableInvalidator().scanCount_.get();
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index d9bce89..9d1ddbe 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -682,7 +682,7 @@ public class MetastoreEventsProcessorTest {
eventsProcessor_.processEvents();
// Simulate a load table
- Table tbl = catalog_.getOrLoadTable(dbName, tblName);
+ Table tbl = catalog_.getOrLoadTable(dbName, tblName, "test");
Partition partition = null;
if (isPartitionInsert) {
// Get the partition from metastore. This should now contain the new file.
@@ -2622,7 +2622,7 @@ public class MetastoreEventsProcessorTest {
}
private Table loadTable(String dbName, String tblName) throws CatalogException {
- Table loadedTable = catalog_.getOrLoadTable(dbName, tblName);
+ Table loadedTable = catalog_.getOrLoadTable(dbName, tblName, "test");
assertFalse("Table should have been loaded after getOrLoadTable call",
loadedTable instanceof IncompleteTable);
return loadedTable;
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index 81bb9bd..336fc88 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -125,7 +125,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
// The table was not yet loaded. Load it in to the catalog now.
Table newTbl = null;
try {
- newTbl = srcCatalog_.getOrLoadTable(dbName, tblName);
+ newTbl = srcCatalog_.getOrLoadTable(dbName, tblName, "test");
} catch (CatalogException e) {
throw new IllegalStateException("Unexpected table loading failure.", e);
}