You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/05/13 07:09:56 UTC

[22/22] hive git commit: HIVE-16602: Implement shared scans with Tez (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

HIVE-16602: Implement shared scans with Tez (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/59f65772
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/59f65772
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/59f65772

Branch: refs/heads/master
Commit: 59f6577296bfcdb5d8e74657f7eb6d6294630b23
Parents: f2fa83c
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu May 4 20:27:40 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Sat May 13 08:09:14 2017 +0100

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    4 +
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |    9 +-
 .../hive/ql/optimizer/MapJoinProcessor.java     |   11 +-
 .../ql/optimizer/ReduceSinkMapJoinProc.java     |    6 +-
 .../hive/ql/optimizer/SharedScanOptimizer.java  |  625 ++++++
 .../physical/GenMRSkewJoinProcessor.java        |    2 +-
 .../physical/GenSparkSkewJoinProcessor.java     |    2 +-
 .../apache/hadoop/hive/ql/parse/GenTezWork.java |   10 +-
 .../hadoop/hive/ql/parse/TezCompiler.java       |   75 +-
 .../apache/hadoop/hive/ql/plan/JoinDesc.java    |   12 +
 .../apache/hadoop/hive/ql/plan/MapJoinDesc.java |    3 +-
 .../test/queries/clientpositive/perf/query88.q  |    2 +
 .../clientpositive/llap/auto_join0.q.out        |   15 +-
 .../clientpositive/llap/auto_join30.q.out       |   67 +-
 .../llap/auto_sortmerge_join_9.q.out            |   70 +-
 .../llap/bucket_map_join_tez1.q.out             |   11 +-
 .../llap/correlationoptimizer2.q.out            |   26 +-
 .../llap/correlationoptimizer3.q.out            |  188 +-
 .../llap/correlationoptimizer6.q.out            |  104 +-
 .../llap/dynamic_partition_pruning.q.out        |   92 +-
 .../clientpositive/llap/except_distinct.q.out   |   92 +-
 .../clientpositive/llap/explainuser_1.q.out     |  137 +-
 .../clientpositive/llap/explainuser_2.q.out     |  772 ++++---
 .../clientpositive/llap/intersect_merge.q.out   |   83 +-
 .../results/clientpositive/llap/join46.q.out    |   38 +-
 .../llap/limit_join_transpose.q.out             |  242 +--
 .../clientpositive/llap/limit_pushdown.q.out    |   17 +-
 .../clientpositive/llap/llap_nullscan.q.out     |   15 +-
 .../results/clientpositive/llap/mapjoin46.q.out |   43 +-
 .../test/results/clientpositive/llap/mrr.q.out  |   82 +-
 .../clientpositive/llap/multiMapJoin2.q.out     |  209 +-
 .../llap/offset_limit_ppd_optimizer.q.out       |   17 +-
 .../clientpositive/llap/subquery_in.q.out       |  173 +-
 .../clientpositive/llap/subquery_multi.q.out    |  617 +++---
 .../clientpositive/llap/subquery_notin.q.out    | 1539 ++++++-------
 .../clientpositive/llap/subquery_null_agg.q.out |   26 +-
 .../clientpositive/llap/subquery_scalar.q.out   |  359 ++--
 .../clientpositive/llap/subquery_select.q.out   |  957 ++++-----
 .../clientpositive/llap/subquery_views.q.out    |  274 +--
 .../clientpositive/llap/tez_join_tests.q.out    |   29 +-
 .../clientpositive/llap/tez_joins_explain.q.out |   29 +-
 .../clientpositive/llap/unionDistinct_1.q.out   |  261 +--
 .../clientpositive/llap/union_top_level.q.out   |   50 +-
 .../llap/vector_groupby_grouping_sets4.q.out    |   43 +-
 .../llap/vector_groupby_mapjoin.q.out           |   26 +-
 .../clientpositive/llap/vector_join30.q.out     |  122 +-
 .../vectorized_dynamic_partition_pruning.q.out  |  108 +-
 .../results/clientpositive/perf/query1.q.out    |   88 +-
 .../results/clientpositive/perf/query14.q.out   | 2032 +++++++++---------
 .../results/clientpositive/perf/query16.q.out   |   27 +-
 .../results/clientpositive/perf/query17.q.out   |   68 +-
 .../results/clientpositive/perf/query23.q.out   |  477 ++--
 .../results/clientpositive/perf/query25.q.out   |   68 +-
 .../results/clientpositive/perf/query28.q.out   |   87 +-
 .../results/clientpositive/perf/query29.q.out   |   19 +-
 .../results/clientpositive/perf/query30.q.out   |  114 +-
 .../results/clientpositive/perf/query31.q.out   |  270 ++-
 .../results/clientpositive/perf/query32.q.out   |   34 +-
 .../results/clientpositive/perf/query33.q.out   |  252 ++-
 .../results/clientpositive/perf/query38.q.out   |   94 +-
 .../results/clientpositive/perf/query39.q.out   |   58 +-
 .../results/clientpositive/perf/query46.q.out   |   75 +-
 .../results/clientpositive/perf/query5.q.out    |   70 +-
 .../results/clientpositive/perf/query51.q.out   |   83 +-
 .../results/clientpositive/perf/query56.q.out   |  252 ++-
 .../results/clientpositive/perf/query58.q.out   |  120 +-
 .../results/clientpositive/perf/query6.q.out    |   19 +-
 .../results/clientpositive/perf/query60.q.out   |  252 ++-
 .../results/clientpositive/perf/query64.q.out   |  443 ++--
 .../results/clientpositive/perf/query65.q.out   |   86 +-
 .../results/clientpositive/perf/query66.q.out   |   90 +-
 .../results/clientpositive/perf/query68.q.out   |   75 +-
 .../results/clientpositive/perf/query69.q.out   |   76 +-
 .../results/clientpositive/perf/query70.q.out   |   44 +-
 .../results/clientpositive/perf/query75.q.out   |  258 ++-
 .../results/clientpositive/perf/query76.q.out   |  126 +-
 .../results/clientpositive/perf/query80.q.out   |  106 +-
 .../results/clientpositive/perf/query81.q.out   |   57 +-
 .../results/clientpositive/perf/query83.q.out   |  225 +-
 .../results/clientpositive/perf/query85.q.out   |   11 +-
 .../results/clientpositive/perf/query87.q.out   |   94 +-
 .../results/clientpositive/perf/query88.q.out   |  358 ++-
 .../results/clientpositive/perf/query9.q.out    |  170 +-
 .../results/clientpositive/perf/query90.q.out   |   58 +-
 .../results/clientpositive/perf/query92.q.out   |   35 +-
 .../results/clientpositive/perf/query95.q.out   |   70 +-
 .../results/clientpositive/perf/query97.q.out   |   35 +-
 .../clientpositive/tez/explainanalyze_2.q.out   |  412 ++--
 .../clientpositive/tez/explainanalyze_3.q.out   |    8 +-
 .../clientpositive/tez/explainuser_3.q.out      |    8 +-
 90 files changed, 7086 insertions(+), 8012 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 73e0290..d6a80ae 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1645,6 +1645,10 @@ public class HiveConf extends Configuration {
         "If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime\n" +
         "would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."),
 
+    HIVE_SHARED_SCAN_OPTIMIZATION("hive.optimize.shared.scan", true,
+        "Whether to enable shared scan optimizer. The optimizer finds scan operator over the same table\n" +
+        "in the query plan and merges them if they meet some preconditions."),
+
     // CTE
     HIVE_CTE_MATERIALIZE_THRESHOLD("hive.optimize.cte.materialize.threshold", -1,
         "If the number of references to a CTE clause exceeds this threshold, Hive will materialize it\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index d0fdb52..0eec78e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -281,7 +281,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
                   joinOp.getConf().getBaseSrc(), joinOp).getSecond(),
                   null, joinDesc.getExprs(), null, null,
                   joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
-                  joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, joinDesc.getNoConditionalTaskSize());
+                  joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null,
+                  joinDesc.getNoConditionalTaskSize(), joinDesc.getInMemoryDataSize());
       mapJoinDesc.setNullSafes(joinDesc.getNullSafes());
       mapJoinDesc.setFilterMap(joinDesc.getFilterMap());
       mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs());
