You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/01/11 09:47:01 UTC

[5/5] hive git commit: HIVE-15539: Optimize complex multi-insert queries in Calcite (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

HIVE-15539: Optimize complex multi-insert queries in Calcite (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/886978db
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/886978db
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/886978db

Branch: refs/heads/master
Commit: 886978db5ccb43fd0473b0f0f80643b29482a4b0
Parents: e56b60f
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Wed Jan 4 15:45:29 2017 +0000
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Jan 11 09:41:55 2017 +0000

----------------------------------------------------------------------
 .../calcite/CalciteSemanticException.java       |    2 +-
 .../hadoop/hive/ql/parse/CalcitePlanner.java    |  211 ++-
 .../hadoop/hive/ql/parse/QBParseInfo.java       |   21 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   35 +-
 .../queries/clientpositive/multi_insert_gby4.q  |   26 +
 .../clientpositive/multi_insert_with_join2.q    |   29 +-
 .../clientpositive/auto_sortmerge_join_13.q.out |   57 +-
 .../clientpositive/groupby_sort_1_23.q.out      |    2 +-
 .../clientpositive/groupby_sort_skew_1_23.q.out |    2 +-
 .../llap/auto_smb_mapjoin_14.q.out              |  116 +-
 .../llap/auto_sortmerge_join_13.q.out           |  114 +-
 .../clientpositive/llap/explainuser_2.q.out     | 1319 +++++++++---------
 .../results/clientpositive/llap/lineage3.q.out  |    6 +-
 .../llap/tez_union_multiinsert.q.out            |  218 ++-
 .../clientpositive/llap/unionDistinct_1.q.out   |   30 +-
 .../llap/vector_auto_smb_mapjoin_14.q.out       |   78 +-
 .../clientpositive/multi_insert_gby4.q.out      |  279 ++++
 .../clientpositive/multi_insert_union_src.q.out |    4 +-
 .../multi_insert_with_join2.q.out               |  519 ++++++-
 .../spark/auto_smb_mapjoin_14.q.out             |  108 +-
 .../spark/auto_sortmerge_join_13.q.out          |   76 +-
 .../spark/groupby_sort_1_23.q.out               |    2 +-
 .../spark/groupby_sort_skew_1_23.q.out          |    2 +-
 .../spark/multi_insert_with_join2.q.out         | 1006 +++++++++++++
 .../results/clientpositive/spark/union17.q.out  |    6 +-
 .../results/clientpositive/spark/union18.q.out  |    6 +-
 .../results/clientpositive/spark/union19.q.out  |    6 +-
 .../results/clientpositive/spark/union31.q.out  |   12 +-
 .../clientpositive/spark/union_remove_6.q.out   |    4 +-
 .../spark/union_remove_6_subq.q.out             |    4 +-
 .../clientpositive/tez/explainanalyze_2.q.out   |  972 ++++++-------
 .../test/results/clientpositive/union17.q.out   |    6 +-
 .../test/results/clientpositive/union18.q.out   |    6 +-
 .../test/results/clientpositive/union19.q.out   |    6 +-
 .../test/results/clientpositive/union31.q.out   |   12 +-
 .../results/clientpositive/union_remove_6.q.out |    8 +-
 .../clientpositive/union_remove_6_subq.q.out    |    8 +-
 37 files changed, 3659 insertions(+), 1659 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/CalciteSemanticException.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/CalciteSemanticException.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/CalciteSemanticException.java
index 0038f73..5b2c9c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/CalciteSemanticException.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/CalciteSemanticException.java
@@ -32,7 +32,7 @@ public class CalciteSemanticException extends SemanticException {
   public enum UnsupportedFeature {
     Distinct_without_an_aggreggation, Duplicates_in_RR, Filter_expression_with_non_boolean_return_type,
     Having_clause_without_any_groupby, Hint, Invalid_column_reference, Invalid_decimal,
-    Less_than_equal_greater_than, Multi_insert, Others, Same_name_in_multiple_expressions,
+    Less_than_equal_greater_than, Others, Same_name_in_multiple_expressions,
     Schema_less_table, Select_alias_in_having_clause, Select_transform, Subquery,
     Table_sample_clauses, UDTF, Union_type, Unique_join
   };

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index cc357c5..9f1b9d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -23,10 +23,11 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.math.BigDecimal;
 import java.util.AbstractMap.SimpleEntry;
-import java.util.ArrayList;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.EnumSet;
@@ -38,9 +39,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.antlr.runtime.ClassicToken;
+import org.antlr.runtime.CommonToken;
 import org.antlr.runtime.tree.TreeVisitor;
 import org.antlr.runtime.tree.TreeVisitorAction;
 import org.apache.calcite.adapter.druid.DruidQuery;
@@ -138,7 +141,6 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HivePlannerContext;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
-import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelDecorrelator;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRexExecutorImpl;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
@@ -186,6 +188,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectOverIntersec
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectSortTransposeRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveReduceExpressionsRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveReduceExpressionsWithStatsRule;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelDecorrelator;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelFieldTrimmer;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSemiJoinRule;
@@ -241,10 +244,12 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.joda.time.Interval;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 
 public class CalcitePlanner extends SemanticAnalyzer {
 
@@ -328,9 +333,13 @@ public class CalcitePlanner extends SemanticAnalyzer {
         queryForCbo = cboCtx.nodeOfInterest; // nodeOfInterest is the query
       }
       runCBO = canCBOHandleAst(queryForCbo, getQB(), cboCtx);
-      profilesCBO = obtainCBOProfiles(queryProperties);
+      if (queryProperties.hasMultiDestQuery()) {
+        handleMultiDestQuery(ast, cboCtx);
+      }
 
       if (runCBO) {
+        profilesCBO = obtainCBOProfiles(queryProperties);
+
         disableJoinMerge = true;
         boolean reAnalyzeAST = false;
         final boolean materializedView = getQB().isMaterializedView();
@@ -454,6 +463,167 @@ public class CalcitePlanner extends SemanticAnalyzer {
     return sinkOp;
   }
 
+  /*
+   * Tries to optimize FROM clause of multi-insert. No attempt to optimize insert clauses of the query.
+   * Returns true if rewriting is successful, false otherwise.
+   */
+  private void handleMultiDestQuery(ASTNode ast, PreCboCtx cboCtx) throws SemanticException {
+    // Not supported by CBO
+    if (!runCBO) {
+      return;
+    }
+    // Currently, we only optimized the query the content of the FROM clause
+    // for multi-insert queries. Thus, nodeOfInterest is the FROM clause
+    if (isJoinToken(cboCtx.nodeOfInterest)) {
+      // Join clause: rewriting is needed
+      ASTNode subq = rewriteASTForMultiInsert(ast, cboCtx.nodeOfInterest);
+      if (subq != null) {
+        // We could rewrite into a subquery
+        cboCtx.nodeOfInterest = (ASTNode) subq.getChild(0);
+        QB newQB = new QB(null, "", false);
+        Phase1Ctx ctx_1 = initPhase1Ctx();
+        doPhase1(cboCtx.nodeOfInterest, newQB, ctx_1, null);
+        setQB(newQB);
+        getMetaData(getQB());
+      } else {
+        runCBO = false;
+      }
+    } else if (cboCtx.nodeOfInterest.getToken().getType() == HiveParser.TOK_SUBQUERY) {
+      // Subquery: no rewriting needed
+      ASTNode subq = cboCtx.nodeOfInterest;
+      // First child is subquery, second child is alias
+      // We set the node of interest and QB to the subquery
+      // We do not need to generate the QB again, but rather we use it directly
+      cboCtx.nodeOfInterest = (ASTNode) subq.getChild(0);
+      String subQAlias = unescapeIdentifier(subq.getChild(1).getText());
+      final QB newQB = getQB().getSubqForAlias(subQAlias).getQB();
+      newQB.getParseInfo().setAlias("");
+      newQB.getParseInfo().setIsSubQ(false);
+      setQB(newQB);
+    } else {
+      // No need to run CBO (table ref or virtual table) or not supported
+      runCBO = false;
+    }
+  }
+
+  private ASTNode rewriteASTForMultiInsert(ASTNode query, ASTNode nodeOfInterest) {
+    // 1. gather references from original query
+    // This is a map from aliases to references.
+    // We keep all references as we will need to modify them after creating
+    // the subquery
+    final Multimap<String, Object> aliasNodes = ArrayListMultimap.create();
+    // To know if we need to bail out
+    final AtomicBoolean notSupported = new AtomicBoolean(false);
+    TreeVisitorAction action = new TreeVisitorAction() {
+      @Override
+      public Object pre(Object t) {
+        if (!notSupported.get()) {
+          if (ParseDriver.adaptor.getType(t) == HiveParser.TOK_ALLCOLREF) {
+            // TODO: this is a limitation of the AST rewriting approach that we will
+            // not be able to overcome till proper integration of full multi-insert
+            // queries with Calcite is implemented.
+            // The current rewriting gather references from insert clauses and then
+            // updates them with the new subquery references. However, if insert
+            // clauses use * or tab.*, we cannot resolve the columns that we are
+            // referring to. Thus, we just bail out and those queries will not be
+            // currently optimized by Calcite.
+            // An example of such query is:
+            // FROM T_A a LEFT JOIN T_B b ON a.id = b.id
+            // INSERT OVERWRITE TABLE join_result_1
+            // SELECT a.*, b.*
+            // INSERT OVERWRITE TABLE join_result_3
+            // SELECT a.*, b.*;
+            notSupported.set(true);
+          } else if (ParseDriver.adaptor.getType(t) == HiveParser.DOT) {
+            Object c = ParseDriver.adaptor.getChild(t, 0);
+            if (c != null && ParseDriver.adaptor.getType(c) == HiveParser.TOK_TABLE_OR_COL) {
+              aliasNodes.put(((ASTNode) t).toStringTree(), t);
+            }
+          } else if (ParseDriver.adaptor.getType(t) == HiveParser.TOK_TABLE_OR_COL) {
+            Object p = ParseDriver.adaptor.getParent(t);
+            if (p == null || ParseDriver.adaptor.getType(p) != HiveParser.DOT) {
+              aliasNodes.put(((ASTNode) t).toStringTree(), t);
+            }
+          }
+        }
+        return t;
+      }
+      @Override
+      public Object post(Object t) {
+        return t;
+      }
+    };
+    TreeVisitor tv = new TreeVisitor(ParseDriver.adaptor);
+    // We will iterate through the children: if it is an INSERT, we will traverse
+    // the subtree to gather the references
+    for (int i = 0; i < query.getChildCount(); i++) {
+      ASTNode child = (ASTNode) query.getChild(i);
+      if (ParseDriver.adaptor.getType(child) != HiveParser.TOK_INSERT) {
+        // If it is not an INSERT, we do not need to anything
+        continue;
+      }
+      tv.visit(child, action);
+    }
+    if (notSupported.get()) {
+      // Bail out
+      return null;
+    }
+    // 2. rewrite into query
+    //  TOK_QUERY
+    //     TOK_FROM
+    //        join
+    //     TOK_INSERT
+    //        TOK_DESTINATION
+    //           TOK_DIR
+    //              TOK_TMP_FILE
+    //        TOK_SELECT
+    //           refs
+    ASTNode from = new ASTNode(new CommonToken(HiveParser.TOK_FROM, "TOK_FROM"));
+    from.addChild((ASTNode) ParseDriver.adaptor.dupTree(nodeOfInterest));
+    ASTNode destination = new ASTNode(new CommonToken(HiveParser.TOK_DESTINATION, "TOK_DESTINATION"));
+    ASTNode dir = new ASTNode(new CommonToken(HiveParser.TOK_DIR, "TOK_DIR"));
+    ASTNode tmpFile = new ASTNode(new CommonToken(HiveParser.TOK_TMP_FILE, "TOK_TMP_FILE"));
+    dir.addChild(tmpFile);
+    destination.addChild(dir);
+    ASTNode select = new ASTNode(new CommonToken(HiveParser.TOK_SELECT, "TOK_SELECT"));
+    int num = 0;
+    for (Collection<Object> selectIdentifier : aliasNodes.asMap().values()) {
+      Iterator<Object> it = selectIdentifier.iterator();
+      ASTNode node = (ASTNode) it.next();
+      // Add select expression
+      ASTNode selectExpr = new ASTNode(new CommonToken(HiveParser.TOK_SELEXPR, "TOK_SELEXPR"));
+      selectExpr.addChild((ASTNode) ParseDriver.adaptor.dupTree(node)); // Identifier
+      String colAlias = "col" + num;
+      selectExpr.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, colAlias))); // Alias
+      select.addChild(selectExpr);
+      // Rewrite all INSERT references (all the node values for this key)
+      ASTNode colExpr = new ASTNode(new CommonToken(HiveParser.TOK_TABLE_OR_COL, "TOK_TABLE_OR_COL"));
+      colExpr.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, colAlias)));
+      replaceASTChild(node, colExpr);
+      while (it.hasNext()) {
+        // Loop to rewrite rest of INSERT references
+        node = (ASTNode) it.next();
+        colExpr = new ASTNode(new CommonToken(HiveParser.TOK_TABLE_OR_COL, "TOK_TABLE_OR_COL"));
+        colExpr.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, colAlias)));
+        replaceASTChild(node, colExpr);
+      }
+      num++;
+    }
+    ASTNode insert = new ASTNode(new CommonToken(HiveParser.TOK_INSERT, "TOK_INSERT"));
+    insert.addChild(destination);
+    insert.addChild(select);
+    ASTNode newQuery = new ASTNode(new CommonToken(HiveParser.TOK_QUERY, "TOK_QUERY"));
+    newQuery.addChild(from);
+    newQuery.addChild(insert);
+    // 3. create subquery
+    ASTNode subq = new ASTNode(new CommonToken(HiveParser.TOK_SUBQUERY, "TOK_SUBQUERY"));
+    subq.addChild(newQuery);
+    subq.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, "subq")));
+    replaceASTChild(nodeOfInterest, subq);
+    // 4. return subquery
+    return subq;
+  }
+
   /**
    * Can CBO handle the given AST?
    *
@@ -476,7 +646,8 @@ public class CalcitePlanner extends SemanticAnalyzer {
         || qb.isCTAS() || qb.isMaterializedView();
     // Queries without a source table currently are not supported by CBO
     boolean isSupportedType = (qb.getIsQuery() && !qb.containsQueryWithoutSourceTable())
-        || qb.isCTAS() || qb.isMaterializedView() || cboCtx.type == PreCboCtx.Type.INSERT;
+        || qb.isCTAS() || qb.isMaterializedView() || cboCtx.type == PreCboCtx.Type.INSERT
+        || cboCtx.type == PreCboCtx.Type.MULTI_INSERT;
     boolean noBadTokens = HiveCalciteUtil.validateASTForUnsupportedTokens(ast);
     boolean result = isSupportedRoot && isSupportedType
         && (getCreateViewDesc() == null || getCreateViewDesc().isMaterialized())
@@ -542,7 +713,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
 
     if (!queryProperties.hasClusterBy() && !queryProperties.hasDistributeBy()
         && !queryProperties.hasSortBy() && !queryProperties.hasPTF() && !queryProperties.usesScript()
-        && !queryProperties.hasMultiDestQuery() && !queryProperties.hasLateralViews()) {
+        && !queryProperties.hasLateralViews()) {
       // Ok to run CBO.
       return null;
     }
@@ -560,8 +731,6 @@ public class CalcitePlanner extends SemanticAnalyzer {
         msg += "has PTF; ";
       if (queryProperties.usesScript())
         msg += "uses scripts; ";
-      if (queryProperties.hasMultiDestQuery())
-        msg += "is a multi-destination query; ";
       if (queryProperties.hasLateralViews())
         msg += "has lateral views; ";
 
@@ -664,7 +833,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
    */
   static class PreCboCtx extends PlannerContext {
     enum Type {
-      NONE, INSERT, CTAS_OR_MV, UNEXPECTED
+      NONE, INSERT, MULTI_INSERT, CTAS_OR_MV, UNEXPECTED
     }
 
     private ASTNode nodeOfInterest;
@@ -692,6 +861,17 @@ public class CalcitePlanner extends SemanticAnalyzer {
         set(PreCboCtx.Type.INSERT, ast);
       }
     }
