You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2018/05/29 19:14:04 UTC

[01/15] impala git commit: IMPALA-4025: Part 1: Generalize and cleanup StmtRewriter

Repository: impala
Updated Branches:
  refs/heads/2.x 228f07722 -> 0e7b07592


http://git-wip-us.apache.org/repos/asf/impala/blob/41d7cd90/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
index 6cfbd20..4206e99 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
@@ -24,8 +24,6 @@ import java.util.List;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
 import org.apache.impala.analysis.UnionStmt.UnionOperand;
 import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.ColumnAliasGenerator;
-import org.apache.impala.common.TableAliasGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,8 +33,9 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 /**
- * Class representing a statement rewriter. A statement rewriter performs subquery
- * unnesting on an analyzed parse tree.
+ * Class representing a statement rewriter. The base class traverses the stmt tree and
+ * the specific rewrite rules are implemented in the subclasses and are called by the
+ * hooks in the base class.
  * TODO: Now that we have a nested-loop join supporting all join modes we could
  * allow more rewrites, although it is not clear we would always want to.
  */
@@ -47,13 +46,12 @@ public class StmtRewriter {
    * Rewrite the statement of an analysis result in-place. Assumes that BetweenPredicates
    * have already been rewritten.
    */
-  public static void rewrite(AnalysisResult analysisResult)
-      throws AnalysisException {
+  public void rewrite(AnalysisResult analysisResult) throws AnalysisException {
     // Analyzed stmt that contains a query statement with subqueries to be rewritten.
     StatementBase stmt = analysisResult.getStmt();
     Preconditions.checkState(stmt.isAnalyzed());
     // Analyzed query statement to be rewritten.
-    QueryStmt queryStmt = null;
+    QueryStmt queryStmt;
     if (stmt instanceof QueryStmt) {
       queryStmt = (QueryStmt) analysisResult.getStmt();
     } else if (stmt instanceof InsertStmt) {
@@ -65,8 +63,7 @@ public class StmtRewriter {
     } else if (analysisResult.isDeleteStmt()) {
       queryStmt = ((DeleteStmt) analysisResult.getStmt()).getQueryStmt();
     } else {
-      throw new AnalysisException("Unsupported statement containing subqueries: " +
-          stmt.toSql());
+      throw new AnalysisException("Unsupported statement: " + stmt.toSql());
     }
     rewriteQueryStatement(queryStmt, queryStmt.getAnalyzer());
   }
@@ -75,45 +72,31 @@ public class StmtRewriter {
    * Calls the appropriate rewrite method based on the specific type of query stmt. See
    * rewriteSelectStatement() and rewriteUnionStatement() documentation.
    */
-  public static void rewriteQueryStatement(QueryStmt stmt, Analyzer analyzer)
+  protected void rewriteQueryStatement(QueryStmt stmt, Analyzer analyzer)
       throws AnalysisException {
     Preconditions.checkNotNull(stmt);
-    Preconditions.checkNotNull(stmt.isAnalyzed());
+    Preconditions.checkState(stmt.isAnalyzed());
     if (stmt instanceof SelectStmt) {
-      rewriteSelectStatement((SelectStmt)stmt, analyzer);
+      rewriteSelectStatement((SelectStmt) stmt, analyzer);
     } else if (stmt instanceof UnionStmt) {
-      rewriteUnionStatement((UnionStmt)stmt, analyzer);
+      rewriteUnionStatement((UnionStmt) stmt);
     } else {
-      throw new AnalysisException("Subqueries not supported for " +
-          stmt.getClass().getSimpleName() + " statements");
+      throw new AnalysisException(
+          "Subqueries not supported for " + stmt.getClass().getSimpleName() +
+              " statements");
     }
   }
 
-  /**
-   * Rewrite all the subqueries of a SelectStmt in place. Subqueries
-   * are currently supported in FROM and WHERE clauses. The rewrite is performed in
-   * place and not in a clone of SelectStmt because it requires the stmt to be analyzed.
-   */
-  private static void rewriteSelectStatement(SelectStmt stmt, Analyzer analyzer)
+  protected void rewriteSelectStatement(SelectStmt stmt, Analyzer analyzer)
       throws AnalysisException {
-    // Rewrite all the subqueries in the FROM clause.
-    for (TableRef tblRef: stmt.fromClause_) {
+    for (TableRef tblRef : stmt.fromClause_) {
       if (!(tblRef instanceof InlineViewRef)) continue;
-      InlineViewRef inlineViewRef = (InlineViewRef)tblRef;
+      InlineViewRef inlineViewRef = (InlineViewRef) tblRef;
       rewriteQueryStatement(inlineViewRef.getViewStmt(), inlineViewRef.getAnalyzer());
     }
-    // Rewrite all the subqueries in the WHERE clause.
-    if (stmt.hasWhereClause()) {
-      // Push negation to leaf operands.
-      stmt.whereClause_ = Expr.pushNegationToOperands(stmt.whereClause_);
-      // Check if we can rewrite the subqueries in the WHERE clause. OR predicates with
-      // subqueries are not supported.
-      if (hasSubqueryInDisjunction(stmt.whereClause_)) {
-        throw new AnalysisException("Subqueries in OR predicates are not supported: " +
-            stmt.whereClause_.toSql());
-      }
-      rewriteWhereClauseSubqueries(stmt, analyzer);
-    }
+    // Currently only SubqueryRewriter touches the where clause. Recurse into the where
+    // clause when the need arises.
+    rewriteSelectStmtHook(stmt, analyzer);
     stmt.sqlString_ = null;
     if (LOG.isTraceEnabled()) LOG.trace("rewritten stmt: " + stmt.toSql());
   }
@@ -122,954 +105,987 @@ public class StmtRewriter {
    * Rewrite all operands in a UNION. The conditions that apply to SelectStmt rewriting
    * also apply here.
    */
-  private static void rewriteUnionStatement(UnionStmt stmt, Analyzer analyzer)
-      throws AnalysisException {
-    for (UnionOperand operand: stmt.getOperands()) {
+  private void rewriteUnionStatement(UnionStmt stmt) throws AnalysisException {
+    for (UnionOperand operand : stmt.getOperands()) {
       Preconditions.checkState(operand.getQueryStmt() instanceof SelectStmt);
-      StmtRewriter.rewriteSelectStatement(
-          (SelectStmt)operand.getQueryStmt(), operand.getAnalyzer());
+      rewriteSelectStatement((SelectStmt) operand.getQueryStmt(), operand.getAnalyzer());
     }
   }
 
-  /**
-   * Returns true if the Expr tree rooted at 'expr' has at least one subquery
-   * that participates in a disjunction.
-   */
-  private static boolean hasSubqueryInDisjunction(Expr expr) {
-    if (!(expr instanceof CompoundPredicate)) return false;
-    if (Expr.IS_OR_PREDICATE.apply(expr)) {
-      return expr.contains(Subquery.class);
-    }
-    for (Expr child: expr.getChildren()) {
-      if (hasSubqueryInDisjunction(child)) return true;
-    }
-    return false;
-  }
-
-  /**
-   * Rewrite all subqueries of a stmt's WHERE clause. Initially, all the
-   * conjuncts containing subqueries are extracted from the WHERE clause and are
-   * replaced with true BoolLiterals. Subsequently, each extracted conjunct is
-   * merged into its parent select block by converting it into a join.
-   * Conjuncts with subqueries that themselves contain conjuncts with subqueries are
-   * recursively rewritten in a bottom up fashion.
-   *
-   * The following example illustrates the bottom up rewriting of nested queries.
-   * Suppose we have the following three level nested query Q0:
-   *
-   * SELECT *
-   * FROM T1                                            : Q0
-   * WHERE T1.a IN (SELECT a
-   *                FROM T2 WHERE T2.b IN (SELECT b
-   *                                       FROM T3))
-   * AND T1.c < 10;
-   *
-   * This query will be rewritten as follows. Initially, the IN predicate
-   * T1.a IN (SELECT a FROM T2 WHERE T2.b IN (SELECT b FROM T3)) is extracted
-   * from the top level block (Q0) since it contains a subquery and is
-   * replaced by a true BoolLiteral, resulting in the following query Q1:
-   *
-   * SELECT * FROM T1 WHERE TRUE : Q1
-   *
-   * Since the stmt in the extracted predicate contains a conjunct with a subquery,
-   * it is also rewritten. As before, rewriting stmt SELECT a FROM T2
-   * WHERE T2.b IN (SELECT b FROM T3) works by first extracting the conjunct that
-   * contains the subquery (T2.b IN (SELECT b FROM T3)) and substituting it with
-   * a true BoolLiteral, producing the following stmt Q2:
-   *
-   * SELECT a FROM T2 WHERE TRUE : Q2
-   *
-   * The predicate T2.b IN (SELECT b FROM T3) is then merged with Q2,
-   * producing the following unnested query Q3:
-   *
-   * SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b : Q3
-   *
-   * The extracted IN predicate becomes:
-   *
-   * T1.a IN (SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b)
-   *
-   * Finally, the rewritten IN predicate is merged with query block Q1,
-   * producing the following unnested query (WHERE clauses that contain only
-   * conjunctions of true BoolLiterals are eliminated):
-   *
-   * SELECT *
-   * FROM T1 LEFT SEMI JOIN (SELECT a
-   *                         FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1
-   *                         ON T2.b = $a$1.b) $a$1
-   * ON $a$1.a = T1.a
-   * WHERE T1.c < 10;
-   *
-   */
-  private static void rewriteWhereClauseSubqueries(SelectStmt stmt, Analyzer analyzer)
-     throws AnalysisException {
-    int numTableRefs = stmt.fromClause_.size();
-    ArrayList<Expr> exprsWithSubqueries = Lists.newArrayList();
-    ExprSubstitutionMap smap = new ExprSubstitutionMap();
-    // Check if all the conjuncts in the WHERE clause that contain subqueries
-    // can currently be rewritten as a join.
-    for (Expr conjunct: stmt.whereClause_.getConjuncts()) {
-      List<Subquery> subqueries = Lists.newArrayList();
-      conjunct.collectAll(Predicates.instanceOf(Subquery.class), subqueries);
-      if (subqueries.size() == 0) continue;
-      if (subqueries.size() > 1) {
-        throw new AnalysisException("Multiple subqueries are not supported in " +
-            "expression: " + conjunct.toSql());
+  protected void rewriteSelectStmtHook(SelectStmt stmt, Analyzer analyzer)
+      throws AnalysisException {}
+
+  static class SubqueryRewriter extends StmtRewriter {
+    /**
+     * Returns true if the Expr tree rooted at 'expr' has at least one subquery
+     * that participates in a disjunction.
+     */
+    private static boolean hasSubqueryInDisjunction(Expr expr) {
+      if (!(expr instanceof CompoundPredicate)) return false;
+      if (Expr.IS_OR_PREDICATE.apply(expr)) {
+        return expr.contains(Subquery.class);
       }
-      if (!(conjunct instanceof InPredicate) && !(conjunct instanceof ExistsPredicate) &&
-          !(conjunct instanceof BinaryPredicate) &&
-          !conjunct.getSubquery().getType().isScalarType()) {
-        throw new AnalysisException("Non-scalar subquery is not supported in " +
-            "expression: " + conjunct.toSql());
+      for (Expr child : expr.getChildren()) {
+        if (hasSubqueryInDisjunction(child)) return true;
       }
+      return false;
+    }
 
-      Expr rewrittenConjunct = conjunct;
-      if (conjunct instanceof InPredicate && conjunct.getChild(0).isConstant()) {
-        Expr newConjunct = rewriteInConstant(stmt, (InPredicate)conjunct);
-        if (newConjunct != null) {
-          newConjunct.analyze(analyzer);
-          rewrittenConjunct = newConjunct;
-        }
+    /**
+     * Replace an ExistsPredicate that contains a subquery with a BoolLiteral if we
+     * can determine its result without evaluating it. Return null if the result of the
+     * ExistsPredicate can only be determined at run-time.
+     */
+    private static BoolLiteral replaceExistsPredicate(ExistsPredicate predicate) {
+      Subquery subquery = predicate.getSubquery();
+      Preconditions.checkNotNull(subquery);
+      SelectStmt subqueryStmt = (SelectStmt) subquery.getStatement();
+      BoolLiteral boolLiteral = null;
+      if (subqueryStmt.getAnalyzer().hasEmptyResultSet()) {
+        boolLiteral = new BoolLiteral(predicate.isNotExists());
+      } else if (subqueryStmt.hasAggInfo() &&
+          subqueryStmt.getAggInfo().hasAggregateExprs() &&
+          !subqueryStmt.hasAnalyticInfo() && subqueryStmt.getHavingPred() == null) {
+        boolLiteral = new BoolLiteral(!predicate.isNotExists());
       }
+      return boolLiteral;
+    }
 
-      if (rewrittenConjunct instanceof ExistsPredicate) {
-        // Check if we can determine the result of an ExistsPredicate during analysis.
-        // If so, replace the predicate with a BoolLiteral predicate and remove it from
-        // the list of predicates to be rewritten.
-        BoolLiteral boolLiteral =
-            replaceExistsPredicate((ExistsPredicate) rewrittenConjunct);
-        if (boolLiteral != null) {
-          boolLiteral.analyze(analyzer);
-          smap.put(conjunct, boolLiteral);
-          continue;
-        }
-      }
 
-      // Replace all the supported exprs with subqueries with true BoolLiterals
-      // using an smap.
-      BoolLiteral boolLiteral = new BoolLiteral(true);
-      boolLiteral.analyze(analyzer);
-      smap.put(conjunct, boolLiteral);
-      exprsWithSubqueries.add(rewrittenConjunct);
-    }
-    stmt.whereClause_ = stmt.whereClause_.substitute(smap, analyzer, false);
-
-    boolean hasNewVisibleTuple = false;
-    // Recursively rewrite all the exprs that contain subqueries and merge them
-    // with 'stmt'.
-    for (Expr expr: exprsWithSubqueries) {
-      if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer)) {
-        hasNewVisibleTuple = true;
+    /**
+     * Rewrites [NOT] IN predicate when the LHS is a constant and RHS is a subquery.
+     * If 'inPred' is not rewritten, null is returned. If 'inPred' is rewritten, the
+     * resulting expression is not analyzed (caller must analyze). 'outerBlock' is the
+     * parent block of 'inPred'.
+     *
+     * Example: SELECT * FROM t WHERE 1 IN (SELECT id FROM s)
+     *
+     * The rewrite transforms 'inPred' using the following cases. C refers to the LHS
+     * constant and RHS is the subquery. All cases apply to both correlated and
+     * uncorrelated subqueries.
+     *
+     * 1) Predicate is IN: No rewrite since it can be evaluated using the existing
+     *                     NestedLoop based Left Semijoin.
+     *
+     * 2) Predicate is NOT IN and RHS returns a single row.
+     *
+     *    Example: 10 NOT IN (SELECT 1)
+     *    Example: 10 NOT IN (SELECT MAX(b) FROM t)
+     *    Example: 10 NOT IN (SELECT x FROM t LIMIT 1)
+     *
+     *    REWRITE: C NOT IN RHS: => C != (RHS)
+     *
+     * 3) Predicate is NOT IN and RHS returns multiple rows.
+     *
+     *    Example: SELECT * FROM t WHERE 1 NOT IN (SELECT id FROM s)
+     *
+     *    Assume RHS is of the form SELECT expr FROM T WHERE ...
+     *
+     *    REWRITE:
+     *     C NOT IN (RHS)
+     *       Rewrites to:
+     *     NOT EXISTS (SELECT x FROM (SELECT x FROM RHS) tmp
+     *                 WHERE C IS NULL OR tmp.x IS NULL OR tmp.x = C)
+     *
+     *    Example:
+     *     ... 10 NOT IN (SELECT x FROM t WHERE t.y > 3)
+     *       Rewrites to:
+     *     ... NOT EXISTS (SELECT x (SELECT x FROM t WHERE t.y > 3) tmp
+     *                     WHERE 10 IS NULL OR tmp.x IS NULL OR tmp.x = 10)
+     *
+     *    The rewrite wraps the RHS subquery in an inline view and filters it with a
+     *    condition using the LHS constant. The inline view ensures that the filter is
+     *    logically evaluated over the RHS result. Alternatively, injecting the filter
+     *    into the RHS is generally incorrect so requires push-down analysis to preserve
+     *    correctness (consider cases such as limit, aggregation, and analytic functions).
+     *    Such special cases are avoided here by using the inline view.
+     *    TODO: Correlated NOT IN subqueries require that column resolution be extended to
+     *    handle references to an outer block that is more than one nesting level away.
+     *
+     *    The filter constructed from the LHS constant is subtle, so warrants further
+     *    explanation. Consider the cases where the LHS is NULL vs. NOT NULL and the RHS
+     *    is empty vs. not-empty. When RHS subquery evaluates to the empty result set, the
+     *    NOT EXISTS passes for all LHS values. When the RHS subquery is not-empty, it is
+     *    useful to think of C NOT IN (RHS) as the boolean expansion:
+     *          C != x_1 & C != x_2 & C != x_3 & ... where each x_i is bound to a result
+     *          from the RHS subquery.
+     *
+     *    So, if C is equal to any x_i, the expression is false. Similarly, if any
+     *    x_i is null or if C is null, then the overall expression also is false.
+     */
+    private static Expr rewriteInConstant(SelectStmt outerBlock, InPredicate inPred) {
+      Expr lhs = inPred.getChild(0);
+      Preconditions.checkArgument(lhs.isConstant());
+
+      Expr rhs = inPred.getChild(1);
+      QueryStmt subquery = inPred.getSubquery().getStatement();
+      Preconditions.checkState(subquery instanceof SelectStmt);
+      SelectStmt rhsQuery = (SelectStmt) subquery;
+
+      // CASE 1, IN:
+      if (!inPred.isNotIn()) return null;
+
+      // CASE 2, NOT IN and RHS returns a single row:
+      if (rhsQuery.returnsSingleRow()) {
+        return new BinaryPredicate(BinaryPredicate.Operator.NE, lhs, rhs);
       }
-    }
-    if (canEliminate(stmt.whereClause_)) stmt.whereClause_ = null;
-    if (hasNewVisibleTuple) replaceUnqualifiedStarItems(stmt, numTableRefs);
-  }
 
-  /**
-   * Replace an ExistsPredicate that contains a subquery with a BoolLiteral if we
-   * can determine its result without evaluating it. Return null if the result of the
-   * ExistsPredicate can only be determined at run-time.
-   */
-  private static BoolLiteral replaceExistsPredicate(ExistsPredicate predicate) {
-    Subquery subquery = predicate.getSubquery();
-    Preconditions.checkNotNull(subquery);
-    SelectStmt subqueryStmt = (SelectStmt) subquery.getStatement();
-    BoolLiteral boolLiteral = null;
-    if (subqueryStmt.getAnalyzer().hasEmptyResultSet()) {
-      boolLiteral = new BoolLiteral(predicate.isNotExists());
-    } else if (subqueryStmt.hasAggInfo() && subqueryStmt.getAggInfo().hasAggregateExprs()
-          && !subqueryStmt.hasAnalyticInfo() && subqueryStmt.getHavingPred() == null) {
-      boolLiteral = new BoolLiteral(!predicate.isNotExists());
+      // CASE 3, NOT IN, RHS returns multiple rows.
+      Preconditions.checkState(rhsQuery.getResultExprs().size() == 1);
+      // Do not rewrite NOT IN when the RHS is correlated.
+      if (isCorrelated(rhsQuery)) return null;
+
+      // Wrap RHS in an inline view: (select wrapperColumnAlias from RHS) wrapperTableAlias.
+      // Use outerBlock (parent block of subquery) to generate aliases. Doing so guarantees
+      // that the wrapper view does not produce the same alias if further rewritten.
+      String wrapperTableAlias = outerBlock.getTableAliasGenerator().getNextAlias();
+      String wrapperColumnAlias = outerBlock.getColumnAliasGenerator().getNextAlias();
+      InlineViewRef wrapperView = new InlineViewRef(wrapperTableAlias, rhsQuery,
+          Lists.newArrayList(wrapperColumnAlias));
+      SlotRef wrapperResult =
+          new SlotRef(Lists.newArrayList(wrapperTableAlias, wrapperColumnAlias));
+
+      // Build: lhs IS NULL OR rhsResultExpr IS NULL OR lhs = rhs
+      Expr rewritePredicate = new CompoundPredicate(CompoundPredicate.Operator.OR,
+          new IsNullPredicate(lhs, false),
+          new CompoundPredicate(CompoundPredicate.Operator.OR,
+              new IsNullPredicate(wrapperResult, false),
+              new BinaryPredicate(BinaryPredicate.Operator.EQ, wrapperResult, lhs)));
+
+      List<TableRef> fromList = Lists.newArrayList();
+      fromList.add(wrapperView);
+      SelectStmt rewriteQuery = new SelectStmt(
+          new SelectList(Lists.newArrayList(new SelectListItem(wrapperResult, null))),
+          new FromClause(fromList), rewritePredicate, null, null, null, null);
+      Subquery newSubquery = new Subquery(rewriteQuery);
+      rhsQuery.reset();
+
+      // Build: NOT EXISTS(newSubquery)
+      return new ExistsPredicate(newSubquery, true);
     }
-    return boolLiteral;
-  }
-
-  /**
-   * Rewrites [NOT] IN predicate when the LHS is a constant and RHS is a subquery.
-   * If 'inPred' is not rewritten, null is returned. If 'inPred' is rewritten, the
-   * resulting expression is not analyzed (caller must analyze). 'outerBlock' is the
-   * parent block of 'inPred'.
-   *
-   * Example: SELECT * FROM t WHERE 1 IN (SELECT id FROM s)
-   *
-   * The rewrite transforms 'inPred' using the following cases. C refers to the LHS
-   * constant and RHS is the subquery. All cases apply to both correlated and
-   * uncorrelated subqueries.
-   *
-   * 1) Predicate is IN: No rewrite since it can be evaluated using the existing
-   *                     NestedLoop based Left Semijoin.
-   *
-   * 2) Predicate is NOT IN and RHS returns a single row.
-   *
-   *    Example: 10 NOT IN (SELECT 1)
-   *    Example: 10 NOT IN (SELECT MAX(b) FROM t)
-   *    Example: 10 NOT IN (SELECT x FROM t LIMIT 1)
-   *
-   *    REWRITE: C NOT IN RHS: => C != (RHS)
-   *
-   * 3) Predicate is NOT IN and RHS returns multiple rows.
-   *
-   *    Example: SELECT * FROM t WHERE 1 NOT IN (SELECT id FROM s)
-   *
-   *    Assume RHS is of the form SELECT expr FROM T WHERE ...
-   *
-   *    REWRITE:
-   *     C NOT IN (RHS)
-   *       Rewrites to:
-   *     NOT EXISTS (SELECT x FROM (SELECT x FROM RHS) tmp
-   *                 WHERE C IS NULL OR tmp.x IS NULL OR tmp.x = C)
-   *
-   *    Example:
-   *     ... 10 NOT IN (SELECT x FROM t WHERE t.y > 3)
-   *       Rewrites to:
-   *     ... NOT EXISTS (SELECT x (SELECT x FROM t WHERE t.y > 3) tmp
-   *                     WHERE 10 IS NULL OR tmp.x IS NULL OR tmp.x = 10)
-   *
-   *    The rewrite wraps the RHS subquery in an inline view and filters it with a
-   *    condition using the LHS constant. The inline view ensures that the filter is
-   *    logically evaluated over the RHS result. Alternatively, injecting the filter into
-   *    the RHS is generally incorrect so requires push-down analysis to preserve
-   *    correctness (consider cases such as limit, aggregation, and analytic functions).
-   *    Such special cases are avoided here by using the inline view.
-   *    TODO: Correlated NOT IN subqueries require that column resolution be extended to
-   *    handle references to an outer block that is more than one nesting level away.
-   *
-   *    The filter constructed from the LHS constant is subtle, so warrants further
-   *    explanation. Consider the cases where the LHS is NULL vs. NOT NULL and the RHS
-   *    is empty vs. not-empty. When RHS subquery evaluates to the empty result set, the
-   *    NOT EXISTS passes for all LHS values. When the RHS subquery is not-empty, it is
-   *    useful to think of C NOT IN (RHS) as the boolean expansion:
-   *          C != x_1 & C != x_2 & C != x_3 & ... where each x_i is bound to a result
-   *          from the RHS subquery.
-   *
-   *    So, if C is equal to any x_i, the expression is false. Similarly, if any
-   *    x_i is null or if C is null, then the overall expression also is false.
-   */
-  private static Expr rewriteInConstant(SelectStmt outerBlock,
-      InPredicate inPred) throws AnalysisException {
-    Expr lhs = inPred.getChild(0);
-    Preconditions.checkArgument(lhs.isConstant());
-
-    Expr rhs = inPred.getChild(1);
-    QueryStmt subquery = inPred.getSubquery().getStatement();
-    Preconditions.checkState(subquery instanceof SelectStmt);
-    SelectStmt rhsQuery = (SelectStmt) subquery;
-
-    // CASE 1, IN:
-    if (!inPred.isNotIn()) return null;
-
-    // CASE 2, NOT IN and RHS returns a single row:
-    if (rhsQuery.returnsSingleRow()) {
-      return new BinaryPredicate(BinaryPredicate.Operator.NE, lhs, rhs);
-    }
-
-    // CASE 3, NOT IN, RHS returns multiple rows.
-    Preconditions.checkState(rhsQuery.getResultExprs().size() == 1);
-    // Do not rewrite NOT IN when the RHS is correlated.
-    if (isCorrelated(rhsQuery)) return null;
-
-    // Wrap RHS in an inline view: (select wrapperColumnAlias from RHS) wrapperTableAlias.
-    // Use outerBlock (parent block of subquery) to generate aliases. Doing so guarantees
-    // that the wrapper view does not produce the same alias if further rewritten.
-    String wrapperTableAlias = outerBlock.getTableAliasGenerator().getNextAlias();
-    String wrapperColumnAlias = outerBlock.getColumnAliasGenerator().getNextAlias();
-    InlineViewRef wrapperView = new InlineViewRef(wrapperTableAlias, rhsQuery,
-        Lists.newArrayList(wrapperColumnAlias));
-    SlotRef wrapperResult = new SlotRef(
-        Lists.newArrayList(wrapperTableAlias, wrapperColumnAlias));
-
-    // Build: lhs IS NULL OR rhsResultExpr IS NULL OR lhs = rhs
-    Expr rewritePredicate = new CompoundPredicate(CompoundPredicate.Operator.OR,
-        new IsNullPredicate(lhs, false),
-        new CompoundPredicate(CompoundPredicate.Operator.OR,
-            new IsNullPredicate(wrapperResult, false),
-            new BinaryPredicate(BinaryPredicate.Operator.EQ, wrapperResult, lhs)));
-
-    List<TableRef> fromList = Lists.newArrayList();
-    fromList.add(wrapperView);
-    SelectStmt rewriteQuery = new SelectStmt(
-        new SelectList(Lists.newArrayList(new SelectListItem(wrapperResult, null))),
-        new FromClause(fromList),
-        rewritePredicate,
-        null, null, null, null);
-    Subquery newSubquery = new Subquery(rewriteQuery);
-    rhsQuery.reset();
-
-    // Build: NOT EXISTS(newSubquery)
-    return new ExistsPredicate(newSubquery, true);
-  }
-
-  /**
-   * Tests if a subquery is correlated to its outer block.
-   */
-  private static boolean isCorrelated(SelectStmt subqueryStmt) {
-    if (!subqueryStmt.hasWhereClause()) return false;
-    return containsCorrelatedPredicate(subqueryStmt.getWhereClause(),
-        subqueryStmt.getTableRefIds());
-  }
 
-  /**
-   * Modifies in place an expr that contains a subquery by rewriting its
-   * subquery stmt. The modified analyzed expr is returned.
-   */
-  private static Expr rewriteExpr(Expr expr, Analyzer analyzer)
-      throws AnalysisException {
-    // Extract the subquery and rewrite it.
-    Subquery subquery = expr.getSubquery();
-    Preconditions.checkNotNull(subquery);
-    rewriteSelectStatement((SelectStmt) subquery.getStatement(), subquery.getAnalyzer());
-    // Create a new Subquery with the rewritten stmt and use a substitution map
-    // to replace the original subquery from the expr.
-    QueryStmt rewrittenStmt = subquery.getStatement().clone();
-    rewrittenStmt.reset();
-    Subquery newSubquery = new Subquery(rewrittenStmt);
-    newSubquery.analyze(analyzer);
-    ExprSubstitutionMap smap = new ExprSubstitutionMap();
-    smap.put(subquery, newSubquery);
-    return expr.substitute(smap, analyzer, false);
-  }
-
-  /**
-   * Merge an expr containing a subquery with a SelectStmt 'stmt' by
-   * converting the subquery stmt of the former into an inline view and
-   * creating a join between the new inline view and the right-most table
-   * from 'stmt'. Return true if the rewrite introduced a new visible tuple
-   * due to a CROSS JOIN or a LEFT OUTER JOIN.
-   *
-   * This process works as follows:
-   * 1. Create a new inline view with the subquery as the view's stmt. Changes
-   *    made to the subquery's stmt will affect the inline view.
-   * 2. Extract all correlated predicates from the subquery's WHERE
-   *    clause; the subquery's select list may be extended with new items and a
-   *    GROUP BY clause may be added.
-   * 3. Add the inline view to stmt's tableRefs and create a
-   *    join (left semi join, anti-join, left outer join for agg functions
-   *    that return a non-NULL value for an empty input, or cross-join) with
-   *    stmt's right-most table.
-   * 4. Initialize the ON clause of the new join from the original subquery
-   *    predicate and the new inline view.
-   * 5. Apply expr substitutions such that the extracted correlated predicates
-   *    refer to columns of the new inline view.
-   * 6. Add all extracted correlated predicates to the ON clause.
-   */
-  private static boolean mergeExpr(SelectStmt stmt, Expr expr,
-      Analyzer analyzer) throws AnalysisException {
-    Preconditions.checkNotNull(expr);
-    Preconditions.checkNotNull(analyzer);
-    boolean updateSelectList = false;
-    SelectStmt subqueryStmt = (SelectStmt)expr.getSubquery().getStatement();
-    boolean isScalarSubquery = expr.getSubquery().isScalarSubquery();
-    boolean isRuntimeScalar = subqueryStmt.isRuntimeScalar();
-    // Create a new inline view from the subquery stmt. The inline view will be added
-    // to the stmt's table refs later. Explicitly set the inline view's column labels
-    // to eliminate any chance that column aliases from the parent query could reference
-    // select items from the inline view after the rewrite.
-    List<String> colLabels = Lists.newArrayList();
-    for (int i = 0; i < subqueryStmt.getColLabels().size(); ++i) {
-      colLabels.add(subqueryStmt.getColumnAliasGenerator().getNextAlias());
-    }
-    InlineViewRef inlineView = new InlineViewRef(
-        stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt, colLabels);
-
-    // Extract all correlated predicates from the subquery.
-    List<Expr> onClauseConjuncts = extractCorrelatedPredicates(subqueryStmt);
-    if (!onClauseConjuncts.isEmpty()) {
-      validateCorrelatedSubqueryStmt(expr);
-      // For correlated subqueries, a LIMIT clause has no effect on the results, so we can
-      // safely remove it.
-      subqueryStmt.limitElement_ = new LimitElement(null, null);
-    }
-    // If runtime scalar, we need to prevent the propagation of predicates into the
-    // inline view by setting a limit on the statement.
-    if (isRuntimeScalar) subqueryStmt.setLimit(2);
-
-    // Update the subquery's select list and/or its GROUP BY clause by adding
-    // exprs from the extracted correlated predicates.
-    boolean updateGroupBy = isScalarSubquery
-        || (expr instanceof ExistsPredicate
-            && !subqueryStmt.getSelectList().isDistinct()
-            && subqueryStmt.hasAggInfo());
-    List<Expr> lhsExprs = Lists.newArrayList();
-    List<Expr> rhsExprs = Lists.newArrayList();
-    for (Expr conjunct: onClauseConjuncts) {
-      updateInlineView(inlineView, conjunct, stmt.getTableRefIds(),
-          lhsExprs, rhsExprs, updateGroupBy);
+    /**
+     * Tests if a subquery is correlated to its outer block.
+     */
+    private static boolean isCorrelated(SelectStmt subqueryStmt) {
+      if (!subqueryStmt.hasWhereClause()) return false;
+      return containsCorrelatedPredicate(subqueryStmt.getWhereClause(),
+          subqueryStmt.getTableRefIds());
     }
 
-    // Analyzing the inline view triggers reanalysis of the subquery's select statement.
-    // However the statement is already analyzed and since statement analysis is not
-    // idempotent, the analysis needs to be reset.
-    inlineView.reset();
-    inlineView.analyze(analyzer);
-    inlineView.setLeftTblRef(stmt.fromClause_.get(stmt.fromClause_.size() - 1));
-    stmt.fromClause_.add(inlineView);
-    JoinOperator joinOp = JoinOperator.LEFT_SEMI_JOIN;
-
-    // Create a join conjunct from the expr that contains a subquery.
-    Expr joinConjunct = createJoinConjunct(expr, inlineView, analyzer,
-        !onClauseConjuncts.isEmpty());
-    if (joinConjunct != null) {
-      SelectListItem firstItem =
-          ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
-      if (!onClauseConjuncts.isEmpty() &&
-          firstItem.getExpr() != null &&
-          firstItem.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
-        // Correlated subqueries with an aggregate function that returns non-null on
-        // an empty input are rewritten using a LEFT OUTER JOIN because we
-        // need to ensure that there is one agg value for every tuple of 'stmt'
-        // (parent select block), even for those tuples of 'stmt' that get rejected
-        // by the subquery due to some predicate. The new join conjunct is added to
-        // stmt's WHERE clause because it needs to be applied to the result of the
-        // LEFT OUTER JOIN (both matched and unmatched tuples).
-        //
-        // TODO Handle other aggregate functions and UDAs that return a non-NULL value
-        // on an empty set.
-        // TODO Handle count aggregate functions in an expression in subqueries
-        // select list.
-        stmt.whereClause_ =
-            CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause_);
-        joinConjunct = null;
-        joinOp = JoinOperator.LEFT_OUTER_JOIN;
-        updateSelectList = true;
+    /**
+     * Merge an expr containing a subquery with a SelectStmt 'stmt' by
+     * converting the subquery stmt of the former into an inline view and
+     * creating a join between the new inline view and the right-most table
+     * from 'stmt'. Return true if the rewrite introduced a new visible tuple
+     * due to a CROSS JOIN or a LEFT OUTER JOIN.
+     * <p>
+     * This process works as follows:
+     * 1. Create a new inline view with the subquery as the view's stmt. Changes
+     * made to the subquery's stmt will affect the inline view.
+     * 2. Extract all correlated predicates from the subquery's WHERE
+     * clause; the subquery's select list may be extended with new items and a
+     * GROUP BY clause may be added.
+     * 3. Add the inline view to stmt's tableRefs and create a
+     * join (left semi join, anti-join, left outer join for agg functions
+     * that return a non-NULL value for an empty input, or cross-join) with
+     * stmt's right-most table.
+     * 4. Initialize the ON clause of the new join from the original subquery
+     * predicate and the new inline view.
+     * 5. Apply expr substitutions such that the extracted correlated predicates
+     * refer to columns of the new inline view.
+     * 6. Add all extracted correlated predicates to the ON clause.
+     */
+    private static boolean mergeExpr(SelectStmt stmt, Expr expr, Analyzer analyzer)
+        throws AnalysisException {
+      Preconditions.checkNotNull(expr);
+      Preconditions.checkNotNull(analyzer);
+      boolean updateSelectList = false;
+      SelectStmt subqueryStmt = (SelectStmt) expr.getSubquery().getStatement();
+      boolean isScalarSubquery = expr.getSubquery().isScalarSubquery();
+      boolean isRuntimeScalar = subqueryStmt.isRuntimeScalar();
+      // Create a new inline view from the subquery stmt. The inline view will be added
+      // to the stmt's table refs later. Explicitly set the inline view's column labels
+      // to eliminate any chance that column aliases from the parent query could reference
+      // select items from the inline view after the rewrite.
+      List<String> colLabels = Lists.newArrayList();
+      for (int i = 0; i < subqueryStmt.getColLabels().size(); ++i) {
+        colLabels.add(subqueryStmt.getColumnAliasGenerator().getNextAlias());
+      }
+      InlineViewRef inlineView =
+          new InlineViewRef(stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt,
+              colLabels);
+
+      // Extract all correlated predicates from the subquery.
+      List<Expr> onClauseConjuncts = extractCorrelatedPredicates(subqueryStmt);
+      if (!onClauseConjuncts.isEmpty()) {
+        validateCorrelatedSubqueryStmt(expr);
+        // For correlated subqueries, a LIMIT clause has no effect on the results, so we can
+        // safely remove it.
+        subqueryStmt.limitElement_ = new LimitElement(null, null);
+      }
+      // If runtime scalar, we need to prevent the propagation of predicates into the
+      // inline view by setting a limit on the statement.
+      if (isRuntimeScalar) subqueryStmt.setLimit(2);
+
+      // Update the subquery's select list and/or its GROUP BY clause by adding
+      // exprs from the extracted correlated predicates.
+      boolean updateGroupBy = isScalarSubquery || (expr instanceof ExistsPredicate &&
+          !subqueryStmt.getSelectList().isDistinct() && subqueryStmt.hasAggInfo());
+      List<Expr> lhsExprs = Lists.newArrayList();
+      List<Expr> rhsExprs = Lists.newArrayList();
+      for (Expr conjunct : onClauseConjuncts) {
+        updateInlineView(inlineView, conjunct, stmt.getTableRefIds(), lhsExprs, rhsExprs,
+            updateGroupBy);
       }
 
-      if (joinConjunct != null) onClauseConjuncts.add(joinConjunct);
-    }
-
-    // Ensure that all the extracted correlated predicates can be added to the ON-clause
-    // of the generated join.
-    if (!onClauseConjuncts.isEmpty()) {
-      validateCorrelatedPredicates(expr, inlineView, onClauseConjuncts);
-    }
+      // Analyzing the inline view triggers reanalysis of the subquery's select statement.
+      // However the statement is already analyzed and since statement analysis is not
+      // idempotent, the analysis needs to be reset.
+      inlineView.reset();
+      inlineView.analyze(analyzer);
+      inlineView.setLeftTblRef(stmt.fromClause_.get(stmt.fromClause_.size() - 1));
+      stmt.fromClause_.add(inlineView);
+      JoinOperator joinOp = JoinOperator.LEFT_SEMI_JOIN;
+
+      // Create a join conjunct from the expr that contains a subquery.
+      Expr joinConjunct =
+          createJoinConjunct(expr, inlineView, analyzer, !onClauseConjuncts.isEmpty());
+      if (joinConjunct != null) {
+        SelectListItem firstItem =
+            ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
+        if (!onClauseConjuncts.isEmpty() && firstItem.getExpr() != null &&
+            firstItem.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
+          // Correlated subqueries with an aggregate function that returns non-null on
+          // an empty input are rewritten using a LEFT OUTER JOIN because we
+          // need to ensure that there is one agg value for every tuple of 'stmt'
+          // (parent select block), even for those tuples of 'stmt' that get rejected
+          // by the subquery due to some predicate. The new join conjunct is added to
+          // stmt's WHERE clause because it needs to be applied to the result of the
+          // LEFT OUTER JOIN (both matched and unmatched tuples).
+          //
+          // TODO Handle other aggregate functions and UDAs that return a non-NULL value
+          // on an empty set.
+          // TODO Handle count aggregate functions in an expression in subqueries
+          // select list.
+          stmt.whereClause_ =
+              CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause_);
+          joinConjunct = null;
+          joinOp = JoinOperator.LEFT_OUTER_JOIN;
+          updateSelectList = true;
+        }
 
-    // Create the ON clause from the extracted correlated predicates.
-    Expr onClausePredicate =
-        CompoundPredicate.createConjunctivePredicate(onClauseConjuncts);
-
-    if (onClausePredicate == null) {
-      Preconditions.checkState(expr instanceof ExistsPredicate);
-      ExistsPredicate existsPred = (ExistsPredicate) expr;
-      // TODO This is very expensive if uncorrelated. Remove it when we implement
-      // independent subquery evaluation.
-      if (existsPred.isNotExists()) {
-        inlineView.setJoinOp(JoinOperator.LEFT_ANTI_JOIN);
-      } else {
-        inlineView.setJoinOp(JoinOperator.LEFT_SEMI_JOIN);
-      }
-      // Note that the concept of a 'correlated inline view' is similar but not the same
-      // as a 'correlated subquery', i.e., a subquery with a correlated predicate.
-      if (!inlineView.isCorrelated()) {
-        // For uncorrelated subqueries, we limit the number of rows returned by the
-        // subquery.
-        subqueryStmt.setLimit(1);
-        inlineView.setOnClause(new BoolLiteral(true));
+        if (joinConjunct != null) onClauseConjuncts.add(joinConjunct);
       }
-      return false;
-    }
 
-    // Create an smap from the original select-list exprs of the select list to
-    // the corresponding inline-view columns.
-    ExprSubstitutionMap smap = new ExprSubstitutionMap();
-    Preconditions.checkState(lhsExprs.size() == rhsExprs.size());
-    for (int i = 0; i < lhsExprs.size(); ++i) {
-      Expr lhsExpr = lhsExprs.get(i);
-      Expr rhsExpr = rhsExprs.get(i);
-      rhsExpr.analyze(analyzer);
-      smap.put(lhsExpr, rhsExpr);
-    }
-    onClausePredicate = onClausePredicate.substitute(smap, analyzer, false);
+      // Ensure that all the extracted correlated predicates can be added to the ON-clause
+      // of the generated join.
+      if (!onClauseConjuncts.isEmpty()) {
+        validateCorrelatedPredicates(expr, inlineView, onClauseConjuncts);
+      }
 
-    // Check for references to ancestor query blocks (cycles in the dependency
-    // graph of query blocks are not supported).
-    if (!onClausePredicate.isBoundByTupleIds(stmt.getTableRefIds())) {
-      throw new AnalysisException("Unsupported correlated subquery: " +
-          subqueryStmt.toSql());
-    }
+      // Create the ON clause from the extracted correlated predicates.
+      Expr onClausePredicate =
+          CompoundPredicate.createConjunctivePredicate(onClauseConjuncts);
+
+      if (onClausePredicate == null) {
+        Preconditions.checkState(expr instanceof ExistsPredicate);
+        ExistsPredicate existsPred = (ExistsPredicate) expr;
+        // TODO This is very expensive if uncorrelated. Remove it when we implement
+        // independent subquery evaluation.
+        if (existsPred.isNotExists()) {
+          inlineView.setJoinOp(JoinOperator.LEFT_ANTI_JOIN);
+        } else {
+          inlineView.setJoinOp(JoinOperator.LEFT_SEMI_JOIN);
+        }
+        // Note that the concept of a 'correlated inline view' is similar but not the same
+        // as a 'correlated subquery', i.e., a subquery with a correlated predicate.
+        if (!inlineView.isCorrelated()) {
+          // For uncorrelated subqueries, we limit the number of rows returned by the
+          // subquery.
+          subqueryStmt.setLimit(1);
+          inlineView.setOnClause(new BoolLiteral(true));
+        }
+        return false;
+      }
 
-    // Check if we have a valid ON clause for an equi-join.
-    boolean hasEqJoinPred = false;
-    for (Expr conjunct: onClausePredicate.getConjuncts()) {
-      if (!(conjunct instanceof BinaryPredicate)) continue;
-      BinaryPredicate.Operator operator = ((BinaryPredicate) conjunct).getOp();
-      if (!operator.isEquivalence()) continue;
-      List<TupleId> lhsTupleIds = Lists.newArrayList();
-      conjunct.getChild(0).getIds(lhsTupleIds, null);
-      // Allows for constants to be a join predicate.
-      if (lhsTupleIds.isEmpty() && !conjunct.getChild(0).isConstant()) continue;
-      List<TupleId> rhsTupleIds = Lists.newArrayList();
-      conjunct.getChild(1).getIds(rhsTupleIds, null);
-      if (rhsTupleIds.isEmpty()) continue;
-      // Check if columns from the outer query block (stmt) appear in both sides
-      // of the binary predicate.
-      if ((lhsTupleIds.contains(inlineView.getDesc().getId()) && lhsTupleIds.size() > 1)
-          || (rhsTupleIds.contains(inlineView.getDesc().getId())
-              && rhsTupleIds.size() > 1)) {
-        continue;
+      // Create an smap from the original select-list exprs of the select list to
+      // the corresponding inline-view columns.
+      ExprSubstitutionMap smap = new ExprSubstitutionMap();
+      Preconditions.checkState(lhsExprs.size() == rhsExprs.size());
+      for (int i = 0; i < lhsExprs.size(); ++i) {
+        Expr lhsExpr = lhsExprs.get(i);
+        Expr rhsExpr = rhsExprs.get(i);
+        rhsExpr.analyze(analyzer);
+        smap.put(lhsExpr, rhsExpr);
       }
-      hasEqJoinPred = true;
-      break;
-    }
+      onClausePredicate = onClausePredicate.substitute(smap, analyzer, false);
 
-    if (!hasEqJoinPred && !inlineView.isCorrelated()) {
-      // TODO: Remove this when independent subquery evaluation is implemented.
-      // TODO: Requires support for non-equi joins.
-      boolean hasGroupBy = ((SelectStmt) inlineView.getViewStmt()).hasGroupByClause();
-      Subquery subquery = expr.getSubquery();
-      if ((!isScalarSubquery && !isRuntimeScalar) ||
-          (hasGroupBy && !stmt.selectList_.isDistinct())) {
-        throw new AnalysisException("Unsupported predicate with subquery: " +
-            expr.toSql());
+      // Check for references to ancestor query blocks (cycles in the dependency
+      // graph of query blocks are not supported).
+      if (!onClausePredicate.isBoundByTupleIds(stmt.getTableRefIds())) {
+        throw new AnalysisException(
+            "Unsupported correlated subquery: " + subqueryStmt.toSql());
       }
 
-      // TODO: Requires support for null-aware anti-join mode in nested-loop joins
-      if (isScalarSubquery && expr instanceof InPredicate
-          && ((InPredicate) expr).isNotIn()) {
-        throw new AnalysisException("Unsupported NOT IN predicate with subquery: " +
-            expr.toSql());
+      // Check if we have a valid ON clause for an equi-join.
+      boolean hasEqJoinPred = false;
+      for (Expr conjunct : onClausePredicate.getConjuncts()) {
+        if (!(conjunct instanceof BinaryPredicate)) continue;
+        BinaryPredicate.Operator operator = ((BinaryPredicate) conjunct).getOp();
+        if (!operator.isEquivalence()) continue;
+        List<TupleId> lhsTupleIds = Lists.newArrayList();
+        conjunct.getChild(0).getIds(lhsTupleIds, null);
+        // Allows for constants to be a join predicate.
+        if (lhsTupleIds.isEmpty() && !conjunct.getChild(0).isConstant()) continue;
+        List<TupleId> rhsTupleIds = Lists.newArrayList();
+        conjunct.getChild(1).getIds(rhsTupleIds, null);
+        if (rhsTupleIds.isEmpty()) continue;
+        // Check if columns from the outer query block (stmt) appear in both sides
+        // of the binary predicate.
+        if ((lhsTupleIds.contains(inlineView.getDesc().getId()) &&
+            lhsTupleIds.size() > 1) ||
+            (rhsTupleIds.contains(inlineView.getDesc().getId()) &&
+                rhsTupleIds.size() > 1)) {
+          continue;
+        }
+        hasEqJoinPred = true;
+        break;
       }
 
-      // We can rewrite the aggregate subquery using a cross join. All conjuncts
-      // that were extracted from the subquery are added to stmt's WHERE clause.
-      stmt.whereClause_ =
-          CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause_);
-      inlineView.setJoinOp(JoinOperator.CROSS_JOIN);
-      // Indicate that the CROSS JOIN may add a new visible tuple to stmt's
-      // select list (if the latter contains an unqualified star item '*')
-      return true;
-    }
+      if (!hasEqJoinPred && !inlineView.isCorrelated()) {
+        // TODO: Remove this when independent subquery evaluation is implemented.
+        // TODO: Requires support for non-equi joins.
+        boolean hasGroupBy = ((SelectStmt) inlineView.getViewStmt()).hasGroupByClause();
+        if ((!isScalarSubquery && !isRuntimeScalar) ||
+            (hasGroupBy && !stmt.selectList_.isDistinct())) {
+          throw new AnalysisException(
+              "Unsupported predicate with subquery: " + expr.toSql());
+        }
 
-    // We have a valid equi-join conjunct or the inline view is correlated.
-    if (expr instanceof InPredicate && ((InPredicate)expr).isNotIn() ||
-        expr instanceof ExistsPredicate && ((ExistsPredicate)expr).isNotExists()) {
-      // For the case of a NOT IN with an eq join conjunct, replace the join
-      // conjunct with a conjunct that uses the null-matching eq operator.
-      if (expr instanceof InPredicate) {
-        joinOp = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
-        List<TupleId> tIds = Lists.newArrayList();
-        joinConjunct.getIds(tIds, null);
-
-        if (tIds.size() <= 1 || !tIds.contains(inlineView.getDesc().getId())) {
-          throw new AnalysisException("Unsupported NOT IN predicate with subquery: " +
-              expr.toSql());
+        // TODO: Requires support for null-aware anti-join mode in nested-loop joins
+        if (isScalarSubquery && expr instanceof InPredicate &&
+            ((InPredicate) expr).isNotIn()) {
+          throw new AnalysisException(
+              "Unsupported NOT IN predicate with subquery: " + expr.toSql());
         }
-        // Replace the EQ operator in the generated join conjunct with a
-        // null-matching EQ operator.
-        for (Expr conjunct: onClausePredicate.getConjuncts()) {
-          if (conjunct.equals(joinConjunct)) {
-            Preconditions.checkState(conjunct instanceof BinaryPredicate);
-            BinaryPredicate binaryPredicate = (BinaryPredicate)conjunct;
-            Preconditions.checkState(binaryPredicate.getOp().isEquivalence());
-            binaryPredicate.setOp(BinaryPredicate.Operator.NULL_MATCHING_EQ);
-            break;
+
+        // We can rewrite the aggregate subquery using a cross join. All conjuncts
+        // that were extracted from the subquery are added to stmt's WHERE clause.
+        stmt.whereClause_ =
+            CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause_);
+        inlineView.setJoinOp(JoinOperator.CROSS_JOIN);
+        // Indicate that the CROSS JOIN may add a new visible tuple to stmt's
+        // select list (if the latter contains an unqualified star item '*')
+        return true;
+      }
+
+      // We have a valid equi-join conjunct or the inline view is correlated.
+      if (expr instanceof InPredicate && ((InPredicate) expr).isNotIn() ||
+          expr instanceof ExistsPredicate && ((ExistsPredicate) expr).isNotExists()) {
+        // For the case of a NOT IN with an eq join conjunct, replace the join
+        // conjunct with a conjunct that uses the null-matching eq operator.
+        if (expr instanceof InPredicate) {
+          joinOp = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
+          List<TupleId> tIds = Lists.newArrayList();
+          joinConjunct.getIds(tIds, null);
+
+          if (tIds.size() <= 1 || !tIds.contains(inlineView.getDesc().getId())) {
+            throw new AnalysisException(
+                "Unsupported NOT IN predicate with subquery: " + expr.toSql());
+          }
+          // Replace the EQ operator in the generated join conjunct with a
+          // null-matching EQ operator.
+          for (Expr conjunct : onClausePredicate.getConjuncts()) {
+            if (conjunct.equals(joinConjunct)) {
+              Preconditions.checkState(conjunct instanceof BinaryPredicate);
+              BinaryPredicate binaryPredicate = (BinaryPredicate) conjunct;
+              Preconditions.checkState(binaryPredicate.getOp().isEquivalence());
+              binaryPredicate.setOp(BinaryPredicate.Operator.NULL_MATCHING_EQ);
+              break;
+            }
           }
+        } else {
+          joinOp = JoinOperator.LEFT_ANTI_JOIN;
         }
-      } else {
-        joinOp = JoinOperator.LEFT_ANTI_JOIN;
       }
+      inlineView.setJoinOp(joinOp);
+      inlineView.setOnClause(onClausePredicate);
+      return updateSelectList;
     }
-    inlineView.setJoinOp(joinOp);
-    inlineView.setOnClause(onClausePredicate);
-    return updateSelectList;
-  }
 
-  /**
-   * Replace all unqualified star exprs ('*') from stmt's select list with qualified
-   * ones, i.e. tbl_1.*,...,tbl_n.*, where tbl_1,...,tbl_n are the visible tablerefs
-   * in stmt. 'tableIdx' indicates the maximum tableRef ordinal to consider when
-   * replacing an unqualified star item.
-   */
-  private static void replaceUnqualifiedStarItems(SelectStmt stmt, int tableIdx) {
-    Preconditions.checkState(tableIdx < stmt.fromClause_.size());
-    ArrayList<SelectListItem> newItems = Lists.newArrayList();
-    for (int i = 0; i < stmt.selectList_.getItems().size(); ++i) {
-      SelectListItem item = stmt.selectList_.getItems().get(i);
-      if (!item.isStar() || item.getRawPath() != null) {
-        newItems.add(item);
-        continue;
-      }
-      // '*' needs to be replaced by tbl1.*,...,tbln.*, where
-      // tbl1,...,tbln are the visible tableRefs in stmt.
-      for (int j = 0; j < tableIdx; ++j) {
-        TableRef tableRef = stmt.fromClause_.get(j);
-        if (tableRef.getJoinOp() == JoinOperator.LEFT_SEMI_JOIN ||
-            tableRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN) {
+    /**
+     * Replace all unqualified star exprs ('*') from stmt's select list with qualified
+     * ones, i.e. tbl_1.*,...,tbl_n.*, where tbl_1,...,tbl_n are the visible tablerefs
+     * in stmt. 'tableIdx' indicates the maximum tableRef ordinal to consider when
+     * replacing an unqualified star item.
+     */
+    private static void replaceUnqualifiedStarItems(SelectStmt stmt, int tableIdx) {
+      Preconditions.checkState(tableIdx < stmt.fromClause_.size());
+      ArrayList<SelectListItem> newItems = Lists.newArrayList();
+      for (int i = 0; i < stmt.selectList_.getItems().size(); ++i) {
+        SelectListItem item = stmt.selectList_.getItems().get(i);
+        if (!item.isStar() || item.getRawPath() != null) {
+          newItems.add(item);
           continue;
         }
-        newItems.add(SelectListItem.createStarItem(
-            Lists.newArrayList(tableRef.getUniqueAlias())));
+        // '*' needs to be replaced by tbl1.*,...,tbln.*, where
+        // tbl1,...,tbln are the visible tableRefs in stmt.
+        for (int j = 0; j < tableIdx; ++j) {
+          TableRef tableRef = stmt.fromClause_.get(j);
+          if (tableRef.getJoinOp() == JoinOperator.LEFT_SEMI_JOIN ||
+              tableRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN) {
+            continue;
+          }
+          newItems.add(SelectListItem
+              .createStarItem(Lists.newArrayList(tableRef.getUniqueAlias())));
+        }
       }
+      Preconditions.checkState(!newItems.isEmpty());
+      boolean isDistinct = stmt.selectList_.isDistinct();
+      stmt.selectList_ =
+          new SelectList(newItems, isDistinct, stmt.selectList_.getPlanHints());
     }
-    Preconditions.checkState(!newItems.isEmpty());
-    boolean isDistinct = stmt.selectList_.isDistinct();
-    stmt.selectList_ =
-        new SelectList(newItems, isDistinct, stmt.selectList_.getPlanHints());
-  }
-
-  /**
-   * Return true if the Expr tree rooted at 'expr' can be safely
-   * eliminated, i.e. it only consists of conjunctions of true BoolLiterals.
-   */
-  private static boolean canEliminate(Expr expr) {
-    for (Expr conjunct: expr.getConjuncts()) {
-      if (!Expr.IS_TRUE_LITERAL.apply(conjunct)) return false;
-    }
-    return true;
-  }
 
-  /**
-   * Extract all correlated predicates of a subquery.
-   *
-   * TODO Handle correlated predicates in a HAVING clause.
-   */
-  private static ArrayList<Expr> extractCorrelatedPredicates(SelectStmt subqueryStmt)
-      throws AnalysisException {
-    List<TupleId> subqueryTupleIds = subqueryStmt.getTableRefIds();
-    ArrayList<Expr> correlatedPredicates = Lists.newArrayList();
-
-    if (subqueryStmt.hasWhereClause()) {
-      if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(),
-          subqueryTupleIds)) {
-        throw new AnalysisException("Disjunctions with correlated predicates " +
-            "are not supported: " + subqueryStmt.getWhereClause().toSql());
+    /**
+     * Return true if the Expr tree rooted at 'expr' can be safely
+     * eliminated, i.e. it only consists of conjunctions of true BoolLiterals.
+     */
+    private static boolean canEliminate(Expr expr) {
+      for (Expr conjunct : expr.getConjuncts()) {
+        if (!Expr.IS_TRUE_LITERAL.apply(conjunct)) return false;
       }
-      // Extract the correlated predicates from the subquery's WHERE clause and
-      // replace them with true BoolLiterals.
-      Expr newWhereClause = extractCorrelatedPredicates(subqueryStmt.getWhereClause(),
-          subqueryTupleIds, correlatedPredicates);
-      if (canEliminate(newWhereClause)) newWhereClause = null;
-      subqueryStmt.setWhereClause(newWhereClause);
+      return true;
     }
 
-    // Process all correlated predicates from subquery's ON clauses.
-    for (TableRef tableRef: subqueryStmt.getTableRefs()) {
-      if (tableRef.getOnClause() == null) continue;
-
-      ArrayList<Expr> onClauseCorrelatedPreds = Lists.newArrayList();
-      Expr newOnClause = extractCorrelatedPredicates(tableRef.getOnClause(),
-          subqueryTupleIds, onClauseCorrelatedPreds);
-      if (onClauseCorrelatedPreds.isEmpty()) continue;
-
-      correlatedPredicates.addAll(onClauseCorrelatedPreds);
-      if (canEliminate(newOnClause)) {
-        // After the extraction of correlated predicates from an ON clause,
-        // the latter may only contain conjunctions of True BoolLiterals. In
-        // this case, we can eliminate the ON clause and set the join type to
-        // CROSS JOIN.
-        tableRef.setJoinOp(JoinOperator.CROSS_JOIN);
-        tableRef.setOnClause(null);
-      } else {
-        tableRef.setOnClause(newOnClause);
+    /**
+     * Extract all correlated predicates of a subquery.
+     * <p>
+     * TODO Handle correlated predicates in a HAVING clause.
+     */
+    private static ArrayList<Expr> extractCorrelatedPredicates(SelectStmt subqueryStmt)
+        throws AnalysisException {
+      List<TupleId> subqueryTupleIds = subqueryStmt.getTableRefIds();
+      ArrayList<Expr> correlatedPredicates = Lists.newArrayList();
+
+      if (subqueryStmt.hasWhereClause()) {
+        if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(),
+            subqueryTupleIds)) {
+          throw new AnalysisException(
+              "Disjunctions with correlated predicates " + "are not supported: " +
+                  subqueryStmt.getWhereClause().toSql());
+        }
+        // Extract the correlated predicates from the subquery's WHERE clause and
+        // replace them with true BoolLiterals.
+        Expr newWhereClause =
+            extractCorrelatedPredicates(subqueryStmt.getWhereClause(), subqueryTupleIds,
+                correlatedPredicates);
+        if (canEliminate(newWhereClause)) newWhereClause = null;
+        subqueryStmt.setWhereClause(newWhereClause);
       }
-    }
-    return correlatedPredicates;
-  }
 
-  /**
-   * Extract all correlated predicates from the expr tree rooted at 'root' and
-   * replace them with true BoolLiterals. The modified expr tree is returned
-   * and the extracted correlated predicates are added to 'matches'.
-   */
-  private static Expr extractCorrelatedPredicates(Expr root, List<TupleId> tupleIds,
-      ArrayList<Expr> matches) {
-    if (isCorrelatedPredicate(root, tupleIds)) {
-      matches.add(root);
-      return new BoolLiteral(true);
-    }
-    for (int i = 0; i < root.getChildren().size(); ++i) {
-      root.getChildren().set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds,
-          matches));
+      // Process all correlated predicates from subquery's ON clauses.
+      for (TableRef tableRef : subqueryStmt.getTableRefs()) {
+        if (tableRef.getOnClause() == null) continue;
+
+        ArrayList<Expr> onClauseCorrelatedPreds = Lists.newArrayList();
+        Expr newOnClause =
+            extractCorrelatedPredicates(tableRef.getOnClause(), subqueryTupleIds,
+                onClauseCorrelatedPreds);
+        if (onClauseCorrelatedPreds.isEmpty()) continue;
+
+        correlatedPredicates.addAll(onClauseCorrelatedPreds);
+        if (canEliminate(newOnClause)) {
+          // After the extraction of correlated predicates from an ON clause,
+          // the latter may only contain conjunctions of True BoolLiterals. In
+          // this case, we can eliminate the ON clause and set the join type to
+          // CROSS JOIN.
+          tableRef.setJoinOp(JoinOperator.CROSS_JOIN);
+          tableRef.setOnClause(null);
+        } else {
+          tableRef.setOnClause(newOnClause);
+        }
+      }
+      return correlatedPredicates;
     }
-    return root;
-  }
 
-  /**
-   * Checks if an expr containing a correlated subquery is eligible for rewrite by
-   * transforming into a join. Throws an AnalysisException if 'expr' is not eligible for
-   * rewrite.
-   * TODO: Merge all the rewrite eligibility tests into a single function.
-   */
-  private static void validateCorrelatedSubqueryStmt(Expr expr) throws AnalysisException {
-    Preconditions.checkNotNull(expr);
-    Preconditions.checkState(expr.contains(Subquery.class));
-    SelectStmt stmt = (SelectStmt) expr.getSubquery().getStatement();
-    Preconditions.checkNotNull(stmt);
-    // Grouping and/or aggregation is not allowed on correlated scalar and IN subqueries
-    if ((expr instanceof BinaryPredicate
-          && (stmt.hasGroupByClause() || stmt.hasAnalyticInfo()))
-        || (expr instanceof InPredicate
-            && (stmt.hasAggInfo() || stmt.hasAnalyticInfo()))) {
-      throw new AnalysisException("Unsupported correlated subquery with grouping " +
-          "and/or aggregation: " + stmt.toSql());
-    }
-    // TODO: instead of this check, implement IMPALA-6315
-    if (!expr.getSubquery().isScalarSubquery() &&
-        !(expr instanceof InPredicate || expr instanceof ExistsPredicate)) {
-      throw new AnalysisException(
-          "Unsupported correlated subquery with runtime scalar check: " + stmt.toSql());
-    }
-    // The following correlated subqueries with a limit clause are supported:
-    // 1. EXISTS subqueries
-    // 2. Scalar subqueries with aggregation
-    if (stmt.hasLimit() &&
-        (!(expr instanceof BinaryPredicate) || !stmt.hasAggInfo() ||
-         stmt.selectList_.isDistinct()) &&
-        !(expr instanceof ExistsPredicate)) {
-      throw new AnalysisException("Unsupported correlated subquery with a " +
-          "LIMIT clause: " + stmt.toSql());
+    /**
+     * Extract all correlated predicates from the expr tree rooted at 'root' and
+     * replace them with true BoolLiterals. The modified expr tree is returned
+     * and the extracted correlated predicates are added to 'matches'.
+     */
+    private static Expr extractCorrelatedPredicates(Expr root, List<TupleId> tupleIds,
+        ArrayList<Expr> matches) {
+      if (isCorrelatedPredicate(root, tupleIds)) {
+        matches.add(root);
+        return new BoolLiteral(true);
+      }
+      for (int i = 0; i < root.getChildren().size(); ++i) {
+        root.getChildren()
+            .set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds, matches));
+      }
+      return root;
     }
-  }
 
-  /**
-   * Checks if all the 'correlatedPredicates' extracted from the subquery of 'expr' can be
-   * added to the ON-clause of the join that results from the subquery rewrite. It throws
-   * an AnalysisException if this is not the case. 'inlineView' is the generated inline
-   * view that will replace the subquery in the rewritten statement.
-   */
-  private static void validateCorrelatedPredicates(Expr expr, InlineViewRef inlineView,
-      List<Expr> correlatedPredicates) throws AnalysisException {
-    Preconditions.checkNotNull(expr);
-    Preconditions.checkNotNull(correlatedPredicates);
-    Preconditions.checkState(inlineView.isAnalyzed());
-    SelectStmt stmt = (SelectStmt) expr.getSubquery().getStatement();
-    final com.google.common.base.Predicate<Expr> isSingleSlotRef =
-        new com.google.common.base.Predicate<Expr>() {
-      @Override
-      public boolean apply(Expr arg) { return arg.unwrapSlotRef(false) != null; }
-    };
-
-    // A HAVING clause is only allowed on correlated EXISTS subqueries with
-    // correlated binary predicates of the form Slot = Slot (see IMPALA-2734)
-    // TODO Handle binary predicates with IS NOT DISTINCT op
-    if (expr instanceof ExistsPredicate && stmt.hasHavingClause()
-        && !correlatedPredicates.isEmpty()
-        && (!stmt.hasAggInfo()
-            || !Iterables.all(correlatedPredicates,
-                Predicates.or(Expr.IS_EQ_BINARY_PREDICATE, isSingleSlotRef)))) {
-      throw new AnalysisException("Unsupported correlated EXISTS subquery with a " +
-          "HAVING clause: " + stmt.toSql());
+    /**
+     * Checks if an expr containing a correlated subquery is eligible for rewrite by
+     * transforming into a join. Throws an AnalysisException if 'expr' is not eligible for
+     * rewrite.
+     * TODO: Merge all the rewrite eligibility tests into a single function.
+     */
+    private static void validateCorrelatedSubqueryStmt(Expr expr)
+        throws AnalysisException {
+      Preconditions.checkNotNull(expr);
+      Preconditions.checkState(expr.contains(Subquery.class));
+      SelectStmt stmt = (SelectStmt) expr.getSubquery().getStatement();
+      Preconditions.checkNotNull(stmt);
+      // Grouping and/or aggregation is not allowed on correlated scalar and IN subqueries
+      if ((expr instanceof BinaryPredicate &&
+          (stmt.hasGroupByClause() || stmt.hasAnalyticInfo())) ||
+          (expr instanceof InPredicate &&
+              (stmt.hasAggInfo() || stmt.hasAnalyticInfo()))) {
+        throw new AnalysisException(
+            "Unsupported correlated subquery with grouping " + "and/or aggregation: " +
+                stmt.toSql());
+      }
+      // TODO: instead of this check, implement IMPALA-6315
+      if (!expr.getSubquery().isScalarSubquery() &&
+          !(expr instanceof InPredicate || expr instanceof ExistsPredicate)) {
+        throw new AnalysisException(
+            "Unsupported correlated subquery with runtime scalar check: " + stmt.toSql());
+      }
+      // The following correlated subqueries with a limit clause are supported:
+      // 1. EXISTS subqueries
+      // 2. Scalar subqueries with aggregation
+      if (stmt.hasLimit() && (!(expr instanceof BinaryPredicate) || !stmt.hasAggInfo() ||
+          stmt.selectList_.isDistinct()) && !(expr instanceof ExistsPredicate)) {
+        throw new AnalysisException(
+            "Unsupported correlated subquery with a " + "LIMIT clause: " + stmt.toSql());
+      }
     }
 
-    // We only support equality correlated predicates in aggregate subqueries
-    // (see IMPALA-5531). This check needs to be performed after the inline view
-    // has been analyzed to make sure we don't incorrectly reject non-equality correlated
-    // predicates from nested collections.
-    if (expr instanceof BinaryPredicate && !inlineView.isCorrelated()
-        && !correlatedPredicates.isEmpty()) {
-      final List<TupleId> subqueryTblIds = stmt.getTableRefIds();
-      final com.google.common.base.Predicate<Expr> isBoundBySubqueryTids =
+    /**
+     * Checks if all the 'correlatedPredicates' extracted from the subquery of 'expr' can be
+     * added to the ON-clause of the join that results from the subquery rewrite. It throws
+     * an AnalysisException if this is not the case. 'inlineView' is the generated inline
+     * view that will replace the subquery in the rewritten statement.
+     */
+    private static void validateCorrelatedPredicates(Expr expr, InlineViewRef inlineView,
+        List<Expr> correlatedPredicates) throws AnalysisException {
+      Preconditions.checkNotNull(expr);
+      Preconditions.checkNotNull(correlatedPredicates);
+      Preconditions.checkState(inlineView.isAnalyzed());
+      SelectStmt stmt = (SelectStmt) expr.getSubquery().getStatement();
+      final com.google.common.base.Predicate<Expr> isSingleSlotRef =
           new com.google.common.base.Predicate<Expr>() {
             @Override
-            public boolean apply(Expr arg) {
-              List<TupleId> tids = Lists.newArrayList();
-              arg.getIds(tids, null);
-              return !Collections.disjoint(tids, subqueryTblIds);
-            }
-        };
-
-      List<Expr> unsupportedPredicates = Lists.newArrayList(Iterables.filter(
-          correlatedPredicates, Predicates.and(Expr.IS_NOT_EQ_BINARY_PREDICATE,
-              isBoundBySubqueryTids)));
-      if (!unsupportedPredicates.isEmpty()) {
-        throw new AnalysisException("Unsupported aggregate subquery with " +
-            "non-equality correlated predicates: " +
-            Expr.listToSql(unsupportedPredicates));
+            public boolean apply(Expr arg) { return arg.unwrapSlotRef(false) != null; }
+          };
+
+      // A HAVING clause is only allowed on correlated EXISTS subqueries with
+      // correlated binary predicates of the form Slot = Slot (see IMPALA-2734)
+      // TODO Handle binary predicates with IS NOT DISTINCT op
+      if (expr instanceof ExistsPredicate && stmt.hasHavingClause() &&
+          !correlatedPredicates.isEmpty() && (!stmt.hasAggInfo() || !Iterables
+          .all(correlatedPredicates,
+              Predicates.or(Expr.IS_EQ_BINARY_PREDICATE, isSingleSlotRef)))) {
+        throw new AnalysisException(
+            "Unsupported correlated EXISTS subquery with a " + "HAVING clause: " +
+                stmt.toSql());
       }
-    }
-  }
 
-  /**
-   * Update the subquery within an inline view by expanding its select list with exprs
-   * from a correlated predicate 'expr' that will be 'moved' to an ON clause in the
-   * subquery's parent query block. We need to make sure that every expr extracted from
-   * the subquery references an item in the subquery's select list. If 'updateGroupBy'
-   * is true, the exprs extracted from 'expr' are also added in stmt's GROUP BY clause.
-   * Throws an AnalysisException if we need to update the GROUP BY clause but
-   * both the lhs and rhs of 'expr' reference a tuple of the subquery stmt.
-   */
-  private static void updateInlineView(InlineViewRef inlineView, Expr expr,
-      List<TupleId> parentQueryTids, List<Expr> lhsExprs, List<Expr> rhsExprs,
-      boolean updateGroupBy) throws AnalysisException {
-    SelectStmt stmt = (SelectStmt)inlineView.getViewStmt();
-    List<TupleId> subqueryTblIds = stmt.getTableRefIds();
-    ArrayList<Expr> groupByExprs = null;
-    if (updateGroupBy) groupByExprs = Lists.newArrayList();
-
-    List<SelectListItem> items = stmt.selectList_.getItems();
-    // Collect all the SlotRefs from 'expr' and identify those that are bound by
-    // subquery tuple ids.
-    ArrayList<Expr> slotRefs = Lists.newArrayList();
-    expr.collectAll(Predicates.instanceOf(SlotRef.class), slotRefs);
-    List<Expr> exprsBoundBySubqueryTids = Lists.newArrayList();
-    for (Expr slotRef: slotRefs) {
-      if (slotRef.isBoundByTupleIds(subqueryTblIds)) {
-        exprsBoundBySubqueryTids.add(slotRef);
+      // We only support equality correlated predicates in aggregate subqueries
+      // (see IMPALA-5531). This check needs to be performed after the inline view
+      // has been analyzed to make sure we don't incorrectly reject non-equality
+      // correlated predicates from nested collections.
+      if (expr instanceof BinaryPredicate && !inlineView.isCorrelated() &&
+          !correlatedPredicates.isEmpty()) {
+        final List<TupleId> subqueryTblIds = stmt.getTableRefIds();
+        final com.google.common.base.Predicate<Expr> isBoundBySubqueryTids =
+            new com.google.common.base.Predicate<Expr>() {
+              @Override
+              public boolean apply(Expr arg) {
+                List<TupleId> tids = Lists.newArrayList();
+                arg.getIds(tids, null);
+                return !Collections.disjoint(tids, subqueryTblIds);
+              }
+            };
+
+        List<Expr> unsupportedPredicates = Lists.newArrayList(Iterables
+            .filter(correlatedPredicates,
+                Predicates.and(Expr.IS_NOT_EQ_BINARY_PREDICATE, isBoundBySubqueryTids)));
+        if (!unsupportedPredicates.isEmpty()) {
+          throw new AnalysisException("Unsupported aggregate subquery with " +
+              "non-equality correlated predicates: " +
+              Expr.listToSql(unsupportedPredicates));
+        }
       }
     }
-    // The correlated predicate only references slots from a parent block,
-    // no need to update the subquery's select or group by list.
-    if (exprsBoundBySubqueryTids.isEmpty()) return;
-    if (updateGroupBy) {
-      Preconditions.checkState(expr instanceof BinaryPredicate);
-      Expr exprBoundBySubqueryTids = null;
-      if (exprsBoundBySubqueryTids.size() > 1) {
-        // If the predicate contains multiple SlotRefs bound by subquery tuple
-        // ids, they must all be on the same side of that predicate.
-        if (expr.getChild(0).isBoundByTupleIds(subqueryTblIds) &&
-           expr.getChild(1).isBoundByTupleIds(parentQueryTids)) {
-          exprBoundBySubqueryTids = expr.getChild(0);
-        } else if (expr.getChild(0).isBoundByTupleIds(parentQueryTids) &&
-            expr.getChild(1).isBoundByTupleIds(subqueryTblIds)) {
-          exprBoundBySubqueryTids = expr.getChild(1);
+
+    /**
+     * Update the subquery within an inline view by expanding its select list with exprs
+     * from a correlated predicate 'expr' that will be 'moved' to an ON clause in the
+     * subquery's parent query block. We need to make sure that every expr extracted from
+     * the subquery references an item in the subquery's select list. If 'updateGroupBy'
+     * is true, the exprs extracted from 'expr' are also added in stmt's GROUP BY clause.
+     * Throws an AnalysisException if we need to update the GROUP BY clause but
+     * both the lhs and rhs of 'expr' reference a tuple of the subquery stmt.
+     */
+    private static void updateInlineView(InlineViewRef inlineView, Expr expr,
+        List<TupleId> parentQueryTids, List<Expr> lhsExprs, List<Expr> rhsExprs,
+        boolean updateGroupBy) throws AnalysisException {
+      SelectStmt stmt = (SelectStmt) inlineView.getViewStmt();
+      List<TupleId> subqueryTblIds = stmt.getTableRefIds();
+      ArrayList<Expr> groupByExprs = null;
+      if (updateGroupBy) groupByExprs = Lists.newArrayList();
+
+      List<SelectListItem> items = stmt.selectList_.getItems();
+      // Collect all the SlotRefs from 'expr' and identify those that are bound by
+      // subquery tuple ids.
+      ArrayList<Expr> slotRefs = Lists.newArrayList();
+      expr.collectAll(Predicates.instanceOf(SlotRef.class), slotRefs);
+      List<Expr> exprsBoundBySubqueryTids = Lists.newArrayList();
+      for (Expr slotRef : slotRefs) {
+        if (slotRef.isBoundByTupleIds(subqueryTblIds)) {
+          exprsBoundBySubqueryTids.add(slotRef);
+        }
+      }
+      // The correlated predicate only references slots from a parent block,
+      // no need to update the subquery's select or group by list.
+      if (exprsBoundBySubqueryTids.isEmpty()) return;
+      if (updateGroupBy) {
+        Preconditions.checkState(expr instanceof BinaryPredicate);
+        Expr exprBoundBySubqueryTids;
+        if (exprsBoundBySubqueryTids.size() > 1) {
+          // If the predicate contains multiple SlotRefs bound by subquery tuple
+          // ids, they must all be on the same side of that predicate.
+          if (expr.getChild(0).isBoundByTupleIds(subqueryTblIds) &&
+              expr.getChild(1).isBoundByTupleIds(parentQueryTids)) {
+            exprBoundBySubqueryTids = expr.getChild(0);
+          } else if (expr.getChild(0).isBoundByTupleIds(parentQueryTids) &&
+              expr.getChild(1).isBoundByTupleIds(subqueryTblIds)) {
+            exprBoundBySubqueryTids = expr.getChild(1);
+          } else {
+            throw new AnalysisException("All subquery columns " +
+                "that participate in a predicate must be on the same side of " +
+                "that predicate: " + expr.toSql());
+          }
         } else {
-          throw new AnalysisException("All subquery columns " +
-              "that participate in a predicate must be on the same side of " +
-              "that predicate: " + expr.toSql());
+          Preconditions.checkState(exprsBoundBySubqueryTids.size() == 1);
+          exprBoundBySubqueryTids = exprsBoundBySubqueryTids.get(0);
         }
-      } else {
-        Preconditions.checkState(exprsBoundBySubqueryTids.size() == 1);
-        exprBoundBySubqueryTids = exprsBoundBySubqueryTids.get(0);
+        exprsBoundBySubqueryTids.clear();
+        exprsBoundBySubqueryTids.add(exprBoundBySubqueryTids);
       }
-      exprsBoundBySubqueryTids.clear();
-      exprsBoundBySubqueryTids.add(exprBoundBySubqueryTids);
-    }
 
-    // Add the exprs bound by subquery tuple ids to the select list and
-    // register it for substitution. We use a temporary substitution map
-    // because we cannot at this point analyze the new select list expr. Once
-    // the new inline view is analyzed, the entries from this map will be
-    // added to an ExprSubstitutionMap.
-    for (Expr boundExpr: exprsBoundBySubqueryTids) {
-      String colAlias = stmt.getColumnAliasGenerator().getNextAlias();
-      items.add(new SelectListItem(boundExpr, null));
-      inlineView.getExplicitColLabels().add(colAlias);
-      lhsExprs.add(boundExpr);
-      rhsExprs.add(new SlotRef(Lists.newArrayList(inlineView.getUniqueAlias(), colAlias)));
-      if (groupByExprs != null) groupByExprs.add(boundExpr);
-    }
+      // Add the exprs bound by subquery tuple ids to the select list and
+      // register it for substitution. We use a temporary substitution map
+      // because we cannot at this point analyze the new select list expr. Once
+      // the new inline view is analyzed, the entries from this map will be
+      // added to an ExprSubstitutionMap.
+      for (Expr boundExpr : exprsBoundBySubqueryTids) {
+        String colAlias = stmt.getColumnAliasGenerator().getNextAlias();
+        items.add(new SelectListItem(boundExpr, null));
+        inlineView.getExplicitColLabels().add(colAlias);
+        lhsExprs.add(boundExpr);
+        rhsExprs
+            .add(new SlotRef(Lists.newArrayList(inlineView.getUniqueAlias(), colAlias)));
+        if (groupByExprs != null) groupByExprs.add(boundExpr);
+      }
 
-    // Update the subquery's select list.
-    boolean isDistinct = stmt.selectList_.isDistinct();
-    stmt.selectList_ = new SelectList(
-        items, isDistinct, stmt.selectList_.getPlanHints());
-    // Update subquery's GROUP BY clause
-    if (groupByExprs != null && !groupByExprs.isEmpty()) {
-      if (stmt.hasGroupByClause()) {
-        stmt.groupingExprs_.addAll(groupByExprs);
-      } else {
-        stmt.groupingExprs_ = groupByExprs;
+      // Update the subquery's select list.
+      boolean isDistinct = stmt.selectList_.isDistinct();
+      stmt.selectList_ =
+          new SelectList(items, isDistinct, stmt.selectList_.getPlanHints());
+      // Update subquery's GROUP BY clause
+      if (groupByExprs != null && !groupByExprs.isEmpty()) {
+        if (stmt.hasGroupByClause()) {
+          stmt.groupingExprs_.addAll(groupByExprs);
+        } else {
+          stmt.groupingExprs_ = groupByExprs;
+        }
       }
     }
-  }
 
-  /**
-   * Returns true if we can extract the correlated predicates from 'expr'. A
-   * correlated predicate cannot be extracted if it is part of a disjunction.
-   */
-  private static boolean canExtractCorrelatedPredicates(Expr expr,
-      List<TupleId> subqueryTupleIds) {
-    if (!(expr instanceof CompoundPredicate)) return true;
-    if (Expr.IS_OR_PREDICATE.apply(expr)) {
-      return !containsCorrelatedPredicate(expr, subqueryTupleIds);
+    /**
+     * Returns true if we can extract the correlated predicates from 'expr'. A
+     * correlated predicate cannot be extracted if it is part of a disjunction.
+     */
+    private static boolean canExtractCorrelatedPredicates(Expr expr,
+        List<TupleId> subqueryTupleIds) {
+      if (!(expr instanceof CompoundPredicate)) return true;
+      if (Expr.IS_OR_PREDICATE.apply(expr)) {
+        return !containsCorrelatedPredicate(expr, subqueryTupleIds);
+      }
+      for (Expr child : expr.getChildren()) {
+        if (!canExtractCorrelatedPredicates(child, subqueryTupleIds)) {
+          return false;
+        }
+      }
+      return true;
     }
-    for (Expr child: expr.getChildren()) {
-      if (!canExtractCorrelatedPredicates(child, subqueryTupleIds)) {
-        return false;
+
+    /**
+     * Return true if the expr tree rooted at 'root' contains a correlated
+     * predicate.
+     */
+    private static boolean containsCorrelatedPredicate(Expr root,
+        List<TupleId> tupleIds) {
+      if (isCorrelatedPredicate(root, tupleIds)) return true;
+      for (Expr child : root.getChildren()) {
+        if (containsCorrelatedPredicate(child, tupleIds)) return true;
       }
+      return false;
     }
-    return true;
-  }
 
-  /**
-   * Return true if the expr tree rooted at 'root' contains a correlated
-   * predicate.
-   */
-  private static boolean containsCorrelatedPredicate(Expr root, List<TupleId> tupleIds) {
-    if (isCorrelatedPredicate(root, tupleIds)) return true;
-    for (Expr child: root.getChildren()) {
-      if (containsCorrelatedPredicate(child, tupleIds)) return true;
+    /**
+     * Returns true if 'expr' is a correlated predicate. A predicate is
+     * correlated if at least one of its SlotRefs belongs to an ancestor
+     * query block (i.e. is not bound by the given 'tupleIds').
+     */
+    private static boolean isCorrelatedPredicate(Expr expr, List<TupleId> tupleIds) {
+      return (expr instanceof BinaryPredicate || expr instanceof SlotRef) &&
+          !expr.isBoundByTupleIds(tupleIds);
     }
-    return false;
-  }
 
-  /**
-   * Returns true if 'expr' is a correlated predicate. A predicate is
-   * correlated if at least one of its SlotRefs belongs to an ancestor
-   * query block (i.e. is not bound by the given 'tupleIds').
-   */
-  private static boolean isCorrelatedPredicate(Expr expr, List<TupleId> tupleIds) {
-    return (expr instanceof BinaryPredicate || expr instanceof SlotRef)
-        && !expr.isBoundByTupleIds(tupleIds);
-  }
+    /**
+     * Converts an expr containing a subquery into an analyzed conjunct to be
+     * used in a join. The conversion is performed in place by replacing the
+     * subquery with the first expr from the select list of 'inlineView'.
+     * If 'isCorrelated' is true and the first expr from the inline view contains
+     * an aggregate function that returns non-null on an empty input,
+     * the aggregate function is wrapped into a 'zeroifnull' function.
+     */
+    private static Expr createJoinConjunct(Expr exprWithSubquery,
+        InlineViewRef inlineView, Analyzer analyzer, boolean isCorrelated)
+        throws AnalysisException {
+      Preconditions.checkNotNull(exprWithSubquery);
+      Preconditions.checkNotNull(inlineView);
+      Preconditions.checkState(exprWithSubquery.contains(Subquery.class));
+      if (exprWithSubquery instanceof ExistsPredicate) return null;
+      // Create a SlotRef from the first item of inlineView's select list
+      SlotRef slotRef = new SlotRef(Lists
+          .newArrayList(inlineView.getUniqueAlias(), inlineView.getColLabels().get(0)));
+      slotRef.analyze(analyzer);
+      Expr subquerySubstitute = slotRef;
+      if (exprWithSubquery instanceof InPredicate) {
+        BinaryPredicate pred =
+            new BinaryPredicate(BinaryPredicate.Operator.EQ, exprWithSubquery.getChild(0),
+                slotRef);
+        pred.analyze(analyzer);
+        return pred;
+      }
+      Subquery subquery = exprWithSubquery.getSubquery();
+      Preconditions.checkState(subquery.getType().isScalarType());
+      ExprSubstitutionMap smap = new ExprSubstitutionMap();
+      SelectListItem item =
+          ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
+      if (isCorrelated && item.getExpr().contains(Expr.IS_UDA_FN)) {
+        throw new AnalysisException(
+            "UDAs are not supported in the select list of " + "correlated subqueries: " +
+                subquery.toSql());
+      }
+      if (isCorrelated && item.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
+        // TODO: Add support for multiple agg functions that return non-null on an
+        // empty input, by wrapping them with zeroifnull functions before the inline
+        // view is analyzed.
+        if (!Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr()) &&
+            (!(item.getExpr() instanceof CastExpr) ||
+                !Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr().getChild(0)))) {
+          throw new AnalysisException("Aggregate function that returns non-null on " +
+              "an empty input cannot be used in an expression in a " +
+              "correlated subquery's select list: " + subquery.toSql());
+        }
 
-  /**
-   * Converts an expr containing a subquery into an analyzed conjunct to be
-   * used in a join. The conversion is performed in place by replacing the
-   * subquery with the first expr from the select list of 'inlineView'.
-   * If 'isCorrelated' is true and the first expr from the inline view contains
-   * an aggregate function that returns non-null on an empty input,
-   * the aggregate function is wrapped into a 'zeroifnull' function.
-   */
-  private static Expr createJoinConjunct(Expr exprWithSubquery, InlineViewRef inlineView,
-      Analyzer analyzer, boolean isCorrelated) throws AnalysisException {
-    Preconditions.checkNotNull(exprWithSubquery);
-    Preconditions.checkNotNull(inlineView);
-    Preconditions.checkState(exprWithSubquery.contains(Subquery.class));
-    if (exprWithSubquery instanceof ExistsPredicate) return null;
-    // Create a SlotRef from the first item of inlineView's select list
-    SlotRef slotRef = new SlotRef(Lists.newArrayList(
-        inlineView.getUniqueAlias(), inlineView.getColLabels().get(0)));
-    slotRef.analyze(analyzer);
-    Expr subquerySubstitute = slotRef;
-    if (exprWithSubquery instanceof InPredicate) {
-      BinaryPredicate pred = new BinaryPredicate(BinaryPredicate.Operator.EQ,
-          exprWithSubquery.getChild(0), slotRef);
-      pred.analyze(analyzer);
-      return pred;
-    }
-    Subquery subquery = exprWithSubquery.getSubquery();
-    Preconditions.checkState(subquery.getType().isScalarType());
-    ExprSubstitutionMap smap = new ExprSubstitutionMap();
-    SelectListItem item =
-      ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
-    if (isCorrelated && item.getExpr().contains(Expr.IS_UDA_FN)) {
-      throw new AnalysisException("UDAs are not supported in the select list of " +
-          "correlated subqueries: " + subquery.toSql());
+        List<Expr> aggFns = Lists.newArrayList();
+        item.getExpr().collectAll(Expr.NON_NULL_EMPTY_AGG, aggFns);
+        // TODO Generalize this by making the aggregate functions aware of the
+        // literal expr that they return on empty input, e.g. max returns a
+        // NullLiteral whereas count returns a NumericLiteral.
+        if (((FunctionCallExpr) aggFns.get(0)).getReturnType().isNumericType()) {
+          FunctionCallExpr zeroIfNull =
+              new FunctionCallExpr("zeroifnull", Lists.newArrayList((Expr) slotRef));
+          zeroIfNull.analyze(analyzer);
+          subquerySubstitute = zeroIfNull;
+        } else if (((FunctionCallExpr) aggFns.get(0)).getReturnType().isStringType()) {
+          List<Expr> params = Lists.newArrayList();
+          params.add(slotRef);
+          params.add(new StringLiteral(""));
+          FunctionCallExpr ifnull = new FunctionCallExpr("ifnull", params);
+          ifnull.analyze(analyzer);
+          subquerySubstitute = ifnull;
+        } else {
+          throw new AnalysisException("Unsupported aggregate function used in " +
+              "a correlated subquery's select list: " + subquery.toSql());
+        }
+      }
+      smap.put(subquery, subquerySubstitute);
+      return exprWithSubquery.substitute(smap, analyzer, false);
     }
-    if (isCorrelated && item.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
-      // TODO: Add support for multiple agg functions that return non-null on an
-      // empty input, by wrapping them with zeroifnull functions before the inline
-      // view is analyzed.
-      if (!Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr()) &&
-        (!(item.getExpr() instanceof CastExpr) ||
-         !Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr().getChild(0)))) {
-        throw new AnalysisException("Aggregate function that returns non-null on " +
-          "an empty input cannot be used in an expression in a " +
-          "correlated subquery's select list: " + subquery.toSql());
+
+    /**
+     * Rewrite all the subqueries of a SelectStmt in place. Subqueries are currently
+     * supported in FROM and WHERE clauses. The rewrite is performed in place and not in a
+     * clone of SelectStmt because it requires the stmt to be analyzed.
+     */
+    @Override
+    prote

<TRUNCATED>

[10/15] impala git commit: [DOCS] Added a link to impala kerberos doc in impala-shell options doc

Posted by tm...@apache.org.
[DOCS] Added a link to impala kerberos doc in impala-shell options doc

Change-Id: Ifbd542dc0e9051e022636dd1b6bebb69b9b22b08
Reviewed-on: http://gerrit.cloudera.org:8080/10482
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/71e22a59
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/71e22a59
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/71e22a59

Branch: refs/heads/2.x
Commit: 71e22a59a4e8e67fbf10f74f0b6e054989238e50
Parents: 4760680
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Tue May 22 18:21:02 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_shell_options.xml | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/71e22a59/docs/topics/impala_shell_options.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_shell_options.xml b/docs/topics/impala_shell_options.xml
index 5db5bcc..80e9d77 100644
--- a/docs/topics/impala_shell_options.xml
+++ b/docs/topics/impala_shell_options.xml
@@ -345,6 +345,10 @@ under the License.
                   is not enabled on the instance of <codeph>impalad</codeph> to which you are connecting, errors
                   are displayed.
                 </p>
+                <p>
+                  See <keyword keyref="kerberos"/> for the steps to set up and
+                  use Kerberos authentication in Impala.
+                </p>
               </entry>
             </row>
             <row>


[13/15] impala git commit: [DOCS] Sentry is required for Impala to enable delegation

Posted by tm...@apache.org.
[DOCS] Sentry is required for Impala to enable delegation

Change-Id: I002d3d33eee6a9b9336f21c81a4de75ed3bd5efb
Reviewed-on: http://gerrit.cloudera.org:8080/10451
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f9158657
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f9158657
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f9158657

Branch: refs/heads/2.x
Commit: f91586571db826d6f12663e1dc11ab5ce3253f24
Parents: 41d7cd9
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Fri May 18 12:24:02 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:17 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_delegation.xml | 93 +++++++++++++++-------------------
 1 file changed, 42 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f9158657/docs/topics/impala_delegation.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_delegation.xml b/docs/topics/impala_delegation.xml
index 73ae658..c524bf5 100644
--- a/docs/topics/impala_delegation.xml
+++ b/docs/topics/impala_delegation.xml
@@ -36,67 +36,58 @@ under the License.
   </prolog>
 
   <conbody>
-
     <p>
-<!--
+      <!--
       When users connect to Impala directly through the <cmdname>impala-shell</cmdname> interpreter, the Sentry
       authorization framework determines what actions they can take and what data they can see.
 -->
-      When users submit Impala queries through a separate application, such as Hue or a business intelligence tool,
-      typically all requests are treated as coming from the same user. In Impala 1.2 and higher, authentication is
-      extended by a new feature that allows applications to pass along credentials for the users that connect to
-      them (known as <q>delegation</q>), and issue Impala queries with the privileges for those users. Currently,
-      the delegation feature is available only for Impala queries submitted through application interfaces such as
-      Hue and BI tools; for example, Impala cannot issue queries using the privileges of the HDFS user.
-    </p>
-
-    <p>
-      The delegation feature is enabled by a startup option for <cmdname>impalad</cmdname>:
-      <codeph>--authorized_proxy_user_config</codeph>. When you specify this option, users whose names you specify
-      (such as <codeph>hue</codeph>) can delegate the execution of a query to another user. The query runs with the
-      privileges of the delegated user, not the original user such as <codeph>hue</codeph>. The name of the
-      delegated user is passed using the HiveServer2 configuration property <codeph>impala.doas.user</codeph>.
-    </p>
-
-    <p>
-      You can specify a list of users that the application user can delegate to, or <codeph>*</codeph> to allow a
-      superuser to delegate to any other user. For example:
-    </p>
-
-<codeblock>impalad --authorized_proxy_user_config 'hue=user1,user2;admin=*' ...</codeblock>
-
-    <note>
-      Make sure to use single quotes or escape characters to ensure that any <codeph>*</codeph> characters do not
-      undergo wildcard expansion when specified in command-line arguments.
-    </note>
-
-    <p>
-      See <xref href="impala_config_options.xml#config_options"/> for details about adding or changing
-      <cmdname>impalad</cmdname> startup options. See
-      <xref keyref="how-hiveserver2-brings-security-and-concurrency-to-apache-hive">this
-      blog post</xref> for background information about the delegation capability in HiveServer2.
-    </p>
-    <p>
-      To set up authentication for the delegated users:
-    </p>
-
+      When users submit Impala queries through a separate application, such as
+      Hue or a business intelligence tool, typically all requests are treated as
+      coming from the same user. In Impala 1.2 and higher,Impala supports
+      applications to pass along credentials for the users that connect to them,
+      known as <q>delegation</q>, and to issue Impala queries with the
+      privileges for those users. Currently, the delegation feature is available
+      only for Impala queries submitted through application interfaces such as
+      Hue and BI tools. For example, Impala cannot issue queries using the
+      privileges of the HDFS user. </p>
+    <note type="attention">Impala requires Apache Sentry on the cluster to
+      enable delegation. Without Apache Sentry installed, the delegation feature
+      will fail with the following error: User <i>user1</i> is not authorized to
+      delegate to <i>user2</i> User delegation is disabled.</note>
+    <p> The delegation feature is enabled by a startup option for
+        <cmdname>impalad</cmdname>:
+        <codeph>--authorized_proxy_user_config</codeph>. When you specify this
+      option, users whose names you specify (such as <codeph>hue</codeph>) can
+      delegate the execution of a query to another user. The query runs with the
+      privileges of the delegated user, not the original user such as
+        <codeph>hue</codeph>. The name of the delegated user is passed using the
+      HiveServer2 configuration property <codeph>impala.doas.user</codeph>. </p>
+    <p> You can specify a list of users that the application user can delegate
+      to, or <codeph>*</codeph> to allow a superuser to delegate to any other
+      user. For example: </p>
+    <codeblock>impalad --authorized_proxy_user_config 'hue=user1,user2;admin=*' ...</codeblock>
+    <note> Make sure to use single quotes or escape characters to ensure that
+      any <codeph>*</codeph> characters do not undergo wildcard expansion when
+      specified in command-line arguments. </note>
+    <p> See <xref href="impala_config_options.xml#config_options"/> for details
+      about adding or changing <cmdname>impalad</cmdname> startup options. See
+        <xref
+        keyref="how-hiveserver2-brings-security-and-concurrency-to-apache-hive"
+        >this blog post</xref> for background information about the delegation
+      capability in HiveServer2. </p>
+    <p> To set up authentication for the delegated users: </p>
     <ul>
       <li>
-        <p>
-          On the server side, configure either user/password authentication through LDAP, or Kerberos
-          authentication, for all the delegated users. See <xref href="impala_ldap.xml#ldap"/> or
-          <xref href="impala_kerberos.xml#kerberos"/> for details.
-        </p>
+        <p> On the server side, configure either user/password authentication
+          through LDAP, or Kerberos authentication, for all the delegated users.
+          See <xref href="impala_ldap.xml#ldap"/> or <xref
+            href="impala_kerberos.xml#kerberos"/> for details. </p>
       </li>
-
       <li>
-        <p>
-          On the client side, to learn how to enable delegation, consult the documentation
-          for the ODBC driver you are using.
-        </p>
+        <p> On the client side, to learn how to enable delegation, consult the
+          documentation for the ODBC driver you are using. </p>
       </li>
     </ul>
-
   </conbody>
 
 </concept>


[06/15] impala git commit: [DOCS] Fixed misleading documentation on Impala + HDFS caching

Posted by tm...@apache.org.
[DOCS] Fixed misleading documentation on Impala + HDFS caching

Change-Id: I63cd1ff7b885a094a4a3e91c31101d25414b4db7
Reviewed-on: http://gerrit.cloudera.org:8080/10454
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fd27b170
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fd27b170
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fd27b170

Branch: refs/heads/2.x
Commit: fd27b1708f8605e8c91396c8f6e4a657dd1b218c
Parents: 86c61bc
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Fri May 18 14:32:02 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_perf_hdfs_caching.xml | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fd27b170/docs/topics/impala_perf_hdfs_caching.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_perf_hdfs_caching.xml b/docs/topics/impala_perf_hdfs_caching.xml
index e1f25a6..8f0fbb9 100644
--- a/docs/topics/impala_perf_hdfs_caching.xml
+++ b/docs/topics/impala_perf_hdfs_caching.xml
@@ -270,14 +270,11 @@ show table stats census;
         location, dropping the table, and so on.
       </p>
 
-      <p>
-        When data is requested to be pinned in memory, that process happens in the background without blocking
-        access to the data while the caching is in progress. Loading the data from disk could take some time.
-        Impala reads each HDFS data block from memory if it has been pinned already, or from disk if it has not
-        been pinned yet. When files are added to a table or partition whose contents are cached, Impala
-        automatically detects those changes and performs a <codeph>REFRESH</codeph> automatically once the relevant
-        data is cached.
-      </p>
+      <p> When data is requested to be pinned in memory, that process happens in
+        the background without blocking access to the data while the caching is
+        in progress. Loading the data from disk could take some time. Impala
+        reads each HDFS data block from memory if it has been pinned already, or
+        from disk if it has not been pinned yet.</p>
 
       <p>
         The amount of data that you can pin on each node through the HDFS caching mechanism is subject to a quota


[02/15] impala git commit: IMPALA-4025: Part 1: Generalize and cleanup StmtRewriter

Posted by tm...@apache.org.
IMPALA-4025: Part 1: Generalize and cleanup StmtRewriter

This patch generalizes StmtRewriter, allowing it to be subclassed. The
base class would traverse the stmt tree while the subclasses can install
hooks to execute specific rewrite rules at certain places. Existing
rewriting rules are moved into SubqueryRewriter.

Change-Id: I9e7a6108d3d49be12ae032fdb54b5c3c23152a47
Reviewed-on: http://gerrit.cloudera.org:8080/10495
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/41d7cd90
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/41d7cd90
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/41d7cd90

Branch: refs/heads/2.x
Commit: 41d7cd908a05dabe31775dabf188d3b2136c25d2
Parents: 71e22a5
Author: Tianyi Wang <ti...@apache.org>
Authored: Thu May 17 18:38:34 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/AnalysisContext.java |   81 +-
 .../apache/impala/analysis/StmtRewriter.java    | 1830 +++++++++---------
 2 files changed, 961 insertions(+), 950 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/41d7cd90/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 791e528..3e7f0cc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -426,52 +426,47 @@ public class AnalysisContext {
   private void analyze(StmtTableCache stmtTableCache) throws AnalysisException {
     Preconditions.checkNotNull(analysisResult_);
     Preconditions.checkNotNull(analysisResult_.stmt_);
-    try {
+    analysisResult_.analyzer_ = createAnalyzer(stmtTableCache);
+    analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
+    boolean isExplain = analysisResult_.isExplainStmt();
+
+    // Apply expr and subquery rewrites.
+    boolean reAnalyze = false;
+    ExprRewriter rewriter = analysisResult_.analyzer_.getExprRewriter();
+    if (analysisResult_.requiresExprRewrite()) {
+      rewriter.reset();
+      analysisResult_.stmt_.rewriteExprs(rewriter);
+      reAnalyze = rewriter.changed();
+    }
+    if (analysisResult_.requiresSubqueryRewrite()) {
+      new StmtRewriter.SubqueryRewriter().rewrite(analysisResult_);
+      reAnalyze = true;
+    }
+    if (reAnalyze) {
+      // The rewrites should have no user-visible effect. Remember the original result
+      // types and column labels to restore them after the rewritten stmt has been
+      // reset() and re-analyzed. For a CTAS statement, the types represent column types
+      // of the table that will be created, including the partition columns, if any.
+      List<Type> origResultTypes = Lists.newArrayList();
+      for (Expr e: analysisResult_.stmt_.getResultExprs()) {
+        origResultTypes.add(e.getType());
+      }
+      List<String> origColLabels =
+          Lists.newArrayList(analysisResult_.stmt_.getColLabels());
+
+      // Re-analyze the stmt with a new analyzer.
       analysisResult_.analyzer_ = createAnalyzer(stmtTableCache);
+      analysisResult_.stmt_.reset();
       analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
-      boolean isExplain = analysisResult_.isExplainStmt();
-
-      // Apply expr and subquery rewrites.
-      boolean reAnalyze = false;
-      ExprRewriter rewriter = analysisResult_.analyzer_.getExprRewriter();
-      if (analysisResult_.requiresExprRewrite()) {
-        rewriter.reset();
-        analysisResult_.stmt_.rewriteExprs(rewriter);
-        reAnalyze = rewriter.changed();
-      }
-      if (analysisResult_.requiresSubqueryRewrite()) {
-        StmtRewriter.rewrite(analysisResult_);
-        reAnalyze = true;
-      }
-      if (reAnalyze) {
-        // The rewrites should have no user-visible effect. Remember the original result
-        // types and column labels to restore them after the rewritten stmt has been
-        // reset() and re-analyzed. For a CTAS statement, the types represent column types
-        // of the table that will be created, including the partition columns, if any.
-        List<Type> origResultTypes = Lists.newArrayList();
-        for (Expr e: analysisResult_.stmt_.getResultExprs()) {
-          origResultTypes.add(e.getType());
-        }
-        List<String> origColLabels =
-            Lists.newArrayList(analysisResult_.stmt_.getColLabels());
-
-        // Re-analyze the stmt with a new analyzer.
-        analysisResult_.analyzer_ = createAnalyzer(stmtTableCache);
-        analysisResult_.stmt_.reset();
-        analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
-
-        // Restore the original result types and column labels.
-        analysisResult_.stmt_.castResultExprs(origResultTypes);
-        analysisResult_.stmt_.setColLabels(origColLabels);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("rewrittenStmt: " + analysisResult_.stmt_.toSql());
-        }
-        if (isExplain) analysisResult_.stmt_.setIsExplain();
-        Preconditions.checkState(!analysisResult_.requiresSubqueryRewrite());
+
+      // Restore the original result types and column labels.
+      analysisResult_.stmt_.castResultExprs(origResultTypes);
+      analysisResult_.stmt_.setColLabels(origColLabels);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("rewrittenStmt: " + analysisResult_.stmt_.toSql());
       }
-    } catch (AnalysisException e) {
-      // Don't wrap AnalysisExceptions in another AnalysisException
-      throw e;
+      if (isExplain) analysisResult_.stmt_.setIsExplain();
+      Preconditions.checkState(!analysisResult_.requiresSubqueryRewrite());
     }
   }
 


[03/15] impala git commit: IMPALA-7058: disable fuzz test for RC and Seq

Posted by tm...@apache.org.
IMPALA-7058: disable fuzz test for RC and Seq

There appear to still be some rare crashes. Let's disable the
test until we can sort those out

Change-Id: I10eb184ab2f27ca9b2d286630ceb37b71affcc27
Reviewed-on: http://gerrit.cloudera.org:8080/10485
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/47606806
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/47606806
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/47606806

Branch: refs/heads/2.x
Commit: 47606806a478ea003d6487d375bf683682c16298
Parents: a48bbfd
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed May 23 09:59:26 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 tests/query_test/test_scanners_fuzz.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/47606806/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index 4886c4a..9501775 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -71,7 +71,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
     # TODO(IMPALA-6772): enable for ORC formats once a new version after release-1.4.3
     # of ORC library is released.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format in ('avro', 'parquet', 'rc', 'seq') or
+        v.get_value('table_format').file_format in ('avro', 'parquet') or
         (v.get_value('table_format').file_format == 'text' and
           v.get_value('table_format').compression_codec in ('none', 'lzo')))
 


[12/15] impala git commit: IMPALA-6813: Hedged reads metrics broken when scanning non-HDFS based table

Posted by tm...@apache.org.
IMPALA-6813: Hedged reads metrics broken when scanning non-HDFS based table

We realized that the libHDFS API call hdfsGetHedgedReadMetrics() crashes
when the 'fs' argument passed to it is not a HDFS filesystem.

There is an open bug for it on the HDFS side: HDFS-13417
However, it looks like we won't be getting a fix for it in the short term,
so our only option at this point is to skip it.

Testing: Made sure that enabling preads and scanning from S3 doesn't
cause a crash.
Also, added a custom cluster test to exercise the pread code path. We
are unable to verify hedged reads in a minicluster, but we can at least
exercise the code path to make sure that nothing breaks.

Change-Id: I48fe80dfd9a1ed68a8f2b7038e5f42b5a3df3baa
Reviewed-on: http://gerrit.cloudera.org:8080/9966
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a3efde84
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a3efde84
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a3efde84

Branch: refs/heads/2.x
Commit: a3efde84a5e0ef17357d24c3e69aa3f255eb4865
Parents: 466188b
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Mon Apr 9 15:26:06 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/io/scan-range.cc           |  4 +++-
 tests/custom_cluster/test_hedged_reads.py | 30 ++++++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a3efde84/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index c868c3d..409e743 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -498,11 +498,13 @@ void ScanRange::Close() {
       closed_file = true;
     }
 
-    if (FLAGS_use_hdfs_pread) {
+    if (FLAGS_use_hdfs_pread && IsHdfsPath(file())) {
       // Update Hedged Read Metrics.
       // We call it only if the --use_hdfs_pread flag is set, to avoid having the
       // libhdfs client malloc and free a hdfsHedgedReadMetrics object unnecessarily
       // otherwise. 'hedged_metrics' is only set upon success.
+      // We also avoid calling hdfsGetHedgedReadMetrics() when the file is not on HDFS
+      // (see HDFS-13417).
       struct hdfsHedgedReadMetrics* hedged_metrics;
       int success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics);
       if (success == 0) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a3efde84/tests/custom_cluster/test_hedged_reads.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_hedged_reads.py b/tests/custom_cluster/test_hedged_reads.py
new file mode 100644
index 0000000..b24fd92
--- /dev/null
+++ b/tests/custom_cluster/test_hedged_reads.py
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIf
+
+@SkipIf.not_hdfs
+class TestHedgedReads(CustomClusterTestSuite):
+  """ Exercises the hedged reads code path.
+      NOTE: We unfortunately cannot force hedged reads on a minicluster, but we enable
+      this test to at least make sure that the code path doesn't break."""
+  @CustomClusterTestSuite.with_args("--use_hdfs_pread=true")
+  def test_hedged_reads(self, vector):
+    QUERY = "select * from tpch_parquet.lineitem limit 100"
+    self.client.execute(QUERY)


[14/15] impala git commit: IMPALA-7067: deflake test_cancellation

Posted by tm...@apache.org.
IMPALA-7067: deflake test_cancellation

Tweak the query so that it still runs for a long time but
can cancel the fragment quicker instead of being
stuck in a long sleep() call.

Change-Id: I0c90d4f5c277f7b0d5561637944b454f7a44c76e
Reviewed-on: http://gerrit.cloudera.org:8080/10499
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0e7b0759
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0e7b0759
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0e7b0759

Branch: refs/heads/2.x
Commit: 0e7b075923cbecce4db2fd2e4fa3edf63afef06f
Parents: 1ba8581
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed May 23 21:56:14 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:17 2018 +0000

----------------------------------------------------------------------
 tests/shell/test_shell_commandline.py | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/0e7b0759/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index e1a4b61..94722e4 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -318,9 +318,12 @@ class TestImpalaShell(ImpalaTestSuite):
     assert 0 == impalad_service.get_num_in_flight_queries()
 
   def test_cancellation(self):
-    """Test cancellation (Ctrl+C event)."""
-    args = '-q "select sleep(100000)"'
-    p = ImpalaShell(args)
+    """Test cancellation (Ctrl+C event). Run a query that sleeps 10ms per row so will run
+    for 110s if not cancelled, but will detect cancellation quickly because of the small
+    batch size."""
+    query = "set num_nodes=1; set mt_dop=1; set batch_size=1; \
+             select sleep(10) from functional_parquet.alltypesagg"
+    p = ImpalaShell('-q "{0}"'.format(query))
     sleep(6)
     os.kill(p.pid(), signal.SIGINT)
     result = p.get_result()


[04/15] impala git commit: IMPALA-7048: Failed test: test_write_index_many_columns_tables

Posted by tm...@apache.org.
IMPALA-7048: Failed test: test_write_index_many_columns_tables

The test in the title fails when the local filesystem is used.

Looking at the error message it seems that the determined
Parquet file size is too small when the local filesystem
is used. There is already an annotation for that:
'SkipIfLocal.parquet_file_size'

I added this annotation to the TestHdfsParquetTableIndexWriter
class, therefore these tests won't be executed when the
test-warehouse directory of Impala resides on the local
filesystem.

Change-Id: Idd3be70fb654a49dda44309a8914fe1f2b48a1af
Reviewed-on: http://gerrit.cloudera.org:8080/10476
Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a48bbfdf
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a48bbfdf
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a48bbfdf

Branch: refs/heads/2.x
Commit: a48bbfdf4692eb68f06a4cd192a98947bcc04aba
Parents: a3efde8
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Tue May 22 18:13:45 2018 +0200
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 tests/query_test/test_parquet_page_index.py | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a48bbfdf/tests/query_test/test_parquet_page_index.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_page_index.py b/tests/query_test/test_parquet_page_index.py
index 51632e5..0ee5d37 100644
--- a/tests/query_test/test_parquet_page_index.py
+++ b/tests/query_test/test_parquet_page_index.py
@@ -24,6 +24,7 @@ from subprocess import check_call
 from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
 
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIfLocal
 from tests.util.filesystem_utils import get_fs_path
 from tests.util.get_parquet_metadata import (
     decode_stats_value,
@@ -34,6 +35,7 @@ from tests.util.get_parquet_metadata import (
 PAGE_INDEX_MAX_STRING_LENGTH = 64
 
 
+@SkipIfLocal.parquet_file_size
 class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
   """Since PARQUET-922 page statistics can be written before the footer.
   The tests in this class checks if Impala writes the page indices correctly.


[08/15] impala git commit: IMPALA-7051: Serialize Maven invocations.

Posted by tm...@apache.org.
IMPALA-7051: Serialize Maven invocations.

I've observed some rare cases where Impala fails to build. I believe
it's because two Maven targets (yarn-extras and ext-data-source) are
being executed simultaneously. Maven's handling of ~/.m2/repository,
for example, is known to be not safe.

This patch serializes the Maven builds with the following
dependency graph:
  fe -> yarn-extras -> ext-data-source -> impala-parent
The ordering of yarn-extras -> ext-data-source is arbitrary.

I decided that this artificial dependency was the clearest
way to prevent parallel executions. Having mvn-quiet.sh
take a lock seemed considerably more complex.

Change-Id: Ie24f34f421bc7dcf9140938464d43400da95275e
Reviewed-on: http://gerrit.cloudera.org:8080/10460
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/98052e07
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/98052e07
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/98052e07

Branch: refs/heads/2.x
Commit: 98052e078a77f86a21a55aec234a269fd2a0c19e
Parents: 3b8a964
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Fri May 18 16:36:58 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 common/yarn-extras/CMakeLists.txt | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/98052e07/common/yarn-extras/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/yarn-extras/CMakeLists.txt b/common/yarn-extras/CMakeLists.txt
index 2b5f005..4f46ba5 100644
--- a/common/yarn-extras/CMakeLists.txt
+++ b/common/yarn-extras/CMakeLists.txt
@@ -15,6 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
-add_custom_target(yarn-extras ALL DEPENDS impala-parent
+# The dependency on ext-data-source here is fictional, but Maven does not like
+# concurrent invocations. These lead to opaque, non-deterministic errors due to
+# races in how Maven handles its ~/.m2/repository doc.
+add_custom_target(yarn-extras ALL DEPENDS impala-parent ext-data-source
   COMMAND $ENV{IMPALA_HOME}/bin/mvn-quiet.sh -B install -DskipTests
 )


[09/15] impala git commit: IMPALA-3134: Support different proc mem limits among impalads for admission control checks

Posted by tm...@apache.org.
IMPALA-3134: Support different proc mem limits among impalads for
admission control checks

Currently the admission controller assumes that all backends have the
same process mem limit as the impalad it itself is running on. With
this patch the proc mem limit for each impalad is available to the
admission controller and it uses it for making correct admisssion
decisions. It currently works under the assumption that the
per-process memory limit does not change dynamically.

Testing:
Added an e2e test.

IMPALA-5662: Log the queuing reason for a query

The queuing reason is now logged both while queuing for the first
time and while trying to dequeue.

Change-Id: Idb72eee790cc17466bbfa82e30f369a65f2b060e
Reviewed-on: http://gerrit.cloudera.org:8080/10396
Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/466188b3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/466188b3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/466188b3

Branch: refs/heads/2.x
Commit: 466188b3970595e2e04d7ecf6a5141a7d3012909
Parents: b07bb27
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Fri May 4 14:42:48 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 be/src/scheduling/admission-controller.cc       | 70 +++++++++++---------
 be/src/scheduling/admission-controller.h        |  3 -
 be/src/scheduling/query-schedule.h              |  4 ++
 be/src/scheduling/scheduler.cc                  | 11 ++-
 be/src/scheduling/scheduler.h                   |  3 +-
 be/src/service/impala-server.cc                 |  3 +
 bin/start-impala-cluster.py                     | 12 ++++
 common/thrift/StatestoreService.thrift          |  3 +
 .../custom_cluster/test_admission_controller.py | 56 ++++++++++++++++
 9 files changed, 127 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index eef18ef..c9a2139 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -46,10 +46,6 @@ std::string PrintBytes(int64_t value) {
   return PrettyPrinter::Print(value, TUnit::BYTES);
 }
 
-int64_t GetProcMemLimit() {
-  return ExecEnv::GetInstance()->process_mem_tracker()->limit();
-}
-
 // Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>".
 // "!" is used because the backend id contains a colon, but it should not contain "!".
 // When parsing the topic key we need to be careful to find the last instance in
@@ -136,7 +132,7 @@ const string REASON_REQ_OVER_POOL_MEM =
     "The total memory needed is the per-node MEM_LIMIT times the number of nodes "
     "executing the query. See the Admission Control documentation for more information.";
 const string REASON_REQ_OVER_NODE_MEM =
-    "request memory needed $0 per node is greater than process mem limit $1.\n\n"
+    "request memory needed $0 per node is greater than process mem limit $1 of $2.\n\n"
     "Use the MEM_LIMIT query option to indicate how much memory is required per node.";
 
 // Queue decision details
@@ -150,7 +146,7 @@ const string POOL_MEM_NOT_AVAILABLE = "Not enough aggregate memory available in
     "with max mem resources $1. Needed $2 but only $3 was available.";
 // $0 = host name, $1 = host mem needed, $3 = host mem available
 const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0."
-    "Needed $1 but only $2 was available.";
+    "Needed $1 but only $2 out of $3 was available.";
 
 // Parses the pool name and backend_id from the topic key if it is valid.
 // Returns true if the topic key is valid and pool_name and backend_id are set.
@@ -348,10 +344,10 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   }
 
   // Case 2:
-  int64_t proc_mem_limit = GetProcMemLimit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
+    int64_t proc_mem_limit = entry.second.proc_mem_limit;
     int64_t mem_reserved = host_mem_reserved_[host_id];
     int64_t mem_admitted = host_mem_admitted_[host_id];
     VLOG_ROW << "Checking memory on host=" << host_id
@@ -363,7 +359,8 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
     if (effective_host_mem_reserved + per_node_mem_needed > proc_mem_limit) {
       *mem_unavailable_reason = Substitute(HOST_MEM_NOT_AVAILABLE, host_id,
           PrintBytes(per_node_mem_needed),
-          PrintBytes(max(proc_mem_limit - effective_host_mem_reserved, 0L)));
+          PrintBytes(max(proc_mem_limit - effective_host_mem_reserved, 0L)),
+          PrintBytes(proc_mem_limit));
       return false;
     }
   }
@@ -403,15 +400,22 @@ bool AdmissionController::RejectImmediately(QuerySchedule* schedule,
   // the checks isn't particularly important, though some thought was given to ordering
   // them in a way that might make the sense for a user.
 
-  // Compute the max (over all backends) min_mem_reservation_bytes and the cluster total
-  // (across all backends) min_mem_reservation_bytes.
+  // Compute the max (over all backends) min_mem_reservation_bytes, the cluster total
+  // (across all backends) min_mem_reservation_bytes and the min (over all backends)
+  // min_proc_mem_limit.
   int64_t max_min_mem_reservation_bytes = -1;
   int64_t cluster_min_mem_reservation_bytes = 0;
+  pair<const TNetworkAddress*, int64_t> min_proc_mem_limit(
+      nullptr, std::numeric_limits<int64_t>::max());
   for (const auto& e : schedule->per_backend_exec_params()) {
     cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
     if (e.second.min_mem_reservation_bytes > max_min_mem_reservation_bytes) {
       max_min_mem_reservation_bytes = e.second.min_mem_reservation_bytes;
     }
+    if (e.second.proc_mem_limit < min_proc_mem_limit.second) {
+      min_proc_mem_limit.first = &e.first;
+      min_proc_mem_limit.second = e.second.proc_mem_limit;
+    }
   }
 
   // Checks related to the min buffer reservation against configured query memory limits:
@@ -447,25 +451,26 @@ bool AdmissionController::RejectImmediately(QuerySchedule* schedule,
     *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES;
     return true;
   }
-  if (pool_cfg.max_mem_resources > 0
-      && cluster_min_mem_reservation_bytes > pool_cfg.max_mem_resources) {
-    *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM,
-        PrintBytes(pool_cfg.max_mem_resources),
-        PrintBytes(cluster_min_mem_reservation_bytes));
-    return true;
-  }
-  if (pool_cfg.max_mem_resources > 0
-      && schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) {
-    *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM,
-        PrintBytes(schedule->GetClusterMemoryEstimate()),
-        PrintBytes(pool_cfg.max_mem_resources));
-    return true;
-  }
-  if (pool_cfg.max_mem_resources > 0 &&
-      schedule->GetPerHostMemoryEstimate() > GetProcMemLimit()) {
-    *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-        PrintBytes(schedule->GetPerHostMemoryEstimate()), PrintBytes(GetProcMemLimit()));
-    return true;
+  if (pool_cfg.max_mem_resources > 0) {
+    if (cluster_min_mem_reservation_bytes > pool_cfg.max_mem_resources) {
+      *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM,
+          PrintBytes(pool_cfg.max_mem_resources),
+          PrintBytes(cluster_min_mem_reservation_bytes));
+      return true;
+    }
+    if (schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) {
+      *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM,
+          PrintBytes(schedule->GetClusterMemoryEstimate()),
+          PrintBytes(pool_cfg.max_mem_resources));
+      return true;
+    }
+    int64_t perHostMemoryEstimate = schedule->GetPerHostMemoryEstimate();
+    if (perHostMemoryEstimate > min_proc_mem_limit.second) {
+      *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+          PrintBytes(perHostMemoryEstimate), PrintBytes(min_proc_mem_limit.second),
+          TNetworkAddressToString(*min_proc_mem_limit.first));
+      return true;
+    }
   }
 
   // Checks related to the pool queue size:
@@ -537,7 +542,8 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule) {
     }
 
     // We cannot immediately admit but do not need to reject, so queue the request
-    VLOG_QUERY << "Queuing, query id=" << PrintId(schedule->query_id());
+    VLOG_QUERY << "Queuing, query id=" << PrintId(schedule->query_id())
+               << " reason: " << not_admitted_reason;
     stats->Queue(*schedule);
     queue->Enqueue(&queue_node);
   }
@@ -875,8 +881,8 @@ void AdmissionController::DequeueLoop() {
         // TODO: Requests further in the queue may be blocked unnecessarily. Consider a
         // better policy once we have better test scenarios.
         if (!CanAdmitRequest(schedule, pool_config, true, &not_admitted_reason)) {
-          VLOG_RPC << "Could not dequeue query id=" << PrintId(schedule.query_id())
-                   << " reason: " << not_admitted_reason;
+          VLOG_QUERY << "Could not dequeue query id=" << PrintId(schedule.query_id())
+                     << " reason: " << not_admitted_reason;
           break;
         }
         VLOG_RPC << "Dequeuing query=" << PrintId(schedule.query_id());

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 3341b1b..406e09e 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -173,9 +173,6 @@ class ExecEnv;
 /// above. Note the pool's max_mem_resources (#1) is not contented.
 /// TODO: Improve the dequeuing policy. IMPALA-2968.
 ///
-/// TODO: Assumes all impalads have the same proc mem limit. Should send proc mem limit
-///       via statestore (e.g. ideally in TBackendDescriptor) and check per-node
-///       reservations against this value.
 /// TODO: Remove less important debug logging after more cluster testing. Should have a
 ///       better idea of what is perhaps unnecessary.
 class AdmissionController {

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index fa200c6..e5d8f6a 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -65,6 +65,10 @@ struct BackendExecParams {
   // operators in all fragment instances that execute on this backend. This is used for
   // an optimization in InitialReservation. Measured in bytes.
   int64_t initial_mem_reservation_total_claims;
+
+  // The process memory limit of this backend. Obtained from the scheduler's executors
+  // configuration which is updated by membership updates from the statestore.
+  int64_t proc_mem_limit;
 };
 
 /// map from an impalad host address to the list of assigned fragment instance params.

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index e25b777..20c3210 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -692,7 +692,7 @@ Status Scheduler::Schedule(QuerySchedule* schedule) {
   ExecutorsConfigPtr config_ptr = GetExecutorsConfig();
   RETURN_IF_ERROR(ComputeScanRangeAssignment(*config_ptr, schedule));
   ComputeFragmentExecParams(*config_ptr, schedule);
-  ComputeBackendExecParams(schedule);
+  ComputeBackendExecParams(*config_ptr, schedule);
 #ifndef NDEBUG
   schedule->Validate();
 #endif
@@ -709,7 +709,8 @@ Status Scheduler::Schedule(QuerySchedule* schedule) {
   return Status::OK();
 }
 
-void Scheduler::ComputeBackendExecParams(QuerySchedule* schedule) {
+void Scheduler::ComputeBackendExecParams(
+    const BackendConfig& executor_config, QuerySchedule* schedule) {
   PerBackendExecParams per_backend_params;
   for (const FragmentExecParams& f : schedule->fragment_exec_params()) {
     for (const FInstanceExecParams& i : f.instance_exec_params) {
@@ -726,6 +727,12 @@ void Scheduler::ComputeBackendExecParams(QuerySchedule* schedule) {
           f.fragment.initial_mem_reservation_total_claims;
     }
   }
+
+  for (auto& backend: per_backend_params) {
+    const TNetworkAddress& host = backend.first;
+    backend.second.proc_mem_limit =
+        LookUpBackendDesc(executor_config, host).proc_mem_limit;
+  }
   schedule->set_per_backend_exec_params(per_backend_params);
 
   stringstream min_mem_reservation_ss;

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index b87b239..d0302e6 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -411,7 +411,8 @@ class Scheduler {
 
   /// Computes BackendExecParams for all backends assigned in the query. Must be called
   /// after ComputeFragmentExecParams().
-  void ComputeBackendExecParams(QuerySchedule* schedule);
+  void ComputeBackendExecParams(
+      const BackendConfig& executor_config, QuerySchedule* schedule);
 
   /// Compute the FragmentExecParams for all plans in the schedule's
   /// TQueryExecRequest.plan_exec_info.

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 36224d4..3625a7d 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -53,6 +53,7 @@
 #include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
@@ -1654,6 +1655,8 @@ void ImpalaServer::AddLocalBackendToStatestore(
   local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
   local_backend_descriptor.__set_address(exec_env_->backend_address());
   local_backend_descriptor.ip_address = exec_env_->ip_address();
+  local_backend_descriptor.__set_proc_mem_limit(
+      exec_env_->process_mem_tracker()->limit());
   if (FLAGS_use_krpc) {
     const TNetworkAddress& krpc_address = exec_env_->krpc_address();
     DCHECK(IsResolvedAddress(krpc_address));

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index cd926da..1242ab9 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -84,6 +84,10 @@ parser.add_option("--kudu_master_hosts", default=KUDU_MASTER_HOSTS,
 # replica initialization. The ith delay is applied to the ith impalad.
 parser.add_option("--catalog_init_delays", dest="catalog_init_delays", default="",
                   help=SUPPRESS_HELP)
+# For testing: Semi-colon separated list of startup arguments to be passed per impalad.
+# The ith group of options is applied to the ith impalad.
+parser.add_option("--per_impalad_args", dest="per_impalad_args", type="string"
+                  ,default="", help=SUPPRESS_HELP)
 
 options, args = parser.parse_args()
 
@@ -236,6 +240,10 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
   if options.catalog_init_delays != "":
     delay_list = [delay.strip() for delay in options.catalog_init_delays.split(",")]
 
+  per_impalad_args = []
+  if options.per_impalad_args != "":
+    per_impalad_args = [args.strip() for args in options.per_impalad_args.split(";")]
+
   # Start each impalad instance and optionally redirect the output to a log file.
   for i in range(cluster_size):
     if i == 0:
@@ -273,6 +281,10 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
     if options.disable_krpc:
       args = "-use_krpc=false %s" % (args)
 
+    # Appended at the end so they can override previous args.
+    if i < len(per_impalad_args):
+      args = "%s %s" % (args, per_impalad_args[i])
+
     stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 3702932..4f2dada 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -71,6 +71,9 @@ struct TBackendDescriptor {
 
   // IP address + port of KRPC based ImpalaInternalService on this backend
   7: optional Types.TNetworkAddress krpc_address;
+
+  // The process memory limit of this backend (in bytes).
+  8: required i64 proc_mem_limit;
 }
 
 // Description of a single entry in a topic

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index a8070a6..9aa74a5 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -24,6 +24,7 @@ import pytest
 import re
 import sys
 import threading
+from copy import copy
 from time import sleep, time
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
@@ -426,6 +427,61 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     exec_options['mem_limit'] = self.PROC_MEM_TEST_LIMIT
     self.execute_query_expect_success(self.client, query, exec_options)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,
+      pool_max_mem=10 * PROC_MEM_TEST_LIMIT,
+      queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS),
+      start_args="--per_impalad_args=-mem_limit=3G;-mem_limit=3G;-mem_limit=2G",
+      statestored_args=_STATESTORED_ARGS)
+  def test_heterogeneous_proc_mem_limit(self, vector):
+    """ Test to ensure that the admission controller takes into account the actual proc
+    mem limits of each impalad. Starts a cluster where the last impalad has a smaller
+    proc mem limit than other impalads and runs queries where admission/rejection decision
+    depends on the coordinator knowing the other impalad's mem limits.
+    The queue_wait_timeout_ms has been set to be more than the prioritized statestore
+    update time, so that the queries don't time out before receiving updates to pool
+    stats"""
+    # Choose a query that runs on all 3 backends.
+    query = "select * from functional.alltypesagg, (select 1) B limit 1"
+    # Successfully run a query with mem limit equal to the lowest process memory among
+    # impalads
+    exec_options = copy(vector.get_value('exec_option'))
+    exec_options['mem_limit'] = "2G"
+    self.execute_query_expect_success(self.client, query, exec_options)
+    # Test that a query scheduled to run on a single node and submitted to the impalad
+    # with higher proc mem limit succeeds.
+    exec_options = copy(vector.get_value('exec_option'))
+    exec_options['mem_limit'] = "3G"
+    exec_options['num_nodes'] = "1"
+    self.execute_query_expect_success(self.client, query, exec_options)
+    # Exercise rejection checks in admission controller.
+    try:
+      exec_options = copy(vector.get_value('exec_option'))
+      exec_options['mem_limit'] = "3G"
+      self.execute_query(query, exec_options)
+    except ImpalaBeeswaxException as e:
+      assert re.search("Rejected query from pool \S+: request memory needed 3.00 GB per "
+          "node is greater than process mem limit 2.00 GB of \S+", str(e)), str(e)
+    # Exercise queuing checks in admission controller.
+    try:
+      impalad_with_2g_mem = self.cluster.impalads[2].service.create_beeswax_client()
+      impalad_with_2g_mem.set_configuration_option('mem_limit', '1G')
+      impalad_with_2g_mem.execute_async("select sleep(1000)")
+      # Wait for statestore update to update the mem admitted in each node.
+      sleep(STATESTORE_RPC_FREQUENCY_MS/1000)
+      exec_options = copy(vector.get_value('exec_option'))
+      exec_options['mem_limit'] = "2G"
+      # Since Queuing is synchronous and we can't close the previous query till this
+      # returns, we wait for this to timeout instead.
+      self.execute_query(query, exec_options)
+    except ImpalaBeeswaxException as e:
+      assert re.search("Queued reason: Not enough memory available on host \S+.Needed "
+          "2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), str(e)
+    finally:
+      if impalad_with_2g_mem is not None:
+        impalad_with_2g_mem.close()
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
   (parameterized) and the ability to submit to one impalad or many in a round-robin


[15/15] impala git commit: IMPALA-7055: fix race with DML errors

Posted by tm...@apache.org.
IMPALA-7055: fix race with DML errors

Error statuses could be lost because backend_exec_complete_barrier_
went to 0 before the query was transitioned to an error state.
Reordering the UpdateExecState() and backend_exec_complete_barrier_
calls prevents this race.

Change-Id: Idafd0b342e77a065be7cc28fa8c8a9df445622c2
Reviewed-on: http://gerrit.cloudera.org:8080/10491
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1ba8581c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1ba8581c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1ba8581c

Branch: refs/heads/2.x
Commit: 1ba8581ceeac4f3c8dbf2b56139dec420de6e967
Parents: f915865
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed May 23 14:03:12 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:17 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc | 24 ++++++++++++++++--------
 be/src/runtime/coordinator.h  |  3 +++
 2 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1ba8581c/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 995d747..28e4d6c 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -669,25 +669,33 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
 
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
     // This backend execution has completed.
+    if (VLOG_QUERY_IS_ON) {
+      // Don't log backend completion if the query has already been cancelled.
+      int pending_backends = backend_exec_complete_barrier_->pending();
+      if (pending_backends >= 1) {
+        VLOG_QUERY << "Backend completed:"
+                   << " host=" << TNetworkAddressToString(backend_state->impalad_address())
+                   << " remaining=" << pending_backends
+                   << " query_id=" << PrintId(query_id());
+        BackendState::LogFirstInProgress(backend_states_);
+      }
+    }
     bool is_fragment_failure;
     TUniqueId failed_instance_id;
     Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id);
-    int pending_backends = backend_exec_complete_barrier_->Notify();
-    if (VLOG_QUERY_IS_ON && pending_backends >= 0) {
-      VLOG_QUERY << "Backend completed:"
-                 << " host=" << TNetworkAddressToString(backend_state->impalad_address())
-                 << " remaining=" << pending_backends
-                 << " query_id=" << PrintId(query_id());
-      BackendState::LogFirstInProgress(backend_states_);
-    }
     if (!status.ok()) {
       // We may start receiving status reports before all exec rpcs are complete.
       // Can't apply state transition until no more exec rpcs will be sent.
       exec_rpcs_complete_barrier_->Wait();
+      // Transition the status if we're not already in a terminal state. This won't block
+      // because either this transitions to an ERROR state or the query is already in
+      // a terminal state.
       discard_result(UpdateExecState(status,
               is_fragment_failure ? &failed_instance_id : nullptr,
               TNetworkAddressToString(backend_state->impalad_address())));
     }
+    // We've applied all changes from the final status report - notify waiting threads.
+    backend_exec_complete_barrier_->Notify();
   }
   // If all results have been returned, return a cancelled status to force the fragment
   // instance to stop executing.

http://git-wip-us.apache.org/repos/asf/impala/blob/1ba8581c/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index ae85bcd..5bb399f 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -350,6 +350,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// the Coordinator object), then finalizes execution (cancels remaining backends if
   /// transitioning to CANCELLED; in all cases releases resources and calls
   /// ComputeQuerySummary()). Must not be called if exec RPCs are pending.
+  /// Will block waiting for backends to completed if transitioning to the
+  /// RETURNED_RESULTS terminal state. Does not block if already in terminal state or
+  /// transitioning to ERROR or CANCELLED.
   void HandleExecStateTransition(const ExecState old_state, const ExecState new_state);
 
   /// Return true if 'exec_state_' is RETURNED_RESULTS.


[11/15] impala git commit: IMPALA-7039: Ignore the port in HBase planner tests

Posted by tm...@apache.org.
IMPALA-7039: Ignore the port in HBase planner tests

Before this patch, we used to check the HBase port in the HBase planner
tests. This caused a failure when HBase was running on a different port
than expected. We fix the problem in this patch by not checking the
HBase port.

Testing: ran the FE tests and they passed.

Change-Id: I8eb7628061b2ebaf84323b37424925e9a64f70a0
Reviewed-on: http://gerrit.cloudera.org:8080/10459
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b07bb272
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b07bb272
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b07bb272

Branch: refs/heads/2.x
Commit: b07bb2729df4aa92d68626f88afa7cd09733ec23
Parents: 98052e0
Author: Taras Bobrovytsky <ta...@apache.org>
Authored: Fri May 18 15:42:15 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 .../apache/impala/planner/PlannerTestBase.java  |  3 --
 .../queries/PlannerTest/hbase.test              | 38 ++++++++++----------
 .../queries/PlannerTest/joins.test              |  2 +-
 3 files changed, 20 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b07bb272/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index bcd1fc3..e7e228f 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -288,10 +288,7 @@ public class PlannerTestBase extends FrontendTestBase {
           }
           if (locations.scan_range.isSetHbase_key_range()) {
             THBaseKeyRange keyRange = locations.scan_range.getHbase_key_range();
-            Integer hostIdx = locations.locations.get(0).host_idx;
-            TNetworkAddress networkAddress = execRequest.getHost_list().get(hostIdx);
             result.append("HBASE KEYRANGE ");
-            result.append("port=" + networkAddress.port + " ");
             if (keyRange.isSetStartKey()) {
               result.append(HBaseScanNode.printKey(keyRange.getStartKey().getBytes()));
             } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/b07bb272/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test b/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test
index e0728be..2119e2c 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/hbase.test
@@ -16,9 +16,9 @@ PLAN-ROOT SINK
    predicates: id < 5
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16201 <unbounded>:3
-  HBASE KEYRANGE port=16202 3:7
-  HBASE KEYRANGE port=16203 7:<unbounded>
+  HBASE KEYRANGE <unbounded>:3
+  HBASE KEYRANGE 3:7
+  HBASE KEYRANGE 7:<unbounded>
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -40,7 +40,7 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16202 5:5\0
+  HBASE KEYRANGE 5:5\0
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -62,8 +62,8 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16202 5\0:7
-  HBASE KEYRANGE port=16203 7:<unbounded>
+  HBASE KEYRANGE 5\0:7
+  HBASE KEYRANGE 7:<unbounded>
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -84,8 +84,8 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16202 5:7
-  HBASE KEYRANGE port=16203 7:<unbounded>
+  HBASE KEYRANGE 5:7
+  HBASE KEYRANGE 7:<unbounded>
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -106,8 +106,8 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16201 <unbounded>:3
-  HBASE KEYRANGE port=16202 3:5
+  HBASE KEYRANGE <unbounded>:3
+  HBASE KEYRANGE 3:5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -139,7 +139,7 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16202 4\0:5
+  HBASE KEYRANGE 4\0:5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -162,7 +162,7 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16202 4:5
+  HBASE KEYRANGE 4:5
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -185,7 +185,7 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16202 4\0:5\0
+  HBASE KEYRANGE 4\0:5\0
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -208,7 +208,7 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16202 4:5\0
+  HBASE KEYRANGE 4:5\0
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -395,7 +395,7 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5, string_col = '4'
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16202 4:5\0
+  HBASE KEYRANGE 4:5\0
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -449,7 +449,7 @@ PLAN-ROOT SINK
    predicates: tinyint_col = 5, string_col = '4'
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16202 4:5\0
+  HBASE KEYRANGE 4:5\0
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -548,9 +548,9 @@ PLAN-ROOT SINK
    predicates: bigint_col IS NOT NULL, bool_col = TRUE
 ---- SCANRANGELOCATIONS
 NODE 0:
-  HBASE KEYRANGE port=16201 <unbounded>:3
-  HBASE KEYRANGE port=16202 3:7
-  HBASE KEYRANGE port=16203 7:<unbounded>
+  HBASE KEYRANGE <unbounded>:3
+  HBASE KEYRANGE 3:7
+  HBASE KEYRANGE 7:<unbounded>
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |

http://git-wip-us.apache.org/repos/asf/impala/blob/b07bb272/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index 26d4d2f..4c5d7ab 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -358,7 +358,7 @@ NODE 0:
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypesagg/year=2010/month=1/day=8/100108.txt 0:76263
   HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypesagg/year=2010/month=1/day=9/100109.txt 0:76263
 NODE 1:
-  HBASE KEYRANGE port=16202 5:5\0
+  HBASE KEYRANGE 5:5\0
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |


[05/15] impala git commit: IMPALA-6317: Add -cmake_only option to buildall.sh

Posted by tm...@apache.org.
IMPALA-6317: Add -cmake_only option to buildall.sh

It's sometimes useful to be able to build a complete Impala dev
environment without necessarily building the Impala binary itself
-- e.g., when one wants to use the internal test framework to run
tests against an instance of Impala running on a remote cluster.

- This patch adds a -cmake_only flag to buildall.sh, which then
  gets propagated to the make_impala.sh.

- Added a missing line to the help text re: passing the -ninja
  command line option

Change-Id: If31a4e29425a6a20059cba2f43b72e4fb908018f
Reviewed-on: http://gerrit.cloudera.org:8080/10455
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/86c61bc3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/86c61bc3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/86c61bc3

Branch: refs/heads/2.x
Commit: 86c61bc3466ab9c181a133d428194f4b7bf5b082
Parents: 228f077
Author: David Knupp <dk...@cloudera.com>
Authored: Mon Mar 5 17:14:36 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 buildall.sh | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/86c61bc3/buildall.sh
----------------------------------------------------------------------
diff --git a/buildall.sh b/buildall.sh
index 56cdb9a..4e1866e 100755
--- a/buildall.sh
+++ b/buildall.sh
@@ -183,6 +183,9 @@ do
       LZO_CMAKE_ARGS+=" -GNinja"
       MAKE_CMD=ninja
       ;;
+    -cmake_only)
+      MAKE_IMPALA_ARGS+=" -cmake_only"
+      ;;
     -help|*)
       echo "buildall.sh - Builds Impala and runs all tests."
       echo "[-noclean] : Omits cleaning all packages before building. Will not kill"\
@@ -217,6 +220,8 @@ do
       echo "[-so|-build_shared_libs] : Dynamically link executables (default is static)"
       echo "[-kerberize] : Enable kerberos on the cluster"
       echo "[-fe_only] : Build just the frontend"
+      echo "[-ninja] : Use ninja instead of make"
+      echo "[-cmake_only] : Generate makefiles only, instead of doing a full build"
       echo "-----------------------------------------------------------------------------
 Examples of common tasks:
 


[07/15] impala git commit: IMPALA-7011: Simplify PlanRootSink control logic

Posted by tm...@apache.org.
IMPALA-7011: Simplify PlanRootSink control logic

1) The eos_ and sender_done_ bits really encode three possible states
   that the sender can be in. Make this explicit using an enum with
   three values.

2) The purpose of CloseConsumer() has changed over time and we can clean
   this up now:

 a) Originally, it looks like it was used to unblock the sender when the
   consumer finishes before eos, but also keep the sink alive long
   enough for the coordinator. This is no longer necessary now that
   control structures are owned by the QueryState whose lifetime is
   controlled by a reference count taken by the coordinator. So, we don't
   need the coordinator to tell the sink it's done calling it and we
   don't need the consumer_done_ state.

 b) Later on, CloseConsumer() was used as a cancellation mechinism.
   We need to keep this around (or use timeouts on the condvars) to kick
   both the consumer and producer on cancellation. But let's make the
   cancellation logic similar to the exec nodes and other sinks by
   driving the cancellation using the RuntimeState's cancellation
   flag. Now that CloseConsumer() is only about cancellation, rename it
   to Cancel() (later we may promote it to DataSink and implement in the
   data stream sender as well).

Testing:
- Exhaustive
- Minicluster concurrent_select.py stress

Change-Id: Ifc75617a253fd43a6122baa4b4dc7aeb1dbe633f
Reviewed-on: http://gerrit.cloudera.org:8080/10449
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3b8a9648
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3b8a9648
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3b8a9648

Branch: refs/heads/2.x
Commit: 3b8a96485e36336c8e823479559cd81edc2a231d
Parents: fd27b17
Author: Dan Hecht <dh...@cloudera.com>
Authored: Thu May 17 17:03:54 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 be/src/exec/plan-root-sink.cc             | 39 +++++++-------
 be/src/exec/plan-root-sink.h              | 72 +++++++++++++-------------
 be/src/runtime/coordinator.cc             | 11 +---
 be/src/runtime/fragment-instance-state.cc |  6 +--
 4 files changed, 60 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 836a376..a64dbb9 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -72,11 +72,10 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
   // written clients may not cope correctly with them. See IMPALA-4335.
   while (current_batch_row < batch->num_rows()) {
     unique_lock<mutex> l(lock_);
-    while (results_ == nullptr && !consumer_done_) sender_cv_.Wait(l);
-    if (consumer_done_ || batch == nullptr) {
-      eos_ = true;
-      return Status::OK();
-    }
+    // Wait until the consumer gives us a result set to fill in, or the fragment
+    // instance has been cancelled.
+    while (results_ == nullptr && !state->is_cancelled()) sender_cv_.Wait(l);
+    RETURN_IF_CANCELLED(state);
 
     // Otherwise the consumer is ready. Fill out the rows.
     DCHECK(results_ != nullptr);
@@ -107,29 +106,26 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
 
 Status PlanRootSink::FlushFinal(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
-  sender_done_ = true;
-  eos_ = true;
+  sender_state_ = SenderState::EOS;
   consumer_cv_.NotifyAll();
   return Status::OK();
 }
 
 void PlanRootSink::Close(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
-  // No guarantee that FlushFinal() has been called, so need to mark sender_done_ here as
-  // well.
-  // TODO: shouldn't this also set eos to true? do we need both eos and sender_done_?
-  sender_done_ = true;
+  // FlushFinal() won't have been called when the fragment instance encounters an error
+  // before sending all rows.
+  if (sender_state_ == SenderState::ROWS_PENDING) {
+    sender_state_ = SenderState::CLOSED_NOT_EOS;
+  }
   consumer_cv_.NotifyAll();
-  // Wait for consumer to be done, in case sender tries to tear-down this sink while the
-  // sender is still reading from it.
-  while (!consumer_done_) sender_cv_.Wait(l);
   DataSink::Close(state);
 }
 
-void PlanRootSink::CloseConsumer() {
-  unique_lock<mutex> l(lock_);
-  consumer_done_ = true;
+void PlanRootSink::Cancel(RuntimeState* state) {
+  DCHECK(state->is_cancelled());
   sender_cv_.NotifyAll();
+  consumer_cv_.NotifyAll();
 }
 
 Status PlanRootSink::GetNext(
@@ -140,9 +136,14 @@ Status PlanRootSink::GetNext(
   num_rows_requested_ = num_results;
   sender_cv_.NotifyAll();
 
-  while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.Wait(l);
+  // Wait while the sender is still producing rows and hasn't filled in the current
+  // result set.
+  while (sender_state_ == SenderState::ROWS_PENDING && results_ != nullptr &&
+      !state->is_cancelled()) {
+    consumer_cv_.Wait(l);
+  }
 
-  *eos = eos_;
+  *eos = sender_state_ == SenderState::EOS;
   return state->GetQueryStatus();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 87ab3ef..1d64b21 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -36,19 +36,25 @@ class ScalarExprEvaluator;
 /// The consumer calls GetNext() with a QueryResultSet and a requested fetch
 /// size. GetNext() shares these fields with Send(), and then signals Send() to begin
 /// populating the result set. GetNext() returns when a) the sender has sent all of its
-/// rows b) the requested fetch size has been satisfied or c) the sender calls Close().
+/// rows b) the requested fetch size has been satisfied or c) the sender fragment
+/// instance was cancelled.
 ///
-/// Send() fills in as many rows as are requested from the current batch. When the batch
-/// is exhausted - which may take several calls to GetNext() - control is returned to the
-/// sender to produce another row batch.
+/// The sender uses Send() to fill in as many rows as are requested from the current
+/// batch. When the batch is exhausted - which may take several calls to GetNext() -
+/// Send() returns so that the fragment instance can produce another row batch.
 ///
-/// When the consumer is finished, CloseConsumer() must be called to allow the sender to
-/// exit Send(). Senders must call Close() to signal to the consumer that no more batches
-/// will be produced. CloseConsumer() may be called concurrently with GetNext(). Senders
-/// should ensure that the consumer is not blocked in GetNext() before destroying the
-/// PlanRootSink.
+/// FlushFinal() should be called by the sender to signal it has finished calling
+/// Send() for all rows. Close() should be called by the sender to release resources.
 ///
-/// The sink is thread safe up to a single producer and single consumer.
+/// When the fragment instance is cancelled, Cancel() is called to unblock both the
+/// sender and consumer. Cancel() may be called concurrently with Send(), GetNext() and
+/// Close().
+///
+/// The sink is thread safe up to a single sender and single consumer.
+///
+/// Lifetime: The sink is owned by the QueryState and has the same lifetime as
+/// QueryState. The QueryState references from the fragment instance and the Coordinator
+/// ensures that this outlives any calls to Send() and GetNext(), respectively.
 ///
 /// TODO: The consumer drives the sender in lock-step with GetNext() calls, forcing a
 /// context-switch on every invocation. Measure the impact of this, and consider moving to
@@ -62,25 +68,23 @@ class PlanRootSink : public DataSink {
   /// consumer has consumed 'batch' by calling GetNext().
   virtual Status Send(RuntimeState* state, RowBatch* batch);
 
-  /// Sets eos and notifies consumer.
+  /// Indicates eos and notifies consumer.
   virtual Status FlushFinal(RuntimeState* state);
 
-  /// To be called by sender only. Signals to the consumer that no more batches will be
-  /// produced, then blocks until someone calls CloseConsumer().
+  /// To be called by sender only. Release resources and unblocks consumer.
   virtual void Close(RuntimeState* state);
 
-  /// Populates 'result_set' with up to 'num_rows' rows produced by the fragment instance
-  /// that calls Send(). *eos is set to 'true' when there are no more rows to consume. If
-  /// CloseConsumer() is called concurrently, GetNext() will return and may not populate
-  /// 'result_set'. All subsequent calls after CloseConsumer() will do no work.
+  /// To be called by the consumer only. 'result_set' with up to 'num_rows' rows
+  /// produced by the fragment instance that calls Send(). *eos is set to 'true' when
+  /// there are no more rows to consume. If Cancel() or Close() are called concurrently,
+  /// GetNext() will return and may not populate 'result_set'. All subsequent calls
+  /// after Cancel() or Close() are no-ops.
   Status GetNext(
       RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos);
 
-  /// Signals to the producer that the sink will no longer be used. GetNext() may be
-  /// safely called after this returns (it does nothing), but consumers should consider
-  /// that the PlanRootSink may be undergoing destruction. May be called more than once;
-  /// only the first call has any effect.
-  void CloseConsumer();
+  /// Unblocks both the consumer and sender so they can check the cancellation flag in
+  /// the RuntimeState. The cancellation flag should be set prior to calling this.
+  void Cancel(RuntimeState* state);
 
   static const std::string NAME;
 
@@ -90,21 +94,22 @@ class PlanRootSink : public DataSink {
 
   /// Waited on by the sender only. Signalled when the consumer has written results_ and
   /// num_rows_requested_, and so the sender may begin satisfying that request for rows
-  /// from its current batch. Also signalled when CloseConsumer() is called, to unblock
-  /// the sender.
+  /// from its current batch. Also signalled when Cancel() is called, to unblock the
+  /// sender.
   ConditionVariable sender_cv_;
 
   /// Waited on by the consumer only. Signalled when the sender has finished serving a
-  /// request for rows. Also signalled by Close() and FlushFinal() to signal to the
-  /// consumer that no more rows are coming.
+  /// request for rows. Also signalled by FlushFinal(), Close() and Cancel() to unblock
+  /// the consumer.
   ConditionVariable consumer_cv_;
 
-  /// Signals to producer that the consumer is done, and the sink may be torn down.
-  bool consumer_done_ = false;
-
-  /// Signals to consumer that the sender is done, and that there are no more row batches
-  /// to consume.
-  bool sender_done_ = false;
+  /// State of the sender:
+  /// - ROWS_PENDING: the sender is still producing rows; the only non-terminal state
+  /// - EOS: the sender has passed all rows to Send()
+  /// - CLOSED_NOT_EOS: the sender (i.e. sink) was closed before all rows were passed to
+  ///   Send()
+  enum class SenderState { ROWS_PENDING, EOS, CLOSED_NOT_EOS };
+  SenderState sender_state_ = SenderState::ROWS_PENDING;
 
   /// The current result set passed to GetNext(), to fill in Send(). Not owned by this
   /// sink. Reset to nullptr after Send() completes the request to signal to the consumer
@@ -114,9 +119,6 @@ class PlanRootSink : public DataSink {
   /// Set by GetNext() to indicate to Send() how many rows it should write to results_.
   int num_rows_requested_ = 0;
 
-  /// Set to true in Send() and FlushFinal() when the Sink() has finished producing rows.
-  bool eos_ = false;
-
   /// Writes a single row into 'result' and 'scales' by evaluating
   /// output_expr_evals_ over 'row'.
   void GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales);

http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 042605d..995d747 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -148,14 +148,8 @@ Status Coordinator::Exec() {
       DCHECK(!prepare_status.ok());
       return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
     }
-
-    // When GetFInstanceState() returns the coordinator instance, the Prepare phase
-    // is done and the FragmentInstanceState's root sink will be set up. At that point,
-    // the coordinator must be sure to call root_sink()->CloseConsumer(); the
-    // fragment instance's executor will not complete until that point.
-    // TODO: what does this mean?
-    // TODO: Consider moving this to Wait().
-    // TODO: clarify need for synchronization on this event
+    // When GetFInstanceState() returns the coordinator instance, the Prepare phase is
+    // done and the FragmentInstanceState's root sink will be set up.
     DCHECK(coord_instance_->IsPrepared() && coord_instance_->WaitForPrepare().ok());
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
@@ -527,7 +521,6 @@ void Coordinator::HandleExecStateTransition(
       exec_rpcs_complete_barrier_->pending() <= 0) << "exec rpcs not completed";
 
   query_events_->MarkEvent(exec_state_to_event.at(new_state));
-  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
   // This thread won the race to transitioning into a terminal state - terminate
   // execution and release resources.
   ReleaseExecResources();

http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index a14bf31..c61cb81 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -103,13 +103,9 @@ void FragmentInstanceState::Cancel() {
   // being cancelled.
   discard_result(WaitForPrepare());
 
-  // Ensure that the sink is closed from both sides. Although in ordinary executions we
-  // rely on the consumer to do this, in error cases the consumer may not be able to send
-  // CloseConsumer() (see IMPALA-4348 for an example).
-  if (root_sink_ != nullptr) root_sink_->CloseConsumer();
-
   DCHECK(runtime_state_ != nullptr);
   runtime_state_->set_is_cancelled();
+  if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
   runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }