You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/04/17 09:29:46 UTC

svn commit: r1468783 [6/16] - in /hive/branches/HIVE-4115: ./ beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/ beeline/src/test/org/apache/ beeline/src/test/org/apache/hive/ beeline/src/test/org/apache/hive/beeline/ beeline/src/test/org/...

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java Wed Apr 17 07:29:38 2013
@@ -18,15 +18,17 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.io.Serializable;
+import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ExtractOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -52,32 +54,54 @@ import org.apache.hadoop.hive.ql.parse.O
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST;
 
 /**
- * If two reducer sink operators share the same partition/sort columns, we
- * should merge them. This should happen after map join optimization because map
+ * If two reducer sink operators share the same partition/sort columns and order,
+ * they can be merged. This should happen after map join optimization because map
  * join optimization will remove reduce sink operators.
  */
 public class ReduceSinkDeDuplication implements Transform{
 
+  private static final String RS = ReduceSinkOperator.getOperatorName();
+  private static final String GBY = GroupByOperator.getOperatorName();
+  private static final String JOIN = JoinOperator.getOperatorName();
+
   protected ParseContext pGraphContext;
 
   @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
     pGraphContext = pctx;
 
- // generate pruned column list for all relevant operators
+    // generate pruned column list for all relevant operators
     ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);
 
+    boolean mergeJoins = !pctx.getConf().getBoolVar(HIVECONVERTJOIN) &&
+        !pctx.getConf().getBoolVar(HIVECONVERTJOINNOCONDITIONALTASK);
+
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp("R1",
-      ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
-      ReduceSinkDeduplicateProcFactory.getReducerReducerProc());
+    opRules.put(new RuleRegExp("R1", RS + "%.*%" + RS + "%"),
+        ReduceSinkDeduplicateProcFactory.getReducerReducerProc());
+    opRules.put(new RuleRegExp("R2", RS + "%" + GBY + "%.*%" + RS + "%"),
+        ReduceSinkDeduplicateProcFactory.getGroupbyReducerProc());
+    if (mergeJoins) {
+      opRules.put(new RuleRegExp("R3", JOIN + "%.*%" + RS + "%"),
+          ReduceSinkDeduplicateProcFactory.getJoinReducerProc());
+    }
+    // TODO RS+JOIN
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -92,23 +116,27 @@ public class ReduceSinkDeDuplication imp
     return pGraphContext;
   }
 
-  class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{
+  class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx {
+
     ParseContext pctx;
-    List<ReduceSinkOperator> rejectedRSList;
+    boolean trustScript;
+    // min reducer num for merged RS (to avoid query contains "order by" executed by one reducer)
+    int minReducer;
+    Set<Operator<?>> removedOps;
 
     public ReduceSinkDeduplicateProcCtx(ParseContext pctx) {
-      rejectedRSList = new ArrayList<ReduceSinkOperator>();
+      removedOps = new HashSet<Operator<?>>();
+      trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST);
+      minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER);
       this.pctx = pctx;
     }
 
-    public boolean contains (ReduceSinkOperator rsOp) {
-      return rejectedRSList.contains(rsOp);
+    public boolean contains(Operator<?> rsOp) {
+      return removedOps.contains(rsOp);
     }
 
-    public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) {
-      if (!rejectedRSList.contains(rsOp)) {
-        rejectedRSList.add(rsOp);
-      }
+    public boolean addRemovedOperator(Operator<?> rsOp) {
+      return removedOps.add(rsOp);
     }
 
     public ParseContext getPctx() {
@@ -120,355 +148,598 @@ public class ReduceSinkDeDuplication imp
     }
   }
 
-
   static class ReduceSinkDeduplicateProcFactory {
 
-
     public static NodeProcessor getReducerReducerProc() {
       return new ReducerReducerProc();
     }
 
+    public static NodeProcessor getGroupbyReducerProc() {
+      return new GroupbyReducerProc();
+    }
+
+    public static NodeProcessor getJoinReducerProc() {
+      return new JoinReducerProc();
+    }
+
     public static NodeProcessor getDefaultProc() {
       return new DefaultProc();
     }
+  }
 
-    /*
-     * do nothing.
-     */
-    static class DefaultProc implements NodeProcessor {
-      @Override
-      public Object process(Node nd, Stack<Node> stack,
-          NodeProcessorCtx procCtx, Object... nodeOutputs)
-          throws SemanticException {
-        return null;
-      }
+  /*
+   * do nothing.
+   */
+  static class DefaultProc implements NodeProcessor {
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      return null;
     }
+  }
 
-    static class ReducerReducerProc implements NodeProcessor {
-      @Override
-      public Object process(Node nd, Stack<Node> stack,
-          NodeProcessorCtx procCtx, Object... nodeOutputs)
-          throws SemanticException {
-        ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx;
-        ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd;
+  public abstract static class AbsctractReducerReducerProc implements NodeProcessor {
 
-        if(ctx.contains(childReduceSink)) {
-          return null;
-        }
+    ReduceSinkDeduplicateProcCtx dedupCtx;
 
-        List<Operator<? extends OperatorDesc>> childOp =
-          childReduceSink.getChildOperators();
-        if (childOp != null && childOp.size() == 1) {
-          Operator<? extends OperatorDesc> child = childOp.get(0);
-          if (child instanceof GroupByOperator || child instanceof JoinOperator) {
-            ctx.addRejectedReduceSinkOperator(childReduceSink);
-            return null;
-          }
-        }
+    protected boolean trustScript() {
+      return dedupCtx.trustScript;
+    }
 
-        ParseContext pGraphContext = ctx.getPctx();
-        HashMap<String, String> childColumnMapping =
-          getPartitionAndKeyColumnMapping(childReduceSink);
-        ReduceSinkOperator parentRS = null;
-        parentRS = findSingleParentReduceSink(childReduceSink, pGraphContext);
-        if (parentRS == null) {
-          ctx.addRejectedReduceSinkOperator(childReduceSink);
-          return null;
-        }
-        HashMap<String, String> parentColumnMapping = getPartitionAndKeyColumnMapping(parentRS);
-        Operator<? extends OperatorDesc> stopBacktrackFlagOp = null;
-        if (parentRS.getParentOperators() == null
-            || parentRS.getParentOperators().size() == 0) {
-          stopBacktrackFlagOp =  parentRS;
-        } else if (parentRS.getParentOperators().size() != 1) {
-          return null;
-        } else {
-          stopBacktrackFlagOp = parentRS.getParentOperators().get(0);
-        }
+    protected int minReducer() {
+      return dedupCtx.minReducer;
+    }
 
-        boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext);
-        if (!succeed) {
-          return null;
-        }
-        succeed = backTrackColumnNames(parentColumnMapping, parentRS, stopBacktrackFlagOp, pGraphContext);
-        if (!succeed) {
-          return null;
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      dedupCtx = (ReduceSinkDeduplicateProcCtx) procCtx;
+      if (dedupCtx.contains((Operator<?>) nd)) {
+        return false;
+      }
+      ReduceSinkOperator cRS = (ReduceSinkOperator) nd;
+      Operator<?> child = getSingleChild(cRS);
+      if (child instanceof JoinOperator) {
+        return false; // not supported
+      }
+      ParseContext pctx = dedupCtx.getPctx();
+      if (child instanceof GroupByOperator) {
+        GroupByOperator cGBY = (GroupByOperator) child;
+        if (!hasGroupingSet(cRS) && !cGBY.getConf().isGroupingSetsPresent()) {
+          return process(cRS, cGBY, pctx);
         }
+        return false;
+      }
+      if (child instanceof ExtractOperator) {
+        return process(cRS, pctx);
+      }
+      return false;
+    }
 
-        boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping);
-        if (!same) {
-          return null;
-        }
-        replaceReduceSinkWithSelectOperator(childReduceSink, pGraphContext);
-        return null;
+    private boolean hasGroupingSet(ReduceSinkOperator cRS) {
+      GroupByOperator cGBYm = getSingleParent(cRS, GroupByOperator.class);
+      if (cGBYm != null && cGBYm.getConf().isGroupingSetsPresent()) {
+        return true;
+      }
+      return false;
+    }
+
+    protected Operator<?> getSingleParent(Operator<?> operator) {
+      List<Operator<?>> parents = operator.getParentOperators();
+      if (parents != null && parents.size() == 1) {
+        return parents.get(0);
+      }
+      return null;
+    }
+
+    protected Operator<?> getSingleChild(Operator<?> operator) {
+      List<Operator<?>> children = operator.getChildOperators();
+      if (children != null && children.size() == 1) {
+        return children.get(0);
       }
+      return null;
+    }
 
-      private void replaceReduceSinkWithSelectOperator(
-          ReduceSinkOperator childReduceSink, ParseContext pGraphContext) throws SemanticException {
-        List<Operator<? extends OperatorDesc>> parentOp =
-          childReduceSink.getParentOperators();
-        List<Operator<? extends OperatorDesc>> childOp =
-          childReduceSink.getChildOperators();
+    protected <T> T getSingleParent(Operator<?> operator, Class<T> type) {
+      Operator<?> parent = getSingleParent(operator);
+      return type.isInstance(parent) ? (T)parent : null;
+    }
 
-        Operator<? extends OperatorDesc> oldParent = childReduceSink;
+    protected abstract Object process(ReduceSinkOperator cRS, ParseContext context)
+        throws SemanticException;
 
-        if (childOp != null && childOp.size() == 1
-            && ((childOp.get(0)) instanceof ExtractOperator)) {
-          oldParent = childOp.get(0);
-          childOp = childOp.get(0).getChildOperators();
-        }
+    protected abstract Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+        ParseContext context) throws SemanticException;
 
-        Operator<? extends OperatorDesc> input = parentOp.get(0);
-        input.getChildOperators().clear();
+    protected Operator<?> getStartForGroupBy(ReduceSinkOperator cRS) {
+      Operator<? extends Serializable> parent = getSingleParent(cRS);
+      return parent instanceof GroupByOperator ? parent : cRS;  // skip map-aggr GBY
+    }
 
-        RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();
+    // for JOIN-RS case, it's not possible generally to merge if child has
+    // more key/partition columns than parents
+    protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReducer)
+        throws SemanticException {
+      List<Operator<?>> parents = pJoin.getParentOperators();
+      ReduceSinkOperator[] pRSs = parents.toArray(new ReduceSinkOperator[parents.size()]);
+      ReduceSinkDesc cRSc = cRS.getConf();
+      ReduceSinkDesc pRS0c = pRSs[0].getConf();
+      if (cRSc.getKeyCols().size() > pRS0c.getKeyCols().size()) {
+        return false;
+      }
+      if (cRSc.getPartitionCols().size() > pRS0c.getPartitionCols().size()) {
+        return false;
+      }
+      Integer moveReducerNumTo = checkNumReducer(cRSc.getNumReducers(), pRS0c.getNumReducers());
+      if (moveReducerNumTo == null ||
+          moveReducerNumTo > 0 && cRSc.getNumReducers() < minReducer) {
+        return false;
+      }
 
-        ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
-        ArrayList<String> outputs = new ArrayList<String>();
-        List<String> outputCols = childReduceSink.getConf().getOutputValueColumnNames();
-        RowResolver outputRS = new RowResolver();
+      Integer moveRSOrderTo = checkOrder(cRSc.getOrder(), pRS0c.getOrder());
+      if (moveRSOrderTo == null) {
+        return false;
+      }
 
-        Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+      boolean[] sorted = getSortedTags(pJoin);
 
-        for (int i = 0; i < outputCols.size(); i++) {
-          String internalName = outputCols.get(i);
-          String[] nm = inputRR.reverseLookup(internalName);
-          ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]);
-          ExprNodeDesc colDesc = childReduceSink.getConf().getValueCols().get(i);
-          exprs.add(colDesc);
-          outputs.add(internalName);
-          outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo
-              .getType(), nm[0], valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol()));
-          colExprMap.put(internalName, colDesc);
+      int cKeySize = cRSc.getKeyCols().size();
+      for (int i = 0; i < cKeySize; i++) {
+        ExprNodeDesc cexpr = cRSc.getKeyCols().get(i);
+        ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+        for (int tag = 0; tag < pRSs.length; tag++) {
+          pexprs[tag] = pRSs[tag].getConf().getKeyCols().get(i);
         }
+        int found = indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+        if (found < 0) {
+          return false;
+        }
+      }
+      int cPartSize = cRSc.getPartitionCols().size();
+      for (int i = 0; i < cPartSize; i++) {
+        ExprNodeDesc cexpr = cRSc.getPartitionCols().get(i);
+        ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+        for (int tag = 0; tag < pRSs.length; tag++) {
+          pexprs[tag] = pRSs[tag].getConf().getPartitionCols().get(i);
+        }
+        int found = indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+        if (found < 0) {
+          return false;
+        }
+      }
 