+
+    @Override
+    void setMultiInsertToken(ASTNode child) {
+      set(PreCboCtx.Type.MULTI_INSERT, child);
+    }
+
+    @Override
+    void resetToken() {
+      this.type = Type.NONE;
+      this.nodeOfInterest = null;
+    }
   }
 
   ASTNode fixUpAfterCbo(ASTNode originalAst, ASTNode newAst, PreCboCtx cboCtx)
@@ -722,6 +902,12 @@ public class CalcitePlanner extends SemanticAnalyzer {
       return newAst;
     }
 
+    case MULTI_INSERT: {
+      // Patch the optimized query back into original FROM clause.
+      replaceASTChild(cboCtx.nodeOfInterest, newAst);
+      return originalAst;
+    }
+
     default:
       throw new AssertionError("Unexpected type " + cboCtx.type);
     }
@@ -3803,14 +3989,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
     }
 
     private QBParseInfo getQBParseInfo(QB qb) throws CalciteSemanticException {
-      QBParseInfo qbp = qb.getParseInfo();
-      if (qbp.getClauseNames().size() > 1) {
-        String msg = String.format("Multi Insert is currently not supported in CBO,"
-            + " turn off cbo to use Multi Insert.");
-        LOG.debug(msg);
-        throw new CalciteSemanticException(msg, UnsupportedFeature.Multi_insert);
-      }
-      return qbp;
+      return qb.getParseInfo();
     }
 
     private List<String> getTabAliases(RowResolver inputRR) {

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index f549dff..7bf1c59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -40,8 +40,8 @@ import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
  **/
 public class QBParseInfo {
 
-  private final boolean isSubQ;
-  private final String alias;
+  private boolean isSubQ;
+  private String alias;
   private ASTNode joinExpr;
   private ASTNode hints;
   private final HashMap<String, ASTNode> aliasToSrc;
@@ -66,6 +66,7 @@ public class QBParseInfo {
   // insertIntoTables/insertOverwriteTables map a table's fullName to its ast;
   private final Map<String, ASTNode> insertIntoTables;
   private final Map<String, ASTNode> insertOverwriteTables;
+  private ASTNode queryFromExpr;
 
   private boolean isAnalyzeCommand; // used for the analyze command (statistics)
   private boolean isNoScanAnalyzeCommand; // used for the analyze command (statistics) (noscan)
@@ -235,6 +236,10 @@ public class QBParseInfo {
     destToSelExpr.put(clause, ast);
   }
 
+  public void setQueryFromExpr(ASTNode ast) {
+    queryFromExpr = ast;
+  }
+
   public void setWhrExprForClause(String clause, ASTNode ast) {
     destToWhereExpr.put(clause, ast);
   }
@@ -354,6 +359,10 @@ public class QBParseInfo {
     return destToSelExpr.get(clause);
   }
 
+  public ASTNode getQueryFrom() {
+    return queryFromExpr;
+  }
+
   /**
    * Get the Cluster By AST for the clause.
    *
@@ -415,10 +424,18 @@ public class QBParseInfo {
     return alias;
   }
 
+  public void setAlias(String alias) {
+    this.alias = alias;
+  }
+
   public boolean getIsSubQ() {
     return isSubQ;
   }
 
+  public void setIsSubQ(boolean isSubQ) {
+    this.isSubQ = isSubQ;
+  }
+
   public ASTNode getJoinExpr() {
     return joinExpr;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index d0131b7..f275f6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -1482,15 +1482,31 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           throw new SemanticException(ErrorMsg.NO_INSERT_INSUBQUERY.getMsg(ast));
         }
 
-        if (plannerCtx != null) {
-          plannerCtx.setInsertToken(ast, isTmpFileDest);
-        }
-
         qbp.setDestForClause(ctx_1.dest, (ASTNode) ast.getChild(0));
         handleInsertStatementSpecPhase1(ast, qbp, ctx_1);
-        if (qbp.getClauseNamesForDest().size() > 1) {
+
+        if (qbp.getClauseNamesForDest().size() == 2) {
+          // From the moment that we have two destination clauses,
+          // we know that this is a multi-insert query.
+          // Thus, set property to right value.
+          // Using qbp.getClauseNamesForDest().size() >= 2 would be
+          // equivalent, but we use == to avoid setting the property
+          // multiple times
           queryProperties.setMultiDestQuery(true);
         }
+
+        if (plannerCtx != null && !queryProperties.hasMultiDestQuery()) {
+          plannerCtx.setInsertToken(ast, isTmpFileDest);
+        } else if (plannerCtx != null && qbp.getClauseNamesForDest().size() == 2) {
+          // For multi-insert query, currently we only optimize the FROM clause.
+          // Hence, introduce multi-insert token on top of it.
+          // However, first we need to reset existing token (insert).
+          // Using qbp.getClauseNamesForDest().size() >= 2 would be
+          // equivalent, but we use == to avoid setting the property
+          // multiple times
+          plannerCtx.resetToken();
+          plannerCtx.setMultiInsertToken((ASTNode) qbp.getQueryFrom().getChild(0));
+        }
         break;
 
       case HiveParser.TOK_FROM:
@@ -1500,6 +1516,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
               "Multiple Children " + child_count));
         }
 
+        if (!qbp.getIsSubQ()) {
+          qbp.setQueryFromExpr(ast);
+        }
+
         // Check if this is a subquery / lateral view
         ASTNode frm = (ASTNode) ast.getChild(0);
         if (frm.getToken().getType() == HiveParser.TOK_TABREF) {
@@ -10662,6 +10682,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     void setInsertToken(ASTNode ast, boolean isTmpFileDest) {
     }
 
+    void setMultiInsertToken(ASTNode child) {
+    }
+
+    void resetToken() {
+    }
   }
 
   private Table getTableObjectByName(String tableName) throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/test/queries/clientpositive/multi_insert_gby4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/multi_insert_gby4.q b/ql/src/test/queries/clientpositive/multi_insert_gby4.q
new file mode 100644
index 0000000..2e22096
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/multi_insert_gby4.q
@@ -0,0 +1,26 @@
+-- SORT_QUERY_RESULTS
+
+create table e1 (key string, count int);
+create table e2 (key string, count int);
+create table e3 (key string, count int);
+
+explain
+FROM (SELECT key, value FROM src) a
+INSERT OVERWRITE TABLE e1
+    SELECT key, COUNT(*) WHERE key>450 GROUP BY key
+INSERT OVERWRITE TABLE e2
+    SELECT key, COUNT(*) WHERE key>500 GROUP BY key
+INSERT OVERWRITE TABLE e3
+    SELECT key, COUNT(*) WHERE key>490 GROUP BY key;
+
+FROM (SELECT key, value FROM src) a
+INSERT OVERWRITE TABLE e1
+    SELECT key, COUNT(*) WHERE key>450 GROUP BY key
+INSERT OVERWRITE TABLE e2
+    SELECT key, COUNT(*) WHERE key>500 GROUP BY key
+INSERT OVERWRITE TABLE e3
+    SELECT key, COUNT(*) WHERE key>490 GROUP BY key;
+
+select * from e1;
+select * from e2;
+select * from e3;

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/test/queries/clientpositive/multi_insert_with_join2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/multi_insert_with_join2.q b/ql/src/test/queries/clientpositive/multi_insert_with_join2.q
index 1529fa2..37d1678 100644
--- a/ql/src/test/queries/clientpositive/multi_insert_with_join2.q
+++ b/ql/src/test/queries/clientpositive/multi_insert_with_join2.q
@@ -1,4 +1,4 @@
-set hive.cbo.enable=false;
+set hive.strict.checks.cartesian.product=false;
 
 CREATE TABLE T_A ( id STRING, val STRING ); 
 CREATE TABLE T_B ( id STRING, val STRING ); 
@@ -49,3 +49,30 @@ WHERE b.id = 'Id_1' AND b.val = 'val_103'
 INSERT OVERWRITE TABLE join_result_3
 SELECT a.*, b.*
 WHERE b.val = 'val_104' AND b.id = 'Id_2';
+
+explain
+FROM T_A a JOIN T_B b ON a.id = b.id
+INSERT OVERWRITE TABLE join_result_1
+SELECT *
+WHERE b.id = 'Id_1' AND b.val = 'val_103'
+INSERT OVERWRITE TABLE join_result_3
+SELECT *
+WHERE b.val = 'val_104' AND b.id = 'Id_2';
+
+explain
+FROM T_A a JOIN T_B b ON a.id = b.id
+INSERT OVERWRITE TABLE join_result_1
+SELECT a.id, a.val, b.id, b.val
+WHERE b.id = 'Id_1' AND b.val = 'val_103'
+INSERT OVERWRITE TABLE join_result_3
+SELECT a.id, a.val, b.id, b.val
+WHERE b.val = 'val_104' AND b.id = 'Id_2';
+
+explain
+FROM T_A a JOIN T_B b ON a.id = b.id
+INSERT OVERWRITE TABLE join_result_1
+SELECT a.val, a.id, b.id, b.val
+WHERE b.id = 'Id_1' AND b.val = 'val_103'
+INSERT OVERWRITE TABLE join_result_3
+SELECT a.id, b.val, b.id, a.val
+WHERE b.val = 'val_104' AND b.id = 'Id_2';

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/test/results/clientpositive/auto_sortmerge_join_13.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/auto_sortmerge_join_13.q.out b/ql/src/test/results/clientpositive/auto_sortmerge_join_13.q.out
index b45411c..90493ff 100644
--- a/ql/src/test/results/clientpositive/auto_sortmerge_join_13.q.out
+++ b/ql/src/test/results/clientpositive/auto_sortmerge_join_13.q.out
@@ -89,15 +89,16 @@ STAGE PLANS:
             Filter Operator
               predicate: key is not null (type: boolean)
               Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-              Sorted Merge Bucket Map Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 key (type: int)
-                  1 key (type: int)
-                outputColumnNames: _col0, _col1, _col5, _col6
-                Select Operator
-                  expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: string)
+              Select Operator
+                expressions: key (type: int), value (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                Sorted Merge Bucket Map Join Operator
+                  condition map:
+                       Inner Join 0 to 1
+                  keys:
+                    0 _col0 (type: int)
+                    1 _col0 (type: int)
                   outputColumnNames: _col0, _col1, _col2, _col3
                   Select Operator
                     expressions: _col0 (type: int), _col2 (type: int)
@@ -267,15 +268,16 @@ STAGE PLANS:
             Filter Operator
               predicate: key is not null (type: boolean)
               Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-              Sorted Merge Bucket Map Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 key (type: int)
-                  1 key (type: int)
-                outputColumnNames: _col0, _col1, _col5, _col6
-                Select Operator
-                  expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: string)
+              Select Operator
+                expressions: key (type: int), value (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                Sorted Merge Bucket Map Join Operator
+                  condition map:
+                       Inner Join 0 to 1
+                  keys:
+                    0 _col0 (type: int)
+                    1 _col0 (type: int)
                   outputColumnNames: _col0, _col1, _col2, _col3
                   Select Operator
                     expressions: _col0 (type: int), _col2 (type: int)
@@ -445,15 +447,16 @@ STAGE PLANS:
             Filter Operator
               predicate: key is not null (type: boolean)
               Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-              Sorted Merge Bucket Map Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                keys:
-                  0 key (type: int)
-                  1 key (type: int)
-                outputColumnNames: _col0, _col1, _col5, _col6
-                Select Operator
-                  expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: string)
+              Select Operator
+                expressions: key (type: int), value (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                Sorted Merge Bucket Map Join Operator
+                  condition map:
+                       Inner Join 0 to 1
+                  keys:
+                    0 _col0 (type: int)
+                    1 _col0 (type: int)
                   outputColumnNames: _col0, _col1, _col2, _col3
                   Select Operator
                     expressions: _col0 (type: int), _col2 (type: int)

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/test/results/clientpositive/groupby_sort_1_23.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/groupby_sort_1_23.q.out b/ql/src/test/results/clientpositive/groupby_sort_1_23.q.out
index 6572f6c..008b796 100644
--- a/ql/src/test/results/clientpositive/groupby_sort_1_23.q.out
+++ b/ql/src/test/results/clientpositive/groupby_sort_1_23.q.out
@@ -5808,7 +5808,7 @@ STAGE PLANS:
             alias: t2
             Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
             Filter Operator
-              predicate: (key = 8) (type: boolean)
+              predicate: (UDFToDouble(key) = 8.0) (type: boolean)
               Statistics: Num rows: 3 Data size: 12 Basic stats: COMPLETE Column stats: NONE
               Select Operator
                 expressions: key (type: string), val (type: string)

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/test/results/clientpositive/groupby_sort_skew_1_23.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/groupby_sort_skew_1_23.q.out b/ql/src/test/results/clientpositive/groupby_sort_skew_1_23.q.out
index ce71354..35b38ca 100644
--- a/ql/src/test/results/clientpositive/groupby_sort_skew_1_23.q.out
+++ b/ql/src/test/results/clientpositive/groupby_sort_skew_1_23.q.out
@@ -6296,7 +6296,7 @@ STAGE PLANS:
             alias: t2
             Statistics: Num rows: 6 Data size: 24 Basic stats: COMPLETE Column stats: NONE
             Filter Operator
-              predicate: (key = 8) (type: boolean)
+              predicate: (UDFToDouble(key) = 8.0) (type: boolean)
               Statistics: Num rows: 3 Data size: 12 Basic stats: COMPLETE Column stats: NONE
               Select Operator
                 expressions: key (type: string), val (type: string)

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out b/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out
index e999077..3083291 100644
--- a/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_smb_mapjoin_14.q.out
@@ -1599,6 +1599,10 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
             Map Operator Tree:
                 TableScan
                   alias: a
@@ -1606,22 +1610,34 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Merge Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 key (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0, _col1, _col6
-                      Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                      Select Operator
-                        expressions: _col0 (type: int), _col1 (type: string), _col6 (type: string)
-                        outputColumnNames: _col0, _col1, _col2
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                      Merge Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1, _col3
                         Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
                         Select Operator
-                          expressions: _col0 (type: int), _col1 (type: string)
-                          outputColumnNames: _col0, _col1
+                          expressions: _col0 (type: int), _col1 (type: string), _col3 (type: string)
+                          outputColumnNames: _col0, _col1, _col2
                           Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+                          Select Operator
+                            expressions: _col0 (type: int), _col1 (type: string)
+                            outputColumnNames: _col0, _col1
+                            Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+                            File Output Operator
+                              compressed: false
+                              Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+                              table:
+                                  input format: org.apache.hadoop.mapred.TextInputFormat
+                                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                                  name: default.dest1
                           File Output Operator
                             compressed: false
                             Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
@@ -1629,15 +1645,7 @@ STAGE PLANS:
                                 input format: org.apache.hadoop.mapred.TextInputFormat
                                 output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                                 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                                name: default.dest1
-                        File Output Operator
-                          compressed: false
-                          Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                          table:
-                              input format: org.apache.hadoop.mapred.TextInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                              name: default.dest2
+                                name: default.dest2
             Execution mode: llap
 
   Stage: Stage-3
@@ -1812,6 +1820,10 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
             Map Operator Tree:
                 TableScan
                   alias: a
@@ -1819,38 +1831,42 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Merge Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 key (type: int)
-                        1 key (type: int)
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
                       outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                      File Output Operator
-                        compressed: false
-                        Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                        table:
-                            input format: org.apache.hadoop.mapred.TextInputFormat
-                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                            name: default.dest1
-                      Select Operator
-                        expressions: _col0 (type: int)
-                        outputColumnNames: _col0
+                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                      Merge Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col0, _col1
                         Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                        Group By Operator
-                          aggregations: count()
-                          keys: _col0 (type: int)
-                          mode: hash
-                          outputColumnNames: _col0, _col1
+                        File Output Operator
+                          compressed: false
                           Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                          Reduce Output Operator
-                            key expressions: _col0 (type: int)
-                            sort order: +
-                            Map-reduce partition columns: _col0 (type: int)
+                          table:
+                              input format: org.apache.hadoop.mapred.TextInputFormat
+                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                              name: default.dest1
+                        Select Operator
+                          expressions: _col0 (type: int)
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+                          Group By Operator
+                            aggregations: count()
+                            keys: _col0 (type: int)
+                            mode: hash
+                            outputColumnNames: _col0, _col1
                             Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                            value expressions: _col1 (type: bigint)
+                            Reduce Output Operator
+                              key expressions: _col0 (type: int)
+                              sort order: +
+                              Map-reduce partition columns: _col0 (type: int)
+                              Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+                              value expressions: _col1 (type: bigint)
             Execution mode: llap
         Reducer 2 
             Execution mode: llap

http://git-wip-us.apache.org/repos/asf/hive/blob/886978db/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_13.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_13.q.out b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_13.q.out
index a539e03..633abff 100644
--- a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_13.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_13.q.out
@@ -96,19 +96,19 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 key (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0, _col1, _col5, _col6
-                      input vertices:
-                        1 Map 2
-                      Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                      Select Operator
-                        expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: string)
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
                         outputColumnNames: _col0, _col1, _col2, _col3
+                        input vertices:
+                          1 Map 2
                         Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
                         Select Operator
                           expressions: _col0 (type: int), _col2 (type: int)
@@ -144,12 +144,16 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: key (type: int)
-                      sort order: +
-                      Map-reduce partition columns: key (type: int)
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
                       Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: value (type: string)
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
             Execution mode: llap
             LLAP IO: no inputs
 
@@ -310,19 +314,19 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 key (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0, _col1, _col5, _col6
-                      input vertices:
-                        1 Map 2
-                      Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                      Select Operator
-                        expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: string)
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
                         outputColumnNames: _col0, _col1, _col2, _col3
+                        input vertices:
+                          1 Map 2
                         Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
                         Select Operator
                           expressions: _col0 (type: int), _col2 (type: int)
@@ -358,12 +362,16 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: key (type: int)
-                      sort order: +
-                      Map-reduce partition columns: key (type: int)
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
                       Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: value (type: string)
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
             Execution mode: llap
             LLAP IO: no inputs
 
@@ -524,19 +532,19 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 key (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0, _col1, _col5, _col6
-                      input vertices:
-                        1 Map 2
-                      Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
-                      Select Operator
-                        expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: string)
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
                         outputColumnNames: _col0, _col1, _col2, _col3
+                        input vertices:
+                          1 Map 2
                         Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
                         Select Operator
                           expressions: _col0 (type: int), _col2 (type: int)
@@ -572,12 +580,16 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: key (type: int)
-                      sort order: +
-                      Map-reduce partition columns: key (type: int)
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
                       Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: value (type: string)
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
             Execution mode: llap
             LLAP IO: no inputs