You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/10/14 21:07:05 UTC

svn commit: r1631841 [12/42] - in /hive/branches/llap: ./ accumulo-handler/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/ accumulo-handler/src/java/org/apache/hadoop/hiv...

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Tue Oct 14 19:06:45 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,12 +30,17 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MuxOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -42,12 +48,16 @@ import org.apache.hadoop.hive.ql.parse.O
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * ConvertJoinMapJoin is an optimization that replaces a common join
@@ -60,39 +70,46 @@ public class ConvertJoinMapJoin implemen
 
   static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
 
+  @SuppressWarnings("unchecked")
   @Override
-    /*
-     * (non-Javadoc)
-     * we should ideally not modify the tree we traverse.
-     * However, since we need to walk the tree at any time when we modify the
-     * operator, we might as well do it here.
-     */
-    public Object process(Node nd, Stack<Node> stack,
-        NodeProcessorCtx procCtx, Object... nodeOutputs)
-    throws SemanticException {
+  /*
+   * (non-Javadoc) we should ideally not modify the tree we traverse. However,
+   * since we need to walk the tree at any time when we modify the operator, we
+   * might as well do it here.
+   */
+  public Object
+      process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+          throws SemanticException {
 
     OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
 
-    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
+    JoinOperator joinOp = (JoinOperator) nd;
+
+    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)
+        && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
+      // we are just converting to a common merge join operator. The shuffle
+      // join in map-reduce case.
+      int pos = 0; // it doesn't matter which position we use in this case.
+      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
       return null;
     }
 
-    JoinOperator joinOp = (JoinOperator) nd;
-    // if we have traits, and table info is present in the traits, we know the 
+    // if we have traits, and table info is present in the traits, we know the
     // exact number of buckets. Else choose the largest number of estimated
     // reducers from the parent operators.
     int numBuckets = -1;
     int estimatedBuckets = -1;
+    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
       for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
         if (parentOp.getOpTraits().getNumBuckets() > 0) {
-          numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? 
-              parentOp.getOpTraits().getNumBuckets() : numBuckets; 
+          numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
+              parentOp.getOpTraits().getNumBuckets() : numBuckets;
         }
 
         if (parentOp instanceof ReduceSinkOperator) {
           ReduceSinkOperator rs = (ReduceSinkOperator)parentOp;
-          estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? 
+          estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
               rs.getConf().getNumReducers() : estimatedBuckets;
         }
       }
@@ -107,29 +124,80 @@ public class ConvertJoinMapJoin implemen
       numBuckets = 1;
     }
     LOG.info("Estimated number of buckets " + numBuckets);
-    int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets);
+    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
     if (mapJoinConversionPos < 0) {
-      // we cannot convert to bucket map join, we cannot convert to 
-      // map join either based on the size
+      // we cannot convert to bucket map join, we cannot convert to
+      // map join either based on the size. Check if we can convert to SMB join.
+      if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
+        convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
+        return null;
+      }
+      Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
+      try {
+        bigTableMatcherClass =
+            (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
+                context.parseContext.getConf(),
+                HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
+      } catch (ClassNotFoundException e) {
+        throw new SemanticException(e.getMessage());
+      }
+
+      BigTableSelectorForAutoSMJ bigTableMatcher =
+          ReflectionUtils.newInstance(bigTableMatcherClass, null);
+      JoinDesc joinDesc = joinOp.getConf();
+      JoinCondDesc[] joinCondns = joinDesc.getConds();
+      Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
+      if (joinCandidates.isEmpty()) {
+        // This is a full outer join. This can never be a map-join
+        // of any type. So return false.
+        return false;
+      }
+      mapJoinConversionPos =
+          bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
+      if (mapJoinConversionPos < 0) {
+        // contains aliases from sub-query
+        // we are just converting to a common merge join operator. The shuffle
+        // join in map-reduce case.
+        int pos = 0; // it doesn't matter which position we use in this case.
+        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+        return null;
+      }
+
+      if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+        convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
+            tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
+      } else {
+        // we are just converting to a common merge join operator. The shuffle
+        // join in map-reduce case.
+        int pos = 0; // it doesn't matter which position we use in this case.
+        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+      }
       return null;
     }
 
-    if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
-      if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) {
-        return null;
+    if (numBuckets > 1) {
+      if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
+        if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+          return null;
+        }
       }
     }
 
     LOG.info("Convert to non-bucketed map join");
     // check if we can convert to map join no bucket scaling.
-    mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1);
+    mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1);
     if (mapJoinConversionPos < 0) {
+      // we are just converting to a common merge join operator. The shuffle
+      // join in map-reduce case.
+      int pos = 0; // it doesn't matter which position we use in this case.
+      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
       return null;
     }
 
     MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
     // map join operator by default has no bucket cols
-    mapJoinOp.setOpTraits(new OpTraits(null, -1));
+    mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
+    mapJoinOp.setStatistics(joinOp.getStatistics());
     // propagate this change till the next RS
     for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
       setAllChildrenTraitsToNull(childOp);
@@ -138,11 +206,107 @@ public class ConvertJoinMapJoin implemen
     return null;
   }
 
+  // replaces the join operator with a new CommonJoinOperator, removes the
+  // parent reduce sinks
+  private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+      int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
+      throws SemanticException {
+    ParseContext parseContext = context.parseContext;
+    MapJoinDesc mapJoinDesc = null;
+    if (adjustParentsChildren) {
+        mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
+            joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
+    } else {
+      JoinDesc joinDesc = joinOp.getConf();
+      // retain the original join desc in the map join.
+      mapJoinDesc =
+          new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
+              joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
+              joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
+    }
+
+    @SuppressWarnings("unchecked")
+    CommonMergeJoinOperator mergeJoinOp =
+        (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
+            isSubQuery, mapJoinConversionPos, mapJoinDesc));
+    OpTraits opTraits =
+        new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
+            .getSortCols());
+    mergeJoinOp.setOpTraits(opTraits);
+    mergeJoinOp.setStatistics(joinOp.getStatistics());
+
+    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      int pos = parentOp.getChildOperators().indexOf(joinOp);
+      parentOp.getChildOperators().remove(pos);
+      parentOp.getChildOperators().add(pos, mergeJoinOp);
+    }
+
+    for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) {
+      int pos = childOp.getParentOperators().indexOf(joinOp);
+      childOp.getParentOperators().remove(pos);
+      childOp.getParentOperators().add(pos, mergeJoinOp);
+    }
+
+    List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators();
+    if (childOperators == null) {
+      childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+      mergeJoinOp.setChildOperators(childOperators);
+    }
+
+    List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators();
+    if (parentOperators == null) {
+      parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+      mergeJoinOp.setParentOperators(parentOperators);
+    }
+
+    childOperators.clear();
+    parentOperators.clear();
+    childOperators.addAll(joinOp.getChildOperators());
+    parentOperators.addAll(joinOp.getParentOperators());
+    mergeJoinOp.getConf().setGenJoinKeys(false);
+
+    if (adjustParentsChildren) {
+      mergeJoinOp.getConf().setGenJoinKeys(true);
+      List<Operator<? extends OperatorDesc>> newParentOpList =
+          new ArrayList<Operator<? extends OperatorDesc>>();
+      for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) {
+        for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
+          grandParentOp.getChildOperators().remove(parentOp);
+          grandParentOp.getChildOperators().add(mergeJoinOp);
+          newParentOpList.add(grandParentOp);
+        }
+      }
+      mergeJoinOp.getParentOperators().clear();
+      mergeJoinOp.getParentOperators().addAll(newParentOpList);
+      List<Operator<? extends OperatorDesc>> parentOps =
+          new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators());
+      for (Operator<? extends OperatorDesc> parentOp : parentOps) {
+        int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp);
+        if (parentIndex == mapJoinConversionPos) {
+          continue;
+        }
+
+        // insert the dummy store operator here
+        DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator();
+        dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+        dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+        dummyStoreOp.getChildOperators().add(mergeJoinOp);
+        int index = parentOp.getChildOperators().indexOf(mergeJoinOp);
+        parentOp.getChildOperators().remove(index);
+        parentOp.getChildOperators().add(index, dummyStoreOp);
+        dummyStoreOp.getParentOperators().add(parentOp);
+        mergeJoinOp.getParentOperators().remove(parentIndex);
+        mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp);
+      }
+    }
+    mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
+  }
+
   private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
     if (currentOp instanceof ReduceSinkOperator) {
       return;
     }
-    currentOp.setOpTraits(new OpTraits(null, -1));
+    currentOp.setOpTraits(new OpTraits(null, -1, null));
     for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
       if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
         break;
@@ -151,28 +315,26 @@ public class ConvertJoinMapJoin implemen
     }
   }
 
-  private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
-      int bigTablePosition) throws SemanticException {
-
-    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
+  private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+      int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
 
     if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) {
       LOG.info("Check conversion to bucket map join failed.");
       return false;
     }
 
-    MapJoinOperator mapJoinOp = 
-      convertJoinMapJoin(joinOp, context, bigTablePosition);
+    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition);
     MapJoinDesc joinDesc = mapJoinOp.getConf();
     joinDesc.setBucketMapJoin(true);
 
     // we can set the traits for this join operator
     OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
-        tezBucketJoinProcCtx.getNumBuckets());
+        tezBucketJoinProcCtx.getNumBuckets(), null);
     mapJoinOp.setOpTraits(opTraits);
+    mapJoinOp.setStatistics(joinOp.getStatistics());
     setNumberOfBucketsOnChildren(mapJoinOp);
 
-    // Once the conversion is done, we can set the partitioner to bucket cols on the small table    
+    // Once the conversion is done, we can set the partitioner to bucket cols on the small table
     Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>();
     bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets());
     joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping);
@@ -182,6 +344,54 @@ public class ConvertJoinMapJoin implemen
     return true;
   }
 
+  /*
+   * This method tries to convert a join to an SMB. This is done based on
+   * traits. If the sorted by columns are the same as the join columns then, we
+   * can convert the join to an SMB. Otherwise retain the bucket map join as it
+   * is still more efficient than a regular join.
+   */
+  private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+      int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+
+    ReduceSinkOperator bigTableRS =
+        (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+    int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits()
+            .getNumBuckets();
+
+    // the sort and bucket cols have to match on both sides for this
+    // transformation of the join operation
+    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      if (!(parentOp instanceof ReduceSinkOperator)) {
+        // could be mux/demux operators. Currently not supported
+        LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time.");
+        return false;
+      }
+      ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp;
+      if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp
+          .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) {
+        LOG.info("We cannot convert to SMB because the sort column names do not match.");
+        return false;
+      }
+
+      if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp
+          .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx)
+          == false) {
+        LOG.info("We cannot convert to SMB because bucket column names do not match.");
+        return false;
+      }
+    }
+
+    boolean isSubQuery = false;
+    if (numBuckets < 0) {
+      isSubQuery = true;
+      numBuckets = bigTableRS.getConf().getNumReducers();
+    }
+    tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+    tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+    LOG.info("We can convert the join to an SMB join.");
+    return true;
+  }
+
   private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
     int numBuckets = currentOp.getOpTraits().getNumBuckets();
     for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
@@ -193,15 +403,13 @@ public class ConvertJoinMapJoin implemen
   }
 
   /*
-   *  We perform the following checks to see if we can convert to a bucket map join
-   *  1. If the parent reduce sink of the big table side has the same emit key cols as 
-   *  its parent, we can create a bucket map join eliminating the reduce sink.
-   *  2. If we have the table information, we can check the same way as in Mapreduce to 
-   *  determine if we can perform a Bucket Map Join.
+   * If the parent reduce sink of the big table side has the same emit key cols
+   * as its parent, we can create a bucket map join eliminating the reduce sink.
    */
-  private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, 
-      OptimizeTezProcContext context, int bigTablePosition, 
-      TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+  private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
+      OptimizeTezProcContext context, int bigTablePosition,
+      TezBucketJoinProcCtx tezBucketJoinProcCtx)
+  throws SemanticException {
     // bail on mux-operator because mux operator masks the emit keys of the
     // constituent reduce sinks
     if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) {
@@ -211,14 +419,41 @@ public class ConvertJoinMapJoin implemen
     }
 
     ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+    List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames();
+    Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0);
+    List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames();
+    int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
+    // all keys matched.
+    if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(),
+        tezBucketJoinProcCtx) == false) {
+      LOG.info("No info available to check for bucket map join. Cannot convert");
+      return false;
+    }
+
     /*
      * this is the case when the big table is a sub-query and is probably
-     * already bucketed by the join column in say a group by operation 
+     * already bucketed by the join column in say a group by operation
      */
-    List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames();
-    if ((colNames != null) && (colNames.isEmpty() == false)) {
-      Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0);
-      for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) {
+    boolean isSubQuery = false;
+    if (numBuckets < 0) {
+      isSubQuery = true;
+      numBuckets = rs.getConf().getNumReducers();
+    }
+    tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+    tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+    return true;
+  }
+
+  private boolean checkColEquality(List<List<String>> grandParentColNames,
+      List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap,
+      TezBucketJoinProcCtx tezBucketJoinProcCtx) {
+
+    if ((grandParentColNames == null) || (parentColNames == null)) {
+      return false;
+    }
+
+    if ((parentColNames != null) && (parentColNames.isEmpty() == false)) {
+      for (List<String> listBucketCols : grandParentColNames) {
         // can happen if this operator does not carry forward the previous bucketing columns
         // for e.g. another join operator which does not carry one of the sides' key columns
         if (listBucketCols.isEmpty()) {
@@ -226,9 +461,9 @@ public class ConvertJoinMapJoin implemen
         }
         int colCount = 0;
         // parent op is guaranteed to have a single list because it is a reduce sink
-        for (String colName : rs.getOpTraits().getBucketColNames().get(0)) {
+        for (String colName : parentColNames.get(0)) {
           // all columns need to be at least a subset of the parentOfParent's bucket cols
-          ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName);
+          ExprNodeDesc exprNodeDesc = colExprMap.get(colName);
           if (exprNodeDesc instanceof ExprNodeColumnDesc) {
             if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) {
               colCount++;
@@ -236,32 +471,21 @@ public class ConvertJoinMapJoin implemen
               break;
             }
           }
-          
-          if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) {
-            // all keys matched.
-            int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
-            boolean isSubQuery = false;
-            if (numBuckets < 0) {
-              isSubQuery = true;
-              numBuckets = rs.getConf().getNumReducers();
-            }
-            tezBucketJoinProcCtx.setNumBuckets(numBuckets);
-            tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+
+          if (colCount == parentColNames.get(0).size()) {
             return true;
           }
         }
       }
       return false;
     }
-
-    LOG.info("No info available to check for bucket map join. Cannot convert");
     return false;
   }
 
-  public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, 
+  public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
       int buckets) {
-    Set<Integer> bigTableCandidateSet = MapJoinProcessor.
-      getBigTableCandidates(joinOp.getConf().getConds());
+    Set<Integer> bigTableCandidateSet =
+        MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
 
     long maxSize = context.conf.getLongVar(
         HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
@@ -287,7 +511,7 @@ public class ConvertJoinMapJoin implemen
       long inputSize = currInputStat.getDataSize();
       if ((bigInputStat == null) ||
           ((bigInputStat != null) &&
-           (inputSize > bigInputStat.getDataSize()))) {
+          (inputSize > bigInputStat.getDataSize()))) {
 
         if (bigTableFound) {
           // cannot convert to map join; we've already chosen a big table
@@ -347,9 +571,9 @@ public class ConvertJoinMapJoin implemen
    * for tez.
    */
 
-  public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
+  public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
       int bigTablePosition) throws SemanticException {
-    // bail on mux operator because currently the mux operator masks the emit keys 
+    // bail on mux operator because currently the mux operator masks the emit keys
     // of the constituent reduce sinks.
     for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
       if (parentOp instanceof MuxOperator) {
@@ -359,12 +583,12 @@ public class ConvertJoinMapJoin implemen
 
     //can safely convert the join to a map join.
     ParseContext parseContext = context.parseContext;
-    MapJoinOperator mapJoinOp = MapJoinProcessor.
-      convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
-          joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+    MapJoinOperator mapJoinOp =
+        MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
+            parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
 
-    Operator<? extends OperatorDesc> parentBigTableOp
-      = mapJoinOp.getParentOperators().get(bigTablePosition);
+    Operator<? extends OperatorDesc> parentBigTableOp =
+        mapJoinOp.getParentOperators().get(bigTablePosition);
     if (parentBigTableOp instanceof ReduceSinkOperator) {
       for (Operator<?> p : parentBigTableOp.getParentOperators()) {
         // we might have generated a dynamic partition operator chain. Since
@@ -380,11 +604,10 @@ public class ConvertJoinMapJoin implemen
         }
       }
       mapJoinOp.getParentOperators().remove(bigTablePosition);
-      if (!(mapJoinOp.getParentOperators().contains(
-              parentBigTableOp.getParentOperators().get(0)))) {
+      if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
         mapJoinOp.getParentOperators().add(bigTablePosition,
             parentBigTableOp.getParentOperators().get(0));
-              }
+      }
       parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
       for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {
         if (!(op.getChildOperators().contains(mapJoinOp))) {
@@ -397,15 +620,31 @@ public class ConvertJoinMapJoin implemen
     return mapJoinOp;
   }
 
-  private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
-    if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
-      return true;
-    }
-    for (Operator<?> c : op.getChildOperators()) {
-      if (hasDynamicPartitionBroadcast(c)) {
-        return true;
+  private boolean hasDynamicPartitionBroadcast(Operator<?> parent) {
+    boolean hasDynamicPartitionPruning = false;
+
+    for (Operator<?> op: parent.getChildOperators()) {
+      while (op != null) {
+        if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+          // found dynamic partition pruning operator
+          hasDynamicPartitionPruning = true;
+          break;
+        }
+      
+        if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
+          // crossing reduce sink or file sink means the pruning isn't for this parent.
+          break;
+        }
+
+        if (op.getChildOperators().size() != 1) {
+          // dynamic partition pruning pipeline doesn't have multiple children
+          break;
+        }
+
+        op = op.getChildOperators().get(0);
       }
     }
-    return false;
+
+    return hasDynamicPartitionPruning;
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Tue Oct 14 19:06:45 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import com.google.common.collect.Interner;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -39,8 +40,6 @@ import org.apache.hadoop.hive.ql.exec.No
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator;
-import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -99,7 +98,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 
 import java.io.Serializable;
@@ -578,8 +576,6 @@ public final class GenMapRedUtils {
     //This read entity is a direct read entity and not an indirect read (that is when
     // this is being read because it is a dependency of a view).
     boolean isDirectRead = (parentViewInfo == null);
-    PlanUtils.addInput(inputs,
-        new ReadEntity(parseCtx.getTopToTable().get(topOp), parentViewInfo, isDirectRead));
 
     for (Partition part : parts) {
       if (part.getTable().isPartitioned()) {
@@ -873,6 +869,30 @@ public final class GenMapRedUtils {
     }
   }
 
+  public static void internTableDesc(Task<?> task, Interner<TableDesc> interner) {
+
+    if (task instanceof ConditionalTask) {
+      for (Task tsk : ((ConditionalTask) task).getListTasks()) {
+        internTableDesc(tsk, interner);
+      }
+    } else if (task instanceof ExecDriver) {
+      MapredWork work = (MapredWork) task.getWork();
+      work.getMapWork().internTable(interner);
+    } else if (task != null && (task.getWork() instanceof TezWork)) {
+      TezWork work = (TezWork)task.getWork();
+      for (BaseWork w : work.getAllWorkUnsorted()) {
+        if (w instanceof MapWork) {
+          ((MapWork)w).internTable(interner);
+        }
+      }
+    }
+    if (task.getNumChild() > 0) {
+      for (Task childTask : task.getChildTasks()) {
+        internTableDesc(childTask, interner);
+      }
+    }
+  }
+
   /**
    * create a new plan and return.
    *
@@ -1485,7 +1505,7 @@ public final class GenMapRedUtils {
    *
    * @param fsInputDesc
    * @param finalName
-   * @param inputFormatClass 
+   * @param inputFormatClass
    * @return MergeWork if table is stored as RCFile or ORCFile,
    *         null otherwise
    */
@@ -1689,7 +1709,7 @@ public final class GenMapRedUtils {
           // There are separate configuration parameters to control whether to
           // merge for a map-only job
           // or for a map-reduce job
-          if (currTask.getWork() instanceof MapredWork) {  
+          if (currTask.getWork() instanceof MapredWork) {
             ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork();
             boolean mergeMapOnly =
               hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
@@ -1788,7 +1808,7 @@ public final class GenMapRedUtils {
     return Collections.emptyList();
   }
 
-  public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey) 
+  public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
     throws SemanticException {
     List<Path> inputPaths = new ArrayList<Path>();
     switch (parseInfo.getTableSpec().specType) {
@@ -1825,6 +1845,7 @@ public final class GenMapRedUtils {
   public static Set<Operator<?>> findTopOps(Operator<?> startOp, final Class<?> clazz) {
     final Set<Operator<?>> operators = new LinkedHashSet<Operator<?>>();
     OperatorUtils.iterateParents(startOp, new NodeUtils.Function<Operator<?>>() {
+      @Override
       public void apply(Operator<?> argument) {
         if (argument.getNumParent() == 0 && (clazz == null || clazz.isInstance(argument))) {
           operators.add(argument);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Tue Oct 14 19:06:45 2014
@@ -332,18 +332,26 @@ public class GroupByOptimizer implements
               continue;
             }
 
-            ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
-            if (selectColList instanceof ExprNodeColumnDesc) {
+            ExprNodeDesc selectCol = selectDesc.getColList().get(pos);
+            if (selectCol instanceof ExprNodeColumnDesc) {
               String newValue =
-                  tableColsMapping.get(((ExprNodeColumnDesc) selectColList).getColumn());
+                  tableColsMapping.get(((ExprNodeColumnDesc) selectCol).getColumn());
               tableColsMapping.put(outputColumnName, newValue);
             }
             else {
               tableColsMapping.remove(outputColumnName);
-              if ((selectColList instanceof ExprNodeConstantDesc) ||
-                  (selectColList instanceof ExprNodeNullDesc)) {
+              if (selectCol instanceof ExprNodeNullDesc) {
                 newConstantCols.add(outputColumnName);
               }
+              if (selectCol instanceof ExprNodeConstantDesc) {
+                // Lets see if this constant was folded because of optimization.
+                String origCol = ((ExprNodeConstantDesc) selectCol).getFoldedFromCol();
+                if (origCol != null) {
+                  tableColsMapping.put(outputColumnName, origCol);
+                } else {
+                  newConstantCols.add(outputColumnName);
+                }
+              }
             }
           }
 
@@ -351,7 +359,6 @@ public class GroupByOptimizer implements
         }
       }
 
-      boolean sortGroupBy = true;
       // compute groupby columns from groupby keys
       List<String> groupByCols = new ArrayList<String>();
       // If the group by expression is anything other than a list of columns,

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Tue Oct 14 19:06:45 2014
@@ -389,157 +389,8 @@ public class MapJoinProcessor implements
       JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
       throws SemanticException {
 
-    JoinDesc desc = op.getConf();
-    JoinCondDesc[] condns = desc.getConds();
-    Byte[] tagOrder = desc.getTagOrder();
-
-    // outer join cannot be performed on a table which is being cached
-    if (!noCheckOuterJoin) {
-      if (checkMapJoin(mapJoinPos, condns) < 0) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
-      }
-    }
-
-    // Walk over all the sources (which are guaranteed to be reduce sink
-    // operators).
-    // The join outputs a concatenation of all the inputs.
-    QBJoinTree leftSrc = joinTree.getJoinSrc();
-    List<ReduceSinkOperator> oldReduceSinkParentOps =
-        new ArrayList<ReduceSinkOperator>(op.getNumParent());
-    if (leftSrc != null) {
-      // assert mapJoinPos == 0;
-      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
-      assert parentOp.getParentOperators().size() == 1;
-      oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
-    }
-
-
-    byte pos = 0;
-    for (String src : joinTree.getBaseSrc()) {
-      if (src != null) {
-        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
-        assert parentOp.getParentOperators().size() == 1;
-        oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
-      }
-      pos++;
-    }
-
-    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
-    List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
-    Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
-    Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
-      byte tag = entry.getKey();
-      Operator<?> terminal = oldReduceSinkParentOps.get(tag);
-
-      List<ExprNodeDesc> values = entry.getValue();
-      List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
-      newValueExprs.put(tag, newValues);
-      for (int i = 0; i < schema.size(); i++) {
-        ColumnInfo column = schema.get(i);
-        if (column == null) {
-          continue;
-        }
-        ExprNodeDesc expr = colExprMap.get(column.getInternalName());
-        int index = ExprNodeDescUtils.indexOf(expr, values);
-        if (index >= 0) {
-          colExprMap.put(column.getInternalName(), newValues.get(index));
-          schema.set(i, null);
-        }
-      }
-    }
-
-    // rewrite value index for mapjoin
-    Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
-
-    // get the join keys from old parent ReduceSink operators
-    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-
-    // construct valueTableDescs and valueFilteredTableDescs
-    List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
-    List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
-    int[][] filterMap = desc.getFilterMap();
-    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
-      ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
-      List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
-      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
-      if (pos != mapJoinPos) {
-        // remove values in key exprs for value table schema
-        // value expression for hashsink will be modified in LocalMapJoinProcessor
-        int[] valueIndex = new int[valueCols.size()];
-        List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
-        for (int i = 0; i < valueIndex.length; i++) {
-          ExprNodeDesc expr = valueCols.get(i);
-          int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
-          if (kindex >= 0) {
-            valueIndex[i] = kindex;
-          } else {
-            valueIndex[i] = -valueColsInValueExpr.size() - 1;
-            valueColsInValueExpr.add(expr);
-          }
-        }
-        if (needValueIndex(valueIndex)) {
-          valueIndices.put(pos, valueIndex);
-        }
-        valueCols = valueColsInValueExpr;
-      }
-      // deep copy expr node desc
-      List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
-      if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
-        ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
-            .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false);
-        valueFilteredCols.add(isFilterDesc);
-      }
-
-      TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
-          .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
-      TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
-          .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
-
-      valueTableDescs.add(valueTableDesc);
-      valueFilteredTableDescs.add(valueFilteredTableDesc);
-
-      keyExprMap.put(pos, keyCols);
-    }
-
-    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
-    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
-      byte srcTag = entry.getKey();
-      List<ExprNodeDesc> filter = entry.getValue();
-
-      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
-      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
-    }
-    desc.setFilters(filters = newFilters);
-
-    // create dumpfile prefix needed to create descriptor
-    String dumpFilePrefix = "";
-    if( joinTree.getMapAliases() != null ) {
-      for(String mapAlias : joinTree.getMapAliases()) {
-        dumpFilePrefix = dumpFilePrefix + mapAlias;
-      }
-      dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix();
-    } else {
-      dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
-    }
-
-    List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos);
-
-    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
-    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
-        PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
-    JoinCondDesc[] joinCondns = op.getConf().getConds();
-    MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
-        valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns,
-        filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
-    mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
-    mapJoinDescriptor.setTagOrder(tagOrder);
-    mapJoinDescriptor.setNullSafes(desc.getNullSafes());
-    mapJoinDescriptor.setFilterMap(desc.getFilterMap());
-    if (!valueIndices.isEmpty()) {
-      mapJoinDescriptor.setValueIndices(valueIndices);
-    }
+    MapJoinDesc mapJoinDescriptor =
+        getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin);
 
     // reduce sink row resolver used to generate map join op
     RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -551,6 +402,7 @@ public class MapJoinProcessor implements
     opParseCtxMap.put(mapJoinOp, ctx);
 
     mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
+    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
     mapJoinOp.setColumnExprMap(colExprMap);
 
     List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
@@ -1176,4 +1028,168 @@ public class MapJoinProcessor implements
     }
 
   }
+
+  public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
+      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+    JoinDesc desc = op.getConf();
+    JoinCondDesc[] condns = desc.getConds();
+    Byte[] tagOrder = desc.getTagOrder();
+
+    // outer join cannot be performed on a table which is being cached
+    if (!noCheckOuterJoin) {
+      if (checkMapJoin(mapJoinPos, condns) < 0) {
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      }
+    }
+
+    // Walk over all the sources (which are guaranteed to be reduce sink
+    // operators).
+    // The join outputs a concatenation of all the inputs.
+    QBJoinTree leftSrc = joinTree.getJoinSrc();
+    List<ReduceSinkOperator> oldReduceSinkParentOps =
+        new ArrayList<ReduceSinkOperator>(op.getNumParent());
+    if (leftSrc != null) {
+      // assert mapJoinPos == 0;
+      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+      assert parentOp.getParentOperators().size() == 1;
+      oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+    }
+
+    byte pos = 0;
+    for (String src : joinTree.getBaseSrc()) {
+      if (src != null) {
+        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+        assert parentOp.getParentOperators().size() == 1;
+        oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+      }
+      pos++;
+    }
+
+    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
+    List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
+    Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
+    Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
+      byte tag = entry.getKey();
+      Operator<?> terminal = oldReduceSinkParentOps.get(tag);
+
+      List<ExprNodeDesc> values = entry.getValue();
+      List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
+      newValueExprs.put(tag, newValues);
+      for (int i = 0; i < schema.size(); i++) {
+        ColumnInfo column = schema.get(i);
+        if (column == null) {
+          continue;
+        }
+        ExprNodeDesc expr = colExprMap.get(column.getInternalName());
+        int index = ExprNodeDescUtils.indexOf(expr, values);
+        if (index >= 0) {
+          colExprMap.put(column.getInternalName(), newValues.get(index));
+          schema.set(i, null);
+        }
+      }
+    }
+
+    // rewrite value index for mapjoin
+    Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
+
+    // get the join keys from old parent ReduceSink operators
+    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+    // construct valueTableDescs and valueFilteredTableDescs
+    List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
+    List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
+    int[][] filterMap = desc.getFilterMap();
+    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+      ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
+      List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
+      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+      if (pos != mapJoinPos) {
+        // remove values in key exprs for value table schema
+        // value expression for hashsink will be modified in
+        // LocalMapJoinProcessor
+        int[] valueIndex = new int[valueCols.size()];
+        List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
+        for (int i = 0; i < valueIndex.length; i++) {
+          ExprNodeDesc expr = valueCols.get(i);
+          int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
+          if (kindex >= 0) {
+            valueIndex[i] = kindex;
+          } else {
+            valueIndex[i] = -valueColsInValueExpr.size() - 1;
+            valueColsInValueExpr.add(expr);
+          }
+        }
+        if (needValueIndex(valueIndex)) {
+          valueIndices.put(pos, valueIndex);
+        }
+        valueCols = valueColsInValueExpr;
+      }
+      // deep copy expr node desc
+      List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
+      if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
+        ExprNodeColumnDesc isFilterDesc =
+            new ExprNodeColumnDesc(
+                TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter",
+                "filter", false);
+        valueFilteredCols.add(isFilterDesc);
+      }
+
+      TableDesc valueTableDesc =
+          PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols,
+              "mapjoinvalue"));
+      TableDesc valueFilteredTableDesc =
+          PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+              valueFilteredCols, "mapjoinvalue"));
+
+      valueTableDescs.add(valueTableDesc);
+      valueFilteredTableDescs.add(valueFilteredTableDesc);
+
+      keyExprMap.put(pos, keyCols);
+    }
+
+    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+      byte srcTag = entry.getKey();
+      List<ExprNodeDesc> filter = entry.getValue();
+
+      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+    }
+    desc.setFilters(filters = newFilters);
+
+    // create dumpfile prefix needed to create descriptor
+    String dumpFilePrefix = "";
+    if (joinTree.getMapAliases() != null) {
+      for (String mapAlias : joinTree.getMapAliases()) {
+        dumpFilePrefix = dumpFilePrefix + mapAlias;
+      }
+      dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+    } else {
+      dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+    }
+
+    List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos);
+
+    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+    TableDesc keyTableDesc =
+        PlanUtils.getMapJoinKeyTableDesc(hconf,
+            PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+    JoinCondDesc[] joinCondns = op.getConf().getConds();
+    MapJoinDesc mapJoinDescriptor =
+        new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
+            valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op
+                .getConf().getNoOuterJoin(), dumpFilePrefix);
+    mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
+    mapJoinDescriptor.setTagOrder(tagOrder);
+    mapJoinDescriptor.setNullSafes(desc.getNullSafes());
+    mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+    if (!valueIndices.isEmpty()) {
+      mapJoinDescriptor.setValueIndices(valueIndices);
+    }
+
+    return mapJoinDescriptor;
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Tue Oct 14 19:06:45 2014
@@ -51,7 +51,12 @@ public class Optimizer {
    * @param hiveConf
    */
   public void initialize(HiveConf hiveConf) {
+
+    boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
+    boolean bucketMapJoinOptimizer = false;
+
     transformations = new ArrayList<Transform>();
+
     // Add the transformation that computes the lineage information.
     transformations.add(new Generator());
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
@@ -59,19 +64,23 @@ public class Optimizer {
       transformations.add(new SyntheticJoinPredicate());
       transformations.add(new PredicatePushDown());
       transformations.add(new PartitionPruner());
+    }
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
+        transformations.add(new ConstantPropagate());
+    }
+
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
       transformations.add(new PartitionConditionRemover());
       if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) {
         /* Add list bucketing pruner. */
         transformations.add(new ListBucketingPruner());
       }
     }
+
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGROUPBY) ||
         HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT)) {
       transformations.add(new GroupByOptimizer());
     }
-    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
-      transformations.add(new ConstantPropagate());
-    }
     transformations.add(new ColumnPruner());
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
       transformations.add(new SkewJoinOptimizer());
@@ -81,15 +90,16 @@ public class Optimizer {
     }
     transformations.add(new SamplePruner());
     transformations.add(new MapJoinProcessor());
-    boolean bucketMapJoinOptimizer = false;
-    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
+
+    if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) {
       transformations.add(new BucketMapJoinOptimizer());
       bucketMapJoinOptimizer = true;
     }
 
     // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
     // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
-    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
+    if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN))
+        && !isTezExecEngine) {
       if (!bucketMapJoinOptimizer) {
         // No need to add BucketMapJoinOptimizer twice
         transformations.add(new BucketMapJoinOptimizer());
@@ -119,7 +129,7 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) &&
-        !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+        !isTezExecEngine) {
       transformations.add(new CorrelationOptimizer());
     }
     if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
@@ -128,8 +138,7 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
       transformations.add(new StatsOptimizer());
     }
-    if (pctx.getContext().getExplain()
-        && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+    if (pctx.getContext().getExplain() && !isTezExecEngine) {
       transformations.add(new AnnotateWithStatistics());
       transformations.add(new AnnotateWithOpTraits());
     }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Tue Oct 14 19:06:45 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 
 public class ReduceSinkMapJoinProc implements NodeProcessor {
@@ -183,7 +184,10 @@ public class ReduceSinkMapJoinProc imple
         TezWork tezWork = context.currentTask.getWork();
         LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
         tezWork.connect(parentWork, myWork, edgeProp);
-        
+        if (edgeType == EdgeType.CUSTOM_EDGE) {
+          tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES);
+        }
+
         ReduceSinkOperator r = null;
         if (parentRS.getConf().getOutputName() != null) {
           LOG.debug("Cloning reduce sink for multi-child broadcast edge");

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Tue Oct 14 19:06:45 2014
@@ -44,9 +44,9 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -55,13 +55,25 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.QB;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -73,9 +85,11 @@ public class SimpleFetchOptimizer implem
 
   private final Log LOG = LogFactory.getLog(SimpleFetchOptimizer.class.getName());
 
+  @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
     Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
-    if (pctx.getQB().isSimpleSelectQuery() && topOps.size() == 1) {
+    if (pctx.getQB().getIsQuery() && !pctx.getQB().getParseInfo().isAnalyzeCommand()
+        && topOps.size() == 1) {
       // no join, no groupby, no distinct, no lateral view, no subq,
       // no CTAS or insert, not analyze command, and single sourced.
       String alias = (String) pctx.getTopOps().keySet().toArray()[0];
@@ -144,7 +158,7 @@ public class SimpleFetchOptimizer implem
   // for non-aggressive mode (minimal)
   // 1. samping is not allowed
   // 2. for partitioned table, all filters should be targeted to partition column
-  // 3. SelectOperator should be select star
+  // 3. SelectOperator should use only simple cast/column access
   private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias,
       TableScanOperator ts) throws HiveException {
     SplitSample splitSample = pctx.getNameToSplitSample().get(alias);
@@ -156,7 +170,7 @@ public class SimpleFetchOptimizer implem
       return null;
     }
 
-    Table table = qb.getMetaData().getAliasToTable().get(alias);
+    Table table = pctx.getTopToTable().get(ts);
     if (table == null) {
       return null;
     }
@@ -181,34 +195,71 @@ public class SimpleFetchOptimizer implem
     return null;
   }
 
-  private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggresive,
+  private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggressive,
       boolean bypassFilter) {
     if (ts.getChildOperators().size() != 1) {
       return null;
     }
     Operator<?> op = ts.getChildOperators().get(0);
     for (; ; op = op.getChildOperators().get(0)) {
-      if (aggresive) {
-        if (!(op instanceof LimitOperator || op instanceof FilterOperator
-            || op instanceof SelectOperator)) {
+      if (op instanceof SelectOperator) {
+        if (!aggressive) {
+          if (!checkExpressions((SelectOperator) op)) {
+            break;
+          }
+        }
+        continue;
+      }
+
+      if (aggressive) {
+        if (!(op instanceof LimitOperator || op instanceof FilterOperator)) {
           break;
         }
-      } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)
-          || (op instanceof SelectOperator && ((SelectOperator) op).getConf().isSelectStar()))) {
+      } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) {
         break;
       }
+
       if (op.getChildOperators() == null || op.getChildOperators().size() != 1) {
         return null;
       }
     }
+
     if (op instanceof FileSinkOperator) {
       fetch.scanOp = ts;
       fetch.fileSink = op;
       return fetch;
     }
+
     return null;
   }
 
+  private boolean checkExpressions(SelectOperator op) {
+    SelectDesc desc = op.getConf();
+    for (ExprNodeDesc expr : desc.getColList()) {
+      if (!checkExpression(expr)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean checkExpression(ExprNodeDesc expr) {
+    if (expr instanceof ExprNodeConstantDesc || expr instanceof ExprNodeColumnDesc) {
+      return true;
+    }
+
+    if (expr instanceof ExprNodeGenericFuncDesc) {
+      GenericUDF udf = ((ExprNodeGenericFuncDesc) expr).getGenericUDF();
+      if (udf instanceof GenericUDFToBinary || udf instanceof GenericUDFToChar
+          || udf instanceof GenericUDFToDate || udf instanceof GenericUDFToDecimal
+          || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp
+          || udf instanceof GenericUDFToVarchar) {
+        return expr.getChildren().size() == 1 && checkExpression(expr.getChildren().get(0));
+      }
+    }
+    return false;
+  }
+
   private class FetchData {
 
     private final ReadEntity parent;
@@ -240,7 +291,7 @@ public class SimpleFetchOptimizer implem
       this.splitSample = splitSample;
       this.onlyPruningFilter = bypassFilter;
     }
-    
+
     /*
      * all filters were executed during partition pruning
      */
@@ -251,7 +302,7 @@ public class SimpleFetchOptimizer implem
     private FetchWork convertToWork() throws HiveException {
       inputs.clear();
       if (!table.isPartitioned()) {
-        inputs.add(new ReadEntity(table, parent));
+        inputs.add(new ReadEntity(table, parent, parent == null));
         FetchWork work = new FetchWork(table.getPath(), Utilities.getTableDesc(table));
         PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc());
         work.setSplitSample(splitSample);
@@ -261,12 +312,12 @@ public class SimpleFetchOptimizer implem
       List<PartitionDesc> partP = new ArrayList<PartitionDesc>();
 
       for (Partition partition : partsList.getNotDeniedPartns()) {
-        inputs.add(new ReadEntity(partition, parent));
+        inputs.add(new ReadEntity(partition, parent, parent == null));
         listP.add(partition.getDataLocation());
         partP.add(Utilities.getPartitionDesc(partition));
       }
       Table sourceTable = partsList.getSourceTable();
-      inputs.add(new ReadEntity(sourceTable, parent));
+      inputs.add(new ReadEntity(sourceTable, parent, parent == null));
       TableDesc table = Utilities.getTableDesc(sourceTable);
       FetchWork work = new FetchWork(listP, partP, table);
       if (!work.getPartDesc().isEmpty()) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Tue Oct 14 19:06:45 2014
@@ -71,7 +71,6 @@ import org.apache.hadoop.hive.ql.plan.Pl
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.IntWritable;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -85,6 +84,7 @@ import com.google.common.collect.Maps;
  */
 public class SortedDynPartitionOptimizer implements Transform {
 
+  private static final String BUCKET_NUMBER_COL_NAME = "_bucket_number";
   @Override
   public ParseContext transform(ParseContext pCtx) throws SemanticException {
 
@@ -216,6 +216,13 @@ public class SortedDynPartitionOptimizer
       ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
           newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
 
+      if (!bucketColumns.isEmpty()) {
+        String tableAlias = outRR.getColumnInfos().get(0).getTabAlias();
+        ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo,
+            tableAlias, true, true);
+        outRR.put(tableAlias, BUCKET_NUMBER_COL_NAME, ci);
+      }
+
       // Create ReduceSink operator
       ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
           OperatorFactory.getAndMakeChild(rsConf, new RowSchema(outRR.getColumnInfos()), fsParent),
@@ -380,8 +387,11 @@ public class SortedDynPartitionOptimizer
       // corresponding with bucket number and hence their OIs
       for (Integer idx : keyColsPosInVal) {
         if (idx < 0) {
-          newKeyCols.add(new ExprNodeConstantDesc(TypeInfoFactory
-              .getPrimitiveTypeInfoFromPrimitiveWritable(IntWritable.class), -1));
+          // add bucket number column to both key and value
+          ExprNodeConstantDesc encd = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo,
+              BUCKET_NUMBER_COL_NAME);
+          newKeyCols.add(encd);
+          newValueCols.add(encd);
         } else {
           newKeyCols.add(newValueCols.get(idx).clone());
         }
@@ -395,7 +405,8 @@ public class SortedDynPartitionOptimizer
       // should honor the ordering of records provided by ORDER BY in SELECT statement
       ReduceSinkOperator parentRSOp = OperatorUtils.findSingleOperatorUpstream(parent,
           ReduceSinkOperator.class);
-      if (parentRSOp != null) {
+      boolean isOrderBy = parseCtx.getQB().getParseInfo().getDestToOrderBy().size() > 0;
+      if (parentRSOp != null && isOrderBy) {
         String parentRSOpOrder = parentRSOp.getConf().getOrder();
         if (parentRSOpOrder != null && !parentRSOpOrder.isEmpty() && sortPositions.isEmpty()) {
           newKeyCols.addAll(parentRSOp.getConf().getKeyCols());
@@ -417,6 +428,9 @@ public class SortedDynPartitionOptimizer
       List<String> outCols = Utilities.getInternalColumnNamesFromSignature(parent.getSchema()
           .getSignature());
       ArrayList<String> outValColNames = Lists.newArrayList(outCols);
+      if (!bucketColumns.isEmpty()) {
+        outValColNames.add(BUCKET_NUMBER_COL_NAME);
+      }
       List<FieldSchema> valFields = PlanUtils.getFieldSchemasFromColumnList(newValueCols,
           outValColNames, 0, "");
       TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java Tue Oct 14 19:06:45 2014
@@ -193,11 +193,12 @@ public class StatsOptimizer implements T
         }
         SelectOperator selOp = (SelectOperator)tsOp.getChildren().get(0);
         for(ExprNodeDesc desc : selOp.getConf().getColList()) {
-          if (!(desc instanceof ExprNodeColumnDesc)) {
+          if (!((desc instanceof ExprNodeColumnDesc) || (desc instanceof ExprNodeConstantDesc))) {
             // Probably an expression, cant handle that
             return null;
           }
         }
+        Map<String, ExprNodeDesc> exprMap = selOp.getColumnExprMap();
         // Since we have done an exact match on TS-SEL-GBY-RS-GBY-SEL-FS
         // we need not to do any instanceof checks for following.
         GroupByOperator gbyOp = (GroupByOperator)selOp.getChildren().get(0);
@@ -215,6 +216,12 @@ public class StatsOptimizer implements T
           return null;
 
         }
+        for(ExprNodeDesc desc : selOp.getConf().getColList()) {
+          if (!(desc instanceof ExprNodeColumnDesc)) {
+            // Probably an expression, cant handle that
+            return null;
+          }
+        }
         FileSinkOperator fsOp = (FileSinkOperator)(selOp.getChildren().get(0));
         if (fsOp.getChildOperators() != null && fsOp.getChildOperators().size() > 0) {
           // looks like a subq plan.
@@ -236,22 +243,28 @@ public class StatsOptimizer implements T
           GenericUDAFResolver udaf =
               FunctionRegistry.getGenericUDAFResolver(aggr.getGenericUDAFName());
           if (udaf instanceof GenericUDAFSum) {
-            if(!(aggr.getParameters().get(0) instanceof ExprNodeConstantDesc)){
+            ExprNodeDesc desc = aggr.getParameters().get(0);
+            String constant;
+            if (desc instanceof ExprNodeConstantDesc) {
+              constant = ((ExprNodeConstantDesc) desc).getValue().toString();
+            } else if (desc instanceof ExprNodeColumnDesc && exprMap.get(((ExprNodeColumnDesc)desc).getColumn()) instanceof ExprNodeConstantDesc) {
+              constant = ((ExprNodeConstantDesc)exprMap.get(((ExprNodeColumnDesc)desc).getColumn())).getValue().toString();
+            } else {
               return null;
             }
             Long rowCnt = getRowCnt(pctx, tsOp, tbl);
             if(rowCnt == null) {
               return null;
             }
-            oneRow.add(HiveDecimal.create(((ExprNodeConstantDesc) aggr.getParameters().get(0))
-                .getValue().toString()).multiply(HiveDecimal.create(rowCnt)));
+            oneRow.add(HiveDecimal.create(constant).multiply(HiveDecimal.create(rowCnt)));
             ois.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                 PrimitiveCategory.DECIMAL));
           }
           else if (udaf instanceof GenericUDAFCount) {
             Long rowCnt = 0L;
-            if ((aggr.getParameters().isEmpty() || aggr.getParameters().get(0) instanceof
-                ExprNodeConstantDesc)) {
+            if (aggr.getParameters().isEmpty() || aggr.getParameters().get(0) instanceof
+                ExprNodeConstantDesc || ((aggr.getParameters().get(0) instanceof ExprNodeColumnDesc) &&
+                    exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn()) instanceof ExprNodeConstantDesc)) {
               // Its either count (*) or count(1) case
               rowCnt = getRowCnt(pctx, tsOp, tbl);
               if(rowCnt == null) {
@@ -259,12 +272,7 @@ public class StatsOptimizer implements T
               }
             } else {
               // Its count(col) case
-              if (!(aggr.getParameters().get(0) instanceof ExprNodeColumnDesc)) {
-                // this is weird, we got expr or something in there, bail out
-                Log.debug("Unexpected expression : " + aggr.getParameters().get(0));
-                return null;
-              }
-              ExprNodeColumnDesc desc = (ExprNodeColumnDesc)aggr.getParameters().get(0);
+              ExprNodeColumnDesc desc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn());
               String colName = desc.getColumn();
               StatType type = getType(desc.getTypeString());
               if(!tbl.isPartitioned()) {
@@ -330,7 +338,7 @@ public class StatsOptimizer implements T
             ois.add(PrimitiveObjectInspectorFactory.
                 getPrimitiveJavaObjectInspector(PrimitiveCategory.LONG));
           } else if (udaf instanceof GenericUDAFMax) {
-            ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)aggr.getParameters().get(0);
+            ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn());
             String colName = colDesc.getColumn();
             StatType type = getType(colDesc.getTypeString());
             if(!tbl.isPartitioned()) {
@@ -419,7 +427,7 @@ public class StatsOptimizer implements T
               }
             }
           }  else if (udaf instanceof GenericUDAFMin) {
-            ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)aggr.getParameters().get(0);
+            ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn());
             String colName = colDesc.getColumn();
             StatType type = getType(colDesc.getTypeString());
             if (!tbl.isPartitioned()) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Tue Oct 14 19:06:45 2014
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Stack;
 
+import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -104,7 +105,12 @@ public class OpTraitsRulesProcFactory {
 
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       listBucketCols.add(bucketCols);
-      OpTraits opTraits = new OpTraits(listBucketCols, -1);
+      int numBuckets = -1;
+      OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getOpTraits();
+      if (parentOpTraits != null) {
+        numBuckets = parentOpTraits.getNumBuckets();
+      }
+      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols);
       rs.setOpTraits(opTraits);
       return null;
     }
@@ -163,15 +169,21 @@ public class OpTraitsRulesProcFactory {
       } catch (HiveException e) {
         prunedPartList = null;
       }
-      boolean bucketMapJoinConvertible = checkBucketedTable(table, 
+      boolean isBucketed = checkBucketedTable(table,
           opTraitsCtx.getParseContext(), prunedPartList);
-      List<List<String>>bucketCols = new ArrayList<List<String>>();
+      List<List<String>> bucketColsList = new ArrayList<List<String>>();
+      List<List<String>> sortedColsList = new ArrayList<List<String>>();
       int numBuckets = -1;
-      if (bucketMapJoinConvertible) {
-        bucketCols.add(table.getBucketCols());
+      if (isBucketed) {
+        bucketColsList.add(table.getBucketCols());
         numBuckets = table.getNumBuckets();
+        List<String> sortCols = new ArrayList<String>();
+        for (Order colSortOrder : table.getSortCols()) {
+          sortCols.add(colSortOrder.getCol());
+        }
+        sortedColsList.add(sortCols);
       }
-      OpTraits opTraits = new OpTraits(bucketCols, numBuckets);
+      OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList);
       ts.setOpTraits(opTraits);
       return null;
     }
@@ -197,7 +209,7 @@ public class OpTraitsRulesProcFactory {
 
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       listBucketCols.add(gbyKeys);
-      OpTraits opTraits = new OpTraits(listBucketCols, -1);
+      OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols);
       gbyOp.setOpTraits(opTraits);
       return null;
     }
@@ -205,22 +217,17 @@ public class OpTraitsRulesProcFactory {
 
   public static class SelectRule implements NodeProcessor {
 
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      SelectOperator selOp = (SelectOperator)nd;
-      List<List<String>> parentBucketColNames = 
-          selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
-
+    public List<List<String>> getConvertedColNames(List<List<String>> parentColNames,
+        SelectOperator selOp) {
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       if (selOp.getColumnExprMap() != null) {
-        if (parentBucketColNames != null) {
-          for (List<String> colNames : parentBucketColNames) {
+        if (parentColNames != null) {
+          for (List<String> colNames : parentColNames) {
             List<String> bucketColNames = new ArrayList<String>();
             for (String colName : colNames) {
               for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) {
                 if (entry.getValue() instanceof ExprNodeColumnDesc) {
-                  if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) {
+                  if (((ExprNodeColumnDesc) (entry.getValue())).getColumn().equals(colName)) {
                     bucketColNames.add(entry.getKey());
                   }
                 }
@@ -231,11 +238,34 @@ public class OpTraitsRulesProcFactory {
         }
       }
 
+      return listBucketCols;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      SelectOperator selOp = (SelectOperator)nd;
+      List<List<String>> parentBucketColNames =
+          selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
+
+      List<List<String>> listBucketCols = null;
+      List<List<String>> listSortCols = null;
+      if (selOp.getColumnExprMap() != null) {
+        if (parentBucketColNames != null) {
+          listBucketCols = getConvertedColNames(parentBucketColNames, selOp);
+        }
+        List<List<String>> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits()
+            .getSortCols();
+        if (parentSortColNames != null) {
+          listSortCols = getConvertedColNames(parentSortColNames, selOp);
+        }
+      }
+
       int numBuckets = -1;
       if (selOp.getParentOperators().get(0).getOpTraits() != null) {
         numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets();
       }
-      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets);
+      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols);
       selOp.setOpTraits(opTraits);
       return null;
     }
@@ -248,6 +278,7 @@ public class OpTraitsRulesProcFactory {
         Object... nodeOutputs) throws SemanticException {
       JoinOperator joinOp = (JoinOperator)nd;
       List<List<String>> bucketColsList = new ArrayList<List<String>>();
+      List<List<String>> sortColsList = new ArrayList<List<String>>();
       byte pos = 0;
       for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
         if (!(parentOp instanceof ReduceSinkOperator)) {
@@ -259,26 +290,24 @@ public class OpTraitsRulesProcFactory {
           ReduceSinkRule rsRule = new ReduceSinkRule();
           rsRule.process(rsOp, stack, procCtx, nodeOutputs);
         }
-        bucketColsList.add(getOutputColNames(joinOp, rsOp, pos));
+        bucketColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getBucketColNames(), pos));
+        sortColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getSortCols(), pos));
         pos++;
       }
 
-      joinOp.setOpTraits(new OpTraits(bucketColsList, -1));
+      joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList));
       return null;
     }
 
-    private List<String> getOutputColNames(JoinOperator joinOp,
-        ReduceSinkOperator rs, byte pos) {
-      List<List<String>> parentBucketColNames =
-          rs.getOpTraits().getBucketColNames();
-
-      if (parentBucketColNames != null) {
+    private List<String> getOutputColNames(JoinOperator joinOp, List<List<String>> parentColNames,
+        byte pos) {
+      if (parentColNames != null) {
         List<String> bucketColNames = new ArrayList<String>();
 
         // guaranteed that there is only 1 list within this list because
         // a reduce sink always brings down the bucketing cols to a single list.
         // may not be true with correlation operators (mux-demux)
-        List<String> colNames = parentBucketColNames.get(0);
+        List<String> colNames = parentColNames.get(0);
         for (String colName : colNames) {
           for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) {
             if (exprNode instanceof ExprNodeColumnDesc) {
@@ -317,7 +346,7 @@ public class OpTraitsRulesProcFactory {
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      OpTraits opTraits = new OpTraits(null, -1);
+      OpTraits opTraits = new OpTraits(null, -1, null);
       @SuppressWarnings("unchecked")
       Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd;
       operator.setOpTraits(opTraits);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java Tue Oct 14 19:06:45 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -152,6 +154,11 @@ public class CrossProductCheck implement
 
   private void checkMapJoins(TezWork tzWrk) throws SemanticException {
     for(BaseWork wrk : tzWrk.getAllWork() ) {
+
+      if ( wrk instanceof MergeJoinWork ) {
+        wrk = ((MergeJoinWork)wrk).getMainWork();
+      }
+
       List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk);
       if ( !warnings.isEmpty() ) {
         for(String w : warnings) {
@@ -163,12 +170,17 @@ public class CrossProductCheck implement
 
   private void checkTezReducer(TezWork tzWrk) throws SemanticException {
     for(BaseWork wrk : tzWrk.getAllWork() ) {
-      if ( !(wrk instanceof ReduceWork) ) {
+
+      if ( wrk instanceof MergeJoinWork ) {
+        wrk = ((MergeJoinWork)wrk).getMainWork();
+      }
+
+      if ( !(wrk instanceof ReduceWork ) ) {
         continue;
       }
       ReduceWork rWork = (ReduceWork) wrk;
       Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer();
-      if ( reducer instanceof JoinOperator ) {
+      if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) {
         Map<Integer, ExtractReduceSinkInfo.Info> rsInfo =
             new HashMap<Integer, ExtractReduceSinkInfo.Info>();
         for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) {
@@ -185,7 +197,7 @@ public class CrossProductCheck implement
       return;
     }
     Operator<? extends OperatorDesc> reducer = rWrk.getReducer();
-    if ( reducer instanceof JoinOperator ) {
+    if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) {
       BaseWork prntWork = mrWrk.getMapWork();
       checkForCrossProduct(taskName, reducer,
           new ExtractReduceSinkInfo(null).analyze(prntWork));