You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/23 17:22:17 UTC
[2/3] impala git commit: IMPALA-5152: Introduce metadata loading phase
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
new file mode 100644
index 0000000..be71161
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.impala.catalog.Db;
+import org.apache.impala.catalog.ImpaladCatalog;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.View;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.service.Frontend;
+import org.apache.impala.util.EventSequence;
+import org.apache.impala.util.TUniqueIdUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Loads all table and view metadata relevant for a single SQL statement and returns the
+ * loaded tables in a StmtTableCache. Optionally marks important loading events in an
+ * EventSequence.
+ */
+public class StmtMetadataLoader {
+ private final static Logger LOG = LoggerFactory.getLogger(StmtMetadataLoader.class);
+
+ // Events are triggered when at least the set number of catalog updates have passed.
+ private final long DEBUG_LOGGING_NUM_CATALOG_UPDATES = 10;
+ private final long RETRY_LOAD_NUM_CATALOG_UPDATES = 20;
+
+ private final Frontend fe_;
+ private final String sessionDb_;
+ private final EventSequence timeline_;
+
+ // Results of the loading process. See StmtTableCache.
+ private final Set<String> dbs_ = Sets.newHashSet();
+ private final Map<TableName, Table> loadedTbls_ = Maps.newHashMap();
+
+ // Metrics for the metadata load.
+ // Number of prioritizedLoad() RPCs issued to the catalogd.
+ private int numLoadRequestsSent_ = 0;
+ // Number of catalog topic updates received from the statestore.
+ private int numCatalogUpdatesReceived_ = 0;
+
+ /**
+ * Contains all statement-relevant tables and database names as well as the latest
+ * ImpaladCatalog. An entry in the tables map is guaranteed to point to a loaded
+ * table. This could mean the table was loaded successfully or a load was attempted
+ * but failed. The absence of a table or database name indicates that object was not
+ * in the Catalog at the time this StmtTableCache was generated.
+ */
+ public static final class StmtTableCache {
+ public final ImpaladCatalog catalog;
+ public final Set<String> dbs;
+ public final Map<TableName, Table> tables;
+
+ public StmtTableCache(ImpaladCatalog catalog, Set<String> dbs,
+ Map<TableName, Table> tables) {
+ this.catalog = Preconditions.checkNotNull(catalog);
+ this.dbs = Preconditions.checkNotNull(dbs);
+ this.tables = Preconditions.checkNotNull(tables);
+ validate();
+ }
+
+ private void validate() {
+ // Checks that all entries in 'tables' have a matching entry in 'dbs'.
+ for (TableName tbl: tables.keySet()) {
+ Preconditions.checkState(dbs.contains(tbl.getDb()));
+ }
+ }
+ }
+
+ /**
+ * The 'fe' and 'sessionDb' arguments must be non-null. A null 'timeline' may be passed
+ * if no events should be marked.
+ */
+ public StmtMetadataLoader(Frontend fe, String sessionDb, EventSequence timeline) {
+ fe_ = Preconditions.checkNotNull(fe);
+ sessionDb_ = Preconditions.checkNotNull(sessionDb);
+ timeline_ = timeline;
+ }
+
+ // Getters for testing
+ public EventSequence getTimeline() { return timeline_; }
+ public int getNumLoadRequestsSent() { return numLoadRequestsSent_; }
+ public int getNumCatalogUpdatesReceived() { return numCatalogUpdatesReceived_; }
+
+ /**
+ * Collects and loads all tables and views required to analyze the given statement.
+ * Marks the start and end of metadata loading in 'timeline_' if it is non-NULL.
+ * Must only be called once for a single statement.
+ */
+ public StmtTableCache loadTables(StatementBase stmt) throws InternalException {
+ Set<TableName> requiredTables = collectTableCandidates(stmt);
+ return loadTables(requiredTables);
+ }
+
+ /**
+ * Loads the tables/views with the given names and returns them. As views become
+ * loaded, the set of table/views still to be loaded is expanded based on the view
+ * definitions. For tables/views missing metadata this function issues a loading
+ * request to the catalog server and then waits for the metadata to arrive through
+ * a statestore topic update.
+ * This function succeeds even across catalog restarts for the following reasons:
+ * - The loading process is strictly additive, i.e., a new loaded table may be added
+ * to the 'loadedTbls_' map, but an existing entry is never removed, even if the
+ * equivalent table in the impalad catalog is different.
+ * - Tables on the impalad side are not modified in place. This means that an entry in
+ * the 'loadedTbls_' will always remain in the loaded state.
+ * Tables/views that are already loaded are simply included in the result.
+ * Marks the start and end of metadata loading in 'timeline_' if it is non-NULL.
+ * Must only be called once for a single statement.
+ */
+ public StmtTableCache loadTables(Set<TableName> tbls) throws InternalException {
+ Preconditions.checkState(dbs_.isEmpty() && loadedTbls_.isEmpty());
+ Preconditions.checkState(numLoadRequestsSent_ == 0);
+ Preconditions.checkState(numCatalogUpdatesReceived_ == 0);
+ ImpaladCatalog catalog = fe_.getCatalog();
+ Set<TableName> missingTbls = getMissingTables(catalog, tbls);
+ // There are no missing tables. Return to avoid making an RPC to the CatalogServer
+ // and adding events to the timeline.
+ if (missingTbls.isEmpty()) {
+ if (timeline_ != null) {
+ timeline_.markEvent(
+ String.format("Metadata of all %d tables cached", loadedTbls_.size()));
+ }
+ return new StmtTableCache(catalog, dbs_, loadedTbls_);
+ }
+
+ if (timeline_ != null) timeline_.markEvent("Metadata load started");
+ long startTimeMs = System.currentTimeMillis();
+
+ // All tables for which we have requested a prioritized load.
+ Set<TableName> requestedTbls = Sets.newHashSet();
+
+ // Loading a fixed set of tables happens in two steps:
+ // 1) Issue a loading request RPC to the catalogd.
+ // 2) Wait for the loaded tables to arrive via the statestore.
+ // The second step could take a while and we should avoid repeatedly issuing
+ // redundant RPCs to the catalogd. This flag indicates whether a loading RPC
+ // should be issued. See below for more details in which circumstances this
+ // flag is set to true.
+ boolean issueLoadRequest = true;
+ // Loop until all the missing tables are loaded in the Impalad's catalog cache.
+ // In every iteration of this loop we wait for one catalog update to arrive.
+ while (!missingTbls.isEmpty()) {
+ if (issueLoadRequest) {
+ catalog.prioritizeLoad(missingTbls);
+ ++numLoadRequestsSent_;
+ requestedTbls.addAll(missingTbls);
+ }
+
+ // Catalog may have been restarted, always use the latest reference.
+ ImpaladCatalog currCatalog = fe_.getCatalog();
+ boolean hasCatalogRestarted = currCatalog != catalog;
+ if (hasCatalogRestarted && LOG.isWarnEnabled()) {
+ LOG.warn(String.format(
+ "Catalog restart detected while waiting for table metadata. " +
+ "Current catalog service id: %s. Previous catalog service id: %s",
+ TUniqueIdUtil.PrintId(currCatalog.getCatalogServiceId()),
+ TUniqueIdUtil.PrintId(catalog.getCatalogServiceId())));
+
+ }
+ catalog = currCatalog;
+
+ // Log progress and wait time for debugging.
+ if (hasCatalogRestarted
+ || (numCatalogUpdatesReceived_ > 0
+ && numCatalogUpdatesReceived_ % DEBUG_LOGGING_NUM_CATALOG_UPDATES == 0)) {
+ if (LOG.isInfoEnabled()) {
+ long endTimeMs = System.currentTimeMillis();
+ LOG.info(String.format("Waiting for table metadata. " +
+ "Waited for %d catalog updates and %dms. Tables remaining: %s",
+ numCatalogUpdatesReceived_, endTimeMs - startTimeMs, missingTbls));
+ }
+ }
+
+ // Wait for the next catalog update and then revise the loaded/missing tables.
+ catalog.waitForCatalogUpdate(Frontend.MAX_CATALOG_UPDATE_WAIT_TIME_MS);
+ Set<TableName> newMissingTbls = getMissingTables(catalog, missingTbls);
+ // Issue a load request for the new missing tables in these cases:
+ // 1) Catalog has restarted so all in-flight loads have been lost
+ // 2) There are new missing tables due to view expansion
+ issueLoadRequest = hasCatalogRestarted || !missingTbls.containsAll(newMissingTbls);
+ // 3) Periodically retry to avoid a hang due to anomalies/bugs, e.g.,
+ // a previous load request was somehow lost on the catalog side, or the table
+ // was invalidated after being loaded but before being sent to this impalad
+ if (!issueLoadRequest && numCatalogUpdatesReceived_ > 0
+ && numCatalogUpdatesReceived_ % RETRY_LOAD_NUM_CATALOG_UPDATES == 0) {
+ issueLoadRequest = true;
+ if (LOG.isInfoEnabled()) {
+ long endTimeMs = System.currentTimeMillis();
+ LOG.info(String.format("Re-sending prioritized load request. " +
+ "Waited for %d catalog updates and %dms.",
+ numCatalogUpdatesReceived_, endTimeMs - startTimeMs));
+ }
+ }
+ missingTbls = newMissingTbls;
+ ++numCatalogUpdatesReceived_;
+ }
+ if (timeline_ != null) {
+ timeline_.markEvent(String.format("Metadata load finished. " +
+ "loaded-tables=%d/%d load-requests=%d catalog-updates=%d",
+ requestedTbls.size(), loadedTbls_.size(), numLoadRequestsSent_,
+ numCatalogUpdatesReceived_));
+ }
+
+ return new StmtTableCache(catalog, dbs_, loadedTbls_);
+ }
+
+ /**
+ * Determines whether the 'tbls' are loaded in the given catalog or not. Adds the names
+ * of referenced databases that exist to 'dbs_', and loaded tables to 'loadedTbls_'.
+ * Returns the set of tables that are not loaded. Recursively collects loaded/missing
+ * tables from views. Uses 'sessionDb_' to construct table candidates from views with
+ * Path.getCandidateTables(). Non-existent tables are ignored and not returned or
+ * added to 'loadedTbls_'.
+ */
+ private Set<TableName> getMissingTables(ImpaladCatalog catalog, Set<TableName> tbls) {
+ Set<TableName> missingTbls = Sets.newHashSet();
+ Set<TableName> viewTbls = Sets.newHashSet();
+ for (TableName tblName: tbls) {
+ if (loadedTbls_.containsKey(tblName)) continue;
+ Db db = catalog.getDb(tblName.getDb());
+ if (db == null) continue;
+ dbs_.add(tblName.getDb());
+ Table tbl = db.getTable(tblName.getTbl());
+ if (tbl == null) continue;
+ if (!tbl.isLoaded()) {
+ missingTbls.add(tblName);
+ continue;
+ }
+ loadedTbls_.put(tblName, tbl);
+ if (tbl instanceof View) {
+ viewTbls.addAll(collectTableCandidates(((View) tbl).getQueryStmt()));
+ }
+ }
+ // Recursively collect loaded/missing tables from loaded views.
+ if (!viewTbls.isEmpty()) missingTbls.addAll(getMissingTables(catalog, viewTbls));
+ return missingTbls;
+ }
+
+ /**
+ * Returns the set of tables whose metadata needs to be loaded for the analysis of the
+ * given 'stmt' to succeed. This is done by collecting all table references from 'stmt'
+ * and generating all possible table-path resolutions considered during analysis.
+ * Uses 'sessionDb_' to construct the candidate tables with Path.getCandidateTables().
+ */
+ private Set<TableName> collectTableCandidates(StatementBase stmt) {
+ Preconditions.checkNotNull(stmt);
+ List<TableRef> tblRefs = Lists.newArrayList();
+ stmt.collectTableRefs(tblRefs);
+ Set<TableName> tableNames = Sets.newHashSet();
+ for (TableRef ref: tblRefs) {
+ tableNames.addAll(Path.getCandidateTables(ref.getPath(), sessionDb_));
+ }
+ return tableNames;
+ }
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
index 714e9b1..4f5dbfb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
@@ -17,6 +17,8 @@
package org.apache.impala.analysis;
+import java.util.List;
+
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.Table;
@@ -47,12 +49,17 @@ public class TruncateStmt extends StatementBase {
}
@Override
+ public void collectTableRefs(List<TableRef> tblRefs) {
+ tblRefs.add(new TableRef(tableName_.toPath(), null));
+ }
+
+ @Override
public void analyze(Analyzer analyzer) throws AnalysisException {
tableName_ = analyzer.getFqTableName(tableName_);
try {
table_ = analyzer.getTable(tableName_, Privilege.INSERT);
} catch (AnalysisException e) {
- if (ifExists_ && analyzer.getMissingTbls().isEmpty()) return;
+ if (ifExists_) return;
throw e;
}
// We only support truncating hdfs tables now.
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
index f811e49..bb472a2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
@@ -162,6 +162,8 @@ public class UnionStmt extends QueryStmt {
public UnionStmt(List<UnionOperand> operands,
ArrayList<OrderByElement> orderByElements, LimitElement limitElement) {
super(orderByElements, limitElement);
+ Preconditions.checkNotNull(operands);
+ Preconditions.checkState(operands.size() > 0);
operands_ = operands;
}
@@ -207,12 +209,7 @@ public class UnionStmt extends QueryStmt {
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
if (isAnalyzed()) return;
- try {
- super.analyze(analyzer);
- } catch (AnalysisException e) {
- if (analyzer.getMissingTbls().isEmpty()) throw e;
- }
- Preconditions.checkState(operands_.size() > 0);
+ super.analyze(analyzer);
// Propagates DISTINCT from right to left.
propagateDistinct();
@@ -279,25 +276,18 @@ public class UnionStmt extends QueryStmt {
*/
private void analyzeOperands(Analyzer analyzer) throws AnalysisException {
for (int i = 0; i < operands_.size(); ++i) {
- try {
- operands_.get(i).analyze(analyzer);
- QueryStmt firstQuery = operands_.get(0).getQueryStmt();
- List<Expr> firstExprs = operands_.get(0).getQueryStmt().getResultExprs();
- QueryStmt query = operands_.get(i).getQueryStmt();
- List<Expr> exprs = query.getResultExprs();
- if (firstExprs.size() != exprs.size()) {
- throw new AnalysisException("Operands have unequal number of columns:\n" +
- "'" + queryStmtToSql(firstQuery) + "' has " +
- firstExprs.size() + " column(s)\n" +
- "'" + queryStmtToSql(query) + "' has " + exprs.size() + " column(s)");
- }
- } catch (AnalysisException e) {
- if (analyzer.getMissingTbls().isEmpty()) throw e;
+ operands_.get(i).analyze(analyzer);
+ QueryStmt firstQuery = operands_.get(0).getQueryStmt();
+ List<Expr> firstExprs = operands_.get(0).getQueryStmt().getResultExprs();
+ QueryStmt query = operands_.get(i).getQueryStmt();
+ List<Expr> exprs = query.getResultExprs();
+ if (firstExprs.size() != exprs.size()) {
+ throw new AnalysisException("Operands have unequal number of columns:\n" +
+ "'" + queryStmtToSql(firstQuery) + "' has " +
+ firstExprs.size() + " column(s)\n" +
+ "'" + queryStmtToSql(query) + "' has " + exprs.size() + " column(s)");
}
}
- if (!analyzer.getMissingTbls().isEmpty()) {
- throw new AnalysisException("Found missing tables. Aborting analysis.");
- }
}
/**
@@ -552,8 +542,11 @@ public class UnionStmt extends QueryStmt {
}
@Override
- public void collectTableRefs(List<TableRef> tblRefs) {
- for (UnionOperand op: operands_) op.getQueryStmt().collectTableRefs(tblRefs);
+ public void collectTableRefs(List<TableRef> tblRefs, boolean fromClauseOnly) {
+ super.collectTableRefs(tblRefs, fromClauseOnly);
+ for (UnionOperand op: operands_) {
+ op.getQueryStmt().collectTableRefs(tblRefs, fromClauseOnly);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/analysis/WithClause.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/WithClause.java b/fe/src/main/java/org/apache/impala/analysis/WithClause.java
index 3e70764..1a771cd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/WithClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/WithClause.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.impala.authorization.PrivilegeRequest;
import org.apache.impala.catalog.View;
import org.apache.impala.common.AnalysisException;
+
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -74,30 +75,23 @@ public class WithClause implements ParseNode {
Analyzer withClauseAnalyzer = Analyzer.createWithNewGlobalState(analyzer);
withClauseAnalyzer.setIsWithClause();
if (analyzer.isExplain()) withClauseAnalyzer.setIsExplain();
- try {
- for (View view: views_) {
- Analyzer viewAnalyzer = new Analyzer(withClauseAnalyzer);
- view.getQueryStmt().analyze(viewAnalyzer);
- // Register this view so that the next view can reference it.
- withClauseAnalyzer.registerLocalView(view);
- }
- // Register all local views with the analyzer.
- for (View localView: withClauseAnalyzer.getLocalViews().values()) {
- analyzer.registerLocalView(localView);
- }
- // Record audit events because the resolved table references won't generate any
- // when a view is referenced.
- analyzer.getAccessEvents().addAll(withClauseAnalyzer.getAccessEvents());
+ for (View view: views_) {
+ Analyzer viewAnalyzer = new Analyzer(withClauseAnalyzer);
+ view.getQueryStmt().analyze(viewAnalyzer);
+ // Register this view so that the next view can reference it.
+ withClauseAnalyzer.registerLocalView(view);
+ }
+ // Register all local views with the analyzer.
+ for (View localView: withClauseAnalyzer.getLocalViews().values()) {
+ analyzer.registerLocalView(localView);
+ }
+ // Record audit events because the resolved table references won't generate any
+ // when a view is referenced.
+ analyzer.getAccessEvents().addAll(withClauseAnalyzer.getAccessEvents());
- // Register all privilege requests made from the root analyzer.
- for (PrivilegeRequest req: withClauseAnalyzer.getPrivilegeReqs()) {
- analyzer.registerPrivReq(req);
- }
- } finally {
- // Record missing tables in the original analyzer.
- if (analyzer.isRootAnalyzer()) {
- analyzer.getMissingTbls().addAll(withClauseAnalyzer.getMissingTbls());
- }
+ // Register all privilege requests made from the root analyzer.
+ for (PrivilegeRequest req: withClauseAnalyzer.getPrivilegeReqs()) {
+ analyzer.registerPrivReq(req);
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index ff7b1e4..0cd1eda 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -141,21 +141,27 @@ public abstract class Catalog {
}
/**
- * Returns the Table object for the given dbName/tableName. If 'throwIfError' is true,
- * an exception is thrown if the associated database does not exist. Otherwise, null is
- * returned.
+ * Returns the Table object for the given dbName/tableName or null if the database or
+ * table does not exist.
*/
- public Table getTable(String dbName, String tableName, boolean throwIfError)
- throws CatalogException {
+ public Table getTableNoThrow(String dbName, String tableName) {
Db db = getDb(dbName);
- if (db == null && throwIfError) {
- throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
- }
+ if (db == null) return null;
return db.getTable(tableName);
}
- public Table getTable(String dbName, String tableName) throws CatalogException {
- return getTable(dbName, tableName, true);
+ /**
+ * Returns the Table object for the given dbName/tableName. Throws if the database
+ * does not exists. Returns null if the table does not exist.
+ * TODO: Clean up the inconsistent error behavior (throwing vs. returning null).
+ */
+ public Table getTable(String dbName, String tableName)
+ throws DatabaseNotFoundException {
+ Db db = getDb(dbName);
+ if (db == null) {
+ throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
+ }
+ return db.getTable(tableName);
}
/**
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 99bd23e..1da5d0b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -19,11 +19,13 @@ package org.apache.impala.catalog;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
+import org.apache.impala.analysis.TableName;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
-import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TCatalogObject;
@@ -219,6 +221,13 @@ public class ImpaladCatalog extends Catalog {
/**
+ * Issues a load request to the catalogd for the given tables.
+ */
+ public void prioritizeLoad(Set<TableName> tableNames) throws InternalException {
+ FeSupport.PrioritizeLoad(tableNames);
+ }
+
+ /**
* Causes the calling thread to wait until a catalog update notification has been sent
* or the given timeout has been reached. A timeout value of 0 indicates an indefinite
* wait. Does not protect against spurious wakeups, so this should be called in a loop.
@@ -234,27 +243,6 @@ public class ImpaladCatalog extends Catalog {
}
}
- /**
- * Returns the Table object for the given dbName/tableName. Returns null
- * if the table does not exist. Will throw a TableLoadingException if the table's
- * metadata was not able to be loaded successfully and DatabaseNotFoundException
- * if the parent database does not exist.
- */
- @Override
- public Table getTable(String dbName, String tableName)
- throws CatalogException {
- Table table = super.getTable(dbName, tableName);
- if (table == null) return null;
-
- if (table.isLoaded() && table instanceof IncompleteTable) {
- // If there were problems loading this table's metadata, throw an exception
- // when it is accessed.
- ImpalaException cause = ((IncompleteTable) table).getCause();
- if (cause instanceof TableLoadingException) throw (TableLoadingException) cause;
- throw new TableLoadingException("Missing metadata for table: " + tableName, cause);
- }
- return table;
- }
/**
* Returns the HDFS path where the metastore would create the given table. If the table
@@ -542,4 +530,5 @@ public class ImpaladCatalog extends Catalog {
LOG.error("LibCacheRemoveEntry(" + hdfsLibFile + ") failed.");
}
}
+ public TUniqueId getCatalogServiceId() { return catalogServiceId_; }
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/common/AnalysisException.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/AnalysisException.java b/fe/src/main/java/org/apache/impala/common/AnalysisException.java
index cbc4f00..2add32e 100644
--- a/fe/src/main/java/org/apache/impala/common/AnalysisException.java
+++ b/fe/src/main/java/org/apache/impala/common/AnalysisException.java
@@ -29,4 +29,8 @@ public class AnalysisException extends ImpalaException {
public AnalysisException(String msg) {
super(msg);
}
+
+ public AnalysisException(Throwable cause) {
+ super(cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/common/ImpalaException.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/ImpalaException.java b/fe/src/main/java/org/apache/impala/common/ImpalaException.java
index 100f682..9302626 100644
--- a/fe/src/main/java/org/apache/impala/common/ImpalaException.java
+++ b/fe/src/main/java/org/apache/impala/common/ImpalaException.java
@@ -30,4 +30,8 @@ abstract public class ImpalaException extends java.lang.Exception {
protected ImpalaException(String msg) {
super(msg);
}
+
+ protected ImpalaException(Throwable cause) {
+ super(cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 71182f5..c320eb4 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.impala.analysis.AnalysisContext;
+import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.ColumnLineageGraph;
import org.apache.impala.analysis.Expr;
@@ -44,6 +45,7 @@ import org.apache.impala.thrift.TQueryExecRequest;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TRuntimeFilterMode;
import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.EventSequence;
import org.apache.impala.util.KuduUtil;
import org.apache.impala.util.MaxRowsProcessedVisitor;
import org.slf4j.Logger;
@@ -69,8 +71,9 @@ public class Planner {
private final PlannerContext ctx_;
- public Planner(AnalysisContext.AnalysisResult analysisResult, TQueryCtx queryCtx) {
- ctx_ = new PlannerContext(analysisResult, queryCtx);
+ public Planner(AnalysisResult analysisResult, TQueryCtx queryCtx,
+ EventSequence timeline) {
+ ctx_ = new PlannerContext(analysisResult, queryCtx, timeline);
}
public TQueryCtx getQueryCtx() { return ctx_.getQueryCtx(); }
@@ -96,7 +99,7 @@ public class Planner {
SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
- ctx_.getAnalysisResult().getTimeline().markEvent("Single node plan created");
+ ctx_.getTimeline().markEvent("Single node plan created");
ArrayList<PlanFragment> fragments = null;
checkForSmallQueryOptimization(singleNodePlan);
@@ -120,7 +123,7 @@ public class Planner {
PlanFragment rootFragment = fragments.get(fragments.size() - 1);
if (ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) {
RuntimeFilterGenerator.generateRuntimeFilters(ctx_, rootFragment.getPlanRoot());
- ctx_.getAnalysisResult().getTimeline().markEvent("Runtime filters computed");
+ ctx_.getTimeline().markEvent("Runtime filters computed");
}
rootFragment.verifyTree();
@@ -169,7 +172,7 @@ public class Planner {
}
Collections.reverse(fragments);
- ctx_.getAnalysisResult().getTimeline().markEvent("Distributed plan created");
+ ctx_.getTimeline().markEvent("Distributed plan created");
ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph();
if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
@@ -212,7 +215,7 @@ public class Planner {
graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer());
}
if (LOG.isTraceEnabled()) LOG.trace("lineage: " + graph.debugString());
- ctx_.getAnalysisResult().getTimeline().markEvent("Lineage info computed");
+ ctx_.getTimeline().markEvent("Lineage info computed");
}
return fragments;
@@ -231,7 +234,7 @@ public class Planner {
// Only use one scanner thread per scan-node instance since intra-node
// parallelism is achieved via multiple fragment instances.
ctx_.getQueryOptions().setNum_scanner_threads(1);
- ctx_.getAnalysisResult().getTimeline().markEvent("Parallel plans created");
+ ctx_.getTimeline().markEvent("Parallel plans created");
return parallelPlans;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
index b2881b7..135f7d0 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
@@ -19,12 +19,13 @@ package org.apache.impala.planner;
import java.util.LinkedList;
-import org.apache.impala.analysis.AnalysisContext;
+import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.QueryStmt;
import org.apache.impala.common.IdGenerator;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.EventSequence;
import com.google.common.collect.Lists;
@@ -56,14 +57,16 @@ public class PlannerContext {
// Keeps track of subplan nesting. Maintained with push/popSubplan().
private final LinkedList<SubplanNode> subplans_ = Lists.newLinkedList();
+ private final AnalysisResult analysisResult_;
+ private final EventSequence timeline_;
private final TQueryCtx queryCtx_;
- private final AnalysisContext.AnalysisResult analysisResult_;
private final QueryStmt queryStmt_;
- public PlannerContext (AnalysisContext.AnalysisResult analysisResult,
- TQueryCtx queryCtx) {
+ public PlannerContext (AnalysisResult analysisResult, TQueryCtx queryCtx,
+ EventSequence timeline) {
analysisResult_ = analysisResult;
queryCtx_ = queryCtx;
+ timeline_ = timeline;
if (isInsertOrCtas()) {
queryStmt_ = analysisResult.getInsertStmt().getQueryStmt();
} else if (analysisResult.isUpdateStmt()) {
@@ -78,7 +81,8 @@ public class PlannerContext {
public QueryStmt getQueryStmt() { return queryStmt_; }
public TQueryCtx getQueryCtx() { return queryCtx_; }
public TQueryOptions getQueryOptions() { return getRootAnalyzer().getQueryOptions(); }
- public AnalysisContext.AnalysisResult getAnalysisResult() { return analysisResult_; }
+ public AnalysisResult getAnalysisResult() { return analysisResult_; }
+ public EventSequence getTimeline() { return timeline_; }
public Analyzer getRootAnalyzer() { return analysisResult_.getAnalyzer(); }
public boolean isSingleNodeExec() { return getQueryOptions().num_nodes == 1; }
public PlanNodeId getNextNodeId() { return nodeIdGenerator_.getNextId(); }
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 5d933a3..27d293d 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -31,13 +31,12 @@ import org.apache.impala.analysis.AggregateInfo;
import org.apache.impala.analysis.AnalyticInfo;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BaseTableRef;
-import org.apache.impala.analysis.BinaryPredicate.Operator;
import org.apache.impala.analysis.BinaryPredicate;
+import org.apache.impala.analysis.BinaryPredicate.Operator;
import org.apache.impala.analysis.CollectionTableRef;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprId;
import org.apache.impala.analysis.ExprSubstitutionMap;
-import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.InlineViewRef;
import org.apache.impala.analysis.JoinOperator;
import org.apache.impala.analysis.NullLiteral;
@@ -48,12 +47,11 @@ import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotId;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TableRef;
-import org.apache.impala.analysis.TableSampleClause;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.analysis.TupleId;
import org.apache.impala.analysis.TupleIsNullPredicate;
-import org.apache.impala.analysis.UnionStmt.UnionOperand;
import org.apache.impala.analysis.UnionStmt;
+import org.apache.impala.analysis.UnionStmt.UnionOperand;
import org.apache.impala.catalog.ColumnStats;
import org.apache.impala.catalog.DataSourceTable;
import org.apache.impala.catalog.HBaseTable;
@@ -62,7 +60,6 @@ import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.Type;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.NotImplementedException;
@@ -129,7 +126,7 @@ public class SingleNodePlanner {
// to detect empty result sets.
Analyzer analyzer = queryStmt.getAnalyzer();
analyzer.computeValueTransferGraph();
- ctx_.getAnalysisResult().getTimeline().markEvent("Value transfer graph computed");
+ ctx_.getTimeline().markEvent("Value transfer graph computed");
// Mark slots referenced by output exprs as materialized, prior to generating the
// plan tree.
@@ -231,7 +228,7 @@ public class SingleNodePlanner {
*/
private void unmarkCollectionSlots(QueryStmt stmt) {
List<TableRef> tblRefs = Lists.newArrayList();
- stmt.collectTableRefs(tblRefs);
+ stmt.collectFromClauseTableRefs(tblRefs);
for (TableRef ref: tblRefs) {
if (!ref.isRelative()) continue;
Preconditions.checkState(ref instanceof CollectionTableRef);
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 69137ef..c3b15a1 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -53,10 +53,8 @@ import org.apache.impala.analysis.AlterTableSortByStmt;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.analysis.TableName;
import org.apache.impala.authorization.User;
-import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
-import org.apache.impala.catalog.CatalogUsageMonitor;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.ColumnNotFoundException;
import org.apache.impala.catalog.DataSource;
@@ -117,7 +115,6 @@ import org.apache.impala.thrift.TCreateFunctionParams;
import org.apache.impala.thrift.TCreateOrAlterViewParams;
import org.apache.impala.thrift.TCreateTableLikeParams;
import org.apache.impala.thrift.TCreateTableParams;
-import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TDdlExecRequest;
import org.apache.impala.thrift.TDdlExecResponse;
import org.apache.impala.thrift.TDropDataSourceParams;
@@ -1544,7 +1541,7 @@ public class CatalogOpExecutor {
Preconditions.checkState(params.getColumns() != null,
"Null column list given as argument to Catalog.createTable");
- Table existingTbl = catalog_.getTable(tableName.getDb(), tableName.getTbl(), false);
+ Table existingTbl = catalog_.getTableNoThrow(tableName.getDb(), tableName.getTbl());
if (params.if_not_exists && existingTbl != null) {
LOG.trace(String.format("Skipping table creation because %s already exists and " +
"IF NOT EXISTS was specified.", tableName));
@@ -1760,7 +1757,7 @@ public class CatalogOpExecutor {
Preconditions.checkState(tblName != null && tblName.isFullyQualified());
Preconditions.checkState(srcTblName != null && srcTblName.isFullyQualified());
- Table existingTbl = catalog_.getTable(tblName.getDb(), tblName.getTbl(), false);
+ Table existingTbl = catalog_.getTableNoThrow(tblName.getDb(), tblName.getTbl());
if (params.if_not_exists && existingTbl != null) {
LOG.trace(String.format("Skipping table creation because %s already exists and " +
"IF NOT EXISTS was specified.", tblName));
@@ -2131,9 +2128,6 @@ public class CatalogOpExecutor {
"The partitions being dropped don't exist any more");
}
- org.apache.hadoop.hive.metastore.api.Table msTbl =
- tbl.getMetaStoreTable().deepCopy();
-
PartitionDropOptions dropOptions = PartitionDropOptions.instance();
dropOptions.purgeData(purge);
long numTargetedPartitions = 0L;
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 5bc1d87..714f686 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -35,13 +35,13 @@ import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TCatalogServiceRequestHeader;
import org.apache.impala.thrift.TColumnValue;
+import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TExprBatch;
import org.apache.impala.thrift.TPrioritizeLoadRequest;
import org.apache.impala.thrift.TPrioritizeLoadResponse;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TResultRow;
-import org.apache.impala.thrift.TStatus;
import org.apache.impala.thrift.TSymbolLookupParams;
import org.apache.impala.thrift.TSymbolLookupResult;
import org.apache.impala.thrift.TTable;
@@ -53,6 +53,7 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
/**
@@ -263,10 +264,13 @@ public class FeSupport {
return NativePrioritizeLoad(thriftReq);
}
- public static TStatus PrioritizeLoad(Set<TableName> tableNames)
+ public static void PrioritizeLoad(Set<TableName> tableNames)
throws InternalException {
Preconditions.checkNotNull(tableNames);
+ LOG.info(String.format("Requesting prioritized load of table(s): %s",
+ Joiner.on(", ").join(tableNames)));
+
List<TCatalogObject> objectDescs = new ArrayList<TCatalogObject>(tableNames.size());
for (TableName tableName: tableNames) {
TCatalogObject catalogObject = new TCatalogObject();
@@ -286,7 +290,10 @@ public class FeSupport {
TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
TPrioritizeLoadResponse response = new TPrioritizeLoadResponse();
deserializer.deserialize(response, result);
- return response.getStatus();
+ if (response.getStatus().getStatus_code() != TErrorCode.OK) {
+ throw new InternalException("Error requesting prioritized load: " +
+ Joiner.on("\n").join(response.getStatus().getError_msgs()));
+ }
} catch (TException e) {
// this should never happen
throw new InternalException("Error processing request: " + e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 318b248..f03feb0 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -18,11 +18,11 @@
package org.apache.impala.service;
import java.io.IOException;
+import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.impala.analysis.AnalysisContext;
+import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
import org.apache.impala.analysis.CreateDataSrcStmt;
import org.apache.impala.analysis.CreateDropRoleStmt;
import org.apache.impala.analysis.CreateUdaStmt;
@@ -54,6 +55,11 @@ import org.apache.impala.analysis.ResetMetadataStmt;
import org.apache.impala.analysis.ShowFunctionsStmt;
import org.apache.impala.analysis.ShowGrantRoleStmt;
import org.apache.impala.analysis.ShowRolesStmt;
+import org.apache.impala.analysis.SqlParser;
+import org.apache.impala.analysis.SqlScanner;
+import org.apache.impala.analysis.StatementBase;
+import org.apache.impala.analysis.StmtMetadataLoader;
+import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
import org.apache.impala.analysis.TableName;
import org.apache.impala.analysis.TruncateStmt;
import org.apache.impala.authorization.AuthorizationChecker;
@@ -63,7 +69,6 @@ import org.apache.impala.authorization.ImpalaInternalAdminUser;
import org.apache.impala.authorization.PrivilegeRequest;
import org.apache.impala.authorization.PrivilegeRequestBuilder;
import org.apache.impala.authorization.User;
-import org.apache.impala.catalog.AuthorizationException;
import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.Column;
@@ -83,8 +88,8 @@ import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.NotImplementedException;
-import org.apache.impala.planner.HdfsScanNode;
import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.planner.HdfsScanNode;
import org.apache.impala.planner.PlanFragment;
import org.apache.impala.planner.Planner;
import org.apache.impala.planner.ScanNode;
@@ -98,7 +103,6 @@ import org.apache.impala.thrift.TDdlExecRequest;
import org.apache.impala.thrift.TDdlType;
import org.apache.impala.thrift.TDescribeOutputStyle;
import org.apache.impala.thrift.TDescribeResult;
-import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TExecRequest;
import org.apache.impala.thrift.TExplainResult;
import org.apache.impala.thrift.TFinalizeParams;
@@ -120,7 +124,6 @@ import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.thrift.TShowFilesParams;
import org.apache.impala.thrift.TShowStatsOp;
-import org.apache.impala.thrift.TStatus;
import org.apache.impala.thrift.TStmtType;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
@@ -135,7 +138,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
@@ -149,16 +151,14 @@ import com.google.common.collect.Sets;
*/
public class Frontend {
private final static Logger LOG = LoggerFactory.getLogger(Frontend.class);
- // Time to wait for missing tables to be loaded before timing out.
- private final long MISSING_TBL_LOAD_WAIT_TIMEOUT_MS = 2 * 60 * 1000;
// Max time to wait for a catalog update notification.
- private final long MAX_CATALOG_UPDATE_WAIT_TIME_MS = 2 * 1000;
+ public static final long MAX_CATALOG_UPDATE_WAIT_TIME_MS = 2 * 1000;
//TODO: Make the reload interval configurable.
private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60;
- private AtomicReference<ImpaladCatalog> impaladCatalog_ =
+ private final AtomicReference<ImpaladCatalog> impaladCatalog_ =
new AtomicReference<ImpaladCatalog>();
private final AuthorizationConfig authzConfig_;
private final AtomicReference<AuthorizationChecker> authzChecker_;
@@ -247,7 +247,7 @@ public class Frontend {
* Constructs a TCatalogOpRequest and attaches it, plus any metadata, to the
* result argument.
*/
- private void createCatalogOpRequest(AnalysisContext.AnalysisResult analysis,
+ private void createCatalogOpRequest(AnalysisResult analysis,
TExecRequest result) throws InternalException {
TCatalogOpRequest ddl = new TCatalogOpRequest();
TResultSetMetadata metadata = new TResultSetMetadata();
@@ -798,69 +798,6 @@ public class Frontend {
}
/**
- * Given a set of table names, returns the set of table names that are missing
- * metadata (are not yet loaded).
- */
- private Set<TableName> getMissingTbls(Set<TableName> tableNames) {
- Set<TableName> missingTbls = new HashSet<TableName>();
- for (TableName tblName: tableNames) {
- Db db = getCatalog().getDb(tblName.getDb());
- if (db == null) continue;
- Table tbl = db.getTable(tblName.getTbl());
- if (tbl == null) continue;
- if (!tbl.isLoaded()) missingTbls.add(tblName);
- }
- return missingTbls;
- }
-
- /**
- * Requests the catalog server load the given set of tables and waits until
- * these tables show up in the local catalog, or the given timeout has been reached.
- * The timeout is specified in milliseconds, with a value <= 0 indicating no timeout.
- * The exact steps taken are:
- * 1) Collect the tables that are missing (not yet loaded locally).
- * 2) Make an RPC to the CatalogServer to prioritize the loading of these tables.
- * 3) Wait until the local catalog contains all missing tables by (re)checking the
- * catalog each time a new catalog update is received.
- *
- * Returns true if all missing tables were received before timing out and false if
- * the timeout was reached before all tables were received.
- */
- private boolean requestTblLoadAndWait(Set<TableName> requestedTbls, long timeoutMs)
- throws InternalException {
- Set<TableName> missingTbls = getMissingTbls(requestedTbls);
- // There are no missing tables, return and avoid making an RPC to the CatalogServer.
- if (missingTbls.isEmpty()) return true;
-
- // Call into the CatalogServer and request the required tables be loaded.
- LOG.info(String.format("Requesting prioritized load of table(s): %s",
- Joiner.on(", ").join(missingTbls)));
- TStatus status = FeSupport.PrioritizeLoad(missingTbls);
- if (status.getStatus_code() != TErrorCode.OK) {
- throw new InternalException("Error requesting prioritized load: " +
- Joiner.on("\n").join(status.getError_msgs()));
- }
-
- long startTimeMs = System.currentTimeMillis();
- // Wait until all the required tables are loaded in the Impalad's catalog cache.
- while (!missingTbls.isEmpty()) {
- // Check if the timeout has been reached.
- if (timeoutMs > 0 && System.currentTimeMillis() - startTimeMs > timeoutMs) {
- return false;
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Waiting for table(s) to complete loading: %s",
- Joiner.on(", ").join(missingTbls)));
- }
- getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS);
- missingTbls = getMissingTbls(missingTbls);
- // TODO: Check for query cancellation here.
- }
- return true;
- }
-
- /**
* Waits indefinitely for the local catalog to be ready. The catalog is "ready" after
* the first catalog update is received from the statestore.
*
@@ -883,78 +820,6 @@ public class Frontend {
}
/**
- * Overload of requestTblLoadAndWait that uses the default timeout.
- */
- public boolean requestTblLoadAndWait(Set<TableName> requestedTbls)
- throws InternalException {
- return requestTblLoadAndWait(requestedTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS);
- }
-
- /**
- * Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
- * Authorizes all catalog object accesses and throws an AuthorizationException
- * if the user does not have privileges to access one or more objects.
- * If a statement fails analysis because table/view metadata was not loaded, an
- * RPC to the CatalogServer will be executed to request loading the missing metadata
- * and analysis will be restarted once the required tables have been loaded
- * in the local Impalad Catalog or the MISSING_TBL_LOAD_WAIT_TIMEOUT_MS timeout
- * is reached.
- * The goal of this timeout is not to analysis, but to restart the analysis/missing
- * table collection process. This helps ensure a statement never waits indefinitely
- * for a table to be loaded in event the table metadata was invalidated.
- * TODO: Also consider adding an overall timeout that fails analysis.
- */
- private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
- throws AnalysisException, InternalException, AuthorizationException {
- Preconditions.checkState(getCatalog().isReady(),
- "Local catalog has not been initialized. Aborting query analysis.");
-
- AnalysisContext analysisCtx = new AnalysisContext(impaladCatalog_.get(), queryCtx,
- authzConfig_);
- LOG.info("Compiling query: " + queryCtx.client_request.stmt);
-
- // Run analysis in a loop until it any of the following events occur:
- // 1) Analysis completes successfully.
- // 2) Analysis fails with an AnalysisException AND there are no missing tables.
- // 3) Analysis fails with an AuthorizationException.
- try {
- while (true) {
- // Ensure that catalog snapshot reflects any recent changes.
- analysisCtx.setCatalog(impaladCatalog_.get());
- try {
- analysisCtx.analyze(queryCtx.client_request.stmt);
- Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty());
- return analysisCtx.getAnalysisResult();
- } catch (AnalysisException e) {
- Set<TableName> missingTbls = analysisCtx.getAnalyzer().getMissingTbls();
- // Only re-throw the AnalysisException if there were no missing tables.
- if (missingTbls.isEmpty()) throw e;
-
- // Record that analysis needs table metadata
- analysisCtx.getTimeline().markEvent("Metadata load started");
-
- // Some tables/views were missing, request and wait for them to load.
- if (!requestTblLoadAndWait(missingTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS)) {
- if (LOG.isWarnEnabled()) {
- LOG.warn(String.format("Missing tables were not received in %dms. Load " +
- "request will be retried.", MISSING_TBL_LOAD_WAIT_TIMEOUT_MS));
- }
- analysisCtx.getTimeline().markEvent("Metadata load timeout");
- } else {
- analysisCtx.getTimeline().markEvent("Metadata load finished");
- }
- }
- }
- } finally {
- // Authorize all accesses.
- // AuthorizationExceptions must take precedence over any AnalysisException
- // that has been thrown, so perform the authorization first.
- analysisCtx.authorize(getAuthzChecker());
- LOG.info("Compiled query.");
- }
- }
-
- /**
* Return a TPlanExecInfo corresponding to the plan with root fragment 'planRoot'.
*/
private TPlanExecInfo createPlanExecInfo(PlanFragment planRoot, Planner planner,
@@ -1015,7 +880,7 @@ public class Frontend {
private TQueryExecRequest createExecRequest(
Planner planner, StringBuilder explainString) throws ImpalaException {
TQueryCtx queryCtx = planner.getQueryCtx();
- AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult();
+ AnalysisResult analysisResult = planner.getAnalysisResult();
boolean isMtExec = analysisResult.isQueryStmt()
&& queryCtx.client_request.query_options.isSetMt_dop()
&& queryCtx.client_request.query_options.mt_dop > 0;
@@ -1065,16 +930,40 @@ public class Frontend {
return result;
}
+ public StatementBase parse(String stmt) throws AnalysisException {
+ SqlScanner input = new SqlScanner(new StringReader(stmt));
+ SqlParser parser = new SqlParser(input);
+ try {
+ return (StatementBase) parser.parse().value;
+ } catch (Exception e) {
+ throw new AnalysisException(parser.getErrorMsg(stmt), e);
+ }
+ }
+
/**
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
*/
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
throws ImpalaException {
- // Analyze the statement
- AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
- EventSequence timeline = analysisResult.getTimeline();
+ // Timeline of important events in the planning process, used for debugging
+ // and profiling.
+ EventSequence timeline = new EventSequence("Query Compilation");
+ LOG.info("Analyzing query: " + queryCtx.client_request.stmt);
+
+ // Parse stmt and collect/load metadata to populate a stmt-local table cache
+ StatementBase stmt = parse(queryCtx.client_request.stmt);
+ StmtMetadataLoader metadataLoader =
+ new StmtMetadataLoader(this, queryCtx.session.database, timeline);
+ StmtTableCache stmtTableCache = metadataLoader.loadTables(stmt);
+
+ // Analyze and authorize stmt
+ AnalysisContext analysisCtx = new AnalysisContext(queryCtx, authzConfig_, timeline);
+ AnalysisResult analysisResult =
+ analysisCtx.analyzeAndAuthorize(stmt, stmtTableCache, authzChecker_.get());
+ LOG.info("Analysis finished.");
timeline.markEvent("Analysis finished");
Preconditions.checkNotNull(analysisResult.getStmt());
+
TExecRequest result = new TExecRequest();
result.setQuery_options(queryCtx.client_request.getQuery_options());
result.setAccess_events(analysisResult.getAccessEvents());
@@ -1123,7 +1012,7 @@ public class Frontend {
|| analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
|| analysisResult.isDeleteStmt());
- Planner planner = new Planner(analysisResult, queryCtx);
+ Planner planner = new Planner(analysisResult, queryCtx, timeline);
TQueryExecRequest queryExecRequest = createExecRequest(planner, explainString);
queryCtx.setDesc_tbl(
planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift());
@@ -1187,7 +1076,7 @@ public class Frontend {
}
timeline.markEvent("Planning finished");
- result.setTimeline(analysisResult.getTimeline().toThrift());
+ result.setTimeline(timeline.toThrift());
return result;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/service/MetadataOp.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/MetadataOp.java b/fe/src/main/java/org/apache/impala/service/MetadataOp.java
index c541cc5..f1cb077 100644
--- a/fe/src/main/java/org/apache/impala/service/MetadataOp.java
+++ b/fe/src/main/java/org/apache/impala/service/MetadataOp.java
@@ -22,11 +22,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.impala.analysis.StmtMetadataLoader;
import org.apache.impala.analysis.TableName;
import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.Function;
@@ -34,7 +34,6 @@ import org.apache.impala.catalog.ImpaladCatalog;
import org.apache.impala.catalog.PrimitiveType;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Table;
-import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.thrift.TColumn;
@@ -43,6 +42,9 @@ import org.apache.impala.thrift.TResultRow;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.util.PatternMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -277,12 +279,7 @@ public class MetadataOp {
List<String> tableComments = Lists.newArrayList();
List<String> tableTypes = Lists.newArrayList();
for (String tabName: fe.getTableNames(db.getName(), tablePatternMatcher, user)) {
- Table table = null;
- try {
- table = catalog.getTable(db.getName(), tabName);
- } catch (TableLoadingException e) {
- // Ignore exception (this table will be skipped).
- }
+ Table table = catalog.getTable(db.getName(), tabName);
if (table == null) continue;
String comment = null;
@@ -356,25 +353,24 @@ public class MetadataOp {
* The parameters catalogName, schemaName, tableName and columnName are JDBC search
* patterns.
*/
- public static TResultSet getColumns(Frontend fe,
- String catalogName, String schemaName, String tableName, String columnName,
- User user)
+ public static TResultSet getColumns(Frontend fe, String catalogName, String schemaName,
+ String tableName, String columnName, User user)
throws ImpalaException {
- TResultSet result = createEmptyResultSet(GET_COLUMNS_MD);
-
// Get the list of schemas, tables, and columns that satisfy the search conditions.
- DbsMetadata dbsMetadata = null;
PatternMatcher schemaMatcher = PatternMatcher.createJdbcPatternMatcher(schemaName);
PatternMatcher tableMatcher = PatternMatcher.createJdbcPatternMatcher(tableName);
PatternMatcher columnMatcher = PatternMatcher.createJdbcPatternMatcher(columnName);
- while (dbsMetadata == null || !dbsMetadata.missingTbls.isEmpty()) {
- dbsMetadata = getDbsMetadata(fe, catalogName, schemaMatcher, tableMatcher,
- columnMatcher, PatternMatcher.MATCHER_MATCH_NONE, user);
- if (!fe.requestTblLoadAndWait(dbsMetadata.missingTbls)) {
- LOG.info("Timed out waiting for missing tables. Load request will be retried.");
- }
+ DbsMetadata dbsMetadata = getDbsMetadata(fe, catalogName, schemaMatcher,
+ tableMatcher, columnMatcher, PatternMatcher.MATCHER_MATCH_NONE, user);
+ if (!dbsMetadata.missingTbls.isEmpty()) {
+ // Need to load tables for column metadata.
+ StmtMetadataLoader mdLoader = new StmtMetadataLoader(fe, Catalog.DEFAULT_DB, null);
+ mdLoader.loadTables(dbsMetadata.missingTbls);
+ dbsMetadata = getDbsMetadata(fe, catalogName, schemaMatcher,
+ tableMatcher, columnMatcher, PatternMatcher.MATCHER_MATCH_NONE, user);
}
+ TResultSet result = createEmptyResultSet(GET_COLUMNS_MD);
for (int i = 0; i < dbsMetadata.dbs.size(); ++i) {
String dbName = dbsMetadata.dbs.get(i);
for (int j = 0; j < dbsMetadata.tableNames.get(i).size(); ++j) {
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/main/java/org/apache/impala/util/EventSequence.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/EventSequence.java b/fe/src/main/java/org/apache/impala/util/EventSequence.java
index 8ce286a..92552d1 100644
--- a/fe/src/main/java/org/apache/impala/util/EventSequence.java
+++ b/fe/src/main/java/org/apache/impala/util/EventSequence.java
@@ -48,6 +48,9 @@ public class EventSequence {
labels_.add(label);
}
+ // For testing
+ public int getNumEvents() { return labels_.size(); }
+
public TEventSequence toThrift() {
TEventSequence ret = new TEventSequence();
ret.timestamps = timestamps_;
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/test/java/org/apache/impala/analysis/AnalyzeAuthStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeAuthStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeAuthStmtsTest.java
index 1fe11bc..ddb95f1 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeAuthStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeAuthStmtsTest.java
@@ -19,14 +19,14 @@ package org.apache.impala.analysis;
import java.util.HashSet;
-import org.junit.Test;
-
import org.apache.impala.authorization.AuthorizationConfig;
import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.Role;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.testutil.TestUtils;
import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.util.EventSequence;
+import org.junit.Test;
public class AnalyzeAuthStmtsTest extends AnalyzerTest {
public AnalyzeAuthStmtsTest() throws AnalysisException {
@@ -35,18 +35,23 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
}
@Override
- protected Analyzer createAnalyzer(String defaultDb) {
- TQueryCtx queryCtx =
- TestUtils.createQueryContext(defaultDb, System.getProperty("user.name"));
- return new Analyzer(catalog_, queryCtx,
- AuthorizationConfig.createHadoopGroupAuthConfig("server1", null, null));
+ protected AnalysisContext createAnalysisCtx(String defaultDb) {
+ TQueryCtx queryCtx = TestUtils.createQueryContext(
+ defaultDb, System.getProperty("user.name"));
+ EventSequence timeline = new EventSequence("Authorization Test");
+ AnalysisContext analysisCtx = new AnalysisContext(queryCtx,
+ AuthorizationConfig.createHadoopGroupAuthConfig("server1", null, null),
+ timeline);
+ return analysisCtx;
}
- private Analyzer createAuthDisabledAnalyzer(String defaultDb) {
- TQueryCtx queryCtx =
- TestUtils.createQueryContext(defaultDb, System.getProperty("user.name"));
- return new Analyzer(catalog_, queryCtx,
- AuthorizationConfig.createAuthDisabledConfig());
+ private AnalysisContext createAuthDisabledAnalysisCtx() {
+ TQueryCtx queryCtx = TestUtils.createQueryContext(
+ Catalog.DEFAULT_DB, System.getProperty("user.name"));
+ EventSequence timeline = new EventSequence("Authorization Test");
+ AnalysisContext analysisCtx = new AnalysisContext(queryCtx,
+ AuthorizationConfig.createAuthDisabledConfig(), timeline);
+ return analysisCtx;
}
@Test
@@ -55,12 +60,12 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
AnalyzesOk("SHOW ROLE GRANT GROUP myGroup");
AnalyzesOk("SHOW CURRENT ROLES");
- Analyzer authDisabledAnalyzer = createAuthDisabledAnalyzer(Catalog.DEFAULT_DB);
- AnalysisError("SHOW ROLES", authDisabledAnalyzer,
+ AnalysisContext authDisabledCtx = createAuthDisabledAnalysisCtx();
+ AnalysisError("SHOW ROLES", authDisabledCtx,
"Authorization is not enabled.");
- AnalysisError("SHOW ROLE GRANT GROUP myGroup", authDisabledAnalyzer,
+ AnalysisError("SHOW ROLE GRANT GROUP myGroup", authDisabledCtx,
"Authorization is not enabled.");
- AnalysisError("SHOW CURRENT ROLES", authDisabledAnalyzer,
+ AnalysisError("SHOW CURRENT ROLES", authDisabledCtx,
"Authorization is not enabled.");
}
@@ -76,10 +81,10 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
AnalysisError("SHOW GRANT ROLE does_not_exist ON SERVER",
"Role 'does_not_exist' does not exist.");
- Analyzer authDisabledAnalyzer = createAuthDisabledAnalyzer(Catalog.DEFAULT_DB);
- AnalysisError("SHOW GRANT ROLE myRole", authDisabledAnalyzer,
+ AnalysisContext authDisabledCtx = createAuthDisabledAnalysisCtx();
+ AnalysisError("SHOW GRANT ROLE myRole", authDisabledCtx,
"Authorization is not enabled.");
- AnalysisError("SHOW GRANT ROLE myRole ON SERVER", authDisabledAnalyzer,
+ AnalysisError("SHOW GRANT ROLE myRole ON SERVER", authDisabledCtx,
"Authorization is not enabled.");
}
@@ -95,10 +100,10 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
AnalyzesOk("DROP ROLE MYrole");
AnalysisError("CREATE ROLE MYrole", "Role 'MYrole' already exists.");
- Analyzer authDisabledAnalyzer = createAuthDisabledAnalyzer(Catalog.DEFAULT_DB);
- AnalysisError("DROP ROLE myRole", authDisabledAnalyzer,
+ AnalysisContext authDisabledCtx = createAuthDisabledAnalysisCtx();
+ AnalysisError("DROP ROLE myRole", authDisabledCtx,
"Authorization is not enabled.");
- AnalysisError("CREATE ROLE doesNotExist", authDisabledAnalyzer,
+ AnalysisError("CREATE ROLE doesNotExist", authDisabledCtx,
"Authorization is not enabled.");
}
@@ -111,10 +116,10 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
AnalysisError("REVOKE ROLE doesNotExist FROM GROUP abc",
"Role 'doesNotExist' does not exist.");
- Analyzer authDisabledAnalyzer = createAuthDisabledAnalyzer(Catalog.DEFAULT_DB);
- AnalysisError("GRANT ROLE myrole TO GROUP abc", authDisabledAnalyzer,
+ AnalysisContext authDisabledCtx = createAuthDisabledAnalysisCtx();
+ AnalysisError("GRANT ROLE myrole TO GROUP abc", authDisabledCtx,
"Authorization is not enabled.");
- AnalysisError("REVOKE ROLE myrole FROM GROUP abc", authDisabledAnalyzer,
+ AnalysisError("REVOKE ROLE myrole FROM GROUP abc", authDisabledCtx,
"Authorization is not enabled.");
}
@@ -126,7 +131,7 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
if (isGrant) formatArgs = new String[] {"GRANT", "TO"};
// ALL privileges
AnalyzesOk(String.format("%s ALL ON TABLE alltypes %s myrole", formatArgs),
- createAnalyzer("functional"));
+ createAnalysisCtx("functional"));
AnalyzesOk(String.format("%s ALL ON TABLE functional.alltypes %s myrole",
formatArgs));
AnalyzesOk(String.format("%s ALL ON TABLE functional_kudu.alltypes %s myrole",
@@ -152,7 +157,7 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
// INSERT privilege
AnalyzesOk(String.format("%s INSERT ON TABLE alltypesagg %s myrole", formatArgs),
- createAnalyzer("functional"));
+ createAnalysisCtx("functional"));
AnalyzesOk(String.format(
"%s INSERT ON TABLE functional_kudu.alltypessmall %s myrole", formatArgs));
AnalyzesOk(String.format("%s INSERT ON TABLE functional.alltypesagg %s myrole",
@@ -167,7 +172,7 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
// SELECT privilege
AnalyzesOk(String.format("%s SELECT ON TABLE alltypessmall %s myrole", formatArgs),
- createAnalyzer("functional"));
+ createAnalysisCtx("functional"));
AnalyzesOk(String.format("%s SELECT ON TABLE functional.alltypessmall %s myrole",
formatArgs));
AnalyzesOk(String.format(
@@ -187,7 +192,7 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
"%s myrole", formatArgs));
// SELECT privilege on both regular and partition columns
AnalyzesOk(String.format("%s SELECT (id, int_col, year, month) ON TABLE " +
- "alltypes %s myrole", formatArgs), createAnalyzer("functional"));
+ "alltypes %s myrole", formatArgs), createAnalysisCtx("functional"));
AnalyzesOk(String.format("%s SELECT (id, bool_col) ON TABLE " +
"functional_kudu.alltypessmall %s myrole", formatArgs));
// Empty column list
@@ -216,16 +221,20 @@ public class AnalyzeAuthStmtsTest extends AnalyzerTest {
"exists and that you have permissions to issue a GRANT/REVOKE statement.");
}
- Analyzer authDisabledAnalyzer = createAuthDisabledAnalyzer(Catalog.DEFAULT_DB);
- AnalysisError("GRANT ALL ON SERVER TO myRole", authDisabledAnalyzer,
+ AnalysisContext authDisabledCtx = createAuthDisabledAnalysisCtx();
+ AnalysisError("GRANT ALL ON SERVER TO myRole", authDisabledCtx,
"Authorization is not enabled.");
- AnalysisError("REVOKE ALL ON SERVER FROM myRole", authDisabledAnalyzer,
+ AnalysisError("REVOKE ALL ON SERVER FROM myRole", authDisabledCtx,
"Authorization is not enabled.");
- TQueryCtx queryCtxNoUsername = TestUtils.createQueryContext("default", "");
- Analyzer noUsernameAnalyzer = new Analyzer(catalog_, queryCtxNoUsername,
- AuthorizationConfig.createHadoopGroupAuthConfig("server1", null, null));
- AnalysisError("GRANT ALL ON SERVER TO myRole", noUsernameAnalyzer,
+
+ TQueryCtx noUserNameQueryCtx = TestUtils.createQueryContext(
+ Catalog.DEFAULT_DB, "");
+ EventSequence timeline = new EventSequence("Authorization Test");
+ AnalysisContext noUserNameCtx = new AnalysisContext(noUserNameQueryCtx,
+ AuthorizationConfig.createHadoopGroupAuthConfig("server1", null, null),
+ timeline);
+ AnalysisError("GRANT ALL ON SERVER TO myRole", noUserNameCtx,
"Cannot execute authorization statement with an empty username.");
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/e0c09181/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 80c6916..ee4ed0e 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.impala.catalog.ArrayType;
-import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.ColumnStats;
@@ -366,7 +365,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// Cannot ALTER TABLE a nested collection.
AnalysisError("alter table allcomplextypes.int_array_col " +
"add columns (c1 string comment 'hi')",
- createAnalyzer("functional"),
+ createAnalysisCtx("functional"),
"ALTER TABLE not allowed on a nested collection: allcomplextypes.int_array_col");
// Cannot ALTER TABLE produced by a data source.
AnalysisError("alter table functional.alltypes_datasource " +
@@ -405,7 +404,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"ALTER TABLE not allowed on a view: functional.alltypes_view");
// Cannot ALTER TABLE a nested collection.
AnalysisError("alter table allcomplextypes.int_array_col drop column int_col",
- createAnalyzer("functional"),
+ createAnalysisCtx("functional"),
"ALTER TABLE not allowed on a nested collection: allcomplextypes.int_array_col");
// Cannot ALTER TABLE produced by a data source.
AnalysisError("alter table functional.alltypes_datasource drop column int_col",
@@ -457,7 +456,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// Cannot ALTER TABLE a nested collection.
AnalysisError("alter table allcomplextypes.int_array_col " +
"change column int_col int_col2 int",
- createAnalyzer("functional"),
+ createAnalysisCtx("functional"),
"ALTER TABLE not allowed on a nested collection: allcomplextypes.int_array_col");
// Cannot ALTER TABLE produced by a data source.
AnalysisError("alter table functional.alltypes_datasource " +
@@ -681,7 +680,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"ALTER TABLE not allowed on a view: functional.alltypes_view");
// Cannot ALTER TABLE a nested collection.
AnalysisError("alter table allcomplextypes.int_array_col set fileformat sequencefile",
- createAnalyzer("functional"),
+ createAnalysisCtx("functional"),
"ALTER TABLE not allowed on a nested collection: allcomplextypes.int_array_col");
// Cannot ALTER TABLE produced by a data source.
AnalysisError("alter table functional.alltypes_datasource set fileformat parquet",
@@ -721,7 +720,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
AnalysisError("alter table functional.view_view set cached in 'testPool'",
"ALTER TABLE not allowed on a view: functional.view_view");
AnalysisError("alter table allcomplextypes.int_array_col set cached in 'testPool'",
- createAnalyzer("functional"),
+ createAnalysisCtx("functional"),
"ALTER TABLE not allowed on a nested collection: allcomplextypes.int_array_col");
AnalysisError("alter table functional.alltypes set cached in 'badPool'",
@@ -841,7 +840,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
AnalysisError(
"alter table allcomplextypes.int_array_col " +
"set column stats int_col ('numNulls'='2')",
- createAnalyzer("functional"),
+ createAnalysisCtx("functional"),
"ALTER TABLE not allowed on a nested collection: allcomplextypes.int_array_col");
// Cannot set column stats of partition columns.
AnalysisError(
@@ -1024,7 +1023,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"ALTER TABLE not allowed on a view: functional.alltypes_view");
// Cannot ALTER TABLE a nested collection.
AnalysisError("alter table allcomplextypes.int_array_col rename to new_alltypes",
- createAnalyzer("functional"),
+ createAnalysisCtx("functional"),
"Database does not exist: allcomplextypes");
// It should be okay to rename an HBase table.
@@ -1046,7 +1045,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
AnalysisError("alter table functional.view_view recover partitions",
"ALTER TABLE not allowed on a view: functional.view_view");
AnalysisError("alter table allcomplextypes.int_array_col recover partitions",
- createAnalyzer("functional"),
+ createAnalysisCtx("functional"),
"ALTER TABLE not allowed on a nested collection: allcomplextypes.int_array_col");
AnalysisError("alter table functional_hbase.alltypes recover partitions",
"ALTER TABLE RECOVER PARTITIONS must target an HDFS table: " +
@@ -1176,21 +1175,21 @@ public class AnalyzeDDLTest extends FrontendTestBase {
}
ComputeStatsStmt checkComputeStatsStmt(String stmt) throws AnalysisException {
- return checkComputeStatsStmt(stmt, createAnalyzer(Catalog.DEFAULT_DB));
+ return checkComputeStatsStmt(stmt, createAnalysisCtx());
}
- ComputeStatsStmt checkComputeStatsStmt(String stmt, Analyzer analyzer)
+ ComputeStatsStmt checkComputeStatsStmt(String stmt, AnalysisContext ctx)
throws AnalysisException {
- return checkComputeStatsStmt(stmt, analyzer, null);
+ return checkComputeStatsStmt(stmt, ctx, null);
}
/**
* Analyzes 'stmt' and checks that the table-level and column-level SQL that is used
* to compute the stats is valid. Returns the analyzed statement.
*/
- ComputeStatsStmt checkComputeStatsStmt(String stmt, Analyzer analyzer,
+ ComputeStatsStmt checkComputeStatsStmt(String stmt, AnalysisContext ctx,
String expectedWarning) throws AnalysisException {
- ParseNode parseNode = AnalyzesOk(stmt, analyzer, expectedWarning);
+ ParseNode parseNode = AnalyzesOk(stmt, ctx, expectedWarning);
assertTrue(parseNode instanceof ComputeStatsStmt);
ComputeStatsStmt parsedStmt = (ComputeStatsStmt)parseNode;
AnalyzesOk(parsedStmt.getTblStatsQuery());
@@ -1349,7 +1348,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
queryOpts.compute_stats_min_sample_size == 1024 * 1024 * 1024);
ComputeStatsStmt noSamplingStmt = checkComputeStatsStmt(
"compute stats functional.alltypes tablesample system (10) repeatable(1)",
- createAnalyzer(queryOpts),
+ createAnalysisCtx(queryOpts),
"Ignoring TABLESAMPLE because the effective sampling rate is 100%");
Assert.assertTrue(noSamplingStmt.getEffectiveSamplingPerc() == 1.0);
String tblStatsQuery = noSamplingStmt.getTblStatsQuery().toUpperCase();
@@ -1362,10 +1361,10 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// No minimum sample bytes.
queryOpts.setCompute_stats_min_sample_size(0);
checkComputeStatsStmt("compute stats functional.alltypes tablesample system (10)",
- createAnalyzer(queryOpts));
+ createAnalysisCtx(queryOpts));
checkComputeStatsStmt(
"compute stats functional.alltypes tablesample system (55) repeatable(1)",
- createAnalyzer(queryOpts));
+ createAnalysisCtx(queryOpts));
// Sample is adjusted based on the minimum sample bytes.
// Assumes that functional.alltypes has 24 files of roughly 20KB each.
@@ -1374,7 +1373,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
queryOpts.setCompute_stats_min_sample_size(0);
ComputeStatsStmt baselineStmt = checkComputeStatsStmt(
"compute stats functional.alltypes tablesample system (1) repeatable(1)",
- createAnalyzer(queryOpts));
+ createAnalysisCtx(queryOpts));
// Approximate validation of effective sampling rate.
Assert.assertTrue(baselineStmt.getEffectiveSamplingPerc() > 0.03);
Assert.assertTrue(baselineStmt.getEffectiveSamplingPerc() < 0.05);
@@ -1383,7 +1382,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
queryOpts.setCompute_stats_min_sample_size(100 * 1024);
ComputeStatsStmt adjustedStmt = checkComputeStatsStmt(
"compute stats functional.alltypes tablesample system (1) repeatable(1)",
- createAnalyzer(queryOpts));
+ createAnalysisCtx(queryOpts));
// Approximate validation to avoid flakiness due to sampling and file size
// changes. Expect a sample between 4 and 6 of the 24 total files.
Assert.assertTrue(adjustedStmt.getEffectiveSamplingPerc() >= 4.0 / 24);
@@ -3592,7 +3591,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
addTestTable("create table ambig.ambig (ambig struct<ambig:array<int>>)");
// Single element path can only be resolved as <table>.
DescribeTableStmt describe = (DescribeTableStmt)AnalyzesOk("describe ambig",
- createAnalyzer("ambig"));
+ createAnalysisCtx("ambig"));
TDescribeTableParams tdesc = (TDescribeTableParams) describe.toThrift();
Assert.assertTrue(tdesc.isSetTable_name());
Assert.assertEquals("ambig", tdesc.table_name.getDb_name());
@@ -3600,14 +3599,14 @@ public class AnalyzeDDLTest extends FrontendTestBase {
Assert.assertFalse(tdesc.isSetResult_struct());
// Path could be resolved as either <db>.<table> or <table>.<complex field>
- AnalysisError("describe ambig.ambig", createAnalyzer("ambig"),
+ AnalysisError("describe ambig.ambig", createAnalysisCtx("ambig"),
"Path is ambiguous: 'ambig.ambig'");
// Path could be resolved as either <db>.<table>.<field> or <table>.<field>.<field>
- AnalysisError("describe ambig.ambig.ambig", createAnalyzer("ambig"),
+ AnalysisError("describe ambig.ambig.ambig", createAnalysisCtx("ambig"),
"Path is ambiguous: 'ambig.ambig.ambig'");
// 4 element path can only be resolved to nested array.
describe = (DescribeTableStmt) AnalyzesOk(
- "describe ambig.ambig.ambig.ambig", createAnalyzer("ambig"));
+ "describe ambig.ambig.ambig.ambig", createAnalysisCtx("ambig"));
tdesc = (TDescribeTableParams) describe.toThrift();
Type expectedType =
org.apache.impala.analysis.Path.getTypeAsStruct(new ArrayType(Type.INT));
@@ -3660,7 +3659,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
partition),
"SHOW FILES not applicable to a non hdfs table: functional.alltypes_view");
AnalysisError(String.format("show files in allcomplextypes.int_array_col %s",
- partition), createAnalyzer("functional"),
+ partition), createAnalysisCtx("functional"),
"SHOW FILES not applicable to a non hdfs table: allcomplextypes.int_array_col");
}