@@ -419,7 +420,6 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       // each side better have 0 or more RS. if either side is unbalanced, cannot convert.
       // This is a workaround for now. Right fix would be to refactor code in the
       // MapRecordProcessor and ReduceRecordProcessor with respect to the sources.
-      @SuppressWarnings({"rawtypes","unchecked"})
       Set<ReduceSinkOperator> set =
           OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(),
               ReduceSinkOperator.class);
@@ -719,6 +719,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
     }
 
+    // We store the total memory that this MapJoin is going to use,
+    // which is calculated as totalSize/buckets, with totalSize
+    // equal to sum of small tables size.
+    joinOp.getConf().setInMemoryDataSize(totalSize/buckets);
+
     return bigTablePosition;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
index 85d46f3..f01fb9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
@@ -29,8 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -78,6 +76,8 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of one of the rule-based map join optimization. User passes hints to specify
@@ -432,7 +432,7 @@ public class MapJoinProcessor extends Transform {
         smbJoinDesc.getOutputColumnNames(),
         bigTablePos, smbJoinDesc.getConds(),
         smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix(),
-        smbJoinDesc.getNoConditionalTaskSize());
+        smbJoinDesc.getNoConditionalTaskSize(), smbJoinDesc.getInMemoryDataSize());
 
     mapJoinDesc.setStatistics(smbJoinDesc.getStatistics());
 
@@ -1184,8 +1184,9 @@ public class MapJoinProcessor extends Transform {
     JoinCondDesc[] joinCondns = op.getConf().getConds();
     MapJoinDesc mapJoinDescriptor =
         new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
-            valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op
-                .getConf().getNoOuterJoin(), dumpFilePrefix, op.getConf().getNoConditionalTaskSize());
+            valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters,
+            op.getConf().getNoOuterJoin(), dumpFilePrefix,
+            op.getConf().getNoConditionalTaskSize(), op.getConf().getInMemoryDataSize());
     mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
     mapJoinDescriptor.setTagOrder(tagOrder);
     mapJoinDescriptor.setNullSafes(desc.getNullSafes());

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
index 3a6baca..ac234d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
@@ -219,8 +219,8 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
     if (tableSize == 0) {
       tableSize = 1;
     }
-    LOG.info("Mapjoin " + mapJoinOp + "(bucket map join = )" + joinConf.isBucketMapJoin()
-    + ", pos: " + pos + " --> " + parentWork.getName() + " (" + keyCount
+    LOG.info("Mapjoin " + mapJoinOp + "(bucket map join = " + joinConf.isBucketMapJoin()
+    + "), pos: " + pos + " --> " + parentWork.getName() + " (" + keyCount
     + " keys estimated from " + rowCount + " rows, " + bucketCount + " buckets)");
     joinConf.getParentToInput().put(pos, parentWork.getName());
     if (keyCount != Long.MAX_VALUE) {
@@ -290,7 +290,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
 
         ReduceSinkOperator r = null;
         if (context.connectedReduceSinks.contains(parentRS)) {
-          LOG.debug("Cloning reduce sink for multi-child broadcast edge");
+          LOG.debug("Cloning reduce sink " + parentRS + " for multi-child broadcast edge");
           // we've already set this one up. Need to clone for the next work.
           r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
               parentRS.getCompilationOpContext(),

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
new file mode 100644
index 0000000..d04fc64
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+/**
+ * Shared scan optimizer. This rule finds scan operator over the same table
+ * in the query plan and merges them if they meet some preconditions.
+ *
+ *  TS   TS             TS
+ *  |    |     ->      /  \
+ *  Op   Op           Op  Op
+ *
+ * <p>Currently it only works with the Tez execution engine.
+ */
+public class SharedScanOptimizer extends Transform {
+
+  private final static Logger LOG = LoggerFactory.getLogger(SharedScanOptimizer.class);
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+    final Map<String, TableScanOperator> topOps = pctx.getTopOps();
+    if (topOps.size() < 2) {
+      // Nothing to do, bail out
+      return pctx;
+    }
+
+    // Cache to use during optimization
+    SharedScanOptimizerCache optimizerCache = new SharedScanOptimizerCache();
+
+    // We will not apply this optimization on some table scan operators.
+    Set<TableScanOperator> excludeTableScanOps = gatherNotValidTableScanOps(pctx, optimizerCache);
+    LOG.debug("Exclude TableScan ops: {}", excludeTableScanOps);
+
+    // Map of dbName.TblName -> Pair(tableAlias, TSOperator)
+    Multimap<String, Entry<String, TableScanOperator>> tableNameToOps = splitTableScanOpsByTable(pctx);
+
+    // We enforce a certain order when we do the reutilization.
+    // In particular, we use size of table x number of reads to
+    // rank the tables.
+    List<Entry<String, Long>> sortedTables = rankTablesByAccumulatedSize(pctx, excludeTableScanOps);
+    LOG.debug("Sorted tables by size: {}", sortedTables);
+
+    // Execute optimization
+    Multimap<String, TableScanOperator> existingOps = ArrayListMultimap.create();
+    Set<String> entriesToRemove = new HashSet<>();
+    for (Entry<String, Long> tablePair : sortedTables) {
+      for (Entry<String, TableScanOperator> tableScanOpPair : tableNameToOps.get(tablePair.getKey())) {
+        TableScanOperator tsOp = tableScanOpPair.getValue();
+        if (excludeTableScanOps.contains(tsOp)) {
+          // Skip operator, currently we do not merge
+          continue;
+        }
+        String tableName = tablePair.getKey();
+        Collection<TableScanOperator> prevTsOps = existingOps.get(tableName);
+        if (!prevTsOps.isEmpty()) {
+          for (TableScanOperator prevTsOp : prevTsOps) {
+
+            // First we check if the two table scan operators can actually be merged
+            // If schemas do not match, we currently do not merge
+            List<String> prevTsOpNeededColumns = prevTsOp.getNeededColumns();
+            List<String> tsOpNeededColumns = tsOp.getNeededColumns();
+            if (prevTsOpNeededColumns.size() != tsOpNeededColumns.size()) {
+              // Skip
+              continue;
+            }
+            boolean notEqual = false;
+            for (int i = 0; i < prevTsOpNeededColumns.size(); i++) {
+              if (!prevTsOpNeededColumns.get(i).equals(tsOpNeededColumns.get(i))) {
+                notEqual = true;
+                break;
+              }
+            }
+            if (notEqual) {
+              // Skip
+              continue;
+            }
+            // If row limit does not match, we currently do not merge
+            if (prevTsOp.getConf().getRowLimit() != tsOp.getConf().getRowLimit()) {
+              // Skip
+              continue;
+            }
+
+            // It seems these two operators can be merged.
+            // Check that plan meets some preconditions before doing it.
+            // In particular, in the presence of map joins in the upstream plan:
+            // - we cannot exceed the noconditional task size, and
+            // - if we already merged the big table, we cannot merge the broadcast
+            // tables.
+            if (!validPreConditions(pctx, optimizerCache, prevTsOp, tsOp)) {
+              // Skip
+              LOG.debug("{} does not meet preconditions", tsOp);
+              continue;
+            }
+
+            // We can merge
+            ExprNodeGenericFuncDesc exprNode = null;
+            if (prevTsOp.getConf().getFilterExpr() != null) {
+              // Push filter on top of children
+              pushFilterToTopOfTableScan(optimizerCache, prevTsOp);
+              // Clone to push to table scan
+              exprNode = (ExprNodeGenericFuncDesc) prevTsOp.getConf().getFilterExpr();
+            }
+            if (tsOp.getConf().getFilterExpr() != null) {
+              // Push filter on top
+              pushFilterToTopOfTableScan(optimizerCache, tsOp);
+              ExprNodeGenericFuncDesc tsExprNode = tsOp.getConf().getFilterExpr();
+              if (exprNode != null && !exprNode.isSame(tsExprNode)) {
+                // We merge filters from previous scan by ORing with filters from current scan
+                if (exprNode.getGenericUDF() instanceof GenericUDFOPOr) {
+                  List<ExprNodeDesc> newChildren = new ArrayList<>(exprNode.getChildren().size() + 1);
+                  for (ExprNodeDesc childExprNode : exprNode.getChildren()) {
+                    if (childExprNode.isSame(tsExprNode)) {
+                      // We do not need to do anything, it is in the OR expression
+                      break;
+                    }
+                    newChildren.add(childExprNode);
+                  }
+                  if (exprNode.getChildren().size() == newChildren.size()) {
+                    newChildren.add(tsExprNode);
+                    exprNode = ExprNodeGenericFuncDesc.newInstance(
+                            new GenericUDFOPOr(),
+                            newChildren);
+                  }
+                } else {
+                  exprNode = ExprNodeGenericFuncDesc.newInstance(
+                          new GenericUDFOPOr(),
+                          Arrays.<ExprNodeDesc>asList(exprNode, tsExprNode));
+                }
+              }
+            }
+            // Replace filter
+            prevTsOp.getConf().setFilterExpr(exprNode);
+            // Replace table scan operator
+            List<Operator<? extends OperatorDesc>> allChildren =
+                    Lists.newArrayList(tsOp.getChildOperators());
+            for (Operator<? extends OperatorDesc> op : allChildren) {
+              tsOp.getChildOperators().remove(op);
+              op.replaceParent(tsOp, prevTsOp);
+              prevTsOp.getChildOperators().add(op);
+            }
+            entriesToRemove.add(tableScanOpPair.getKey());
+            // Remove and combine
+            optimizerCache.removeOpAndCombineWork(tsOp, prevTsOp);
+
+            LOG.debug("Merged {} into {}", tsOp, prevTsOp);
+
+            break;
+          }
+          if (!entriesToRemove.contains(tableScanOpPair.getKey())) {
+            existingOps.put(tableName, tsOp);
+          }
+        } else {
+          // Add to existing ops
+          existingOps.put(tableName, tsOp);
+        }
+      }
+    }
+    // Remove unused operators
+    for (String key : entriesToRemove) {
+      topOps.remove(key);
+    }
+
+    return pctx;
+  }
+
+  private static Set<TableScanOperator> gatherNotValidTableScanOps(
+          ParseContext pctx, SharedScanOptimizerCache optimizerCache) {
+    // Find TS operators with partition pruning enabled in plan
+    // because these TS may potentially read different data for
+    // different pipeline.
+    // These can be:
+    // 1) TS with static partitioning.
+    //    TODO: Check partition list of different TS and do not add if they are identical
+    // 2) TS with DPP.
+    //    TODO: Check if dynamic filters are identical and do not add.
+    // 3) TS with semijoin DPP.
+    //    TODO: Check for dynamic filters.
+    Set<TableScanOperator> notValidTableScanOps = new HashSet<>();
+    // 1) TS with static partitioning.
+    Map<String, TableScanOperator> topOps = pctx.getTopOps();
+    for (TableScanOperator tsOp : topOps.values()) {
+      if (tsOp.getConf().getPartColumns() != null &&
+              !tsOp.getConf().getPartColumns().isEmpty()) {
+        notValidTableScanOps.add(tsOp);
+      }
+    }
+    // 2) TS with DPP.
+    Collection<Operator<? extends OperatorDesc>> tableScanOps =
+            Lists.<Operator<?>>newArrayList(topOps.values());
+    Set<AppMasterEventOperator> s =
+            OperatorUtils.findOperators(tableScanOps, AppMasterEventOperator.class);
+    for (AppMasterEventOperator a : s) {
+      if (a.getConf() instanceof DynamicPruningEventDesc) {
+        DynamicPruningEventDesc dped = (DynamicPruningEventDesc) a.getConf();
+        notValidTableScanOps.add(dped.getTableScan());
+        optimizerCache.tableScanToDPPSource.put(dped.getTableScan(), a);
+      }
+    }
+    // 3) TS with semijoin DPP.
+    for (Entry<ReduceSinkOperator, SemiJoinBranchInfo> e
+            : pctx.getRsToSemiJoinBranchInfo().entrySet()) {
+      notValidTableScanOps.add(e.getValue().getTsOp());
+      optimizerCache.tableScanToDPPSource.put(e.getValue().getTsOp(), e.getKey());
+    }
+    return notValidTableScanOps;
+  }
+
+  private static Multimap<String, Entry<String, TableScanOperator>> splitTableScanOpsByTable(
+          ParseContext pctx) {
+    Multimap<String, Entry<String, TableScanOperator>> tableNameToOps = ArrayListMultimap.create();
+    for (Entry<String, TableScanOperator> e : pctx.getTopOps().entrySet()) {
+      TableScanOperator tsOp = e.getValue();
+      tableNameToOps.put(
+              tsOp.getConf().getTableMetadata().getDbName() + "."
+                      + tsOp.getConf().getTableMetadata().getTableName(), e);
+    }
+    return tableNameToOps;
+  }
+
+  private static List<Entry<String, Long>> rankTablesByAccumulatedSize(ParseContext pctx,
+          Set<TableScanOperator> excludeTables) {
+    Map<String, Long> tableToTotalSize = new HashMap<>();
+    for (Entry<String, TableScanOperator> e : pctx.getTopOps().entrySet()) {
+      TableScanOperator tsOp = e.getValue();
+      if (excludeTables.contains(tsOp)) {
+        // Skip operator, currently we do not merge
+        continue;
+      }
+      String tableName = tsOp.getConf().getTableMetadata().getDbName() + "."
+              + tsOp.getConf().getTableMetadata().getTableName();
+      long tableSize = tsOp.getStatistics() != null ?
+              tsOp.getStatistics().getDataSize() : 0L;
+      Long totalSize = tableToTotalSize.get(tableName);
+      if (totalSize != null) {
+        tableToTotalSize.put(tableName,
+                StatsUtils.safeAdd(totalSize, tableSize));
+      } else {
+        tableToTotalSize.put(tableName, tableSize);
+      }
+    }
+    List<Entry<String, Long>> sortedTables =
+            new LinkedList<>(tableToTotalSize.entrySet());
+    Collections.sort(sortedTables, Collections.reverseOrder(
+            new Comparator<Map.Entry<String, Long>>() {
+              public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
+                return (o1.getValue()).compareTo(o2.getValue());
+              }
+            }));
+    return sortedTables;
+  }
+
+  private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizerCache optimizerCache,
+          TableScanOperator prevTsOp, TableScanOperator tsOp) {
+    // 1) The set of operators in the works of the TS operators need to meet
+    // some requirements. In particular:
+    // 1.1. None of the works that contain the TS operators can contain a Union
+    // operator. This is not supported yet as we might end up with cycles in
+    // the Tez DAG.
+    // 1.2. There cannot be more than one DummyStore operator in the new resulting
+    // work when the TS operators are merged. This is due to an assumption in
+    // MergeJoinProc that needs to be further explored.
+    // If any of these conditions are not met, we cannot merge.
+    // TODO: Extend rule so it can be apply for these cases.
+    final Set<Operator<?>> workOps1 = findWorkOperators(optimizerCache, prevTsOp);
+    final Set<Operator<?>> workOps2 = findWorkOperators(optimizerCache, tsOp);
+    boolean foundDummyStoreOp = false;
+    for (Operator<?> op : workOps1) {
+      if (op instanceof UnionOperator) {
+        // We cannot merge (1.1)
+        return false;
+      }
+      if (op instanceof DummyStoreOperator) {
+        foundDummyStoreOp = true;
+      }
+    }
+    for (Operator<?> op : workOps2) {
+      if (op instanceof UnionOperator) {
+        // We cannot merge (1.1)
+        return false;
+      }
+      if (foundDummyStoreOp && op instanceof DummyStoreOperator) {
+        // We cannot merge (1.2)
+        return false;
+      }
+    }
+    // 2) We check whether output works when we merge the operators will collide.
+    //
+    //   Work1   Work2    (merge TS in W1 & W2)        Work1
+    //       \   /                  ->                  | |       X
+    //       Work3                                     Work3
+    //
+    // If we do, we cannot merge. The reason is that Tez currently does
+    // not support parallel edges, i.e., multiple edges from same work x
+    // into same work y.
+    final Set<Operator<?>> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, prevTsOp);
+    final Set<Operator<?>> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, tsOp);
+    if (!Collections.disjoint(outputWorksOps1, outputWorksOps2)) {
+      // We cannot merge
+      return false;
+    }
+    // 3) We check whether we will end up with same operators inputing on same work.
+    //
+    //       Work1        (merge TS in W2 & W3)        Work1
+    //       /   \                  ->                  | |       X
+    //   Work2   Work3                                 Work2
+    //
+    // If we do, we cannot merge. The reason is the same as above, currently
+    // Tez currently does not support parallel edges.
+    final Set<Operator<?>> inputWorksOps1 = findParentWorkOperators(pctx, optimizerCache, prevTsOp);
+    final Set<Operator<?>> inputWorksOps2 = findParentWorkOperators(pctx, optimizerCache, tsOp);
+    if (!Collections.disjoint(inputWorksOps1, inputWorksOps2)) {
+      // We cannot merge
+      return false;
+    }
+    // 4) We check whether one of the operators is part of a work that is an input for
+    // the work of the other operator.
+    //
+    //   Work1            (merge TS in W1 & W3)        Work1
+    //     |                        ->                   |        X
+    //   Work2                                         Work2
+    //     |                                             |
+    //   Work3                                         Work1
+    //
+    // If we do, we cannot merge, as we would end up with a cycle in the DAG.
+    final Set<Operator<?>> descendantWorksOps1 =
+            findDescendantWorkOperators(pctx, optimizerCache, prevTsOp);
+    final Set<Operator<?>> descendantWorksOps2 =
+            findDescendantWorkOperators(pctx, optimizerCache, tsOp);
+    if (!Collections.disjoint(descendantWorksOps1, workOps2)
+            || !Collections.disjoint(workOps1, descendantWorksOps2)) {
+      return false;
+    }
+    // 5) We check whether merging the works would cause the size of
+    // the data in memory grow too large.
+    // TODO: Currently ignores GBY and PTF which may also buffer data in memory.
+    final Set<Operator<?>> newWorkOps = workOps1;
+    newWorkOps.addAll(workOps2);
+    long dataSize = 0L;
+    for (Operator<?> op : newWorkOps) {
+      if (op instanceof MapJoinOperator) {
+        MapJoinOperator mop = (MapJoinOperator) op;
+        dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize());
+        if (dataSize > mop.getConf().getNoConditionalTaskSize()) {
+          // Size surpasses limit, we cannot convert
+          LOG.debug("accumulated data size: {} / max size: {}",
+                  dataSize, mop.getConf().getNoConditionalTaskSize());
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private static Set<Operator<?>> findParentWorkOperators(ParseContext pctx,
+          SharedScanOptimizerCache optimizerCache, Operator<?> start) {
+    // Find operators in work
+    Set<Operator<?>> workOps = findWorkOperators(optimizerCache, start);
+    // Gather input works operators
+    Set<Operator<?>> set = new HashSet<Operator<?>>();
+    for (Operator<?> op : workOps) {
+      if (op.getParentOperators() != null) {
+        for (Operator<?> parent : op.getParentOperators()) {
+          if (parent instanceof ReduceSinkOperator) {
+            set.addAll(findWorkOperators(optimizerCache, parent));
+          }
+        }
+      } else if (op instanceof TableScanOperator) {
+        // Check for DPP and semijoin DPP
+        for (Operator<?> parent : optimizerCache.tableScanToDPPSource.get((TableScanOperator) op)) {
+          set.addAll(findWorkOperators(optimizerCache, parent));
+        }
+      }
+    }
+    return set;
+  }
+
+  private static Set<Operator<?>> findChildWorkOperators(ParseContext pctx,
+          SharedScanOptimizerCache optimizerCache, Operator<?> start) {
+    // Find operators in work
+    Set<Operator<?>> workOps = findWorkOperators(optimizerCache, start);
+    // Gather output works operators
+    Set<Operator<?>> set = new HashSet<Operator<?>>();
+    for (Operator<?> op : workOps) {
+      if (op instanceof ReduceSinkOperator) {
+        if (op.getChildOperators() != null) {
+          // All children of RS are descendants
+          for (Operator<?> child : op.getChildOperators()) {
+            set.addAll(findWorkOperators(optimizerCache, child));
+          }
+        }
+        // Semijoin DPP work is considered an child because work needs
+        // to finish for it to execute
+        SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op);
+        if (sjbi != null) {
+          set.addAll(findWorkOperators(optimizerCache, sjbi.getTsOp()));
+        }
+      } else if(op.getConf() instanceof DynamicPruningEventDesc) {
+        // DPP work is considered an child because work needs
+        // to finish for it to execute
+        set.addAll(findWorkOperators(
+                optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan()));
+      }
+    }
+    return set;
+  }
+
+  private static Set<Operator<?>> findDescendantWorkOperators(ParseContext pctx,
+          SharedScanOptimizerCache optimizerCache, Operator<?> start) {
+    // Find operators in work
+    Set<Operator<?>> workOps = findWorkOperators(optimizerCache, start);
+    // Gather output works operators
+    Set<Operator<?>> result = new HashSet<Operator<?>>();
+    Set<Operator<?>> set;
+    while (!workOps.isEmpty()) {
+      set = new HashSet<Operator<?>>();
+      for (Operator<?> op : workOps) {
+        if (op instanceof ReduceSinkOperator) {
+          if (op.getChildOperators() != null) {
+            // All children of RS are descendants
+            for (Operator<?> child : op.getChildOperators()) {
+              set.addAll(findWorkOperators(optimizerCache, child));
+            }
+          }
+          // Semijoin DPP work is considered a descendant because work needs
+          // to finish for it to execute
+          SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op);
+          if (sjbi != null) {
+            set.addAll(findWorkOperators(optimizerCache, sjbi.getTsOp()));
+          }
+        } else if(op.getConf() instanceof DynamicPruningEventDesc) {
+          // DPP work is considered a descendant because work needs
+          // to finish for it to execute
+          set.addAll(findWorkOperators(
+                  optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan()));
+        }
+      }
+      workOps = set;
+      result.addAll(set);
+    }
+    return result;
+  }
+
+  // Stores result in cache
+  private static Set<Operator<?>> findWorkOperators(
+          SharedScanOptimizerCache optimizerCache, Operator<?> start) {
+    Set<Operator<?>> c = optimizerCache.operatorToWorkOperators.get(start);
+    if (!c.isEmpty()) {
+      return c;
+    }
+    c = findWorkOperators(start, new HashSet<Operator<?>>());
+    for (Operator<?> op : c) {
+      optimizerCache.operatorToWorkOperators.putAll(op, c);
+    }
+    return c;
+  }
+
+  private static Set<Operator<?>> findWorkOperators(Operator<?> start, Set<Operator<?>> found) {
+    found.add(start);
+    if (start.getParentOperators() != null) {
+      for (Operator<?> parent : start.getParentOperators()) {
+        if (parent instanceof ReduceSinkOperator) {
+          continue;
+        }
+        if (!found.contains(parent)) {
+          findWorkOperators(parent, found);
+        }
+      }
+    }
+    if (start instanceof ReduceSinkOperator) {
+      return found;
+    }
+    if (start.getChildOperators() != null) {
+      for (Operator<?> child : start.getChildOperators()) {
+        if (!found.contains(child)) {
+          findWorkOperators(child, found);
+        }
+      }
+    }
+    return found;
+  }
+
+  private static void pushFilterToTopOfTableScan(
+          SharedScanOptimizerCache optimizerCache, TableScanOperator tsOp)
+                  throws UDFArgumentException {
+    ExprNodeGenericFuncDesc tableScanExprNode = tsOp.getConf().getFilterExpr();
+    List<Operator<? extends OperatorDesc>> allChildren =
+            Lists.newArrayList(tsOp.getChildOperators());
+    for (Operator<? extends OperatorDesc> op : allChildren) {
+      if (op instanceof FilterOperator) {
+        FilterOperator filterOp = (FilterOperator) op;
+        ExprNodeDesc filterExprNode  = filterOp.getConf().getPredicate();
+        if (tableScanExprNode.isSame(filterExprNode)) {
+          // We do not need to do anything
+          return;
+        }
+        if (tableScanExprNode.getGenericUDF() instanceof GenericUDFOPOr) {
+          for (ExprNodeDesc childExprNode : tableScanExprNode.getChildren()) {
+            if (childExprNode.isSame(filterExprNode)) {
+              // We do not need to do anything, it is in the OR expression
+              // so probably we pushed previously
+              return;
+            }
+          }
+        }
+        ExprNodeGenericFuncDesc newPred = ExprNodeGenericFuncDesc.newInstance(
+                new GenericUDFOPAnd(),
+                Arrays.<ExprNodeDesc>asList(tableScanExprNode.clone(), filterExprNode));
+        filterOp.getConf().setPredicate(newPred);
+      } else {
+        Operator<FilterDesc> newOp = OperatorFactory.get(tsOp.getCompilationOpContext(),
+                new FilterDesc(tableScanExprNode.clone(), false),
+                new RowSchema(tsOp.getSchema().getSignature()));
+        tsOp.replaceChild(op, newOp);
+        newOp.getParentOperators().add(tsOp);
+        op.replaceParent(tsOp, newOp);
+        newOp.getChildOperators().add(op);
+        // Add to cache (same group as tsOp)
+        optimizerCache.putIfWorkExists(newOp, tsOp);
+      }
+    }
+  }
+
+  /** Cache to accelerate optimization */
+  private static class SharedScanOptimizerCache {
+    // Operators that belong to each work
+    final HashMultimap<Operator<?>, Operator<?>> operatorToWorkOperators =
+            HashMultimap.<Operator<?>, Operator<?>>create();
+    // Table scan operators to DPP sources
+    final Multimap<TableScanOperator, Operator<?>> tableScanToDPPSource =
+            HashMultimap.<TableScanOperator, Operator<?>>create();
+
+    // Add new operator to cache work group of existing operator (if group exists)
+    void putIfWorkExists(Operator<?> opToAdd, Operator<?> existingOp) {
+      List<Operator<?>> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp));
+      if (!c.isEmpty()) {
+        for (Operator<?> op : c) {
+          operatorToWorkOperators.get(op).add(opToAdd);
+        }
+        operatorToWorkOperators.putAll(opToAdd, c);
+        operatorToWorkOperators.put(opToAdd, opToAdd);
+      }
+    }
+
+    // Remove operator and combine
+    void removeOpAndCombineWork(Operator<?> opToRemove, Operator<?> replacementOp) {
+      Set<Operator<?>> s = operatorToWorkOperators.get(opToRemove);
+      s.remove(opToRemove);
+      List<Operator<?>> c1 = ImmutableList.copyOf(s);
+      List<Operator<?>> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp));
+      if (!c1.isEmpty() && !c2.isEmpty()) {
+        for (Operator<?> op1 : c1) {
+          operatorToWorkOperators.remove(op1, opToRemove); // Remove operator
+          operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection
+        }
+        operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator
+        for (Operator<?> op2 : c2) {
+          operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index f78bd7c..53abb21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -282,7 +282,7 @@ public final class GenMRSkewJoinProcessor {
           newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor
           .getOutputColumnNames(), i, joinDescriptor.getConds(),
           joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix,
-          joinDescriptor.getNoConditionalTaskSize());
+          joinDescriptor.getNoConditionalTaskSize(), joinDescriptor.getInMemoryDataSize());
       mapJoinDescriptor.setTagOrder(tags);
       mapJoinDescriptor.setHandleSkewJoin(false);
       mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes());

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index c970611..a5f0b2a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -241,7 +241,7 @@ public class GenSparkSkewJoinProcessor {
           newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc, joinDescriptor
           .getOutputColumnNames(), i, joinDescriptor.getConds(),
           joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix,
-          joinDescriptor.getNoConditionalTaskSize());
+          joinDescriptor.getNoConditionalTaskSize(), joinDescriptor.getInMemoryDataSize());
       mapJoinDescriptor.setTagOrder(tags);
       mapJoinDescriptor.setHandleSkewJoin(false);
       mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes());

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index c87de16..188590f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -291,8 +291,14 @@ public class GenTezWork implements NodeProcessor {
               // of the downstream work
               for (ReduceSinkOperator r:
                      context.linkWorkWithReduceSinkMap.get(parentWork)) {
+                if (!context.mapJoinParentMap.get(mj).contains(r)) {
+                  // We might be visiting twice because of reutilization of intermediary results.
+                  // If that is the case, we do not need to do anything because either we have
+                  // already connected this RS operator or we will connect it at subsequent pass.
+                  continue;
+                }
                 if (r.getConf().getOutputName() != null) {
-                  LOG.debug("Cloning reduce sink for multi-child broadcast edge");
+                  LOG.debug("Cloning reduce sink " + r + " for multi-child broadcast edge");
                   // we've already set this one up. Need to clone for the next work.
                   r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
                       r.getCompilationOpContext(), (ReduceSinkDesc)r.getConf().clone(),
@@ -370,7 +376,7 @@ public class GenTezWork implements NodeProcessor {
       long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
 
       LOG.debug("Second pass. Leaf operator: "+operator
-        +" has common downstream work:"+followingWork);
+        +" has common downstream work: "+followingWork);
 
       if (operator instanceof DummyStoreOperator) {
         // this is the small table side.

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index f469cd2..7e156f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -17,26 +17,55 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
-import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
-
-import com.google.common.base.Preconditions;
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.hive.ql.exec.*;
-import org.apache.hadoop.hive.ql.lib.*;
-import org.apache.hadoop.hive.ql.plan.*;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
@@ -47,9 +76,11 @@ import org.apache.hadoop.hive.ql.optimizer.MergeJoinProc;
 import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
 import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize;
 import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
+import org.apache.hadoop.hive.ql.optimizer.SharedScanOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
 import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
+import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
 import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider;
 import org.apache.hadoop.hive.ql.optimizer.physical.LlapPreVectorizationPass;
 import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider;
@@ -60,10 +91,25 @@ import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
 import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ColStatistics;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * TezCompiler translates the operator plan into TezTasks.
@@ -92,6 +138,7 @@ public class TezCompiler extends TaskCompiler {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     // Create the context for the walker
     OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs);
+
     perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
     // setup dynamic partition pruning where possible
     runDynamicPartitionPruning(procCtx, inputs, outputs);
@@ -136,6 +183,12 @@ public class TezCompiler extends TaskCompiler {
     runCycleAnalysisForPartitionPruning(procCtx, inputs, outputs);
     perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run cycle analysis for partition pruning");
 
+    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+    if(procCtx.conf.getBoolVar(ConfVars.HIVE_SHARED_SCAN_OPTIMIZATION)) {
+      new SharedScanOptimizer().transform(procCtx.parseContext);
+    }
+    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Shared scans optimization");
+
     // need a new run of the constant folding because we might have created lots
     // of "and true and true" conditions.
     // Rather than run the full constant folding just need to shortcut AND/OR expressions

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index c4fb3f3..12e1ff5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -107,6 +107,9 @@ public class JoinDesc extends AbstractOperatorDesc {
   private transient boolean leftInputJoin;
   private transient List<String> streamAliases;
 
+  // represents the total memory that this Join operator will use if it is a MapJoin operator
+  protected transient long inMemoryDataSize;
+
   // non-transient field, used at runtime to kill a task if it exceeded memory limits when running in LLAP
   protected long noConditionalTaskSize;
 
@@ -202,6 +205,7 @@ public class JoinDesc extends AbstractOperatorDesc {
     this.residualFilterExprs = clone.residualFilterExprs;
     this.statistics = clone.statistics;
     this.noConditionalTaskSize = clone.noConditionalTaskSize;
+    this.inMemoryDataSize = clone.inMemoryDataSize;
   }
 
   public Map<Byte, List<ExprNodeDesc>> getExprs() {
@@ -696,4 +700,12 @@ public class JoinDesc extends AbstractOperatorDesc {
   public void setNoConditionalTaskSize(final long noConditionalTaskSize) {
     this.noConditionalTaskSize = noConditionalTaskSize;
   }
+
+  public long getInMemoryDataSize() {
+    return inMemoryDataSize;
+  }
+
+  public void setInMemoryDataSize(final long inMemoryDataSize) {
+    this.inMemoryDataSize = inMemoryDataSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index 8da85d2..f387e6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -113,7 +113,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
     final List<TableDesc> valueTblDescs, final List<TableDesc> valueFilteredTblDescs, List<String> outputColumnNames,
     final int posBigTable, final JoinCondDesc[] conds,
     final Map<Byte, List<ExprNodeDesc>> filters, boolean noOuterJoin, String dumpFilePrefix,
-    final long noConditionalTaskSize) {
+    final long noConditionalTaskSize, final long inMemoryDataSize) {
     super(values, outputColumnNames, noOuterJoin, conds, filters, null, noConditionalTaskSize);
     vectorDesc = null;
     this.keys = keys;
@@ -123,6 +123,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
     this.posBigTable = posBigTable;
     this.bigTableBucketNumMapping = new LinkedHashMap<String, Integer>();
     this.dumpFilePrefix = dumpFilePrefix;
+    this.inMemoryDataSize = inMemoryDataSize;
     initRetainExprList();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/queries/clientpositive/perf/query88.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/perf/query88.q b/ql/src/test/queries/clientpositive/perf/query88.q
index 2be814e..bb6ef6d 100644
--- a/ql/src/test/queries/clientpositive/perf/query88.q
+++ b/ql/src/test/queries/clientpositive/perf/query88.q
@@ -1,3 +1,5 @@
+set hive.strict.checks.cartesian.product=false;
+
 explain
 select  *
 from

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/auto_join0.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join0.q.out b/ql/src/test/results/clientpositive/llap/auto_join0.q.out
index cba6001..6d051ea 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join0.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join0.q.out
@@ -30,10 +30,10 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (BROADCAST_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
-        Reducer 6 <- Map 5 (SIMPLE_EDGE)
+        Reducer 5 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -52,13 +52,6 @@ STAGE PLANS:
                         key expressions: _col0 (type: string), _col1 (type: string)
                         sort order: ++
                         Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 5 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   Filter Operator
                     predicate: (key < 10) (type: boolean)
                     Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE
@@ -87,7 +80,7 @@ STAGE PLANS:
                     1 
                   outputColumnNames: _col0, _col1, _col2, _col3
                   input vertices:
-                    1 Reducer 6
+                    1 Reducer 5
                   Statistics: Num rows: 27556 Data size: 9809936 Basic stats: COMPLETE Column stats: COMPLETE
                   Reduce Output Operator
                     key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
@@ -124,7 +117,7 @@ STAGE PLANS:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-        Reducer 6 
+        Reducer 5 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/auto_join30.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join30.q.out b/ql/src/test/results/clientpositive/llap/auto_join30.q.out
index a26db55..cc59c5c 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join30.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join30.q.out
@@ -457,9 +457,9 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 2 (BROADCAST_EDGE), Reducer 7 (BROADCAST_EDGE)
+        Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 2 (BROADCAST_EDGE), Reducer 6 (BROADCAST_EDGE)
         Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)
-        Reducer 7 <- Map 6 (SIMPLE_EDGE)
+        Reducer 6 <- Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -497,13 +497,6 @@ STAGE PLANS:
                         sort order: +
                         Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col0 (type: string)
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
@@ -548,7 +541,7 @@ STAGE PLANS:
                   outputColumnNames: _col2, _col3
                   input vertices:
                     0 Reducer 2
-                    2 Reducer 7
+                    2 Reducer 6
                   Statistics: Num rows: 2974 Data size: 529372 Basic stats: COMPLETE Column stats: COMPLETE
                   Group By Operator
                     aggregations: sum(hash(_col2,_col3))
@@ -574,7 +567,7 @@ STAGE PLANS:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-        Reducer 7 
+        Reducer 6 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator
@@ -650,10 +643,10 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
-        Reducer 8 <- Map 7 (SIMPLE_EDGE)
+        Reducer 7 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -685,13 +678,6 @@ STAGE PLANS:
                       sort order: +
                       Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                       value expressions: _col0 (type: string)
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 7 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: _col0, _col1
@@ -765,7 +751,7 @@ STAGE PLANS:
                   Map-reduce partition columns: _col0 (type: string)
                   Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   value expressions: _col1 (type: string)
-        Reducer 8 
+        Reducer 7 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator
@@ -841,10 +827,10 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
-        Reducer 8 <- Map 7 (SIMPLE_EDGE)
+        Reducer 7 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -876,13 +862,6 @@ STAGE PLANS:
                       sort order: +
                       Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                       value expressions: _col0 (type: string)
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 7 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: _col0, _col1
@@ -956,7 +935,7 @@ STAGE PLANS:
                   Map-reduce partition columns: _col0 (type: string)
                   Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   value expressions: _col1 (type: string)
-        Reducer 8 
+        Reducer 7 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator
@@ -1032,10 +1011,10 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
-        Reducer 8 <- Map 7 (SIMPLE_EDGE)
+        Reducer 7 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1067,13 +1046,6 @@ STAGE PLANS:
                       sort order: +
                       Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                       value expressions: _col0 (type: string)
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 7 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: _col0, _col1
@@ -1147,7 +1119,7 @@ STAGE PLANS:
                   Map-reduce partition columns: _col0 (type: string)
                   Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   value expressions: _col1 (type: string)
-        Reducer 8 
+        Reducer 7 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator
@@ -1223,10 +1195,10 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
         Reducer 6 <- Map 5 (SIMPLE_EDGE)
-        Reducer 8 <- Map 7 (SIMPLE_EDGE)
+        Reducer 7 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1258,13 +1230,6 @@ STAGE PLANS:
                       sort order: +
                       Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                       value expressions: _col0 (type: string)
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 7 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: _col0, _col1
@@ -1338,7 +1303,7 @@ STAGE PLANS:
                   Map-reduce partition columns: _col0 (type: string)
                   Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
                   value expressions: _col1 (type: string)
-        Reducer 8 
+        Reducer 7 
             Execution mode: llap
             Reduce Operator Tree:
               Select Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out
index b69d0bd..bdb30d7 100644
--- a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out
@@ -475,7 +475,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Map 1 <- Map 3 (BROADCAST_EDGE)
-        Map 4 <- Map 6 (BROADCAST_EDGE)
+        Map 4 <- Map 3 (BROADCAST_EDGE)
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (BROADCAST_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -533,6 +533,18 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
             Execution mode: llap
             LLAP IO: no inputs
         Map 4 
@@ -555,7 +567,7 @@ STAGE PLANS:
                           1 _col0 (type: int)
                         outputColumnNames: _col0
                         input vertices:
-                          1 Map 6
+                          1 Map 3
                         Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
                         Group By Operator
                           aggregations: count()
@@ -571,25 +583,6 @@ STAGE PLANS:
                             value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: b
-                  Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: key is not null (type: boolean)
-                    Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Select Operator
-                      expressions: key (type: int)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
-            LLAP IO: no inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -2306,7 +2299,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Map 1 <- Map 3 (BROADCAST_EDGE)
-        Map 4 <- Map 6 (BROADCAST_EDGE)
+        Map 4 <- Map 3 (BROADCAST_EDGE)
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (BROADCAST_EDGE)
         Reducer 5 <- Map 4 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -2364,6 +2357,18 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
             Execution mode: llap
             LLAP IO: no inputs
         Map 4 
@@ -2386,7 +2391,7 @@ STAGE PLANS:
                           1 _col0 (type: int)
                         outputColumnNames: _col0
                         input vertices:
-                          1 Map 6
+                          1 Map 3
                         Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
                         Group By Operator
                           aggregations: count()
@@ -2402,25 +2407,6 @@ STAGE PLANS:
                             value expressions: _col1 (type: bigint)
             Execution mode: llap
             LLAP IO: no inputs
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: b
-                  Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: key is not null (type: boolean)
-                    Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                    Select Operator
-                      expressions: key (type: int)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: llap
-            LLAP IO: no inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
index 964d058..042c60b 100644
--- a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
+++ b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
@@ -757,7 +757,7 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Map 1 <- Map 3 (CUSTOM_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (CUSTOM_SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -818,13 +818,6 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: int)
                         Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col1 (type: string)
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 4 
-            Map Operator Tree:
-                TableScan
-                  alias: b
-                  Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
@@ -861,7 +854,7 @@ STAGE PLANS:
                       1 _col0 (type: int)
                     outputColumnNames: _col0, _col1, _col3
                     input vertices:
-                      1 Map 4
+                      1 Map 3
                     Statistics: Num rows: 302 Data size: 5633 Basic stats: COMPLETE Column stats: NONE
                     Select Operator
                       expressions: _col1 (type: int), _col0 (type: double), _col3 (type: string)

http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out b/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
index b628cb1..291a1f2 100644
--- a/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
+++ b/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
@@ -1738,9 +1738,9 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
-        Reducer 7 <- Map 6 (SIMPLE_EDGE)
+        Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1780,13 +1780,6 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: z
-                  Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
@@ -1864,7 +1857,7 @@ STAGE PLANS:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-        Reducer 7 
+        Reducer 6 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
@@ -1929,9 +1922,9 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
-        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
         Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
-        Reducer 7 <- Map 6 (SIMPLE_EDGE)
+        Reducer 6 <- Map 5 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1971,13 +1964,6 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col1 (type: string)
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: z
-                  Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
@@ -2055,7 +2041,7 @@ STAGE PLANS:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-        Reducer 7 
+        Reducer 6 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator