You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/06/05 20:28:15 UTC

svn commit: r1600719 [2/40] - in /hive/trunk: contrib/src/test/results/clientpositive/ hbase-handler/src/test/results/positive/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/h...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Thu Jun  5 18:28:07 2014
@@ -75,7 +75,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -363,7 +362,7 @@ public class MapJoinProcessor implements
 
     // create the map-join operator
     MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
-        op, joinTree, mapJoinPos, noCheckOuterJoin);
+        op, joinTree, mapJoinPos, noCheckOuterJoin, false);
 
 
     // remove old parents
@@ -387,8 +386,8 @@ public class MapJoinProcessor implements
 
   public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
       LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
-      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
-      throws SemanticException {
+      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
+      boolean tezJoin) throws SemanticException {
 
     JoinDesc desc = op.getConf();
     JoinCondDesc[] condns = desc.getConds();
@@ -401,19 +400,17 @@ public class MapJoinProcessor implements
       }
     }
 
-    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-
     // Walk over all the sources (which are guaranteed to be reduce sink
     // operators).
     // The join outputs a concatenation of all the inputs.
     QBJoinTree leftSrc = joinTree.getJoinSrc();
-    List<Operator<? extends OperatorDesc>> oldReduceSinkParentOps =
-        new ArrayList<Operator<? extends OperatorDesc>>();
+    List<ReduceSinkOperator> oldReduceSinkParentOps =
+        new ArrayList<ReduceSinkOperator>(op.getNumParent());
     if (leftSrc != null) {
       // assert mapJoinPos == 0;
       Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
       assert parentOp.getParentOperators().size() == 1;
-      oldReduceSinkParentOps.add(parentOp);
+      oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
     }
 
 
@@ -422,25 +419,11 @@ public class MapJoinProcessor implements
       if (src != null) {
         Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
         assert parentOp.getParentOperators().size() == 1;
-        oldReduceSinkParentOps.add(parentOp);
+        oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
       }
       pos++;
     }
 
-    // get the join keys from old parent ReduceSink operators
-    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
-      ReduceSinkOperator parent = (ReduceSinkOperator) oldReduceSinkParentOps.get(pos);
-      ReduceSinkDesc rsconf = parent.getConf();
-      List<ExprNodeDesc> keys = rsconf.getKeyCols();
-      keyExprMap.put(pos, keys);
-    }
-
-    List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
-    StringBuilder keyOrder = new StringBuilder();
-    for (int i = 0; i < keyCols.size(); i++) {
-      keyOrder.append("+");
-    }
-
     Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
     List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
     Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
@@ -466,37 +449,57 @@ public class MapJoinProcessor implements
       }
     }
 
+    // rewrite value index for mapjoin
+    Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
+
+    // get the join keys from old parent ReduceSink operators
+    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
     // construct valueTableDescs and valueFilteredTableDescs
     List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
-    List<TableDesc> valueFiltedTableDescs = new ArrayList<TableDesc>();
+    List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
     int[][] filterMap = desc.getFilterMap();
     for (pos = 0; pos < op.getParentOperators().size(); pos++) {
-      List<ExprNodeDesc> valueCols = newValueExprs.get(Byte.valueOf((byte) pos));
-      int length = valueCols.size();
-      List<ExprNodeDesc> valueFilteredCols = new ArrayList<ExprNodeDesc>(length);
-      // deep copy expr node desc
-      for (int i = 0; i < length; i++) {
-        valueFilteredCols.add(valueCols.get(i).clone());
+      ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
+      List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
+      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+      // remove values referencing key
+      // todo: currently, mr-mapjoin stores whole value exprs of join operator, which may contain key
+      if (tezJoin && pos != mapJoinPos) {
+        int[] valueIndex = new int[valueCols.size()];
+        List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
+        for (int i = 0; i < valueIndex.length; i++) {
+          ExprNodeDesc expr = valueCols.get(i);
+          int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
+          if (kindex >= 0) {
+            valueIndex[i] = kindex;
+          } else {
+            valueIndex[i] = -valueColsInValueExpr.size() - 1;
+            valueColsInValueExpr.add(expr);
+          }
+        }
+        if (needValueIndex(valueIndex)) {
+          valueIndices.put(pos, valueIndex);
+        }
+        valueCols = valueColsInValueExpr;
       }
+      // deep copy expr node desc
+      List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
       if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
         ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
             .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false);
         valueFilteredCols.add(isFilterDesc);
       }
 
-
-      keyOrder = new StringBuilder();
-      for (int i = 0; i < valueCols.size(); i++) {
-        keyOrder.append("+");
-      }
-
       TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
           .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
       TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
           .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
 
       valueTableDescs.add(valueTableDesc);
-      valueFiltedTableDescs.add(valueFilteredTableDesc);
+      valueFilteredTableDescs.add(valueFilteredTableDesc);
+
+      keyExprMap.put(pos, keyCols);
     }
 
     Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
@@ -505,7 +508,7 @@ public class MapJoinProcessor implements
       byte srcTag = entry.getKey();
       List<ExprNodeDesc> filter = entry.getValue();
 
-      Operator<?> terminal = op.getParentOperators().get(srcTag);
+      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
       newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
     }
     desc.setFilters(filters = newFilters);
@@ -521,17 +524,22 @@ public class MapJoinProcessor implements
       dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
     }
 
+    List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos);
+
     List<String> outputColumnNames = op.getConf().getOutputColumnNames();
     TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
         PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
     JoinCondDesc[] joinCondns = op.getConf().getConds();
     MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
-        valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns,
+        valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns,
         filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
     mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
     mapJoinDescriptor.setTagOrder(tagOrder);
     mapJoinDescriptor.setNullSafes(desc.getNullSafes());
     mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+    if (!valueIndices.isEmpty()) {
+      mapJoinDescriptor.setValueIndices(valueIndices);
+    }
 
     // reduce sink row resolver used to generate map join op
     RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -558,6 +566,15 @@ public class MapJoinProcessor implements
 
   }
 
+  private static boolean needValueIndex(int[] valueIndex) {
+    for (int i = 0; i < valueIndex.length; i++) {
+      if (valueIndex[i] != -i - 1) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * convert a sortmerge join to a a map-side join.
    *

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java Thu Jun  5 18:28:07 2014
@@ -36,7 +36,6 @@ import org.apache.hadoop.fs.ContentSumma
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -62,6 +61,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.physical.CommonJoinTaskDispatcher;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -341,7 +341,6 @@ public class CorrelationOptimizer implem
         IntraQueryCorrelation correlation) throws SemanticException {
 
       LOG.info("now detecting operator " + current.getIdentifier() + " " + current.getName());
-
       LinkedHashSet<ReduceSinkOperator> correlatedReduceSinkOperators =
           new LinkedHashSet<ReduceSinkOperator>();
       if (skipedJoinOperators.contains(current)) {
@@ -387,18 +386,18 @@ public class CorrelationOptimizer implem
             ExprNodeDescUtils.backtrack(childKeyCols, child, current);
         List<ExprNodeDesc> backtrackedPartitionCols =
             ExprNodeDescUtils.backtrack(childPartitionCols, child, current);
+
+        OpParseContext opCtx = pCtx.getOpParseCtx().get(current);
+        RowResolver rowResolver = opCtx.getRowResolver();
         Set<String> tableNeedToCheck = new HashSet<String>();
         for (ExprNodeDesc expr: childKeyCols) {
           if (!(expr instanceof ExprNodeColumnDesc)) {
             return correlatedReduceSinkOperators;
-          } else {
-            String colName = ((ExprNodeColumnDesc)expr).getColumn();
-            OpParseContext opCtx = pCtx.getOpParseCtx().get(current);
-            for (ColumnInfo cinfo : opCtx.getRowResolver().getColumnInfos()) {
-              if (colName.equals(cinfo.getInternalName())) {
-                tableNeedToCheck.add(cinfo.getTabAlias());
-              }
-            }
+          }
+          String colName = ((ExprNodeColumnDesc)expr).getColumn();
+          String[] nm = rowResolver.reverseLookup(colName);
+          if (nm != null) {
+            tableNeedToCheck.add(nm[0]);
           }
         }
         if (current instanceof JoinOperator) {
@@ -576,7 +575,6 @@ public class CorrelationOptimizer implem
         Object... nodeOutputs) throws SemanticException {
       CorrelationNodeProcCtx corrCtx = (CorrelationNodeProcCtx) ctx;
       ReduceSinkOperator op = (ReduceSinkOperator) nd;
-
       // Check if we have visited this operator
       if (corrCtx.isWalked(op)) {
         return null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java Thu Jun  5 18:28:07 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Gr
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -163,7 +164,7 @@ public class ReduceSinkDeDuplication imp
         }
         return false;
       }
-      if (child instanceof ExtractOperator) {
+      if (child instanceof ExtractOperator || child instanceof SelectOperator) {
         return process(cRS, dedupCtx);
       }
       return false;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java Thu Jun  5 18:28:07 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimi
 
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
@@ -69,4 +70,8 @@ public class ExprProcCtx implements Node
   public Operator<? extends OperatorDesc> getInputOperator() {
     return inpOp;
   }
+
+  public RowResolver getResolver() {
+    return lctx.getParseCtx().getOpParseCtx().get(inpOp).getRowResolver();
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java Thu Jun  5 18:28:07 2014
@@ -28,6 +28,8 @@ import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -70,20 +73,19 @@ public class ExprProcFactory {
 
       // assert that the input operator is not null as there are no
       // exprs associated with table scans.
-      assert (epc.getInputOperator() != null);
+      Operator<? extends OperatorDesc> operator = epc.getInputOperator();
+      assert (operator != null);
 
-      ColumnInfo inp_ci = null;
-      for (ColumnInfo tmp_ci : epc.getInputOperator().getSchema()
-          .getSignature()) {
-        if (tmp_ci.getInternalName().equals(cd.getColumn())) {
-          inp_ci = tmp_ci;
-          break;
-        }
+      RowResolver resolver = epc.getResolver();
+      String[] nm = resolver.reverseLookup(cd.getColumn());
+      if (nm == null && operator instanceof ReduceSinkOperator) {
+        nm = resolver.reverseLookup(Utilities.removeValueTag(cd.getColumn()));
       }
+      ColumnInfo ci = nm != null ? resolver.get(nm[0], nm[1]): null;
 
       // Insert the dependencies of inp_ci to that of the current operator, ci
       LineageCtx lc = epc.getLineageCtx();
-      Dependency dep = lc.getIndex().getDependency(epc.getInputOperator(), inp_ci);
+      Dependency dep = lc.getIndex().getDependency(operator, ci);
 
       return dep;
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java Thu Jun  5 18:28:07 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
 import org.apache.hadoop.hive.ql.exec.ForwardOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Re
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
@@ -52,6 +54,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.Utils;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -213,8 +216,8 @@ public class OpProcFactory {
 
         // Otherwise look up the expression corresponding to this ci
         ExprNodeDesc expr = exprs.get(cnt++);
-        lCtx.getIndex().mergeDependency(op, ci,
-            ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+        Dependency dependency = ExprProcFactory.getExprDependency(lCtx, inpOp, expr);
+        lCtx.getIndex().mergeDependency(op, ci, dependency);
       }
 
       return null;
@@ -438,7 +441,6 @@ public class OpProcFactory {
       LineageCtx lCtx = (LineageCtx) procCtx;
       ReduceSinkOperator rop = (ReduceSinkOperator)nd;
 
-      ArrayList<ColumnInfo> col_infos = rop.getSchema().getSignature();
       Operator<? extends OperatorDesc> inpOp = getParent(stack);
       int cnt = 0;
 
@@ -450,15 +452,49 @@ public class OpProcFactory {
       }
 
       if (op instanceof GroupByOperator) {
+        ArrayList<ColumnInfo> col_infos = rop.getSchema().getSignature();
         for(ExprNodeDesc expr : rop.getConf().getKeyCols()) {
           lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
               ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
         }
-      }
-
-      for(ExprNodeDesc expr : rop.getConf().getValueCols()) {
-        lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
-            ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+        for(ExprNodeDesc expr : rop.getConf().getValueCols()) {
+          lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
+              ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+        }
+      } else if (op instanceof ExtractOperator) {
+        ArrayList<ColumnInfo> col_infos = rop.getSchema().getSignature();
+        for(ExprNodeDesc expr : rop.getConf().getValueCols()) {
+          lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
+              ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+        }
+      } else {
+        RowResolver resolver = lCtx.getParseCtx().getOpParseCtx().get(rop).getRowResolver();
+        ReduceSinkDesc desc = rop.getConf();
+        List<ExprNodeDesc> keyCols = desc.getKeyCols();
+        ArrayList<String> keyColNames = desc.getOutputKeyColumnNames();
+        for (int i = 0; i < keyCols.size(); i++) {
+          // order-bys, joins
+          String[] nm = resolver.reverseLookup(Utilities.ReduceField.KEY + "." + keyColNames.get(i));
+          if (nm == null) {
+            continue;   // key in values
+          }
+          ColumnInfo column = resolver.get(nm[0], nm[1]);
+          lCtx.getIndex().putDependency(rop, column,
+              ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i)));
+        }
+        List<ExprNodeDesc> valCols = desc.getValueCols();
+        ArrayList<String> valColNames = desc.getOutputValueColumnNames();
+        for (int i = 0; i < valCols.size(); i++) {
+          // todo: currently, bucketing,etc. makes RS differently with those for order-bys or joins
+          String[] nm = resolver.reverseLookup(valColNames.get(i));
+          if (nm == null) {
+            // order-bys, joins
+            nm = resolver.reverseLookup(Utilities.ReduceField.VALUE + "." + valColNames.get(i));
+          }
+          ColumnInfo column = resolver.get(nm[0], nm[1]);
+          lCtx.getIndex().putDependency(rop, column,
+              ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i)));
+        }
       }
 
       return null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java Thu Jun  5 18:28:07 2014
@@ -20,7 +20,9 @@ package org.apache.hadoop.hive.ql.optimi
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -42,8 +44,11 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
 
 /**
  * Operator factory for the rule processors for inferring bucketing/sorting columns.
@@ -129,8 +134,11 @@ public class BucketingSortingOpProcFacto
 
       BucketingSortingCtx bctx = (BucketingSortingCtx)procCtx;
       JoinOperator jop = (JoinOperator)nd;
-      List<ColumnInfo> colInfos = jop.getSchema().getSignature();
-      Byte[] order = jop.getConf().getTagOrder();
+      JoinDesc joinDesc = jop.getConf();
+
+      Byte[] order = joinDesc.getTagOrder();
+      Map<Byte, List<ExprNodeDesc>> expressions = joinDesc.getExprs();
+      List<String> outputValNames = joinDesc.getOutputColumnNames();
 
       BucketCol[] newBucketCols = null;
       SortCol[] newSortCols = null;
@@ -143,63 +151,55 @@ public class BucketingSortingOpProcFacto
         assert(parent instanceof ReduceSinkOperator);
 
         ReduceSinkOperator rop = (ReduceSinkOperator)jop.getParentOperators().get(i);
+        ReduceSinkDesc rsDesc = rop.getConf();
 
-        String sortOrder = rop.getConf().getOrder();
-        List<BucketCol> bucketCols = new ArrayList<BucketCol>();
-        List<SortCol> sortCols = new ArrayList<SortCol>();
-        // Go through the Reduce keys and find the matching column(s) in the reduce values
-        for (int keyIndex = 0; keyIndex < rop.getConf().getKeyCols().size(); keyIndex++) {
-          for (int valueIndex = 0; valueIndex < rop.getConf().getValueCols().size();
-              valueIndex++) {
-
-            if (new ExprNodeDescEqualityWrapper(rop.getConf().getValueCols().get(valueIndex)).
-                equals(new ExprNodeDescEqualityWrapper(rop.getConf().getKeyCols().get(
-                    keyIndex)))) {
-
-              String colName = rop.getSchema().getSignature().get(valueIndex).getInternalName();
-              bucketCols.add(new BucketCol(colName, keyIndex));
-              sortCols.add(new SortCol(colName, keyIndex, sortOrder.charAt(keyIndex)));
-              break;
-            }
+        byte tag = (byte) rsDesc.getTag();
+        List<ExprNodeDesc> joinValues = expressions.get(tag);
+
+        // Columns are output from the join from the different reduce sinks in the order of their
+        // offsets
+        int offset = 0;
+        for (byte orderIndex = 0; orderIndex < order.length; orderIndex++) {
+          if (order[orderIndex] < order[tag]) {
+            offset += expressions.get(orderIndex).size();
           }
         }
 
-        if (bucketCols.isEmpty()) {
-          assert(sortCols.isEmpty());
-          continue;
-        }
+        String sortOrder = rsDesc.getOrder();
+        List<ExprNodeDesc> keyCols = rsDesc.getKeyCols();
+        List<ExprNodeDesc> valCols = ExprNodeDescUtils.backtrack(joinValues, jop, parent);
 
         if (newBucketCols == null) {
-          assert(newSortCols == null);
-          // The number of join keys is equal to the number of keys in every reducer, although
-          // not every key may map to a value in the reducer
-          newBucketCols = new BucketCol[rop.getConf().getKeyCols().size()];
-          newSortCols = new SortCol[rop.getConf().getKeyCols().size()];
-        } else {
-          assert(newSortCols != null);
+          newBucketCols = new BucketCol[keyCols.size()];
+          newSortCols = new SortCol[keyCols.size()];
         }
 
-        byte tag = (byte)rop.getConf().getTag();
-        List<ExprNodeDesc> exprs = jop.getConf().getExprs().get(tag);
-
-        int colInfosOffset = 0;
-        int orderValue = order[tag];
-        // Columns are output from the join from the different reduce sinks in the order of their
-        // offsets
-        for (byte orderIndex = 0; orderIndex < order.length; orderIndex++) {
-          if (order[orderIndex] < orderValue) {
-            colInfosOffset += jop.getConf().getExprs().get(orderIndex).size();
+        // Go through the Reduce keys and find the matching column(s) in the reduce values
+        for (int keyIndex = 0 ; keyIndex < keyCols.size(); keyIndex++) {
+          ExprNodeDesc key = keyCols.get(keyIndex);
+          int index = ExprNodeDescUtils.indexOf(key, valCols);
+          if (index >= 0) {
+            int vindex = offset + index;
+            String vname = outputValNames.get(vindex);
+            if (newBucketCols[keyIndex] != null) {
+              newBucketCols[keyIndex].addAlias(vname, vindex);
+              newSortCols[keyIndex].addAlias(vname, vindex);
+            } else {
+              newBucketCols[keyIndex] = new BucketCol(vname, vindex);
+              newSortCols[keyIndex] = new SortCol(vname, vindex, sortOrder.charAt(keyIndex));
+            }
           }
         }
-
-        findBucketingSortingColumns(exprs, colInfos, bucketCols, sortCols, newBucketCols,
-            newSortCols, colInfosOffset);
-
       }
 
-      setBucketingColsIfComplete(bctx, jop, newBucketCols);
-
-      setSortingColsIfComplete(bctx, jop, newSortCols);
+      List<BucketCol> bucketCols = Arrays.asList(newBucketCols);
+      if (!bucketCols.contains(null)) {
+        bctx.setBucketedCols(jop, bucketCols);
+      }
+      List<SortCol> sortCols = Arrays.asList(newSortCols);
+      if (!sortCols.contains(null)) {
+        bctx.setSortedCols(jop, sortCols);
+      }
 
       return null;
     }
@@ -331,6 +331,12 @@ public class BucketingSortingOpProcFacto
       BucketingSortingCtx bctx = (BucketingSortingCtx)procCtx;
       SelectOperator sop = (SelectOperator)nd;
 
+      if (sop.getNumParent() == 1 &&
+          sop.getParentOperators().get(0) instanceof ReduceSinkOperator) {
+        ReduceSinkOperator rs = (ReduceSinkOperator) sop.getParentOperators().get(0);
+        extractTraits(bctx, rs, sop);
+        return null;
+      }
       Operator<? extends OperatorDesc> parent = getParent(stack);
 
       // if this is a selStarNoCompute then this select operator
@@ -506,71 +512,83 @@ public class BucketingSortingOpProcFacto
       Operator<? extends OperatorDesc> parent = exop.getParentOperators().get(0);
 
       // The caller of this method should guarantee this
-      assert(parent instanceof ReduceSinkOperator);
+      if (parent instanceof ReduceSinkOperator) {
+        extractTraits(bctx, (ReduceSinkOperator)parent, exop);
+      }
 
-      ReduceSinkOperator rop = (ReduceSinkOperator)parent;
+      return null;
+    }
+  }
 
-      // Go through the set of partition columns, and find their representatives in the values
-      // These represent the bucketed columns
-      List<BucketCol> bucketCols = new ArrayList<BucketCol>();
-      for (int i = 0; i < rop.getConf().getPartitionCols().size(); i++) {
-        boolean valueColFound = false;
-        for (int j = 0; j < rop.getConf().getValueCols().size(); j++) {
-          if (new ExprNodeDescEqualityWrapper(rop.getConf().getValueCols().get(j)).equals(
-              new ExprNodeDescEqualityWrapper(rop.getConf().getPartitionCols().get(i)))) {
-
-            bucketCols.add(new BucketCol(
-                rop.getSchema().getSignature().get(j).getInternalName(), j));
-            valueColFound = true;
-            break;
-          }
-        }
+  static void extractTraits(BucketingSortingCtx bctx, ReduceSinkOperator rop, Operator<?> exop)
+      throws SemanticException {
 
-        // If the partition columns can't all be found in the values then the data is not bucketed
-        if (!valueColFound) {
-          bucketCols.clear();
-          break;
-        }
-      }
+    List<ExprNodeDesc> outputValues = Collections.emptyList();
+    if (exop instanceof ExtractOperator) {
+      outputValues = rop.getConf().getValueCols();
+    } else if (exop instanceof SelectOperator) {
+      SelectDesc select = ((SelectOperator)exop).getConf();
+      outputValues = ExprNodeDescUtils.backtrack(select.getColList(), exop, rop);
+    }
+    if (outputValues.isEmpty()) {
+      return;
+    }
 
-      // Go through the set of key columns, and find their representatives in the values
-      // These represent the sorted columns
-      String sortOrder = rop.getConf().getOrder();
-      List<SortCol> sortCols = new ArrayList<SortCol>();
-      for (int i = 0; i < rop.getConf().getKeyCols().size(); i++) {
-        boolean valueColFound = false;
-        for (int j = 0; j < rop.getConf().getValueCols().size(); j++) {
-          if (new ExprNodeDescEqualityWrapper(rop.getConf().getValueCols().get(j)).equals(
-              new ExprNodeDescEqualityWrapper(rop.getConf().getKeyCols().get(i)))) {
-
-            sortCols.add(new SortCol(
-                rop.getSchema().getSignature().get(j).getInternalName(), j, sortOrder.charAt(i)));
-            valueColFound = true;
-            break;
-          }
-        }
+    // Go through the set of partition columns, and find their representatives in the values
+    // These represent the bucketed columns
+    List<BucketCol> bucketCols = extractBucketCols(rop, outputValues);
 
-        // If the sorted columns can't all be found in the values then the data is only sorted on
-        // the columns seen up until now
-        if (!valueColFound) {
-          break;
-        }
-      }
+    // Go through the set of key columns, and find their representatives in the values
+    // These represent the sorted columns
+    List<SortCol> sortCols = extractSortCols(rop, outputValues);
 
-      List<ColumnInfo> colInfos = exop.getSchema().getSignature();
+    List<ColumnInfo> colInfos = exop.getSchema().getSignature();
 
-      if (!bucketCols.isEmpty()) {
-        List<BucketCol> newBucketCols = getNewBucketCols(bucketCols, colInfos);
-        bctx.setBucketedCols(exop, newBucketCols);
-      }
+    if (!bucketCols.isEmpty()) {
+      List<BucketCol> newBucketCols = getNewBucketCols(bucketCols, colInfos);
+      bctx.setBucketedCols(exop, newBucketCols);
+    }
 
-      if (!sortCols.isEmpty()) {
-        List<SortCol> newSortCols = getNewSortCols(sortCols, colInfos);
-        bctx.setSortedCols(exop, newSortCols);
+    if (!sortCols.isEmpty()) {
+      List<SortCol> newSortCols = getNewSortCols(sortCols, colInfos);
+      bctx.setSortedCols(exop, newSortCols);
+    }
+  }
+
+  static List<BucketCol> extractBucketCols(ReduceSinkOperator rop, List<ExprNodeDesc> outputValues) {
+    List<BucketCol> bucketCols = new ArrayList<BucketCol>();
+    for (ExprNodeDesc partitionCol : rop.getConf().getPartitionCols()) {
+      if (!(partitionCol instanceof ExprNodeColumnDesc)) {
+        return Collections.emptyList();
       }
+      int index = ExprNodeDescUtils.indexOf(partitionCol, outputValues);
+      if (index < 0) {
+        return Collections.emptyList();
+      }
+      bucketCols.add(new BucketCol(((ExprNodeColumnDesc) partitionCol).getColumn(), index));
+    }
+    // If the partition columns can't all be found in the values then the data is not bucketed
+    return bucketCols;
+  }
 
-      return null;
+  static List<SortCol> extractSortCols(ReduceSinkOperator rop, List<ExprNodeDesc> outputValues) {
+    String sortOrder = rop.getConf().getOrder();
+    List<SortCol> sortCols = new ArrayList<SortCol>();
+    ArrayList<ExprNodeDesc> keyCols = rop.getConf().getKeyCols();
+    for (int i = 0; i < keyCols.size(); i++) {
+      ExprNodeDesc keyCol = keyCols.get(i);
+      if (!(keyCol instanceof ExprNodeColumnDesc)) {
+        break;
+      }
+      int index = ExprNodeDescUtils.indexOf(keyCol, outputValues);
+      if (index < 0) {
+        break;
+      }
+      sortCols.add(new SortCol(((ExprNodeColumnDesc) keyCol).getColumn(), index, sortOrder.charAt(i)));
     }
+    // If the sorted columns can't all be found in the values then the data is only sorted on
+    // the columns seen up until now
+    return sortCols;
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Thu Jun  5 18:28:07 2014
@@ -97,24 +97,23 @@ public class RowResolver implements Seri
   }
 
   public void put(String tab_alias, String col_alias, ColumnInfo colInfo) {
+    if (!addMappingOnly(tab_alias, col_alias, colInfo)) {
+    	rowSchema.getSignature().add(colInfo);
+    }
+  }
+
+  public boolean addMappingOnly(String tab_alias, String col_alias, ColumnInfo colInfo) {
     if (tab_alias != null) {
       tab_alias = tab_alias.toLowerCase();
     }
     col_alias = col_alias.toLowerCase();
-    if (rowSchema.getSignature() == null) {
-      rowSchema.setSignature(new ArrayList<ColumnInfo>());
-    }
-    
+
     /*
      * allow multiple mappings to the same ColumnInfo.
-     * When a ColumnInfo is mapped multiple times, only the 
+     * When a ColumnInfo is mapped multiple times, only the
      * first inverse mapping is captured.
      */
     boolean colPresent = invRslvMap.containsKey(colInfo.getInternalName());
-    
-    if ( !colPresent ) {
-    	rowSchema.getSignature().add(colInfo);
-    }
 
     LinkedHashMap<String, ColumnInfo> f_map = rslvMap.get(tab_alias);
     if (f_map == null) {
@@ -127,10 +126,12 @@ public class RowResolver implements Seri
     qualifiedAlias[0] = tab_alias;
     qualifiedAlias[1] = col_alias;
     if ( !colPresent ) {
-	    invRslvMap.put(colInfo.getInternalName(), qualifiedAlias);
+      invRslvMap.put(colInfo.getInternalName(), qualifiedAlias);
     } else {
       altInvRslvMap.put(colInfo.getInternalName(), qualifiedAlias);
     }
+
+    return colPresent;
   }
 
   public boolean hasTableAlias(String tab_alias) {
@@ -350,18 +351,4 @@ public class RowResolver implements Seri
     this.expressionMap = expressionMap;
   }
 
-  public String[] toColumnDesc() {
-    StringBuilder cols = new StringBuilder();
-    StringBuilder colTypes = new StringBuilder();
-
-    for (ColumnInfo colInfo : getColumnInfos()) {
-      if (cols.length() > 0) {
-        cols.append(',');
-        colTypes.append(':');
-      }
-      cols.append(colInfo.getInternalName());
-      colTypes.append(colInfo.getType().getTypeName());
-    }
-    return new String[] {cols.toString(), colTypes.toString()};
-  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Jun  5 18:28:07 2014
@@ -2708,7 +2708,7 @@ public class SemanticAnalyzer extends Ba
             fetchFilesNotInLocalFilesystem(stripQuotes(trfm.getChild(execPos).getText())),
             inInfo, inRecordWriter, outInfo, outRecordReader, errRecordReader, errInfo),
         new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch);
-    output.setColumnExprMap(Collections.emptyMap());  // disable backtracking
+    output.setColumnExprMap(new HashMap<String, ExprNodeDesc>());  // disable backtracking
 
     return output;
   }
@@ -3117,15 +3117,18 @@ public class SemanticAnalyzer extends Ba
       startPosn = 0;
     }
 
+    Set<String> colAliases = new HashSet<String>();
+    ASTNode[] exprs = new ASTNode[exprList.getChildCount()];
+    String[][] aliases = new String[exprList.getChildCount()][];
+    boolean[] hasAsClauses = new boolean[exprList.getChildCount()];
     // Iterate over all expression (either after SELECT, or in SELECT TRANSFORM)
     for (int i = startPosn; i < exprList.getChildCount(); ++i) {
 
       // child can be EXPR AS ALIAS, or EXPR.
       ASTNode child = (ASTNode) exprList.getChild(i);
       boolean hasAsClause = (!isInTransform) && (child.getChildCount() == 2);
-      boolean isWindowSpec = child.getChildCount() == 3 ?
-          (child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC) :
-            false;
+      boolean isWindowSpec = child.getChildCount() == 3 &&
+          child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC;
 
       // EXPR AS (ALIAS,...) parses, but is only allowed for UDTF's
       // This check is not needed and invalid when there is a transform b/c the
@@ -3156,8 +3159,20 @@ public class SemanticAnalyzer extends Ba
           unparseTranslator.addIdentifierTranslation((ASTNode) child
               .getChild(1));
         }
-
       }
+      exprs[i] = expr;
+      aliases[i] = new String[] {tabAlias, colAlias};
+      hasAsClauses[i] = hasAsClause;
+      colAliases.add(colAlias);
+    }
+
+    // Iterate over all expression (either after SELECT, or in SELECT TRANSFORM)
+    for (int i = startPosn; i < exprList.getChildCount(); ++i) {
+      // The real expression
+      ASTNode expr = exprs[i];
+      String tabAlias = aliases[i][0];
+      String colAlias = aliases[i][1];
+      boolean hasAsClause = hasAsClauses[i];
 
       if (expr.getType() == HiveParser.TOK_ALLCOLREF) {
         pos = genColListRegex(".*", expr.getChildCount() == 0 ? null
@@ -3193,7 +3208,8 @@ public class SemanticAnalyzer extends Ba
         tcCtx.setAllowDistinctFunctions(false);
         ExprNodeDesc exp = genExprNodeDesc(expr, inputRR, tcCtx);
         String recommended = recommendName(exp, colAlias);
-        if (recommended != null && out_rwsch.get(null, recommended) == null) {
+        if (recommended != null && !colAliases.contains(recommended) &&
+            out_rwsch.get(null, recommended) == null) {
           colAlias = recommended;
         }
         col_list.add(exp);
@@ -4726,7 +4742,7 @@ public class SemanticAnalyzer extends Ba
    *           distinct key in hope of getting a uniform distribution, and
    *           compute partial aggregates by the grouping key. Evaluate partial
    *           aggregates first, and spray by the grouping key to compute actual
-   *           aggregates in the second phase. The agggregation evaluation
+   *           aggregates in the second phase. The aggregation evaluation
    *           functions are as follows: Partitioning Key: distinct key
    *
    *           Sorting Key: distinct key
@@ -4794,7 +4810,7 @@ public class SemanticAnalyzer extends Ba
    *           compute partial aggregates grouped by the reduction key (grouping
    *           key + distinct key). Evaluate partial aggregates first, and spray
    *           by the grouping key to compute actual aggregates in the second
-   *           phase. The agggregation evaluation functions are as follows:
+   *           phase. The aggregation evaluation functions are as follows:
    *           Partitioning Key: random() if no DISTINCT grouping + distinct key
    *           if DISTINCT
    *
@@ -4968,7 +4984,7 @@ public class SemanticAnalyzer extends Ba
    * spray by the group by key, and sort by the distinct key (if any), and
    * compute aggregates based on actual aggregates
    *
-   * The agggregation evaluation functions are as follows:
+   * The aggregation evaluation functions are as follows:
    *
    * No grouping sets:
    * Group By Operator:
@@ -5135,7 +5151,7 @@ public class SemanticAnalyzer extends Ba
    * key). Evaluate partial aggregates first, and spray by the grouping key to
    * compute actual aggregates in the second phase.
    *
-   * The agggregation evaluation functions are as follows:
+   * The aggregation evaluation functions are as follows:
    *
    * No grouping sets:
    * STAGE 1
@@ -5157,7 +5173,7 @@ public class SemanticAnalyzer extends Ba
    * Sorting Key: grouping key
    * Reducer: merge/terminate (mode = FINAL)
    *
-   * In the presence of grouping sets, the agggregation evaluation functions are as follows:
+   * In the presence of grouping sets, the aggregation evaluation functions are as follows:
    * STAGE 1
    * Group by Operator:
    * grouping keys: group by expressions + grouping id. if no DISTINCT
@@ -5406,8 +5422,8 @@ public class SemanticAnalyzer extends Ba
     if ((dest_tab.getNumBuckets() > 0) &&
         (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
       enforceBucketing = true;
-      partnCols = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
-      partnColsNoConvert = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
+      partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
+      partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
           false);
     }
 
@@ -6205,7 +6221,7 @@ public class SemanticAnalyzer extends Ba
     return genLimitPlan(dest, qb, curr, limit);
   }
 
-  private ArrayList<ExprNodeDesc> getParitionColsFromBucketCols(String dest, QB qb, Table tab,
+  private ArrayList<ExprNodeDesc> getPartitionColsFromBucketCols(String dest, QB qb, Table tab,
       TableDesc table_desc, Operator input, boolean convert)
       throws SemanticException {
     List<String> tabBucketCols = tab.getBucketCols();
@@ -6384,7 +6400,7 @@ public class SemanticAnalyzer extends Ba
   }
 
   @SuppressWarnings("nls")
-  private Operator genReduceSinkPlan(String dest, QB qb, Operator input,
+  private Operator genReduceSinkPlan(String dest, QB qb, Operator<?> input,
       int numReducers) throws SemanticException {
 
     RowResolver inputRR = opParseCtx.get(input).getRowResolver();
@@ -6424,8 +6440,11 @@ public class SemanticAnalyzer extends Ba
         }
       }
     }
+    Operator dummy = Operator.createDummy();
+    dummy.setParentOperators(Arrays.asList(input));
 
     ArrayList<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> sortColsBack = new ArrayList<ExprNodeDesc>();
     StringBuilder order = new StringBuilder();
     if (sortExprs != null) {
       int ccount = sortExprs.getChildCount();
@@ -6446,58 +6465,121 @@ public class SemanticAnalyzer extends Ba
         }
         ExprNodeDesc exprNode = genExprNodeDesc(cl, inputRR);
         sortCols.add(exprNode);
+        sortColsBack.add(ExprNodeDescUtils.backtrack(exprNode, dummy, input));
       }
     }