-        SelectDesc select = new SelectDesc(exprs, outputs, false);
+      if (moveReducerNumTo > 0) {
+        for (ReduceSinkOperator pRS : pRSs) {
+          pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
+        }
+      }
+      return true;
+    }
 
-        SelectOperator sel = (SelectOperator) putOpInsertMap(
-            OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
-            .getColumnInfos()), input), inputRR, pGraphContext);
+    private boolean[] getSortedTags(JoinOperator joinOp) {
+      boolean[] result = new boolean[joinOp.getParentOperators().size()];
+      for (int tag = 0; tag < result.length; tag++) {
+        result[tag] = isSortedTag(joinOp, tag);
+      }
+      return result;
+    }
 
-        sel.setColumnExprMap(colExprMap);
+    private boolean isSortedTag(JoinOperator joinOp, int tag) {
+      for (JoinCondDesc cond : joinOp.getConf().getConds()) {
+        switch (cond.getType()) {
+          case JoinDesc.LEFT_OUTER_JOIN:
+            if (cond.getRight() == tag) {
+              return false;
+            }
+            continue;
+          case JoinDesc.RIGHT_OUTER_JOIN:
+            if (cond.getLeft() == tag) {
+              return false;
+            }
+            continue;
+          case JoinDesc.FULL_OUTER_JOIN:
+            if (cond.getLeft() == tag || cond.getRight() == tag) {
+              return false;
+            }
+        }
+      }
+      return true;
+    }
 
-        // Insert the select operator in between.
-        sel.setChildOperators(childOp);
-        for (Operator<? extends OperatorDesc> ch : childOp) {
-          ch.replaceParent(oldParent, sel);
+    private int indexOf(ExprNodeDesc cexpr, ExprNodeDesc[] pexprs, Operator child,
+        Operator[] parents, boolean[] sorted) throws SemanticException {
+      for (int tag = 0; tag < parents.length; tag++) {
+        if (sorted[tag] &&
+            pexprs[tag].isSame(ExprNodeDescUtils.backtrack(cexpr, child, parents[tag]))) {
+          return tag;
         }
+      }
+      return -1;
+    }
 
+    protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+        throws SemanticException {
+      int[] result = checkStatus(cRS, pRS, minReducer);
+      if (result == null) {
+        return false;
+      }
+      if (result[0] > 0) {
+        ArrayList<ExprNodeDesc> childKCs = cRS.getConf().getKeyCols();
+        pRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(childKCs, cRS, pRS));
+      }
+      if (result[1] > 0) {
+        ArrayList<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+        pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS));
+      }
+      if (result[2] > 0) {
+        pRS.getConf().setOrder(cRS.getConf().getOrder());
+      }
+      if (result[3] > 0) {
+        pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
       }
+      return true;
+    }
 
-      private Operator<? extends OperatorDesc> putOpInsertMap(
-        Operator<? extends OperatorDesc> op, RowResolver rr, ParseContext pGraphContext) {
-        OpParseContext ctx = new OpParseContext(rr);
-        pGraphContext.getOpParseCtx().put(op, ctx);
-        return op;
+    // -1 for p to c, 1 for c to p
+    private int[] checkStatus(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+        throws SemanticException {
+      ReduceSinkDesc cConf = cRS.getConf();
+      ReduceSinkDesc pConf = pRS.getConf();
+      Integer moveRSOrderTo = checkOrder(cConf.getOrder(), pConf.getOrder());
+      if (moveRSOrderTo == null) {
+        return null;
+      }
+      Integer moveReducerNumTo = checkNumReducer(cConf.getNumReducers(), pConf.getNumReducers());
+      if (moveReducerNumTo == null ||
+          moveReducerNumTo > 0 && cConf.getNumReducers() < minReducer) {
+        return null;
+      }
+      List<ExprNodeDesc> ckeys = cConf.getKeyCols();
+      List<ExprNodeDesc> pkeys = pConf.getKeyCols();
+      Integer moveKeyColTo = checkExprs(ckeys, pkeys, cRS, pRS);
+      if (moveKeyColTo == null) {
+        return null;
       }
+      List<ExprNodeDesc> cpars = cConf.getPartitionCols();
+      List<ExprNodeDesc> ppars = pConf.getPartitionCols();
+      Integer movePartitionColTo = checkExprs(cpars, ppars, cRS, pRS);
+      if (movePartitionColTo == null) {
+        return null;
+      }
+      return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo, moveReducerNumTo};
+    }
 
-      private boolean compareReduceSink(ReduceSinkOperator childReduceSink,
-          ReduceSinkOperator parentRS,
-          HashMap<String, String> childColumnMapping,
-          HashMap<String, String> parentColumnMapping) {
+    private Integer checkExprs(List<ExprNodeDesc> ckeys, List<ExprNodeDesc> pkeys,
+        ReduceSinkOperator cRS, ReduceSinkOperator pRS) throws SemanticException {
+      Integer moveKeyColTo = 0;
+      if (ckeys == null || ckeys.isEmpty()) {
+        if (pkeys != null && !pkeys.isEmpty()) {
+          moveKeyColTo = -1;
+        }
+      } else {
+        if (pkeys == null || pkeys.isEmpty()) {
+          for (ExprNodeDesc ckey : ckeys) {
+            if (ExprNodeDescUtils.backtrack(ckey, cRS, pRS) == null) {
+              return null;
+            }
+          }
+          moveKeyColTo = 1;
+        } else {
+          moveKeyColTo = sameKeys(ckeys, pkeys, cRS, pRS);
+        }
+      }
+      return moveKeyColTo;
+    }
 
-        ArrayList<ExprNodeDesc> childPartitionCols = childReduceSink.getConf().getPartitionCols();
-        ArrayList<ExprNodeDesc> parentPartitionCols = parentRS.getConf().getPartitionCols();
+    protected Integer sameKeys(List<ExprNodeDesc> cexprs, List<ExprNodeDesc> pexprs,
+        Operator<?> child, Operator<?> parent) throws SemanticException {
+      int common = Math.min(cexprs.size(), pexprs.size());
+      int limit = Math.max(cexprs.size(), pexprs.size());
+      int i = 0;
+      for (; i < common; i++) {
+        ExprNodeDesc pexpr = pexprs.get(i);
+        ExprNodeDesc cexpr = ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent);
+        if (!pexpr.isSame(cexpr)) {
+          return null;
+        }
+      }
+      for (;i < limit; i++) {
+        if (cexprs.size() > pexprs.size()) {
+          if (ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent) == null) {
+            return null;
+          }
+        }
+      }
+      return Integer.valueOf(cexprs.size()).compareTo(pexprs.size());
+    }
 
-        boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping,
-            childPartitionCols, parentPartitionCols);
-        if (!ret) {
-          return false;
+    protected Integer checkOrder(String corder, String porder) {
+      if (corder == null || corder.trim().equals("")) {
+        if (porder == null || porder.trim().equals("")) {
+          return 0;
         }
+        return -1;
+      }
+      if (porder == null || porder.trim().equals("")) {
+        return 1;
+      }
+      corder = corder.trim();
+      porder = porder.trim();
+      int target = Math.min(corder.length(), porder.length());
+      if (!corder.substring(0, target).equals(porder.substring(0, target))) {
+        return null;
+      }
+      return Integer.valueOf(corder.length()).compareTo(porder.length());
+    }
 
-        ArrayList<ExprNodeDesc> childReduceKeyCols = childReduceSink.getConf().getKeyCols();
-        ArrayList<ExprNodeDesc> parentReduceKeyCols = parentRS.getConf().getKeyCols();
-        ret = compareExprNodes(childColumnMapping, parentColumnMapping,
-            childReduceKeyCols, parentReduceKeyCols);
-        if (!ret) {
-          return false;
+    protected Integer checkNumReducer(int creduce, int preduce) {
+      if (creduce < 0) {
+        if (preduce < 0) {
+          return 0;
         }
+        return -1;
+      }
+      if (preduce < 0) {
+        return 1;
+      }
+      if (creduce != preduce) {
+        return null;
+      }
+      return 0;
+    }
 
-        String childRSOrder = childReduceSink.getConf().getOrder();
-        String parentRSOrder = parentRS.getConf().getOrder();
-        boolean moveChildRSOrderToParent = false;
-        //move child reduce sink's order to the parent reduce sink operator.
-        if (childRSOrder != null && !(childRSOrder.trim().equals(""))) {
-          if (parentRSOrder == null
-              || !childRSOrder.trim().equals(parentRSOrder.trim())) {
-            return false;
-          }
-        } else {
-          if(parentRSOrder == null || parentRSOrder.trim().equals("")) {
-            moveChildRSOrderToParent = true;
-          }
+    protected <T extends Operator<?>> T findPossibleParent(Operator<?> start, Class<T> target,
+        boolean trustScript) throws SemanticException {
+      T[] parents = findPossibleParents(start, target, trustScript);
+      return parents != null && parents.length == 1 ? parents[0] : null;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T extends Operator<?>> T[] findPossibleParents(Operator<?> start, Class<T> target,
+        boolean trustScript) {
+      Operator<?> cursor = getSingleParent(start);
+      for (; cursor != null; cursor = getSingleParent(cursor)) {
+        if (target.isAssignableFrom(cursor.getClass())) {
+          T[] array = (T[]) Array.newInstance(target, 1);
+          array[0] = (T) cursor;
+          return array;
+        }
+        if (cursor instanceof JoinOperator) {
+          return findParents((JoinOperator) cursor, target);
         }
+        if (cursor instanceof ScriptOperator && !trustScript) {
+          return null;
+        }
+        if (!(cursor instanceof SelectOperator
+            || cursor instanceof FilterOperator
+            || cursor instanceof ExtractOperator
+            || cursor instanceof ForwardOperator
+            || cursor instanceof ScriptOperator
+            || cursor instanceof ReduceSinkOperator)) {
+          return null;
+        }
+      }
+      return null;
+    }
 
-        int childNumReducers = childReduceSink.getConf().getNumReducers();
-        int parentNumReducers = parentRS.getConf().getNumReducers();
-        boolean moveChildReducerNumToParent = false;
-        //move child reduce sink's number reducers to the parent reduce sink operator.
-        if (childNumReducers != parentNumReducers) {
-          if (childNumReducers == -1) {
-            //do nothing.
-          } else if (parentNumReducers == -1) {
-            //set childNumReducers in the parent reduce sink operator.
-            moveChildReducerNumToParent = true;
-          } else {
-            return false;
+    @SuppressWarnings("unchecked")
+    private <T extends Operator<?>> T[] findParents(JoinOperator join, Class<T> target) {
+      List<Operator<?>> parents = join.getParentOperators();
+      T[] result = (T[]) Array.newInstance(target, parents.size());
+      for (int tag = 0; tag < result.length; tag++) {
+        Operator<?> cursor = parents.get(tag);
+        for (; cursor != null; cursor = getSingleParent(cursor)) {
+          if (target.isAssignableFrom(cursor.getClass())) {
+            result[tag] = (T) cursor;
+            break;
           }
         }
-
-        if(moveChildRSOrderToParent) {
-          parentRS.getConf().setOrder(childRSOrder);
+        if (result[tag] == null) {
+          throw new IllegalStateException("failed to find " + target.getSimpleName()
+              + " from " + join + " on tag " + tag);
         }
+      }
+      return result;
+    }
 
-        if(moveChildReducerNumToParent) {
-          parentRS.getConf().setNumReducers(childNumReducers);
-        }
+    protected SelectOperator replaceReduceSinkWithSelectOperator(ReduceSinkOperator childRS,
+        ParseContext context) throws SemanticException {
+      SelectOperator select = replaceOperatorWithSelect(childRS, context);
+      select.getConf().setOutputColumnNames(childRS.getConf().getOutputValueColumnNames());
+      select.getConf().setColList(childRS.getConf().getValueCols());
+      return select;
+    }
 
-        return true;
+    private SelectOperator replaceOperatorWithSelect(Operator<?> operator, ParseContext context)
+        throws SemanticException {
+      RowResolver inputRR = context.getOpParseCtx().get(operator).getRowResolver();
+      SelectDesc select = new SelectDesc(null, null);
+
+      Operator<?> parent = getSingleParent(operator);
+      Operator<?> child = getSingleChild(operator);
+
+      parent.getChildOperators().clear();
+
+      SelectOperator sel = (SelectOperator) putOpInsertMap(
+          OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
+              .getColumnInfos()), parent), inputRR, context);
+
+      sel.setColumnExprMap(operator.getColumnExprMap());
+
+      sel.setChildOperators(operator.getChildOperators());
+      for (Operator<? extends Serializable> ch : operator.getChildOperators()) {
+        ch.replaceParent(operator, sel);
+      }
+      if (child instanceof ExtractOperator) {
+        removeOperator(child, getSingleChild(child), sel, context);
+        dedupCtx.addRemovedOperator(child);
       }
+      operator.setChildOperators(null);
+      operator.setParentOperators(null);
+      dedupCtx.addRemovedOperator(operator);
+      return sel;
+    }
 
-      private boolean compareExprNodes(HashMap<String, String> childColumnMapping,
-          HashMap<String, String> parentColumnMapping,
-          ArrayList<ExprNodeDesc> childColExprs,
-          ArrayList<ExprNodeDesc> parentColExprs) {
+    protected void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupByOperator cGBYr,
+        ParseContext context) throws SemanticException {
 
-        boolean childEmpty = childColExprs == null || childColExprs.size() == 0;
-        boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0;
+      Operator<?> parent = getSingleParent(cRS);
 
-        if (childEmpty) { //both empty
-          return true;
-        }
+      if (parent instanceof GroupByOperator) {
+        GroupByOperator cGBYm = (GroupByOperator) parent;
 
-        //child not empty here
-        if (parentEmpty) { // child not empty, but parent empty
-          return false;
+        cGBYr.getConf().setKeys(cGBYm.getConf().getKeys());
+        cGBYr.getConf().setAggregators(cGBYm.getConf().getAggregators());
+        for (AggregationDesc aggr : cGBYm.getConf().getAggregators()) {
+          aggr.setMode(GenericUDAFEvaluator.Mode.COMPLETE);
         }
-
-        if (childColExprs.size() != parentColExprs.size()) {
-          return false;
+        cGBYr.setColumnExprMap(cGBYm.getColumnExprMap());
+        cGBYr.setSchema(cGBYm.getSchema());
+        RowResolver resolver = context.getOpParseCtx().get(cGBYm).getRowResolver();
+        context.getOpParseCtx().get(cGBYr).setRowResolver(resolver);
+      } else {
+        cGBYr.getConf().setKeys(ExprNodeDescUtils.backtrack(cGBYr.getConf().getKeys(), cGBYr, cRS));
+        for (AggregationDesc aggr : cGBYr.getConf().getAggregators()) {
+          aggr.setParameters(ExprNodeDescUtils.backtrack(aggr.getParameters(), cGBYr, cRS));
         }
-        int i = 0;
-        while (i < childColExprs.size()) {
-          ExprNodeDesc childExpr = childColExprs.get(i);
-          ExprNodeDesc parentExpr = parentColExprs.get(i);
-
-          if ((childExpr instanceof ExprNodeColumnDesc)
-              && (parentExpr instanceof ExprNodeColumnDesc)) {
-            String childCol = childColumnMapping
-                .get(((ExprNodeColumnDesc) childExpr).getColumn());
-            String parentCol = parentColumnMapping
-                .get(((ExprNodeColumnDesc) childExpr).getColumn());
 
-            if (!childCol.equals(parentCol)) {
-              return false;
-            }
-          } else {
-            return false;
+        Map<String, ExprNodeDesc> oldMap = cGBYr.getColumnExprMap();
+        RowResolver oldRR = context.getOpParseCtx().get(cGBYr).getRowResolver();
+
+        Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
+        RowResolver newRR = new RowResolver();
+
+        List<String> outputCols = cGBYr.getConf().getOutputColumnNames();
+        for (int i = 0; i < outputCols.size(); i++) {
+          String colName = outputCols.get(i);
+          String[] nm = oldRR.reverseLookup(colName);
+          ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
+          newRR.put(nm[0], nm[1], colInfo);
+          ExprNodeDesc colExpr = ExprNodeDescUtils.backtrack(oldMap.get(colName), cGBYr, cRS);
+          if (colExpr != null) {
+            newMap.put(colInfo.getInternalName(), colExpr);
           }
-          i++;
         }
-        return true;
+        cGBYr.setColumnExprMap(newMap);
+        cGBYr.setSchema(new RowSchema(newRR.getColumnInfos()));
+        context.getOpParseCtx().get(cGBYr).setRowResolver(newRR);
       }
+      cGBYr.getConf().setMode(GroupByDesc.Mode.COMPLETE);
 
-      /*
-       * back track column names to find their corresponding original column
-       * names. Only allow simple operators like 'select column' or filter.
-       */
-      private boolean backTrackColumnNames(
-          HashMap<String, String> columnMapping,
-          ReduceSinkOperator reduceSink,
-          Operator<? extends OperatorDesc> stopBacktrackFlagOp,
-          ParseContext pGraphContext) {
-        Operator<? extends OperatorDesc> startOperator = reduceSink;
-        while (startOperator != null && startOperator != stopBacktrackFlagOp) {
-          startOperator = startOperator.getParentOperators().get(0);
-          Map<String, ExprNodeDesc> colExprMap = startOperator.getColumnExprMap();
-          if(colExprMap == null || colExprMap.size()==0) {
-            continue;
-          }
-          Iterator<String> keyIter = columnMapping.keySet().iterator();
-          while (keyIter.hasNext()) {
-            String key = keyIter.next();
-            String oldCol = columnMapping.get(key);
-            ExprNodeDesc exprNode = colExprMap.get(oldCol);
-            if(exprNode instanceof ExprNodeColumnDesc) {
-              String col = ((ExprNodeColumnDesc)exprNode).getColumn();
-              columnMapping.put(key, col);
-            } else {
-              return false;
-            }
-          }
-        }
+      removeOperator(cRS, cGBYr, parent, context);
+      dedupCtx.addRemovedOperator(cRS);
+
+      if (parent instanceof GroupByOperator) {
+        removeOperator(parent, cGBYr, getSingleParent(parent), context);
+        dedupCtx.addRemovedOperator(cGBYr);
+      }
+    }
+
+    private void removeOperator(Operator<?> target, Operator<?> child, Operator<?> parent,
+        ParseContext context) {
+      for (Operator<?> aparent : target.getParentOperators()) {
+        aparent.replaceChild(target, child);
+      }
+      for (Operator<?> achild : target.getChildOperators()) {
+        achild.replaceParent(target, parent);
+      }
+      target.setChildOperators(null);
+      target.setParentOperators(null);
+      context.getOpParseCtx().remove(target);
+    }
+
+    private Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
+        ParseContext context) {
+      OpParseContext ctx = new OpParseContext(rr);
+      context.getOpParseCtx().put(op, ctx);
+      return op;
+    }
+  }
 
+  static class GroupbyReducerProc extends AbsctractReducerReducerProc {
+
+    // pRS-pGBY-cRS
+    public Object process(ReduceSinkOperator cRS, ParseContext context)
+        throws SemanticException {
+      GroupByOperator pGBY = findPossibleParent(cRS, GroupByOperator.class, trustScript());
+      if (pGBY == null) {
+        return false;
+      }
+      ReduceSinkOperator pRS = findPossibleParent(pGBY, ReduceSinkOperator.class, trustScript());
+      if (pRS != null && merge(cRS, pRS, minReducer())) {
+        replaceReduceSinkWithSelectOperator(cRS, context);
         return true;
       }
+      return false;
+    }
 
-      private HashMap<String, String> getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) {
-        HashMap<String, String> columnMapping = new HashMap<String, String> ();
-        ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
-        ArrayList<ExprNodeDesc> partitionCols = reduceSinkDesc.getPartitionCols();
-        ArrayList<ExprNodeDesc> reduceKeyCols = reduceSinkDesc.getKeyCols();
-        if(partitionCols != null) {
-          for (ExprNodeDesc desc : partitionCols) {
-            List<String> cols = desc.getCols();
-            if ( cols != null ) {
-              for(String col : cols) {
-                columnMapping.put(col, col);
-              }
-            }
-          }
-        }
-        if(reduceKeyCols != null) {
-          for (ExprNodeDesc desc : reduceKeyCols) {
-            List<String> cols = desc.getCols();
-            if ( cols != null ) {
-              for(String col : cols) {
-                columnMapping.put(col, col);
-              }
-            }
-          }
-        }
-        return columnMapping;
+    // pRS-pGBY-cRS-cGBY
+    public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context)
+        throws SemanticException {
+      Operator<?> start = getStartForGroupBy(cRS);
+      GroupByOperator pGBY = findPossibleParent(start, GroupByOperator.class, trustScript());
+      if (pGBY == null) {
+        return false;
       }
+      ReduceSinkOperator pRS = getSingleParent(pGBY, ReduceSinkOperator.class);
+      if (pRS != null && merge(cRS, pRS, minReducer())) {
+        removeReduceSinkForGroupBy(cRS, cGBY, context);
+        return true;
+      }
+      return false;
+    }
+  }
 
-      private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childReduceSink, ParseContext pGraphContext) {
-        Operator<? extends OperatorDesc> start = childReduceSink;
-        while(start != null) {
-          if (start.getParentOperators() == null
-              || start.getParentOperators().size() != 1) {
-            // this potentially is a join operator
-            return null;
-          }
+  static class JoinReducerProc extends AbsctractReducerReducerProc {
 
-          boolean allowed = false;
-          if ((start instanceof SelectOperator)
-              || (start instanceof FilterOperator)
-              || (start instanceof ExtractOperator)
-              || (start instanceof ForwardOperator)
-              || (start instanceof ScriptOperator)
-              || (start instanceof ReduceSinkOperator)) {
-            allowed = true;
-          }
+    // pRS-pJOIN-cRS
+    public Object process(ReduceSinkOperator cRS, ParseContext context)
+        throws SemanticException {
+      JoinOperator pJoin = findPossibleParent(cRS, JoinOperator.class, trustScript());
+      if (pJoin != null && merge(cRS, pJoin, minReducer())) {
+        pJoin.getConf().setFixedAsSorted(true);
+        replaceReduceSinkWithSelectOperator(cRS, context);
+        return true;
+      }
+      return false;
+    }
 
-          if (!allowed) {
-            return null;
-          }
+    // pRS-pJOIN-cRS-cGBY
+    public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context)
+        throws SemanticException {
+      Operator<?> start = getStartForGroupBy(cRS);
+      JoinOperator pJoin = findPossibleParent(start, JoinOperator.class, trustScript());
+      if (pJoin != null && merge(cRS, pJoin, minReducer())) {
+        pJoin.getConf().setFixedAsSorted(true);
+        removeReduceSinkForGroupBy(cRS, cGBY, context);
+        return true;
+      }
+      return false;
+    }
+  }
 
-          if ((start instanceof ScriptOperator)
-              && !HiveConf.getBoolVar(pGraphContext.getConf(),
-                  HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) {
-            return null;
-          }
+  static class ReducerReducerProc extends AbsctractReducerReducerProc {
 
-          start = start.getParentOperators().get(0);
-          if(start instanceof ReduceSinkOperator) {
-            return (ReduceSinkOperator)start;
-          }
-        }
-        return null;
+    // pRS-cRS
+    public Object process(ReduceSinkOperator cRS, ParseContext context)
+        throws SemanticException {
+      ReduceSinkOperator pRS = findPossibleParent(cRS, ReduceSinkOperator.class, trustScript());
+      if (pRS != null && merge(cRS, pRS, minReducer())) {
+        replaceReduceSinkWithSelectOperator(cRS, context);
+        return true;
       }
+      return false;
     }
 
+    // pRS-cRS-cGBY
+    public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context)
+        throws SemanticException {
+      Operator<?> start = getStartForGroupBy(cRS);
+      ReduceSinkOperator pRS = findPossibleParent(start, ReduceSinkOperator.class, trustScript());
+      if (pRS != null && merge(cRS, pRS, minReducer())) {
+        removeReduceSinkForGroupBy(cRS, cGBY, context);
+        return true;
+      }
+      return false;
+    }
   }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java Wed Apr 17 07:29:38 2013
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.optimizer.PrunerUtils;
@@ -290,12 +291,13 @@ public class ListBucketingPruner impleme
       List<List<String>> uniqSkewedValues) throws SemanticException {
     // For each entry in dynamic-multi-dimension collection.
     List<String> skewedCols = part.getSkewedColNames(); // Retrieve skewed column.
-    Map<List<String>, String> mappings = part.getSkewedColValueLocationMaps(); // Retrieve skewed
+    Map<SkewedValueList, String> mappings = part.getSkewedColValueLocationMaps(); // Retrieve skewed
                                                                                // map.
     assert ListBucketingPrunerUtils.isListBucketingPart(part) : part.getName()
         + " skewed metadata is corrupted. No skewed column and/or location mappings information.";
     List<List<String>> skewedValues = part.getSkewedColValues();
     List<Boolean> nonSkewedValueMatchResult = new ArrayList<Boolean>();
+    SkewedValueList skewedValueList = new SkewedValueList();
     for (List<String> cell : collections) {
       // Walk through the tree to decide value.
       // Example: skewed column: C1, C2 ;
@@ -309,8 +311,9 @@ public class ListBucketingPruner impleme
           /* It's valid case if a partition: */
           /* 1. is defined with skewed columns and skewed values in metadata */
           /* 2. doesn't have all skewed values within its data */
-          if (mappings.get(cell) != null) {
-            selectedPaths.add(new Path(mappings.get(cell)));
+          skewedValueList.setSkewedValueList(cell);
+          if (mappings.get(skewedValueList) != null) {
+            selectedPaths.add(new Path(mappings.get(skewedValueList)));
           }
         }
       } else {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java Wed Apr 17 07:29:38 2013
@@ -120,6 +120,8 @@ public class CommonJoinResolver implemen
    */
   class CommonJoinTaskDispatcher implements Dispatcher {
 
+    HashMap<String, Long> aliasToSize = null;
+
     private final PhysicalContext physicalContext;
 
     public CommonJoinTaskDispatcher(PhysicalContext context) {
@@ -145,7 +147,7 @@ public class CommonJoinResolver implemen
      * A task and its child task has been converted from join to mapjoin.
      * See if the two tasks can be merged.
      */
-    private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) {
+    private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
       MapRedTask childTask = (MapRedTask)task.getChildTasks().get(0);
       MapredWork work = task.getWork();
       MapredLocalWork localWork = work.getMapLocalWork();
@@ -194,6 +196,33 @@ public class CommonJoinResolver implemen
       if (childWork.getAliasToWork().size() > 1) {
         return;
       }
+      long mapJoinSize = HiveConf.getLongVar(conf,
+          HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+      long localTableTotalSize = 0;
+      for (String alias : localWork.getAliasToWork().keySet()) {
+        Long tabSize = aliasToSize.get(alias);
+        if (tabSize == null) {
+          /* if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
+           * this implies that merge cannot happen so we can return.
+           */
+          return;
+        }
+        localTableTotalSize += tabSize;
+      }
+
+      for (String alias : childLocalWork.getAliasToWork().keySet()) {
+        Long tabSize = aliasToSize.get(alias);
+        if (tabSize == null) {
+          /* if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
+           * this implies that merge cannot happen so we can return.
+           */
+          return;
+        }
+        localTableTotalSize += tabSize;
+        if (localTableTotalSize > mapJoinSize) {
+          return;
+        }
+      }
 
       Operator<? extends Serializable> childAliasOp =
           childWork.getAliasToWork().values().iterator().next();
@@ -234,11 +263,8 @@ public class CommonJoinResolver implemen
     }
 
     // create map join task and set big table as bigTablePosition
-    private ObjectPair<MapRedTask, String> convertTaskToMapJoinTask(String xml,
-        int bigTablePosition) throws UnsupportedEncodingException, SemanticException {
-      // deep copy a new mapred work from xml
-      InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
-      MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+    private ObjectPair<MapRedTask, String> convertTaskToMapJoinTask(MapredWork newWork,
+        int bigTablePosition) throws SemanticException {
       // create a mapred task for this work
       MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
           .getParseContext().getConf());
@@ -256,7 +282,7 @@ public class CommonJoinResolver implemen
 
       // whether it contains common join op; if contains, return this common join op
       JoinOperator joinOp = getJoinOp(currTask);
-      if (joinOp == null) {
+      if (joinOp == null || joinOp.getConf().isFixedAsSorted()) {
         return null;
       }
       currTask.setTaskTag(Task.COMMON_JOIN);
@@ -282,7 +308,10 @@ public class CommonJoinResolver implemen
       int numAliases = order.length;
 
       long aliasTotalKnownInputSize = 0;
-      HashMap<String, Long> aliasToSize = new HashMap<String, Long>();
+
+      if (aliasToSize == null) {
+        aliasToSize = new HashMap<String, Long>();
+      }
       try {
         // go over all the input paths, and calculate a known total size, known
         // size for each input alias.
@@ -369,11 +398,10 @@ public class CommonJoinResolver implemen
         String bigTableAlias = null;
         currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
         currWork.setJoinTree(joinTree);
-        String xml = currWork.toXML();
 
         if (convertJoinMapJoin) {
           // create map join task and set big table as bigTablePosition
-          MapRedTask newTask = convertTaskToMapJoinTask(xml, bigTablePosition).getFirst();
+          MapRedTask newTask = convertTaskToMapJoinTask(currWork, bigTablePosition).getFirst();
 
           newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
           replaceTask(currTask, newTask, physicalContext);
@@ -384,7 +412,7 @@ public class CommonJoinResolver implemen
           // followed by a mapjoin can be performed in a single MR job.
           if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)
               && (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP)) {
-            mergeMapJoinTaskWithChildMapJoinTask(newTask);
+            mergeMapJoinTaskWithChildMapJoinTask(newTask, conf);
           }
 
           return newTask;
@@ -392,14 +420,19 @@ public class CommonJoinResolver implemen
 
         long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf,
             HiveConf.ConfVars.HIVESMALLTABLESFILESIZE);
+        String xml = currWork.toXML();
         for (int i = 0; i < numAliases; i++) {
           // this table cannot be big table
           if (!bigTableCandidates.contains(i)) {
             continue;
           }
 
+          // deep copy a new mapred work from xml
+          InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
+          MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+
           // create map join task and set big table as i
-          ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(xml, i);
+          ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(newWork, i);          
           MapRedTask newTask = newTaskAlias.getFirst();
           bigTableAlias = newTaskAlias.getSecond();
 
@@ -502,10 +535,10 @@ public class CommonJoinResolver implemen
       currTask.setParentTasks(null);
       if (parentTasks != null) {
         for (Task<? extends Serializable> tsk : parentTasks) {
-          // make new generated task depends on all the parent tasks of current task.
-          tsk.addDependentTask(newTask);
           // remove the current task from its original parent task's dependent task
           tsk.removeDependentTask(currTask);
+          // make new generated task depends on all the parent tasks of current task.
+          tsk.addDependentTask(newTask);
         }
       } else {
         // remove from current root task and add conditional task to root tasks
@@ -518,10 +551,10 @@ public class CommonJoinResolver implemen
       currTask.setChildTasks(null);
       if (oldChildTasks != null) {
         for (Task<? extends Serializable> tsk : oldChildTasks) {
-          // make new generated task depends on all the parent tasks of current task.
-          newTask.addDependentTask(tsk);
           // remove the current task from its original parent task's dependent task
           tsk.getParentTasks().remove(currTask);
+          // make new generated task depends on all the parent tasks of current task.
+          newTask.addDependentTask(tsk);
         }
       }
     }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java Wed Apr 17 07:29:38 2013
@@ -52,6 +52,9 @@ public final class SkewJoinProcFactory {
         Object... nodeOutputs) throws SemanticException {
       SkewJoinProcCtx context = (SkewJoinProcCtx) ctx;
       JoinOperator op = (JoinOperator) nd;
+      if (op.getConf().isFixedAsSorted()) {
+        return null;
+      }
       ParseContext parseContext = context.getParseCtx();
       Task<? extends Serializable> currentTsk = context.getCurrentTask();
       GenMRSkewJoinProcessor.processSkewJoin(op, currentTsk, parseContext);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Wed Apr 17 07:29:38 2013
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryProperties;
@@ -965,7 +966,7 @@ public abstract class BaseSemanticAnalyz
    * @return
    */
   protected ListBucketingCtx constructListBucketingCtx(List<String> skewedColNames,
-      List<List<String>> skewedValues, Map<List<String>, String> skewedColValueLocationMaps,
+      List<List<String>> skewedValues, Map<SkewedValueList, String> skewedColValueLocationMaps,
       boolean isStoredAsSubDirectories, HiveConf conf) {
     ListBucketingCtx lbCtx = new ListBucketingCtx();
     lbCtx.setSkewedColNames(skewedColNames);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Wed Apr 17 07:29:38 2013
@@ -1828,6 +1828,7 @@ body
    :
    insertClause
    selectClause
+   lateralView?
    whereClause?
    groupByClause?
    havingClause?
@@ -1836,11 +1837,12 @@ body
    distributeByClause?
    sortByClause?
    window_clause?
-   limitClause? -> ^(TOK_INSERT insertClause?
-                     selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
+   limitClause? -> ^(TOK_INSERT insertClause
+                     selectClause lateralView? whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
                      distributeByClause? sortByClause? window_clause? limitClause?)
    |
    selectClause
+   lateralView?
    whereClause?
    groupByClause?
    havingClause?
@@ -1850,7 +1852,7 @@ body
    sortByClause?
    window_clause?
    limitClause? -> ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
-                     selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
+                     selectClause lateralView? whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
                      distributeByClause? sortByClause? window_clause? limitClause?)
    ;
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Wed Apr 17 07:29:38 2013
@@ -909,7 +909,7 @@ public class PTFTranslator {
   }
 
   @SuppressWarnings({"unchecked"})
-  private static void addOIPropertiestoSerDePropsMap(StructObjectInspector OI,
+  public static void addOIPropertiestoSerDePropsMap(StructObjectInspector OI,
       Map<String,String> serdePropsMap) {
 
     if ( serdePropsMap == null ) {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Wed Apr 17 07:29:38 2013
@@ -90,6 +90,8 @@ public class QBParseInfo {
    */
   private final HashMap<String, ArrayList<ASTNode>> aliasToLateralViews;
 
+  private final HashMap<String, ASTNode> destToLateralView;
+
   /* Order by clause */
   private final HashMap<String, ASTNode> destToOrderby;
   private final HashMap<String, Integer> destToLimit;
@@ -111,6 +113,7 @@ public class QBParseInfo {
     nameToDest = new HashMap<String, ASTNode>();
     nameToSample = new HashMap<String, TableSample>();
     exprToColumnAlias = new HashMap<ASTNode, String>();
+    destToLateralView = new HashMap<String, ASTNode>();
     destToSelExpr = new LinkedHashMap<String, ASTNode>();
     destToWhereExpr = new HashMap<String, ASTNode>();
     destToGroupby = new HashMap<String, ASTNode>();
@@ -552,6 +555,9 @@ public class QBParseInfo {
     return nameToSample;
   }
 
+  public HashMap<String, ASTNode> getDestToLateralView() {
+    return destToLateralView;
+  }
 
   protected static enum ClauseType {
     CLUSTER_BY_CLAUSE,

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Apr 17 07:29:38 2013
@@ -1023,7 +1023,13 @@ public class SemanticAnalyzer extends Ba
                 .getMsg(partition.toString()));
           }
         }
-
+        skipRecursion = false;
+        break;
+      case HiveParser.TOK_LATERAL_VIEW:
+        // todo: nested LV
+        assert ast.getChildCount() == 1;
+        qb.getParseInfo().getDestToLateralView().put(ctx_1.dest, ast);
+        break;
       default:
         skipRecursion = false;
         break;
@@ -3989,7 +3995,7 @@ public class SemanticAnalyzer extends Ba
   }
 
   @SuppressWarnings({"nls"})
-  private Operator genGroupByPlan1MRMultiReduceGB(List<String> dests, QB qb, Operator input)
+  private Operator genGroupByPlan1ReduceMultiGBY(List<String> dests, QB qb, Operator input)
       throws SemanticException {
 
     QBParseInfo parseInfo = qb.getParseInfo();
@@ -6811,9 +6817,14 @@ public class SemanticAnalyzer extends Ba
   // Return the common distinct expression
   // There should be more than 1 destination, with group bys in all of them.
   private List<ASTNode> getCommonDistinctExprs(QB qb, Operator input) {
-    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     QBParseInfo qbp = qb.getParseInfo();
+    // If a grouping set aggregation is present, common processing is not possible
+    if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()
+        || !qbp.getDestToLateralView().isEmpty()) {
+      return null;
+    }
 
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     TreeSet<String> ks = new TreeSet<String>();
     ks.addAll(qbp.getClauseNames());
 
@@ -6822,15 +6833,10 @@ public class SemanticAnalyzer extends Ba
       return null;
     }
 
-    List<ExprNodeDesc.ExprNodeDescEqualityWrapper> oldList = null;
+    List<ExprNodeDesc> oldList = null;
     List<ASTNode> oldASTList = null;
 
     for (String dest : ks) {
-      // If a grouping set aggregation is present, common processing is not possible
-      if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()) {
-        return null;
-      }
-
       // If a filter is present, common processing is not possible
       if (qbp.getWhrForClause(dest) != null) {
         return null;
@@ -6847,7 +6853,7 @@ public class SemanticAnalyzer extends Ba
         return null;
       }
 
-      List<ExprNodeDesc.ExprNodeDescEqualityWrapper> currDestList;
+      List<ExprNodeDesc> currDestList;
       try {
         currDestList = getDistinctExprs(qbp, dest, inputRR);
       } catch (SemanticException e) {
@@ -6968,10 +6974,9 @@ public class SemanticAnalyzer extends Ba
   // Groups the clause names into lists so that any two clauses in the same list has the same
   // group by and distinct keys and no clause appears in more than one list. Returns a list of the
   // lists of clauses.
-  private List<List<String>> getCommonGroupByDestGroups(QB qb, Operator input)
-      throws SemanticException {
+  private List<List<String>> getCommonGroupByDestGroups(QB qb,
+      Map<String, Operator<? extends OperatorDesc>> inputs) throws SemanticException {
 
-    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     QBParseInfo qbp = qb.getParseInfo();
 
     TreeSet<String> ks = new TreeSet<String>();
@@ -6989,29 +6994,31 @@ public class SemanticAnalyzer extends Ba
       return commonGroupByDestGroups;
     }
 
-    List<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>> sprayKeyLists =
-        new ArrayList<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>>(ks.size());
+    List<Operator<? extends OperatorDesc>> inputOperators =
+        new ArrayList<Operator<? extends OperatorDesc>>(ks.size());
+    List<List<ExprNodeDesc>> sprayKeyLists = new ArrayList<List<ExprNodeDesc>>(ks.size());
 
     // Iterate over each clause
     for (String dest : ks) {
-
-      List<ExprNodeDesc.ExprNodeDescEqualityWrapper> sprayKeys =
-          getDistinctExprs(qbp, dest, inputRR);
+      Operator input = inputs.get(dest);
+      RowResolver inputRR = opParseCtx.get(input).getRowResolver();
+      List<ExprNodeDesc> sprayKeys = getDistinctExprs(qbp, dest, inputRR);
 
       // Add the group by expressions
       List<ASTNode> grpByExprs = getGroupByForClause(qbp, dest);
       for (ASTNode grpByExpr : grpByExprs) {
-        ExprNodeDesc.ExprNodeDescEqualityWrapper grpByExprWrapper =
-            new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(grpByExpr, inputRR));
-        if (!sprayKeys.contains(grpByExprWrapper)) {
-          sprayKeys.add(grpByExprWrapper);
+        ExprNodeDesc exprDesc = genExprNodeDesc(grpByExpr, inputRR);
+        if (ExprNodeDescUtils.indexOf(exprDesc, sprayKeys) < 0) {
+          sprayKeys.add(exprDesc);
         }
       }
 
       // Loop through each of the lists of exprs, looking for a match
       boolean found = false;
       for (int i = 0; i < sprayKeyLists.size(); i++) {
-
+        if (!input.equals(inputOperators.get(i))) {
+          continue;
+        }
         if (!matchExprLists(sprayKeyLists.get(i), sprayKeys)) {
           continue;
         }
@@ -7024,6 +7031,7 @@ public class SemanticAnalyzer extends Ba
 
       // No match was found, so create new entries
       if (!found) {
+        inputOperators.add(input);
         sprayKeyLists.add(sprayKeys);
         List<String> destGroup = new ArrayList<String>();
         destGroup.add(dest);
@@ -7035,15 +7043,13 @@ public class SemanticAnalyzer extends Ba
   }
 
   // Returns whether or not two lists contain the same elements independent of order
-  private boolean matchExprLists(List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list1,
-      List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list2) {
+  private boolean matchExprLists(List<ExprNodeDesc> list1, List<ExprNodeDesc> list2) {
 
     if (list1.size() != list2.size()) {
       return false;
     }
-
-    for (ExprNodeDesc.ExprNodeDescEqualityWrapper exprNodeDesc : list1) {
-      if (!list2.contains(exprNodeDesc)) {
+    for (ExprNodeDesc exprNodeDesc : list1) {
+      if (ExprNodeDescUtils.indexOf(exprNodeDesc, list2) < 0) {
         return false;
       }
     }
@@ -7051,23 +7057,20 @@ public class SemanticAnalyzer extends Ba
     return true;
   }
 
-  // Returns a list of the distinct exprs for a given clause name as
-  // ExprNodeDesc.ExprNodeDescEqualityWrapper without duplicates
-  private List<ExprNodeDesc.ExprNodeDescEqualityWrapper>
-      getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR) throws SemanticException {
+  // Returns a list of the distinct exprs without duplicates for a given clause name
+  private List<ExprNodeDesc> getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR)
+      throws SemanticException {
 
     List<ASTNode> distinctAggExprs = qbp.getDistinctFuncExprsForClause(dest);
-    List<ExprNodeDesc.ExprNodeDescEqualityWrapper> distinctExprs =
-        new ArrayList<ExprNodeDesc.ExprNodeDescEqualityWrapper>();
+    List<ExprNodeDesc> distinctExprs = new ArrayList<ExprNodeDesc>();
 
     for (ASTNode distinctAggExpr : distinctAggExprs) {
       // 0 is function name
       for (int i = 1; i < distinctAggExpr.getChildCount(); i++) {
         ASTNode parameter = (ASTNode) distinctAggExpr.getChild(i);
-        ExprNodeDesc.ExprNodeDescEqualityWrapper distinctExpr =
-            new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(parameter, inputRR));
-        if (!distinctExprs.contains(distinctExpr)) {
-          distinctExprs.add(distinctExpr);
+        ExprNodeDesc expr = genExprNodeDesc(parameter, inputRR);
+        if (ExprNodeDescUtils.indexOf(expr, distinctExprs) < 0) {
+          distinctExprs.add(expr);
         }
       }
     }
@@ -7096,6 +7099,7 @@ public class SemanticAnalyzer extends Ba
     QBParseInfo qbp = qb.getParseInfo();
 
     TreeSet<String> ks = new TreeSet<String>(qbp.getClauseNames());
+    Map<String, Operator<? extends OperatorDesc>> inputs = createInputForDests(qb, input, ks);
     // For multi-group by with the same distinct, we ignore all user hints
     // currently. It doesnt matter whether he has asked to do
     // map-side aggregation or not. Map side aggregation is turned off
@@ -7148,7 +7152,7 @@ public class SemanticAnalyzer extends Ba
       // expressions, otherwise treat all the expressions as a single group
       if (conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
         try {
-          commonGroupByDestGroups = getCommonGroupByDestGroups(qb, curr);
+          commonGroupByDestGroups = getCommonGroupByDestGroups(qb, inputs);
         } catch (SemanticException e) {
           LOG.error("Failed to group clauses by common spray keys.", e);
         }
@@ -7168,6 +7172,8 @@ public class SemanticAnalyzer extends Ba
           }
 
           String firstDest = commonGroupByDestGroup.get(0);
+          input = inputs.get(firstDest);
+
           // Constructs a standard group by plan if:
           // There is no other subquery with the same group by/distinct keys or
           // (There are no aggregations in a representative query for the group and
@@ -7182,7 +7188,7 @@ public class SemanticAnalyzer extends Ba
 
             // Go over all the destination tables
             for (String dest : commonGroupByDestGroup) {
-              curr = input;
+              curr = inputs.get(dest);
 
               if (qbp.getWhrForClause(dest) != null) {
                 curr = genFilterPlan(dest, qb, curr);
@@ -7215,7 +7221,7 @@ public class SemanticAnalyzer extends Ba
               curr = genPostGroupByBodyPlan(curr, dest, qb);
             }
           } else {
-            curr = genGroupByPlan1MRMultiReduceGB(commonGroupByDestGroup, qb, input);
+            curr = genGroupByPlan1ReduceMultiGBY(commonGroupByDestGroup, qb, input);
           }
         }
       }
@@ -7228,6 +7234,16 @@ public class SemanticAnalyzer extends Ba
     return curr;
   }
 
+  private Map<String, Operator<? extends OperatorDesc>> createInputForDests(QB qb,
+      Operator<? extends OperatorDesc> input, Set<String> dests) throws SemanticException {
+    Map<String, Operator<? extends OperatorDesc>> inputs =
+        new HashMap<String, Operator<? extends OperatorDesc>>();
+    for (String dest : dests) {
+      inputs.put(dest, genLateralViewPlanForDest(dest, qb, input));
+    }
+    return inputs;
+  }
+
   private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb)
       throws SemanticException {
 
@@ -7308,7 +7324,9 @@ public class SemanticAnalyzer extends Ba
             extraMRStep);
         qb.getParseInfo().setOuterQueryLimit(limit.intValue());
       }
-      curr = genFileSinkPlan(dest, qb, curr);
+      if (!SessionState.get().getHiveOperation().equals(HiveOperation.CREATEVIEW)) {
+        curr = genFileSinkPlan(dest, qb, curr);
+      }
     }
 
     // change curr ops row resolver's tab aliases to query alias if it
@@ -8035,71 +8053,7 @@ public class SemanticAnalyzer extends Ba
           // -> LateralViewJoinOperator
           //
 
-          RowResolver lvForwardRR = new RowResolver();
-          RowResolver source = opParseCtx.get(op).getRowResolver();
-          for (ColumnInfo col : source.getColumnInfos()) {
-            if (col.getIsVirtualCol() && col.isHiddenVirtualCol()) {
-              continue;
-            }
-            String[] tabCol = source.reverseLookup(col.getInternalName());
-            lvForwardRR.put(tabCol[0], tabCol[1], col);
-          }
-
-          Operator lvForward = putOpInsertMap(OperatorFactory.getAndMakeChild(
-              new LateralViewForwardDesc(), new RowSchema(lvForwardRR.getColumnInfos()),
-              op), lvForwardRR);
-
-          // The order in which the two paths are added is important. The
-          // lateral view join operator depends on having the select operator
-          // give it the row first.
-
-          // Get the all path by making a select(*).
-          RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver();
-          // Operator allPath = op;
-          Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild(
-              new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()),
-              lvForward), allPathRR);
-          // Get the UDTF Path
-          QB blankQb = new QB(null, null, false);
-          Operator udtfPath = genSelectPlan((ASTNode) lateralViewTree
-              .getChild(0), blankQb, lvForward);
-          // add udtf aliases to QB
-          for (String udtfAlias : blankQb.getAliases()) {
-            qb.addAlias(udtfAlias);
-          }
-          RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRowResolver();
-
-          // Merge the two into the lateral view join
-          // The cols of the merged result will be the combination of both the
-          // cols of the UDTF path and the cols of the all path. The internal
-          // names have to be changed to avoid conflicts
-
-          RowResolver lateralViewRR = new RowResolver();
-          ArrayList<String> outputInternalColNames = new ArrayList<String>();
-
-          LVmergeRowResolvers(allPathRR, lateralViewRR, outputInternalColNames);
-          LVmergeRowResolvers(udtfPathRR, lateralViewRR, outputInternalColNames);
-
-          // For PPD, we need a column to expression map so that during the walk,
-          // the processor knows how to transform the internal col names.
-          // Following steps are dependant on the fact that we called
-          // LVmerge.. in the above order
-          Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-
-          int i = 0;
-          for (ColumnInfo c : allPathRR.getColumnInfos()) {
-            String internalName = getColumnInternalName(i);
-            i++;
-            colExprMap.put(internalName,
-                new ExprNodeColumnDesc(c.getType(), c.getInternalName(),
-                    c.getTabAlias(), c.getIsVirtualCol()));
-          }
-
-          Operator lateralViewJoin = putOpInsertMap(OperatorFactory
-              .getAndMakeChild(new LateralViewJoinDesc(outputInternalColNames),
-                  new RowSchema(lateralViewRR.getColumnInfos()), allPath,
-                  udtfPath), lateralViewRR);
-          lateralViewJoin.setColumnExprMap(colExprMap);
+          Operator lateralViewJoin = genLateralViewPlan(qb, op, lateralViewTree);
           op = lateralViewJoin;
         }
         e.setValue(op);
@@ -8107,6 +8061,85 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
+  private Operator genLateralViewPlanForDest(String dest, QB qb, Operator op)
+      throws SemanticException {
+    ASTNode lateralViewTree = qb.getParseInfo().getDestToLateralView().get(dest);
+    if (lateralViewTree != null) {
+      return genLateralViewPlan(qb, op, lateralViewTree);
+    }
+    return op;
+  }
+
+  private Operator genLateralViewPlan(QB qb, Operator op, ASTNode lateralViewTree)
+      throws SemanticException {
+    RowResolver lvForwardRR = new RowResolver();
+    RowResolver source = opParseCtx.get(op).getRowResolver();
+    for (ColumnInfo col : source.getColumnInfos()) {
+      if (col.getIsVirtualCol() && col.isHiddenVirtualCol()) {
+        continue;
+      }
+      String[] tabCol = source.reverseLookup(col.getInternalName());
+      lvForwardRR.put(tabCol[0], tabCol[1], col);
+    }
+
+    Operator lvForward = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        new LateralViewForwardDesc(), new RowSchema(lvForwardRR.getColumnInfos()),
+        op), lvForwardRR);
+
+    // The order in which the two paths are added is important. The
+    // lateral view join operator depends on having the select operator
+    // give it the row first.
+
+    // Get the all path by making a select(*).
+    RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver();
+    // Operator allPath = op;
+    Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()),
+        lvForward), allPathRR);
+    // Get the UDTF Path
+    QB blankQb = new QB(null, null, false);
+    Operator udtfPath = genSelectPlan((ASTNode) lateralViewTree
+        .getChild(0), blankQb, lvForward);
+    // add udtf aliases to QB
+    for (String udtfAlias : blankQb.getAliases()) {
+      qb.addAlias(udtfAlias);
+    }
+    RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRowResolver();
+
+    // Merge the two into the lateral view join
+    // The cols of the merged result will be the combination of both the
+    // cols of the UDTF path and the cols of the all path. The internal
+    // names have to be changed to avoid conflicts
+
+    RowResolver lateralViewRR = new RowResolver();
+    ArrayList<String> outputInternalColNames = new ArrayList<String>();
+
+    LVmergeRowResolvers(allPathRR, lateralViewRR, outputInternalColNames);
+    LVmergeRowResolvers(udtfPathRR, lateralViewRR, outputInternalColNames);
+
+    // For PPD, we need a column to expression map so that during the walk,
+    // the processor knows how to transform the internal col names.
+    // Following steps are dependant on the fact that we called
+    // LVmerge.. in the above order
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+
+    int i = 0;
+    for (ColumnInfo c : allPathRR.getColumnInfos()) {
+      String internalName = getColumnInternalName(i);
+      i++;
+      colExprMap.put(internalName,
+          new ExprNodeColumnDesc(c.getType(), c.getInternalName(),
+              c.getTabAlias(), c.getIsVirtualCol()));
+    }
+
+    Operator lateralViewJoin = putOpInsertMap(OperatorFactory
+        .getAndMakeChild(new LateralViewJoinDesc(outputInternalColNames),
+            new RowSchema(lateralViewRR.getColumnInfos()), allPath,
+            udtfPath), lateralViewRR);
+    lateralViewJoin.setColumnExprMap(colExprMap);
+    return lateralViewJoin;
+  }
+
   /**
    * A helper function that gets all the columns and respective aliases in the
    * source and puts them into dest. It renames the internal names of the
@@ -8702,7 +8735,7 @@ public class SemanticAnalyzer extends Ba
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("\n" + Operator.toString(pCtx.getTopOps().values()));
+      LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
     }
 
     Optimizer optm = new Optimizer();
@@ -8711,7 +8744,7 @@ public class SemanticAnalyzer extends Ba
     pCtx = optm.optimize();
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("\n" + Operator.toString(pCtx.getTopOps().values()));
+      LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
     }
 
     // Generate column access stats if required - wait until column pruning takes place
@@ -10486,6 +10519,7 @@ public class SemanticAnalyzer extends Ba
       Map<String, ExprNodeDesc> colExprMap,
       List<String> outputColumnNames,
       StringBuilder orderString,
+      RowResolver rsOpRR,
       RowResolver extractRR) throws SemanticException {
 
     ArrayList<PTFExpressionDef> partColList = tabDef.getPartition().getExpressions();
@@ -10515,16 +10549,12 @@ public class SemanticAnalyzer extends Ba
       orderCols.add(colDef.getExprNode());
     }
 
+    ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
     /*
-     * We add the column to value columns or output column names
-     * only if it is not a virtual column
+     * construct the ReduceSinkRR
      */
-    ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
-    LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
-        new LinkedHashMap<String[], ColumnInfo>();
     int pos = 0;
     for (ColumnInfo colInfo : colInfoList) {
-      if (!colInfo.isHiddenVirtualCol()) {
         ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
             .getInternalName(), colInfo.getTabAlias(), colInfo
             .getIsVirtualCol());
@@ -10534,32 +10564,45 @@ public class SemanticAnalyzer extends Ba
         outputColumnNames.add(outColName);
 
         String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
-        /*
-         * if we have already encountered this colInfo internalName.
-         * We encounter it again because it must be put for the Having clause.
-         * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
-         */
-        if ( colsAddedByHaving.containsKey(alias)) {
-          continue;
-        }
-        ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
-        ColumnInfo eColInfo = new ColumnInfo(
+        ColumnInfo newColInfo = new ColumnInfo(
             outColName, colInfo.getType(), alias[0],
             colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+        rsOpRR.put(alias[0], alias[1], newColInfo);
+    }
 
-        if ( astNode == null ) {
-          extractRR.put(alias[0], alias[1], eColInfo);
-        }
-        else {
-          /*
-           * in case having clause refers to this column may have been added twice;
-           * once with the ASTNode.toStringTree as the alias
-           * and then with the real alias.
-           */
-          extractRR.putExpression(astNode, eColInfo);
-          if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
-            colsAddedByHaving.put(alias, eColInfo);
-          }
+    /*
+     * construct the ExtractRR
+     */
+    LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+        new LinkedHashMap<String[], ColumnInfo>();
+    pos = 0;
+    for (ColumnInfo colInfo : colInfoList) {
+      String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+      /*
+       * if we have already encountered this colInfo internalName.
+       * We encounter it again because it must be put for the Having clause.
+       * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+       */
+      if ( colsAddedByHaving.containsKey(alias)) {
+        continue;
+      }
+      ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+      ColumnInfo eColInfo = new ColumnInfo(
+          SemanticAnalyzer.getColumnInternalName(pos++), colInfo.getType(), alias[0],
+          colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+      if ( astNode == null ) {
+        extractRR.put(alias[0], alias[1], eColInfo);
+      }
+      else {
+        /*
+         * in case having clause refers to this column may have been added twice;
+         * once with the ASTNode.toStringTree as the alias
+         * and then with the real alias.
+         */
+        extractRR.putExpression(astNode, eColInfo);
+        if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+          colsAddedByHaving.put(alias, eColInfo);
         }
       }
     }
@@ -10578,6 +10621,8 @@ public class SemanticAnalyzer extends Ba
      */
     RowResolver rr = opParseCtx.get(input).getRowResolver();
     PTFDesc ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+
+    RowResolver rsOpRR = new RowResolver();
     /*
      * Build an RR for the Extract Op from the ResuceSink Op's RR.
      * Why?
@@ -10647,13 +10692,14 @@ public class SemanticAnalyzer extends Ba
           colExprMap,
           outputColumnNames,
           orderString,
+          rsOpRR,
           extractOpRR);
 
       input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
           .getReduceSinkDesc(orderCols,
               valueCols, outputColumnNames, false,
               -1, partCols, orderString.toString(), -1),
-          new RowSchema(rr.getColumnInfos()), input), rr);
+          new RowSchema(rsOpRR.getColumnInfos()), input), rsOpRR);
       input.setColumnExprMap(colExprMap);
     }
 
@@ -10775,7 +10821,7 @@ public class SemanticAnalyzer extends Ba
         .getReduceSinkDesc(orderCols,
             valueCols, outputColumnNames, false,
             -1, partCols, orderString.toString(), -1),
-        new RowSchema(inputRR.getColumnInfos()), input), rsNewRR);
+        new RowSchema(rsNewRR.getColumnInfos()), input), rsNewRR);
     input.setColumnExprMap(colExprMap);
 
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java Wed Apr 17 07:29:38 2013
@@ -237,10 +237,24 @@ public class WindowingSpec {
     OrderSpec orderSpec = wdwSpec.getOrder();
     if ( wFrame == null ) {
       if (!supportsWindowing ) {
-        wFrame = new WindowFrameSpec(
-            new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
-            new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT)
-            );
+
+        if ( wFn.getName().toLowerCase().equals(FunctionRegistry.LAST_VALUE_FUNC_NAME)
+            && orderSpec != null ) {
+          /*
+           * last_value: when an Sort Key is specified, then last_value should return the
+           * last value among rows with the same Sort Key value.
+           */
+          wFrame = new WindowFrameSpec(
+              new CurrentRowSpec(),
+              new RangeBoundarySpec(Direction.FOLLOWING, 0)
+              );
+        }
+        else {
+          wFrame = new WindowFrameSpec(
+              new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
+              new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT)
+              );
+        }
       }
       else if ( orderSpec == null ) {
         wFrame = new WindowFrameSpec(

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java Wed Apr 17 07:29:38 2013
@@ -187,9 +187,10 @@ public class ExprNodeDescUtils {
     return result;
   }
 
-  private static ExprNodeDesc backtrack(ExprNodeDesc source, Operator<?> current,
+  public static ExprNodeDesc backtrack(ExprNodeDesc source, Operator<?> current,
       Operator<?> terminal) throws SemanticException {
-    if (current == null || current == terminal) {
+    Operator<?> parent = getSingleParent(current, terminal);
+    if (parent == null) {
       return source;
     }
     if (source instanceof ExprNodeGenericFuncDesc) {
@@ -200,7 +201,7 @@ public class ExprNodeDescUtils {
     }
     if (source instanceof ExprNodeColumnDesc) {
       ExprNodeColumnDesc column = (ExprNodeColumnDesc) source;
-      return backtrack(column, current, terminal);
+      return backtrack(column, parent, terminal);
     }
     if (source instanceof ExprNodeFieldDesc) {
       // field epression should be resolved
@@ -215,20 +216,19 @@ public class ExprNodeDescUtils {
   // Resolve column expression to input expression by using expression mapping in current operator
   private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator<?> current,
       Operator<?> terminal) throws SemanticException {
-    if (current == null || current == terminal) {
-      return column;
-    }
-    Operator<?> parent = getSingleParent(current, terminal);
     Map<String, ExprNodeDesc> mapping = current.getColumnExprMap();
     if (mapping == null || !mapping.containsKey(column.getColumn())) {
-      return backtrack(column, parent, terminal);  // forward
+      return backtrack((ExprNodeDesc)column, current, terminal);
     }
     ExprNodeDesc mapped = mapping.get(column.getColumn());
-    return backtrack(mapped, parent, terminal);    // forward with resolved expr
+    return backtrack(mapped, current, terminal);
   }
 
   public static Operator<?> getSingleParent(Operator<?> current, Operator<?> terminal)
       throws SemanticException {
+    if (current == terminal) {
+      return null;
+    }
     List<Operator<?>> parents = current.getParentOperators();
     if (parents == null || parents.isEmpty()) {
       if (terminal != null) {
@@ -236,9 +236,12 @@ public class ExprNodeDescUtils {
       }
       return null;
     }
-    if (current.getParentOperators().size() > 1) {
-      throw new SemanticException("Met multiple parent operators");
+    if (parents.size() == 1) {
+      return parents.get(0);
+    }
+    if (terminal != null && parents.contains(terminal)) {
+      return terminal;
     }
-    return parents.get(0);
+    throw new SemanticException("Met multiple parent operators");
   }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java Wed Apr 17 07:29:38 2013
@@ -81,6 +81,10 @@ public class JoinDesc extends AbstractOp
   protected Byte[] tagOrder;
   private TableDesc keyTableDesc;
 
+  // this operator cannot be converted to mapjoin cause output is expected to be sorted on join key
+  // it's resulted from RS-dedup optimization, which removes following RS under some condition
+  private boolean fixedAsSorted;
+
   public JoinDesc() {
   }
 
@@ -525,4 +529,12 @@ public class JoinDesc extends AbstractOp
     }
     return result;
   }
+
+  public boolean isFixedAsSorted() {
+    return fixedAsSorted;
+  }
+
+  public void setFixedAsSorted(boolean fixedAsSorted) {
+    this.fixedAsSorted = fixedAsSorted;
+  }
 }