-
     // For the generation of the values expression just get the inputs
     // signature and generate field expressions for those
+    RowResolver rsRR = new RowResolver();
     ArrayList<String> outputColumns = new ArrayList<String>();
-    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
     ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
-    int i = 0;
-    for (ColumnInfo colInfo : inputRR.getColumnInfos()) {
-      String internalName = getColumnInternalName(i++);
-      outputColumns.add(internalName);
-      valueCols.add(new ExprNodeColumnDesc(colInfo.getType(), colInfo
-          .getInternalName(), colInfo.getTabAlias(), colInfo
-          .getIsVirtualCol()));
-      colExprMap.put(internalName, valueCols.get(valueCols.size() - 1));
-    }
+    ArrayList<ExprNodeDesc> valueColsBack = new ArrayList<ExprNodeDesc>();
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
 
-    Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
-        .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1,
-            partitionCols, order.toString(), numReducers),
-        new RowSchema(inputRR.getColumnInfos()), input), inputRR);
-    interim.setColumnExprMap(colExprMap);
+    ArrayList<ColumnInfo> columnInfos = inputRR.getColumnInfos();
 
-    // Add the extract operator to get the value fields
-    RowResolver out_rwsch = new RowResolver();
-    RowResolver interim_rwsch = inputRR;
-    Integer pos = Integer.valueOf(0);
-    for (ColumnInfo colInfo : interim_rwsch.getColumnInfos()) {
-      String[] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
-      out_rwsch.put(info[0], info[1], new ColumnInfo(
-          getColumnInternalName(pos), colInfo.getType(), info[0],
-          colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol()));
-      pos = Integer.valueOf(pos.intValue() + 1);
+    int[] index = new int[columnInfos.size()];
+    for (int i = 0; i < index.length; i++) {
+      ColumnInfo colInfo = columnInfos.get(i);
+      String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
+      String[] nm2 = inputRR.getAlternateMappings(colInfo.getInternalName());
+      ExprNodeColumnDesc value = new ExprNodeColumnDesc(colInfo.getType(),
+          colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol());
+
+      // backtrack can be null when input is script operator
+      ExprNodeDesc valueBack = ExprNodeDescUtils.backtrack(value, dummy, input);
+      int kindex = valueBack == null ? -1 : ExprNodeDescUtils.indexOf(valueBack, sortColsBack);
+      if (kindex >= 0) {
+        index[i] = kindex;
+        ColumnInfo newColInfo = new ColumnInfo(colInfo);
+        newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex);
+        newColInfo.setTabAlias(nm[0]);
+        rsRR.addMappingOnly(nm[0], nm[1], newColInfo);
+        if (nm2 != null) {
+          rsRR.addMappingOnly(nm2[0], nm2[1], newColInfo);
+        }
+        continue;
+      }
+      int vindex = valueBack == null ? -1 : ExprNodeDescUtils.indexOf(valueBack, valueColsBack);
+      if (vindex >= 0) {
+        index[i] = -vindex - 1;
+        continue;
+      }
+      index[i] = -valueCols.size() - 1;
+      String outputColName = getColumnInternalName(valueCols.size());
+
+      valueCols.add(value);
+      valueColsBack.add(valueBack);
+
+      ColumnInfo newColInfo = new ColumnInfo(colInfo);
+      newColInfo.setInternalName(Utilities.ReduceField.VALUE + "." + outputColName);
+      newColInfo.setTabAlias(nm[0]);
+
+      rsRR.put(nm[0], nm[1], newColInfo);
+      if (nm2 != null) {
+        rsRR.addMappingOnly(nm2[0], nm2[1], newColInfo);
+      }
+      outputColumns.add(outputColName);
     }
 
-    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
-        new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
-            Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema(
-            out_rwsch.getColumnInfos()), interim), out_rwsch);
+    dummy.setParentOperators(null);
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Created ReduceSink Plan for clause: " + dest + " row schema: "
-          + out_rwsch.toString());
+    ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(sortCols, valueCols, outputColumns,
+        false, -1, partitionCols, order.toString(), numReducers);
+    Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc,
+        new RowSchema(rsRR.getColumnInfos()), input), rsRR);
+
+    List<String> keyColNames = rsdesc.getOutputKeyColumnNames();
+    for (int i = 0 ; i < keyColNames.size(); i++) {
+      colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), sortCols.get(i));
+    }
+    List<String> valueColNames = rsdesc.getOutputValueColumnNames();
+    for (int i = 0 ; i < valueColNames.size(); i++) {
+      colExprMap.put(Utilities.ReduceField.VALUE + "." + valueColNames.get(i), valueCols.get(i));
     }
+    interim.setColumnExprMap(colExprMap);
+
+    RowResolver selectRR = new RowResolver();
+    ArrayList<ExprNodeDesc> selCols = new ArrayList<ExprNodeDesc>();
+    ArrayList<String> selOutputCols = new ArrayList<String>();
+    Map<String, ExprNodeDesc> selColExprMap = new HashMap<String, ExprNodeDesc>();
+
+    for (int i = 0; i < index.length; i++) {
+      ColumnInfo prev = columnInfos.get(i);
+      String[] nm = inputRR.reverseLookup(prev.getInternalName());
+      String[] nm2 = inputRR.getAlternateMappings(prev.getInternalName());
+      ColumnInfo info = new ColumnInfo(prev);
+
+      String field;
+      if (index[i] >= 0) {
+        field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]);
+      } else {
+        field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1);
+      }
+      String internalName = getColumnInternalName(i);
+      ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(),
+          field, info.getTabAlias(), info.getIsVirtualCol());
+      selCols.add(desc);
+
+      info.setInternalName(internalName);
+      selectRR.put(nm[0], nm[1], info);
+      if (nm2 != null) {
+        selectRR.addMappingOnly(nm2[0], nm2[1], info);
+      }
+      selOutputCols.add(internalName);
+      selColExprMap.put(internalName, desc);
+    }
+    SelectDesc select = new SelectDesc(selCols, selOutputCols);
+    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(select,
+        new RowSchema(selectRR.getColumnInfos()), interim), selectRR);
+    output.setColumnExprMap(selColExprMap);
     return output;
   }
 
   private Operator genJoinOperatorChildren(QBJoinTree join, Operator left,
       Operator[] right, HashSet<Integer> omitOpts) throws SemanticException {
 
-    RowResolver outputRS = new RowResolver();
+    RowResolver outputRR = new RowResolver();
     ArrayList<String> outputColumnNames = new ArrayList<String>();
     // all children are base classes
     Operator<?>[] rightOps = new Operator[right.length];
@@ -6511,56 +6593,73 @@ public class SemanticAnalyzer extends Ba
         new HashMap<Byte, List<ExprNodeDesc>>();
 
     for (int pos = 0; pos < right.length; ++pos) {
-
-      Operator input = right[pos];
+      Operator<?> input = right[pos] == null ? left : right[pos];
       if (input == null) {
         input = left;
       }
+      ReduceSinkOperator rs = (ReduceSinkOperator) input;
+      if (rs.getNumParent() != 1) {
+        throw new SemanticException("RS should have single parent");
+      }
+      Operator<?> parent = rs.getParentOperators().get(0);
+      ReduceSinkDesc rsDesc = (ReduceSinkDesc) (input.getConf());
 
-      ArrayList<ExprNodeDesc> keyDesc = new ArrayList<ExprNodeDesc>();
+      int[] index = rs.getValueIndex();
+
+      ArrayList<ExprNodeDesc> valueDesc = new ArrayList<ExprNodeDesc>();
       ArrayList<ExprNodeDesc> filterDesc = new ArrayList<ExprNodeDesc>();
-      Byte tag = Byte.valueOf((byte) (((ReduceSinkDesc) (input.getConf()))
-          .getTag()));
+      Byte tag = (byte) rsDesc.getTag();
 
       // check whether this input operator produces output
-      if (omitOpts == null || !omitOpts.contains(pos)) {
-        // prepare output descriptors for the input opt
-        RowResolver inputRS = opParseCtx.get(input).getRowResolver();
-        Iterator<String> keysIter = inputRS.getTableNames().iterator();
-        Set<String> aliases = posToAliasMap.get(pos);
-        if (aliases == null) {
-          aliases = new HashSet<String>();
-          posToAliasMap.put(pos, aliases);
-        }
-        while (keysIter.hasNext()) {
-          String key = keysIter.next();
-          aliases.add(key);
-          HashMap<String, ColumnInfo> map = inputRS.getFieldMap(key);
-          Iterator<String> fNamesIter = map.keySet().iterator();
-          while (fNamesIter.hasNext()) {
-            String field = fNamesIter.next();
-            ColumnInfo valueInfo = inputRS.get(key, field);
-            keyDesc.add(new ExprNodeColumnDesc(valueInfo.getType(), valueInfo
-                .getInternalName(), valueInfo.getTabAlias(), valueInfo
-                .getIsVirtualCol()));
+      if (omitOpts != null && omitOpts.contains(pos)) {
+        exprMap.put(tag, valueDesc);
+        filterMap.put(tag, filterDesc);
+        rightOps[pos] = input;
+        continue;
+      }
 
-            if (outputRS.get(key, field) == null) {
-              String colName = getColumnInternalName(outputPos);
-              outputPos++;
-              outputColumnNames.add(colName);
-              colExprMap.put(colName, keyDesc.get(keyDesc.size() - 1));
-              outputRS.put(key, field, new ColumnInfo(colName, valueInfo
-                  .getType(), key, valueInfo.getIsVirtualCol(), valueInfo
-                  .isHiddenVirtualCol()));
-              reversedExprs.put(colName, tag);
-            }
-          }
+      List<String> keyColNames = rsDesc.getOutputKeyColumnNames();
+      List<String> valColNames = rsDesc.getOutputValueColumnNames();
+
+      // prepare output descriptors for the input opt
+      RowResolver inputRR = opParseCtx.get(input).getRowResolver();
+      RowResolver parentRR = opParseCtx.get(parent).getRowResolver();
+      posToAliasMap.put(pos, new HashSet<String>(inputRR.getTableNames()));
+
+      List<ColumnInfo> columns = parentRR.getColumnInfos();
+      for (int i = 0; i < index.length; i++) {
+        ColumnInfo prev = columns.get(i);
+        String[] nm = parentRR.reverseLookup(prev.getInternalName());
+        String[] nm2 = parentRR.getAlternateMappings(prev.getInternalName());
+        if (outputRR.get(nm[0], nm[1]) != null) {
+          continue;
+        }
+        ColumnInfo info = new ColumnInfo(prev);
+        String field;
+        if (index[i] >= 0) {
+          field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]);
+        } else {
+          field = Utilities.ReduceField.VALUE + "." + valColNames.get(-index[i] - 1);
         }
-        for (ASTNode cond : join.getFilters().get(tag)) {
-          filterDesc.add(genExprNodeDesc(cond, inputRS));
+        String internalName = getColumnInternalName(outputColumnNames.size());
+        ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(),
+            field, info.getTabAlias(), info.getIsVirtualCol());
+
+        info.setInternalName(internalName);
+        colExprMap.put(internalName, desc);
+        outputRR.put(nm[0], nm[1], info);
+        if (nm2 != null) {
+          outputRR.addMappingOnly(nm2[0], nm2[1], info);
         }
+
+        valueDesc.add(desc);
+        outputColumnNames.add(internalName);
+        reversedExprs.put(internalName, tag);
       }
-      exprMap.put(tag, keyDesc);
+      for (ASTNode cond : join.getFilters().get(tag)) {
+        filterDesc.add(genExprNodeDesc(cond, inputRR));
+      }
+      exprMap.put(tag, valueDesc);
       filterMap.put(tag, filterDesc);
       rightOps[pos] = input;
     }
@@ -6577,7 +6676,7 @@ public class SemanticAnalyzer extends Ba
     desc.setFilterMap(join.getFilterMap());
 
     JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(desc,
-        new RowSchema(outputRS.getColumnInfos()), rightOps);
+        new RowSchema(outputRR.getColumnInfos()), rightOps);
     joinOp.setColumnExprMap(colExprMap);
     joinOp.setPosToAliasMap(posToAliasMap);
 
@@ -6588,51 +6687,93 @@ public class SemanticAnalyzer extends Ba
       }
       desc.setNullSafes(nullsafes);
     }
-    return putOpInsertMap(joinOp, outputRS);
+    return putOpInsertMap(joinOp, outputRR);
+  }
+
+  private ExprNodeDesc[][] genJoinKeys(QBJoinTree joinTree, Operator[] inputs)
+      throws SemanticException {
+    ExprNodeDesc[][] joinKeys = new ExprNodeDesc[inputs.length][];
+    for (int i = 0; i < inputs.length; i++) {
+      RowResolver inputRR = opParseCtx.get(inputs[i]).getRowResolver();
+      List<ASTNode> expressions = joinTree.getExpressions().get(i);
+      joinKeys[i] = new ExprNodeDesc[expressions.size()];
+      for (int j = 0; j < joinKeys[i].length; j++) {
+        joinKeys[i][j] = genExprNodeDesc(expressions.get(j), inputRR);
+      }
+    }
+    // Type checking and implicit type conversion for join keys
+    return genJoinOperatorTypeCheck(joinKeys);
   }
 
   @SuppressWarnings("nls")
-  private Operator genJoinReduceSinkChild(QB qb, QBJoinTree joinTree,
-      Operator child, String[] srcs, int pos) throws SemanticException {
-    RowResolver inputRS = opParseCtx.get(child).getRowResolver();
-    RowResolver outputRS = new RowResolver();
+  private Operator genJoinReduceSinkChild(QB qb, ExprNodeDesc[] joinKeys,
+      Operator<?> child, String[] srcs, int tag) throws SemanticException {
+
+    Operator dummy = Operator.createDummy();  // dummy for backtracking
+    dummy.setParentOperators(Arrays.asList(child));
+
+    RowResolver inputRR = opParseCtx.get(child).getRowResolver();
+    RowResolver outputRR = new RowResolver();
     ArrayList<String> outputColumns = new ArrayList<String>();
     ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> reduceKeysBack = new ArrayList<ExprNodeDesc>();
 
     // Compute join keys and store in reduceKeys
-    ArrayList<ASTNode> exprs = joinTree.getExpressions().get(pos);
-    for (int i = 0; i < exprs.size(); i++) {
-      ASTNode expr = exprs.get(i);
-      reduceKeys.add(genExprNodeDesc(expr, inputRS));
+    for (ExprNodeDesc joinKey : joinKeys) {
+      reduceKeys.add(joinKey);
+      reduceKeysBack.add(ExprNodeDescUtils.backtrack(joinKey, dummy, child));
     }
 
     // Walk over the input row resolver and copy in the output
     ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
-    Iterator<String> tblNamesIter = inputRS.getTableNames().iterator();
+    ArrayList<ExprNodeDesc> reduceValuesBack = new ArrayList<ExprNodeDesc>();
     Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-    while (tblNamesIter.hasNext()) {
-      String src = tblNamesIter.next();
-      HashMap<String, ColumnInfo> fMap = inputRS.getFieldMap(src);
-      for (Map.Entry<String, ColumnInfo> entry : fMap.entrySet()) {
-        String field = entry.getKey();
-        ColumnInfo valueInfo = entry.getValue();
-        ExprNodeColumnDesc inputExpr = new ExprNodeColumnDesc(valueInfo
-            .getType(), valueInfo.getInternalName(), valueInfo.getTabAlias(),
-            valueInfo.getIsVirtualCol());
-        reduceValues.add(inputExpr);
-        if (outputRS.get(src, field) == null) {
-          String col = getColumnInternalName(reduceValues.size() - 1);
-          outputColumns.add(col);
-          ColumnInfo newColInfo = new ColumnInfo(Utilities.ReduceField.VALUE
-              .toString()
-              + "." + col, valueInfo.getType(), src, valueInfo
-              .getIsVirtualCol(), valueInfo.isHiddenVirtualCol());
 
-          colExprMap.put(newColInfo.getInternalName(), inputExpr);
-          outputRS.put(src, field, newColInfo);
+    List<ColumnInfo> columns = inputRR.getColumnInfos();
+    int[] index = new int[columns.size()];
+    for (int i = 0; i < columns.size(); i++) {
+      ColumnInfo colInfo = columns.get(i);
+      String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
+      String[] nm2 = inputRR.getAlternateMappings(colInfo.getInternalName());
+      ExprNodeDesc expr = new ExprNodeColumnDesc(colInfo.getType(),
+          colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol());
+
+      // backtrack can be null when input is script operator
+      ExprNodeDesc exprBack = ExprNodeDescUtils.backtrack(expr, dummy, child);
+      int kindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceKeysBack);
+      if (kindex >= 0) {
+        ColumnInfo newColInfo = new ColumnInfo(colInfo);
+        newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex);
+        newColInfo.setTabAlias(nm[0]);
+        outputRR.addMappingOnly(nm[0], nm[1], newColInfo);
+        if (nm2 != null) {
+          outputRR.addMappingOnly(nm2[0], nm2[1], newColInfo);
         }
+        index[i] = kindex;
+        continue;
       }
+      int vindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceValuesBack);
+      if (kindex >= 0) {
+        index[i] = -vindex - 1;
+        continue;
+      }
+      index[i] = -reduceValues.size() - 1;
+      String outputColName = getColumnInternalName(reduceValues.size());
+
+      reduceValues.add(expr);
+      reduceValuesBack.add(exprBack);
+
+      ColumnInfo newColInfo = new ColumnInfo(colInfo);
+      newColInfo.setInternalName(Utilities.ReduceField.VALUE + "." + outputColName);
+      newColInfo.setTabAlias(nm[0]);
+
+      outputRR.put(nm[0], nm[1], newColInfo);
+      if (nm2 != null) {
+        outputRR.addMappingOnly(nm2[0], nm2[1], newColInfo);
+      }
+      outputColumns.add(outputColName);
     }
+    dummy.setParentOperators(null);
 
     int numReds = -1;
 
@@ -6647,11 +6788,23 @@ public class SemanticAnalyzer extends Ba
       }
     }
 
+    ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys,
+        reduceValues, outputColumns, false, tag,
+        reduceKeys.size(), numReds);
+
     ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
-        OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
-            reduceValues, outputColumns, false, joinTree.getNextTag(),
-            reduceKeys.size(), numReds), new RowSchema(outputRS
-            .getColumnInfos()), child), outputRS);
+        OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(outputRR
+            .getColumnInfos()), child), outputRR);
+    List<String> keyColNames = rsDesc.getOutputKeyColumnNames();
+    for (int i = 0 ; i < keyColNames.size(); i++) {
+      colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), reduceKeys.get(i));
+    }
+    List<String> valColNames = rsDesc.getOutputValueColumnNames();
+    for (int i = 0 ; i < valColNames.size(); i++) {
+      colExprMap.put(Utilities.ReduceField.VALUE + "." + valColNames.get(i), reduceValues.get(i));
+    }
+
+    rsOp.setValueIndex(index);
     rsOp.setColumnExprMap(colExprMap);
     rsOp.setInputAliases(srcs);
     return rsOp;
@@ -6672,16 +6825,15 @@ public class SemanticAnalyzer extends Ba
       for (ASTNode cond : filter) {
         joinSrcOp = genFilterPlan(qb, cond, joinSrcOp);
       }
-      String[] leftAliases = joinTree.getLeftAliases();
-      joinSrcOp = genJoinReduceSinkChild(qb, joinTree, joinSrcOp, leftAliases, 0);
     }
 
-    Operator[] srcOps = new Operator[joinTree.getBaseSrc().length];
+    String[] baseSrc = joinTree.getBaseSrc();
+    Operator[] srcOps = new Operator[baseSrc.length];
 
     HashSet<Integer> omitOpts = null; // set of input to the join that should be
     // omitted by the output
     int pos = 0;
-    for (String src : joinTree.getBaseSrc()) {
+    for (String src : baseSrc) {
       if (src != null) {
         Operator srcOp = map.get(src.toLowerCase());
 
@@ -6700,21 +6852,24 @@ public class SemanticAnalyzer extends Ba
 
           // generate a groupby operator (HASH mode) for a map-side partial
           // aggregation for semijoin
-          srcOp = genMapGroupByForSemijoin(qb, fields, srcOp,
+          srcOps[pos++] = genMapGroupByForSemijoin(qb, fields, srcOp,
               GroupByDesc.Mode.HASH);
+        } else {
+          srcOps[pos++] = srcOp;
         }
-
-        // generate a ReduceSink operator for the join
-        srcOps[pos] = genJoinReduceSinkChild(qb, joinTree, srcOp, new String[]{src}, pos);
-        pos++;
       } else {
         assert pos == 0;
-        srcOps[pos++] = null;
+        srcOps[pos++] = joinSrcOp;
       }
     }
 
-    // Type checking and implicit type conversion for join keys
-    genJoinOperatorTypeCheck(joinSrcOp, srcOps);
+    ExprNodeDesc[][] joinKeys = genJoinKeys(joinTree, srcOps);
+
+    for (int i = 0; i < srcOps.length; i++) {
+      // generate a ReduceSink operator for the join
+      String[] srcs = baseSrc[i] != null ? new String[] {baseSrc[i]} : joinTree.getLeftAliases();
+      srcOps[i] = genJoinReduceSinkChild(qb, joinKeys[i], srcOps[i], srcs, joinTree.getNextTag());
+    }
 
     JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree,
       joinSrcOp, srcOps, omitOpts);
@@ -6745,12 +6900,14 @@ public class SemanticAnalyzer extends Ba
     ArrayList<ExprNodeDesc> colList = new ArrayList<ExprNodeDesc>();
     ArrayList<String> columnNames = new ArrayList<String>();
 
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
     // construct the list of columns that need to be projected
     for (ASTNode field : fields) {
       ExprNodeColumnDesc exprNode = (ExprNodeColumnDesc) genExprNodeDesc(field,
           inputRR);
       colList.add(exprNode);
       columnNames.add(exprNode.getColumn());
+      colExprMap.put(exprNode.getColumn(), exprNode);
     }
 
     // create selection operator
@@ -6758,7 +6915,7 @@ public class SemanticAnalyzer extends Ba
         new SelectDesc(colList, columnNames, false), new RowSchema(inputRR
             .getColumnInfos()), input), inputRR);
 
-    output.setColumnExprMap(input.getColumnExprMap());
+    output.setColumnExprMap(colExprMap);
     return output;
   }
 
@@ -6817,28 +6974,24 @@ public class SemanticAnalyzer extends Ba
     return op;
   }
 
-  private void genJoinOperatorTypeCheck(Operator left, Operator[] right)
+  private ExprNodeDesc[][] genJoinOperatorTypeCheck(ExprNodeDesc[][] keys)
       throws SemanticException {
     // keys[i] -> ArrayList<exprNodeDesc> for the i-th join operator key list
-    ArrayList<ArrayList<ExprNodeDesc>> keys = new ArrayList<ArrayList<ExprNodeDesc>>();
     int keyLength = 0;
-    for (int i = 0; i < right.length; i++) {
-      Operator oi = (i == 0 && right[i] == null ? left : right[i]);
-      ReduceSinkDesc now = ((ReduceSinkOperator) (oi)).getConf();
+    for (int i = 0; i < keys.length; i++) {
       if (i == 0) {
-        keyLength = now.getKeyCols().size();
+        keyLength = keys[i].length;
       } else {
-        assert (keyLength == now.getKeyCols().size());
+        assert keyLength == keys[i].length;
       }
-      keys.add(now.getKeyCols());
     }
     // implicit type conversion hierarchy
     for (int k = 0; k < keyLength; k++) {
       // Find the common class for type conversion
-      TypeInfo commonType = keys.get(0).get(k).getTypeInfo();
-      for (int i = 1; i < right.length; i++) {
+      TypeInfo commonType = keys[0][k].getTypeInfo();
+      for (int i = 1; i < keys.length; i++) {
         TypeInfo a = commonType;
-        TypeInfo b = keys.get(i).get(k).getTypeInfo();
+        TypeInfo b = keys[i][k].getTypeInfo();
         commonType = FunctionRegistry.getCommonClassForComparison(a, b);
         if (commonType == null) {
           throw new SemanticException(
@@ -6847,27 +7000,15 @@ public class SemanticAnalyzer extends Ba
         }
       }
       // Add implicit type conversion if necessary
-      for (int i = 0; i < right.length; i++) {
+      for (int i = 0; i < keys.length; i++) {
         if (TypeInfoUtils.isConversionRequiredForComparison(
-                keys.get(i).get(k).getTypeInfo(),
-                commonType)) {
-          keys.get(i).set(
-              k,
-              ParseUtils.createConversionCast(
-                  keys.get(i).get(k), (PrimitiveTypeInfo)commonType));
+                keys[i][k].getTypeInfo(), commonType)) {
+          keys[i][k] = ParseUtils.createConversionCast(
+                  keys[i][k], (PrimitiveTypeInfo)commonType);
         }
       }
     }
-    // regenerate keySerializationInfo because the ReduceSinkOperator's
-    // output key types might have changed.
-    for (int i = 0; i < right.length; i++) {
-      Operator oi = (i == 0 && right[i] == null ? left : right[i]);
-      ReduceSinkDesc now = ((ReduceSinkOperator) (oi)).getConf();
-
-      now.setKeySerializeInfo(PlanUtils.getReduceKeyTableDesc(PlanUtils
-          .getFieldSchemasFromColumnList(now.getKeyCols(), "joinkey"), now
-          .getOrder()));
-    }
+    return keys;
   }
 
   private Operator genJoinPlan(QB qb, Map<String, Operator> map)
@@ -9272,7 +9413,7 @@ public class SemanticAnalyzer extends Ba
     optm.setPctx(pCtx);
     optm.initialize(conf);
     pCtx = optm.optimize();
-    
+
     FetchTask origFetchTask = pCtx.getFetchTask();
 
     if (LOG.isDebugEnabled()) {
@@ -9301,7 +9442,7 @@ public class SemanticAnalyzer extends Ba
       // if desired check we're not going over partition scan limits
       enforceScanLimits(pCtx, origFetchTask);
     }
-    
+
     return;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java Thu Jun  5 18:28:07 2014
@@ -74,7 +74,7 @@ public class ExprNodeDescUtils {
         }
         children.add(child);
       }
-      // duplicate function with possibily replaced children
+      // duplicate function with possibly replaced children
       ExprNodeGenericFuncDesc clone = (ExprNodeGenericFuncDesc) func.clone();
       clone.setChildren(children);
       return clone;
@@ -164,7 +164,7 @@ public class ExprNodeDescUtils {
   }
 
   /**
-   * Return false if the expression has any non determinitic function
+   * Return false if the expression has any non deterministic function
    */
   public static boolean isDeterministic(ExprNodeDesc desc) {
     if (desc instanceof ExprNodeGenericFuncDesc) {
@@ -182,6 +182,14 @@ public class ExprNodeDescUtils {
     return true;
   }
 
+  public static ArrayList<ExprNodeDesc> clone(List<ExprNodeDesc> sources) {
+    ArrayList<ExprNodeDesc> result = new ArrayList<ExprNodeDesc>();
+    for (ExprNodeDesc expr : sources) {
+      result.add(expr.clone());
+    }
+    return result;
+  }
+
   /**
    * Convert expressions in current operator to those in terminal operator, which
    * is an ancestor of current or null (back to top operator).

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Thu Jun  5 18:28:07 2014
@@ -43,6 +43,7 @@ public class MapJoinDesc extends JoinDes
 
   private int posBigTable;
 
+  private Map<Byte, int[]> valueIndices;
   private Map<Byte, List<Integer>> retainList;
 
   private transient String bigTableAlias;
@@ -77,6 +78,7 @@ public class MapJoinDesc extends JoinDes
     this.keyTblDesc = clone.keyTblDesc;
     this.valueTblDescs = clone.valueTblDescs;
     this.posBigTable = clone.posBigTable;
+    this.valueIndices = clone.valueIndices;
     this.retainList = clone.retainList;
     this.bigTableAlias = clone.bigTableAlias;
     this.aliasBucketFileNameMapping = clone.aliasBucketFileNameMapping;
@@ -125,6 +127,18 @@ public class MapJoinDesc extends JoinDes
     this.parentToInput = parentToInput;
   }
 
+  public Map<Byte, int[]> getValueIndices() {
+    return valueIndices;
+  }
+
+  public void setValueIndices(Map<Byte, int[]> valueIndices) {
+    this.valueIndices = valueIndices;
+  }
+
+  public int[] getValueIndex(byte alias) {
+    return valueIndices == null ? null : valueIndices.get(alias);
+  }
+
   public Map<Byte, List<Integer>> getRetainList() {
     return retainList;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java Thu Jun  5 18:28:07 2014
@@ -87,6 +87,10 @@ public final class ExprWalkerProcFactory
           if (exp instanceof ExprNodeGenericFuncDesc) {
             isCandidate = false;
           }
+          if (exp instanceof ExprNodeColumnDesc && colAlias == null) {
+            ExprNodeColumnDesc column = (ExprNodeColumnDesc)exp;
+            colAlias = new String[]{column.getTabAlias(), column.getColumn()};
+          }
         }
         ctx.addConvertedNode(colref, exp);
         ctx.setIsCandidate(exp, isCandidate);

Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin6.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin6.q?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin6.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin6.q Thu Jun  5 18:28:07 2014
@@ -1,6 +1,7 @@
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
 -- SORT_QUERY_RESULTS
 
-set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
 create table tmp1 (a string, b string) clustered by (a) sorted by (a) into 10 buckets;
 
 create table tmp2 (a string, b string) clustered by (a) sorted by (a) into 10 buckets;

Modified: hive/trunk/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out (original)
+++ hive/trunk/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out Thu Jun  5 18:28:07 2014
@@ -117,14 +117,14 @@ STAGE PLANS:
               sort order: +
               Map-reduce partition columns: key (type: int)
               Statistics: Num rows: 40 Data size: 4200 Basic stats: COMPLETE Column stats: NONE
-              value expressions: key (type: int), value (type: string)
+              value expressions: value (type: string)
       Reduce Operator Tree:
         Join Operator
           condition map:
                Inner Join 0 to 1
           condition expressions:
-            0 {VALUE._col0} {VALUE._col1}
-            1 {VALUE._col1}
+            0 {KEY.reducesinkkey0} {VALUE._col0}
+            1 {VALUE._col0}
           outputColumnNames: _col0, _col1, _col6
           Statistics: Num rows: 44 Data size: 4620 Basic stats: COMPLETE Column stats: NONE
           Select Operator

Modified: hive/trunk/ql/src/test/results/clientpositive/ambiguous_col.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/ambiguous_col.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/ambiguous_col.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/ambiguous_col.q.out Thu Jun  5 18:28:07 2014
@@ -24,7 +24,7 @@ STAGE PLANS:
                 sort order: +
                 Map-reduce partition columns: _col0 (type: string)
                 Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
-                value expressions: _col0 (type: string), _col1 (type: string)
+                value expressions: _col1 (type: string)
           TableScan
             alias: src1
             Statistics: Num rows: 2 Data size: 216 Basic stats: COMPLETE Column stats: NONE
@@ -42,7 +42,7 @@ STAGE PLANS:
           condition map:
                Inner Join 0 to 1
           condition expressions:
-            0 {VALUE._col0} {VALUE._col1}
+            0 {KEY.reducesinkkey0} {VALUE._col0}
             1 
           outputColumnNames: _col0, _col1
           Statistics: Num rows: 31 Data size: 6393 Basic stats: COMPLETE Column stats: NONE
@@ -90,7 +90,6 @@ STAGE PLANS:
                 sort order: +
                 Map-reduce partition columns: _col0 (type: string)
                 Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
-                value expressions: _col0 (type: string)
           TableScan
             alias: src1
             Statistics: Num rows: 2 Data size: 216 Basic stats: COMPLETE Column stats: NONE
@@ -108,7 +107,7 @@ STAGE PLANS:
           condition map:
                Inner Join 0 to 1
           condition expressions:
-            0 {VALUE._col0}
+            0 {KEY.reducesinkkey0}
             1 
           outputColumnNames: _col0
           Statistics: Num rows: 63 Data size: 6393 Basic stats: COMPLETE Column stats: NONE
@@ -156,7 +155,6 @@ STAGE PLANS:
                 sort order: +
                 Map-reduce partition columns: _col0 (type: string)
                 Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
-                value expressions: _col0 (type: string)
           TableScan
             alias: src1
             Statistics: Num rows: 2 Data size: 216 Basic stats: COMPLETE Column stats: NONE
@@ -174,7 +172,7 @@ STAGE PLANS:
           condition map:
                Inner Join 0 to 1
           condition expressions:
-            0 {VALUE._col0}
+            0 {KEY.reducesinkkey0}
             1 
           outputColumnNames: _col0
           Statistics: Num rows: 63 Data size: 6393 Basic stats: COMPLETE Column stats: NONE

Modified: hive/trunk/ql/src/test/results/clientpositive/annotate_stats_join.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/annotate_stats_join.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/annotate_stats_join.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/annotate_stats_join.q.out Thu Jun  5 18:28:07 2014 differ

Modified: hive/trunk/ql/src/test/results/clientpositive/annotate_stats_select.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/annotate_stats_select.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/annotate_stats_select.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/annotate_stats_select.q.out Thu Jun  5 18:28:07 2014 differ

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out Thu Jun  5 18:28:07 2014
@@ -86,27 +86,24 @@ STAGE PLANS:
                       key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
                       sort order: ++++
                       Statistics: Num rows: 9 Data size: 1983 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
       Local Work:
         Map Reduce Local Work
       Reduce Operator Tree:
-        Extract
+        Select Operator
+          expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey3 (type: string)
+          outputColumnNames: _col0, _col1, _col2, _col3
           Statistics: Num rows: 9 Data size: 1983 Basic stats: COMPLETE Column stats: NONE
-          Select Operator
-            expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
-            outputColumnNames: _col0, _col1, _col2, _col3
-            Statistics: Num rows: 9 Data size: 1983 Basic stats: COMPLETE Column stats: NONE
-            Group By Operator
-              aggregations: sum(hash(_col0,_col1,_col2,_col3))
-              mode: hash
-              outputColumnNames: _col0
-              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
-              File Output Operator
-                compressed: false
-                table:
-                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                    serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+          Group By Operator
+            aggregations: sum(hash(_col0,_col1,_col2,_col3))
+            mode: hash
+            outputColumnNames: _col0
+            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+            File Output Operator
+              compressed: false
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
 
   Stage: Stage-3
     Map Reduce

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out Thu Jun  5 18:28:07 2014
@@ -67,7 +67,7 @@ STAGE PLANS:
                 Statistics: Num rows: 19 Data size: 1903 Basic stats: COMPLETE Column stats: NONE
                 HashTable Sink Operator
                   condition expressions:
-                    0 {_col3} {_col0}
+                    0 {_col0} {_col3}
                     1 
                   keys:
                     0 (_col0 + _col2) (type: double)
@@ -101,19 +101,19 @@ STAGE PLANS:
                     condition map:
                          Inner Join 0 to 1
                     condition expressions:
-                      0 {_col3} {_col0}
+                      0 {_col0} {_col3}
                       1 
                     keys:
                       0 (_col0 + _col2) (type: double)
                       1 UDFToDouble(_col0) (type: double)
-                    outputColumnNames: _col1, _col2
+                    outputColumnNames: _col0, _col3
                     Statistics: Num rows: 22 Data size: 2302 Basic stats: COMPLETE Column stats: NONE
                     Select Operator
-                      expressions: _col2 (type: string), _col1 (type: string)
-                      outputColumnNames: _col2, _col1
+                      expressions: _col0 (type: string), _col3 (type: string)
+                      outputColumnNames: _col0, _col3
                       Statistics: Num rows: 22 Data size: 2302 Basic stats: COMPLETE Column stats: NONE
                       Group By Operator
-                        aggregations: sum(hash(_col2,_col1))
+                        aggregations: sum(hash(_col0,_col3))
                         mode: hash
                         outputColumnNames: _col0
                         Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE