You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2015/01/28 07:25:50 UTC

svn commit: r1655226 [2/30] - in /hive/trunk: contrib/src/test/results/clientpositive/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ ql/src/java...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java Wed Jan 28 06:25:44 2015
@@ -30,13 +30,10 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * This class implements the processor context for Constant Propagate.
@@ -50,25 +47,18 @@ public class ConstantPropagateProcCtx im
       .getLog(ConstantPropagateProcCtx.class);
 
   private final Map<Operator<? extends Serializable>, Map<ColumnInfo, ExprNodeDesc>> opToConstantExprs;
-  private final Map<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtx;
   private final List<Operator<? extends Serializable>> opToDelete;
 
-  public ConstantPropagateProcCtx(Map<Operator<? extends OperatorDesc>, OpParseContext> opToParseCtx) {
+  public ConstantPropagateProcCtx() {
     opToConstantExprs =
         new HashMap<Operator<? extends Serializable>, Map<ColumnInfo, ExprNodeDesc>>();
     opToDelete = new ArrayList<Operator<? extends Serializable>>();
-    this.opToParseCtx = opToParseCtx;
   }
 
   public Map<Operator<? extends Serializable>, Map<ColumnInfo, ExprNodeDesc>> getOpToConstantExprs() {
     return opToConstantExprs;
   }
 
-
-  public Map<Operator<? extends OperatorDesc>, OpParseContext> getOpToParseCtxMap() {
-    return opToParseCtx;
-  }
-
   /**
    * Resolve a ColumnInfo based on given RowResolver.
    * 
@@ -78,27 +68,25 @@ public class ConstantPropagateProcCtx im
    * @return
    * @throws SemanticException
    */
-  private ColumnInfo resolve(ColumnInfo ci, RowResolver rr, RowResolver parentRR)
-      throws SemanticException {
+  private ColumnInfo resolve(ColumnInfo ci, RowSchema rs, RowSchema parentRS) {
     // Resolve new ColumnInfo from <tableAlias, alias>
     String alias = ci.getAlias();
     if (alias == null) {
       alias = ci.getInternalName();
     }
     String tblAlias = ci.getTabAlias();
-    ColumnInfo rci = rr.get(tblAlias, alias);
-    if (rci == null && rr.getRslvMap().size() == 1 && parentRR.getRslvMap().size() == 1) {
-      rci = rr.get(null, alias);
+    ColumnInfo rci = rs.getColumnInfo(tblAlias, alias);
+    if (rci == null && rs.getTableNames().size() == 1 &&
+            parentRS.getTableNames().size() == 1) {
+      rci = rs.getColumnInfo(rs.getTableNames().iterator().next(),
+              alias);
     }
     if (rci == null) {
       return null;
     }
-    String[] tmp = rr.reverseLookup(rci.getInternalName());
-    rci.setTabAlias(tmp[0]);
-    rci.setAlias(tmp[1]);
     LOG.debug("Resolved "
         + ci.getTabAlias() + "." + ci.getAlias() + " as "
-        + rci.getTabAlias() + "." + rci.getAlias() + " with rr: " + rr);
+        + rci.getTabAlias() + "." + rci.getAlias() + " with rs: " + rs);
     return rci;
   }
 
@@ -117,90 +105,76 @@ public class ConstantPropagateProcCtx im
   public Map<ColumnInfo, ExprNodeDesc> getPropagatedConstants(
       Operator<? extends Serializable> op) {
     Map<ColumnInfo, ExprNodeDesc> constants = new HashMap<ColumnInfo, ExprNodeDesc>();
-    OpParseContext parseCtx = opToParseCtx.get(op);
-    if (parseCtx == null) {
+    if (op.getSchema() == null) {
       return constants;
     }
-    RowResolver rr = parseCtx.getRowResolver();
-    LOG.debug("Getting constants of op:" + op + " with rr:" + rr);
-    
-    try {
-      if (op.getParentOperators() == null) {
-        return constants;
-      }
+    RowSchema rs = op.getSchema();
+    LOG.debug("Getting constants of op:" + op + " with rs:" + rs);
 
-      if (op instanceof UnionOperator) {
-        String alias = (String) rr.getRslvMap().keySet().toArray()[0];
-        // find intersection
-        Map<ColumnInfo, ExprNodeDesc> intersection = null;
-        for (Operator<?> parent : op.getParentOperators()) {
-          Map<ColumnInfo, ExprNodeDesc> unionConst = opToConstantExprs.get(parent);
-          LOG.debug("Constant of op " + parent.getOperatorId() + " " + unionConst);
-          if (intersection == null) {
-            intersection = new HashMap<ColumnInfo, ExprNodeDesc>();
-            for (Entry<ColumnInfo, ExprNodeDesc> e : unionConst.entrySet()) {
-              ColumnInfo ci = new ColumnInfo(e.getKey());
-              ci.setTabAlias(alias);
-              intersection.put(ci, e.getValue());
-            }
-          } else {
-            Iterator<Entry<ColumnInfo, ExprNodeDesc>> itr = intersection.entrySet().iterator();
-            while (itr.hasNext()) {
-              Entry<ColumnInfo, ExprNodeDesc> e = itr.next();
-              boolean found = false;
-              for (Entry<ColumnInfo, ExprNodeDesc> f : opToConstantExprs.get(parent).entrySet()) {
-                if (e.getKey().getInternalName().equals(f.getKey().getInternalName())) {
-                  if (e.getValue().isSame(f.getValue())) {
-                    found = true;
-                  }
-                  break;
+    if (op.getParentOperators() == null) {
+      return constants;
+    }
+
+    if (op instanceof UnionOperator) {
+      String alias = rs.getSignature().get(0).getTabAlias();
+      // find intersection
+      Map<ColumnInfo, ExprNodeDesc> intersection = null;
+      for (Operator<?> parent : op.getParentOperators()) {
+        Map<ColumnInfo, ExprNodeDesc> unionConst = opToConstantExprs.get(parent);
+        LOG.debug("Constant of op " + parent.getOperatorId() + " " + unionConst);
+        if (intersection == null) {
+          intersection = new HashMap<ColumnInfo, ExprNodeDesc>();
+          for (Entry<ColumnInfo, ExprNodeDesc> e : unionConst.entrySet()) {
+            ColumnInfo ci = new ColumnInfo(e.getKey());
+            ci.setTabAlias(alias);
+            intersection.put(ci, e.getValue());
+          }
+        } else {
+          Iterator<Entry<ColumnInfo, ExprNodeDesc>> itr = intersection.entrySet().iterator();
+          while (itr.hasNext()) {
+            Entry<ColumnInfo, ExprNodeDesc> e = itr.next();
+            boolean found = false;
+            for (Entry<ColumnInfo, ExprNodeDesc> f : opToConstantExprs.get(parent).entrySet()) {
+              if (e.getKey().getInternalName().equals(f.getKey().getInternalName())) {
+                if (e.getValue().isSame(f.getValue())) {
+                  found = true;
                 }
-              }
-              if (!found) {
-                itr.remove();
+                break;
               }
             }
+            if (!found) {
+              itr.remove();
+            }
           }
-          if (intersection.isEmpty()) {
-            return intersection;
-          }
         }
-        LOG.debug("Propagated union constants:" + intersection);
-        return intersection;
+        if (intersection.isEmpty()) {
+          return intersection;
+        }
       }
+      LOG.debug("Propagated union constants:" + intersection);
+      return intersection;
+    }
 
-      for (Operator<? extends Serializable> parent : op.getParentOperators()) {
-        Map<ColumnInfo, ExprNodeDesc> c = opToConstantExprs.get(parent);
-        for (Entry<ColumnInfo, ExprNodeDesc> e : c.entrySet()) {
-          ColumnInfo ci = e.getKey();
-          ColumnInfo rci = null;
-          ExprNodeDesc constant = e.getValue();
-          rci = resolve(ci, rr, opToParseCtx.get(parent).getRowResolver());
-          if (rci != null) {
-            constants.put(rci, constant);
-          } else {
-            LOG.debug("Can't resolve " + ci.getTabAlias() + "." + ci.getAlias() + " from rr:"
-                + rr);
-          }
-
+    for (Operator<? extends Serializable> parent : op.getParentOperators()) {
+      Map<ColumnInfo, ExprNodeDesc> c = opToConstantExprs.get(parent);
+      for (Entry<ColumnInfo, ExprNodeDesc> e : c.entrySet()) {
+        ColumnInfo ci = e.getKey();
+        ColumnInfo rci = null;
+        ExprNodeDesc constant = e.getValue();
+        rci = resolve(ci, rs, parent.getSchema());
+        if (rci != null) {
+          constants.put(rci, constant);
+        } else {
+          LOG.debug("Can't resolve " + ci.getTabAlias() + "." + ci.getAlias() +
+                  "(" + ci.getInternalName() + ") from rs:" + rs);
         }
-
       }
-      LOG.debug("Offerring constants " + constants.keySet()
-          + " to operator " + op.toString());
-      return constants;
-    } catch (SemanticException e) {
-      LOG.error(e.getMessage(), e);
-      throw new RuntimeException(e);
     }
-  }
 
-  public RowResolver getRowResolver(Operator<? extends Serializable> op) {
-    OpParseContext parseCtx = opToParseCtx.get(op);
-    if (parseCtx == null) {
-      return null;
-    }
-    return parseCtx.getRowResolver();
+    LOG.debug("Offerring constants " + constants.keySet()
+        + " to operator " + op.toString());
+
+    return constants;
   }
 
   public void addOpToDelete(Operator<? extends Serializable> op) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Wed Jan 28 06:25:44 2015
@@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -108,30 +107,16 @@ public final class ConstantPropagateProc
    * @param desc
    * @return
    */
-  public static ColumnInfo resolveColumn(RowResolver rr,
+  public static ColumnInfo resolveColumn(RowSchema rs,
       ExprNodeColumnDesc desc) {
-    try {
-      ColumnInfo ci = rr.get(desc.getTabAlias(), desc.getColumn());
-      if (ci == null) {
-        String[] tmp = rr.reverseLookup(desc.getColumn());
-        if (tmp == null) {
-          return null;
-        }
-        ci = rr.get(tmp[0], tmp[1]);
-        ci.setTabAlias(tmp[0]);
-        ci.setAlias(tmp[1]);
-      } else {
-        String[] tmp = rr.reverseLookup(ci.getInternalName());
-        if (tmp == null) {
-          return null;
-        }
-        ci.setTabAlias(tmp[0]);
-        ci.setAlias(tmp[1]);
-      }
-      return ci;
-    } catch (SemanticException e) {
-      throw new RuntimeException(e);
+    ColumnInfo ci = rs.getColumnInfo(desc.getTabAlias(), desc.getColumn());
+    if (ci == null) {
+      ci = rs.getColumnInfo(desc.getColumn());
     }
+    if (ci == null) {
+      return null;
+    }
+    return ci;
   }
 
   private static final Set<PrimitiveCategory> unSupportedTypes = ImmutableSet
@@ -254,7 +239,7 @@ public final class ConstantPropagateProc
       // expressions are
       // constant, add them to colToConstatns as half-deterministic columns.
       if (propagate) {
-        propagate(udf, newExprs, cppCtx.getRowResolver(op), constants);
+        propagate(udf, newExprs, op.getSchema(), constants);
       }
 
       return desc;
@@ -318,7 +303,7 @@ public final class ConstantPropagateProc
    * @param op
    * @param constants
    */
-  private static void propagate(GenericUDF udf, List<ExprNodeDesc> newExprs, RowResolver rr,
+  private static void propagate(GenericUDF udf, List<ExprNodeDesc> newExprs, RowSchema rs,
       Map<ColumnInfo, ExprNodeDesc> constants) {
     if (udf instanceof GenericUDFOPEqual) {
       ExprNodeDesc lOperand = newExprs.get(0);
@@ -341,7 +326,7 @@ public final class ConstantPropagateProc
         // we need a column expression on other side.
         return;
       }
-      ColumnInfo ci = resolveColumn(rr, c);
+      ColumnInfo ci = resolveColumn(rs, c);
       if (ci != null) {
         LOG.debug("Filter " + udf + " is identified as a value assignment, propagate it.");
         if (!v.getTypeInfo().equals(ci.getType())) {
@@ -356,7 +341,7 @@ public final class ConstantPropagateProc
       if (operand instanceof ExprNodeColumnDesc) {
         LOG.debug("Filter " + udf + " is identified as a value assignment, propagate it.");
         ExprNodeColumnDesc c = (ExprNodeColumnDesc) operand;
-        ColumnInfo ci = resolveColumn(rr, c);
+        ColumnInfo ci = resolveColumn(rs, c);
         if (ci != null) {
           constants.put(ci, new ExprNodeNullDesc());
         }
@@ -435,45 +420,38 @@ public final class ConstantPropagateProc
    * @return
    */
   private static ExprNodeDesc evaluateColumn(ExprNodeColumnDesc desc,
-      ConstantPropagateProcCtx cppCtx, Operator<? extends Serializable> parent) {
-    try {
-      ColumnInfo ci = null;
-      RowResolver rr = cppCtx.getOpToParseCtxMap().get(parent).getRowResolver();
-      String[] tmp = rr.reverseLookup(desc.getColumn());
-      if (tmp == null) {
-        LOG.error("Reverse look up of column " + desc + " error!");
-        return null;
-      }
-      ci = rr.get(tmp[0], tmp[1]);
-      if (ci != null) {
-        ExprNodeDesc constant = null;
-        // Additional work for union operator, see union27.q
-        if (ci.getAlias() == null) {
-          for (Entry<ColumnInfo, ExprNodeDesc> e : cppCtx.getOpToConstantExprs().get(parent).entrySet()) {
-            if (e.getKey().getInternalName().equals(ci.getInternalName())) {
-              constant = e.getValue();
-              break;
-            }
-          }
-        } else {
-          constant = cppCtx.getOpToConstantExprs().get(parent).get(ci);
-        }
-        if (constant != null) {
-          if (constant instanceof ExprNodeConstantDesc
-              && !constant.getTypeInfo().equals(desc.getTypeInfo())) {
-            return typeCast(constant, desc.getTypeInfo());
-          }
-          return constant;
-        } else {
-          return null;
-        }
-      }
+    ConstantPropagateProcCtx cppCtx, Operator<? extends Serializable> parent) {
+    RowSchema rs = parent.getSchema();
+    ColumnInfo ci = rs.getColumnInfo(desc.getColumn());
+    if (ci == null) {
+      LOG.error("Reverse look up of column " + desc + " error!");
+      ci = rs.getColumnInfo(desc.getTabAlias(), desc.getColumn());
+    }
+    if (ci == null) {
       LOG.error("Can't resolve " + desc.getTabAlias() + "." + desc.getColumn());
       throw new RuntimeException("Can't resolve " + desc.getTabAlias() + "." + desc.getColumn());
-    } catch (SemanticException e) {
-      throw new RuntimeException(e);
     }
-
+    ExprNodeDesc constant = null;
+    // Additional work for union operator, see union27.q
+    if (ci.getAlias() == null) {
+      for (Entry<ColumnInfo, ExprNodeDesc> e : cppCtx.getOpToConstantExprs().get(parent).entrySet()) {
+        if (e.getKey().getInternalName().equals(ci.getInternalName())) {
+          constant = e.getValue();
+          break;
+        }
+      }
+    } else {
+      constant = cppCtx.getOpToConstantExprs().get(parent).get(ci);
+    }
+    if (constant != null) {
+      if (constant instanceof ExprNodeConstantDesc
+          && !constant.getTypeInfo().equals(desc.getTypeInfo())) {
+        return typeCast(constant, desc.getTypeInfo());
+      }
+      return constant;
+    } else {
+      return null;
+    }
   }
 
   /**
@@ -793,11 +771,10 @@ public final class ConstantPropagateProc
         // Assume only 1 parent for FS operator
         Operator<? extends Serializable> parent = op.getParentOperators().get(0);
         Map<ColumnInfo, ExprNodeDesc> parentConstants = cppCtx.getPropagatedConstants(parent);
-        RowResolver rr = cppCtx.getOpToParseCtxMap().get(parent).getRowResolver();
+        RowSchema rs = parent.getSchema();
         boolean allConstant = true;
         for (String input : inputs) {
-          String tmp[] = rr.reverseLookup(input);
-          ColumnInfo ci = rr.get(tmp[0], tmp[1]);
+          ColumnInfo ci = rs.getColumnInfo(input);
           if (parentConstants.get(ci) == null) {
             allConstant = false;
             break;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Wed Jan 28 06:25:44 2015
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
@@ -70,7 +69,6 @@ public class ConvertJoinMapJoin implemen
 
   static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
 
-  @SuppressWarnings("unchecked")
   @Override
   /*
    * (non-Javadoc) we should ideally not modify the tree we traverse. However,
@@ -172,6 +170,7 @@ public class ConvertJoinMapJoin implemen
     return null;
   }
 
+  @SuppressWarnings("unchecked")
   private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
       TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
     // we cannot convert to bucket map join, we cannot convert to
@@ -228,12 +227,11 @@ public class ConvertJoinMapJoin implemen
   private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
       int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
       throws SemanticException {
-    ParseContext parseContext = context.parseContext;
     MapJoinDesc mapJoinDesc = null;
     if (adjustParentsChildren) {
-      mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
-            joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
-            mapJoinConversionPos, true);
+      mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf,
+            joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(),
+            joinOp.getConf().getMapAliases(), mapJoinConversionPos, true);
     } else {
       JoinDesc joinDesc = joinOp.getConf();
       // retain the original join desc in the map join.
@@ -249,7 +247,6 @@ public class ConvertJoinMapJoin implemen
       mapJoinDesc.resetOrder();
     }
 
-    @SuppressWarnings("unchecked")
     CommonMergeJoinOperator mergeJoinOp =
         (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
             isSubQuery, mapJoinConversionPos, mapJoinDesc), joinOp.getSchema());
@@ -637,11 +634,10 @@ public class ConvertJoinMapJoin implemen
     }
 
     //can safely convert the join to a map join.
-    ParseContext parseContext = context.parseContext;
     MapJoinOperator mapJoinOp =
-        MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
-                joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
-                bigTablePosition, true);
+        MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, joinOp,
+                joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(),
+                joinOp.getConf().getMapAliases(), bigTablePosition, true);
 
     Operator<? extends OperatorDesc> parentBigTableOp =
         mapJoinOp.getParentOperators().get(bigTablePosition);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Jan 28 06:25:44 2015
@@ -77,11 +77,9 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPruner;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.QBParseInfo;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
@@ -934,24 +932,6 @@ public final class GenMapRedUtils {
     return mrWork;
   }
 
-  /**
-   * insert in the map for the operator to row resolver.
-   *
-   * @param op
-   *          operator created
-   * @param rr
-   *          row resolver
-   * @param parseCtx
-   *          parse context
-   */
-  @SuppressWarnings("nls")
-  public static Operator<? extends OperatorDesc> putOpInsertMap(
-      Operator<? extends OperatorDesc> op, RowResolver rr, ParseContext parseCtx) {
-    OpParseContext ctx = new OpParseContext(rr);
-    parseCtx.getOpParseCtx().put(op, ctx);
-    return op;
-  }
-
   public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSchema) {
     TableScanOperator tableScanOp =
         (TableScanOperator) OperatorFactory.get(new TableScanDesc(null), rowSchema);
@@ -996,19 +976,16 @@ public final class GenMapRedUtils {
       desc.setCompressType(parseCtx.getConf().getVar(
           HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
     }
-    Operator<? extends OperatorDesc> fileSinkOp = putOpInsertMap(OperatorFactory
-        .get(desc, parent.getSchema()), null, parseCtx);
+    Operator<? extends OperatorDesc> fileSinkOp = OperatorFactory.get(
+            desc, parent.getSchema());
 
     // Connect parent to fileSinkOp
     parent.replaceChild(child, fileSinkOp);
     fileSinkOp.setParentOperators(Utilities.makeList(parent));
 
     // Create a dummy TableScanOperator for the file generated through fileSinkOp
-    RowResolver parentRowResolver =
-        parseCtx.getOpParseCtx().get(parent).getRowResolver();
-    TableScanOperator tableScanOp = (TableScanOperator) putOpInsertMap(
-        createTemporaryTableScanOperator(parent.getSchema()),
-        parentRowResolver, parseCtx);
+    TableScanOperator tableScanOp = (TableScanOperator) createTemporaryTableScanOperator(
+            parent.getSchema());
 
     // Connect this TableScanOperator to child.
     tableScanOp.setChildOperators(Utilities.makeList(child));

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Jan 28 06:25:44 2015
@@ -29,8 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -59,9 +57,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.parse.GenMapRedWalker;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -89,7 +85,6 @@ import org.apache.hadoop.hive.serde2.typ
  */
 public class MapJoinProcessor implements Transform {
 
-  private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName());
   // mapjoin table descriptor contains a key descriptor which needs the field schema
   // (column type + column name). The column name is not really used anywhere, but it
   // needs to be passed. Use the string defined below for that.
@@ -98,15 +93,6 @@ public class MapJoinProcessor implements
   public MapJoinProcessor() {
   }
 
-  @SuppressWarnings("nls")
-  private static Operator<? extends OperatorDesc> putOpInsertMap (
-          ParseContext pGraphContext, Operator<? extends OperatorDesc> op,
-          RowResolver rr) {
-    OpParseContext ctx = new OpParseContext(rr);
-    pGraphContext.getOpParseCtx().put(op, ctx);
-    return op;
-  }
-
   /**
    * Generate the MapRed Local Work for the given map-join operator
    *
@@ -224,12 +210,10 @@ public class MapJoinProcessor implements
   public static void genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork,
     JoinOperator op, int mapJoinPos)
       throws SemanticException {
-    LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
-        newWork.getMapWork().getOpParseCtxMap();
     // generate the map join operator; already checked the map join
-    MapJoinOperator newMapJoinOp = new MapJoinProcessor().convertMapJoin(conf, opParseCtxMap, op,
-        newWork.getMapWork().isLeftInputJoin(), newWork.getMapWork().getBaseSrc(), newWork.getMapWork().getMapAliases(),
-        mapJoinPos, true, false);
+    MapJoinOperator newMapJoinOp = new MapJoinProcessor().convertMapJoin(conf, op,
+        newWork.getMapWork().isLeftInputJoin(), newWork.getMapWork().getBaseSrc(),
+        newWork.getMapWork().getMapAliases(), mapJoinPos, true, false);
     genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
   }
 
@@ -240,11 +224,9 @@ public class MapJoinProcessor implements
       // generate the local work for the big table alias
       MapJoinProcessor.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
       // clean up the mapred work
-      newWork.getMapWork().setOpParseCtxMap(null);
       newWork.getMapWork().setLeftInputJoin(false);
       newWork.getMapWork().setBaseSrc(null);
       newWork.getMapWork().setMapAliases(null);
-
     } catch (Exception e) {
       e.printStackTrace();
       throw new SemanticException("Failed to generate new mapJoin operator " +
@@ -302,7 +284,6 @@ public class MapJoinProcessor implements
    * @param validateMapJoinTree
    */
   public MapJoinOperator convertMapJoin(HiveConf conf,
-    LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
     JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
     int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) throws SemanticException {
 
@@ -352,10 +333,9 @@ public class MapJoinProcessor implements
     }
 
     // create the map-join operator
-    MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
+    MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf,
         op, leftInputJoin, baseSrc, mapAliases, mapJoinPos, noCheckOuterJoin);
 
-
     // remove old parents
     for (pos = 0; pos < newParentOps.size(); pos++) {
       newParentOps.get(pos).replaceChild(oldReduceSinkParentOps.get(pos), mapJoinOp);
@@ -376,22 +356,18 @@ public class MapJoinProcessor implements
   }
 
   public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
-      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
       JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
       int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
 
     MapJoinDesc mapJoinDescriptor =
-        getMapJoinDesc(hconf, opParseCtxMap, op, leftInputJoin, baseSrc, mapAliases,
+        getMapJoinDesc(hconf, op, leftInputJoin, baseSrc, mapAliases,
                 mapJoinPos, noCheckOuterJoin);
 
     // reduce sink row resolver used to generate map join op
-    RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
+    RowSchema outputRS = op.getSchema();
 
     MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
-        mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), op.getParentOperators());
-
-    OpParseContext ctx = new OpParseContext(outputRS);
-    opParseCtxMap.put(mapJoinOp, ctx);
+        mapJoinDescriptor, new RowSchema(outputRS.getSignature()), op.getParentOperators());
 
     mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
     Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
@@ -434,7 +410,6 @@ public class MapJoinProcessor implements
    * @param noCheckOuterJoin
    */
   public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf,
-    Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
     SMBMapJoinOperator smbJoinOp, int bigTablePos, boolean noCheckOuterJoin)
     throws SemanticException {
     // Create a new map join operator
@@ -451,14 +426,10 @@ public class MapJoinProcessor implements
 
     mapJoinDesc.setStatistics(smbJoinDesc.getStatistics());
 
-    RowResolver joinRS = opParseCtxMap.get(smbJoinOp).getRowResolver();
+    RowSchema joinRS = smbJoinOp.getSchema();
     // The mapjoin has the same schema as the join operator
     MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
-        mapJoinDesc, joinRS.getRowSchema(),
-        new ArrayList<Operator<? extends OperatorDesc>>());
-
-    OpParseContext ctx = new OpParseContext(joinRS);
-    opParseCtxMap.put(mapJoinOp, ctx);
+        mapJoinDesc, joinRS, new ArrayList<Operator<? extends OperatorDesc>>());
 
     // change the children of the original join operator to point to the map
     // join operator
@@ -488,11 +459,10 @@ public class MapJoinProcessor implements
         HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)
         && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN);
 
-    LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
-        .getOpParseCtx();
-    MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op,
-        op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(), op.getConf().getMapAliases(),
-        mapJoinPos, noCheckOuterJoin, true);
+    MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), op,
+        op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(),
+        op.getConf().getMapAliases(), mapJoinPos, noCheckOuterJoin, true);
+
     // create a dummy select to select all columns
     genSelectPlan(pctx, mapJoinOp);
     return mapJoinOp;
@@ -597,32 +567,33 @@ public class MapJoinProcessor implements
 
     // create a dummy select - This select is needed by the walker to split the
     // mapJoin later on
-    RowResolver inputRR = pctx.getOpParseCtx().get(input).getRowResolver();
+    RowSchema inputRS = input.getSchema();
 
     ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
     ArrayList<String> outputs = new ArrayList<String>();
     List<String> outputCols = input.getConf().getOutputColumnNames();
-    RowResolver outputRS = new RowResolver();
+    ArrayList<ColumnInfo> outputRS = new ArrayList<ColumnInfo>();
 
     Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
 
     for (int i = 0; i < outputCols.size(); i++) {
       String internalName = outputCols.get(i);
-      String[] nm = inputRR.reverseLookup(internalName);
-      ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]);
+      ColumnInfo valueInfo = inputRS.getColumnInfo(internalName);
       ExprNodeDesc colDesc = new ExprNodeColumnDesc(valueInfo.getType(), valueInfo
-          .getInternalName(), nm[0], valueInfo.getIsVirtualCol());
+          .getInternalName(), valueInfo.getTabAlias(), valueInfo.getIsVirtualCol());
       exprs.add(colDesc);
       outputs.add(internalName);
-      outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo.getType(), nm[0], valueInfo
-          .getIsVirtualCol(), valueInfo.isHiddenVirtualCol()));
+      ColumnInfo newCol = new ColumnInfo(internalName, valueInfo.getType(),
+              valueInfo.getTabAlias(), valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol());
+      newCol.setAlias(valueInfo.getAlias());
+      outputRS.add(newCol);
       colExprMap.put(internalName, colDesc);
     }
 
     SelectDesc select = new SelectDesc(exprs, outputs, false);
 
-    SelectOperator sel = (SelectOperator) putOpInsertMap(pctx, OperatorFactory.getAndMakeChild(select,
-        new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+    SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild(select,
+            new RowSchema(outputRS), input);
 
     sel.setColumnExprMap(colExprMap);
 
@@ -1055,7 +1026,6 @@ public class MapJoinProcessor implements
   }
 
   public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
-      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
       JoinOperator op, boolean leftInputJoin, String[] baseSrc, List<String> mapAliases,
       int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
     JoinDesc desc = op.getConf();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java Wed Jan 28 06:25:44 2015
@@ -134,7 +134,6 @@ public class NonBlockingOpDeDupProc impl
       pSEL.getConf().setSelectStar(cSEL.getConf().isSelectStar());
       // We need to use the OpParseContext of the child SelectOperator to replace the
       // the OpParseContext of the parent SelectOperator.
-      pctx.updateOpParseCtx(pSEL, pctx.removeOpParseCtx(cSEL));
       pSEL.removeChildAndAdoptItsChildren(cSEL);
       cSEL.setParentOperators(null);
       cSEL.setChildOperators(null);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Wed Jan 28 06:25:44 2015
@@ -196,7 +196,9 @@ public class ReduceSinkMapJoinProc imple
           LOG.debug("Cloning reduce sink for multi-child broadcast edge");
           // we've already set this one up. Need to clone for the next work.
           r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
-              (ReduceSinkDesc) parentRS.getConf().clone(), parentRS.getParentOperators());
+              (ReduceSinkDesc) parentRS.getConf().clone(),
+              new RowSchema(parentRS.getSchema()),
+              parentRS.getParentOperators());
           context.clonedReduceSinks.add(r);
         } else {
           r = parentRS;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java Wed Jan 28 06:25:44 2015
@@ -67,6 +67,7 @@ public class SimpleFetchAggregation impl
 
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     opRules.put(new RuleRegExp("R1", GBY + RS + GBY + SEL + FS), new SingleGBYProcessor(pctx));
+    opRules.put(new RuleRegExp("R2", GBY + RS + GBY + FS), new SingleGBYProcessor(pctx));
 
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -88,12 +89,13 @@ public class SimpleFetchAggregation impl
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
       FileSinkOperator FS = (FileSinkOperator) nd;
-      GroupByOperator cGBY = (GroupByOperator) stack.get(stack.size() - 3);
-      ReduceSinkOperator RS = (ReduceSinkOperator) stack.get(stack.size() - 4);
+      int shift = stack.get(stack.size() - 2) instanceof SelectOperator ? 0 : 1;
+      GroupByOperator cGBY = (GroupByOperator) stack.get(stack.size() - 3 + shift);
+      ReduceSinkOperator RS = (ReduceSinkOperator) stack.get(stack.size() - 4 + shift);
       if (RS.getConf().getNumReducers() != 1 || !RS.getConf().getKeyCols().isEmpty()) {
         return null;
       }
-      GroupByOperator pGBY = (GroupByOperator) stack.get(stack.size() - 5);
+      GroupByOperator pGBY = (GroupByOperator) stack.get(stack.size() - 5 + shift);
 
       Path fileName = FS.getConf().getFinalDirName();
       TableDesc tsDesc = createIntermediateFS(pGBY, fileName);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Wed Jan 28 06:25:44 2015
@@ -50,7 +50,6 @@ import org.apache.hadoop.hive.ql.lib.Rul
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -223,9 +222,6 @@ public class SkewJoinOptimizer implement
         OperatorFactory.getAndMakeChild(
           new UnionDesc(), new RowSchema(currOp.getSchema().getSignature()), oplist);
 
-      RowResolver unionRR = parseContext.getOpParseCtx().get(currOp).getRowResolver();
-      GenMapRedUtils.putOpInsertMap(unionOp, unionRR, parseContext);
-
       // Introduce a select after the union
       List<Operator<? extends OperatorDesc>> unionList =
         new ArrayList<Operator<? extends OperatorDesc>>();
@@ -235,7 +231,6 @@ public class SkewJoinOptimizer implement
         OperatorFactory.getAndMakeChild(
           new SelectDesc(true),
           new RowSchema(unionOp.getSchema().getSignature()), unionList);
-      GenMapRedUtils.putOpInsertMap(selectUnionOp, unionRR, parseContext);
 
       // add the finalOp after the union
       selectUnionOp.setChildOperators(finalOps);
@@ -472,12 +467,10 @@ public class SkewJoinOptimizer implement
       currChild.setParentOperators(null);
 
       Operator<FilterDesc> filter = OperatorFactory.getAndMakeChild(
-        new FilterDesc(filterExpr, false), tableScanOp);
-      filter.setSchema(new RowSchema(tableScanOp.getSchema().getSignature()));
+        new FilterDesc(filterExpr, false),
+        new RowSchema(tableScanOp.getSchema().getSignature()),
+        tableScanOp);
       OperatorFactory.makeChild(filter, currChild);
-
-      RowResolver filterRR = parseContext.getOpParseCtx().get(tableScanOp).getRowResolver();
-      GenMapRedUtils.putOpInsertMap(filter, filterRR, parseContext);
     }
 
     /**
@@ -604,9 +597,6 @@ public class SkewJoinOptimizer implement
         ctx.getCloneTSOpMap().put((TableScanOperator)opClone, (TableScanOperator)op);
       }
 
-      GenMapRedUtils.putOpInsertMap(
-        opClone, parseContext.getOpParseCtx().get(op).getRowResolver(), parseContext);
-
       List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
       List<Operator<? extends OperatorDesc>> parentClones = opClone.getParentOperators();
       if ((parents != null) && (!parents.isEmpty()) &&

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Wed Jan 28 06:25:44 2015
@@ -18,8 +18,13 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,10 +53,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -66,14 +68,8 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * When dynamic partitioning (with or without bucketing and sorting) is enabled, this optimization
@@ -196,8 +192,7 @@ public class SortedDynPartitionOptimizer
       for (int i : sortPositions) LOG.debug("sort position " + i);
       for (int i : sortOrder) LOG.debug("sort order " + i);
       List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema());
-      List<ColumnInfo> colInfos = parseCtx.getOpParseCtx().get(fsParent).getRowResolver()
-          .getColumnInfos();
+      List<ColumnInfo> colInfos = fsParent.getSchema().getSignature();
       ArrayList<ExprNodeDesc> bucketColumns = getPositionsToExprNodes(bucketPositions, colInfos);
 
       // update file sink descriptor
@@ -206,9 +201,7 @@ public class SortedDynPartitionOptimizer
       fsOp.getConf().setTotalFiles(1);
 
       // Create ReduceSinkDesc
-      RowResolver inputRR = parseCtx.getOpParseCtx().get(fsParent).getRowResolver();
-      ObjectPair<String, RowResolver> pair = copyRowResolver(inputRR);
-      RowResolver outRR = pair.getSecond();
+      RowSchema outRS = new RowSchema(fsParent.getSchema());
       ArrayList<ColumnInfo> valColInfo = Lists.newArrayList(fsParent.getSchema().getSignature());
       ArrayList<ExprNodeDesc> newValueCols = Lists.newArrayList();
       Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
@@ -220,28 +213,25 @@ public class SortedDynPartitionOptimizer
           newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
 
       if (!bucketColumns.isEmpty()) {
-        String tableAlias = outRR.getColumnInfos().get(0).getTabAlias();
+        String tableAlias = outRS.getSignature().get(0).getTabAlias();
         ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo,
             tableAlias, true, true);
-        outRR.put(tableAlias, BUCKET_NUMBER_COL_NAME, ci);
+        outRS.getSignature().add(ci);
       }
 
       // Create ReduceSink operator
-      ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
-          OperatorFactory.getAndMakeChild(rsConf, new RowSchema(outRR.getColumnInfos()), fsParent),
-          outRR, parseCtx);
+      ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
+              rsConf, new RowSchema(outRS.getSignature()), fsParent);
       rsOp.setColumnExprMap(colExprMap);
 
       // Create ExtractDesc
-      ObjectPair<String, RowResolver> exPair = copyRowResolver(outRR);
-      RowResolver exRR = exPair.getSecond();
+      RowSchema exRR = new RowSchema(outRS);
       ExtractDesc exConf = new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
           Utilities.ReduceField.VALUE.toString(), "", false));
 
       // Create Extract Operator
-      ExtractOperator exOp = (ExtractOperator) putOpInsertMap(
-          OperatorFactory.getAndMakeChild(exConf, new RowSchema(exRR.getColumnInfos()), rsOp),
-          exRR, parseCtx);
+      ExtractOperator exOp = (ExtractOperator) OperatorFactory.getAndMakeChild(
+              exConf, exRR, rsOp);
 
       // link EX to FS
       fsOp.getParentOperators().clear();
@@ -313,8 +303,6 @@ public class SortedDynPartitionOptimizer
             rsParent.getChildOperators().add(rsGrandChild);
             rsGrandChild.getParentOperators().clear();
             rsGrandChild.getParentOperators().add(rsParent);
-            parseCtx.removeOpParseCtx(rsToRemove);
-            parseCtx.removeOpParseCtx(rsChild);
             LOG.info("Removed " + rsToRemove.getOperatorId() + " and " + rsChild.getOperatorId()
                 + " as it was introduced by enforce bucketing/sorting.");
           }
@@ -496,31 +484,6 @@ public class SortedDynPartitionOptimizer
       return cols;
     }
 
-    private Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
-        ParseContext context) {
-      OpParseContext ctx = new OpParseContext(rr);
-      context.getOpParseCtx().put(op, ctx);
-      return op;
-    }
-
-    private ObjectPair<String, RowResolver> copyRowResolver(RowResolver inputRR) {
-      ObjectPair<String, RowResolver> output = new ObjectPair<String, RowResolver>();
-      RowResolver outRR = new RowResolver();
-      int pos = 0;
-      String tabAlias = null;
-
-      for (ColumnInfo colInfo : inputRR.getColumnInfos()) {
-        String[] info = inputRR.reverseLookup(colInfo.getInternalName());
-        tabAlias = info[0];
-        outRR.put(info[0], info[1], new ColumnInfo(SemanticAnalyzer.getColumnInternalName(pos),
-            colInfo.getType(), info[0], colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol()));
-        pos++;
-      }
-      output.setFirst(tabAlias);
-      output.setSecond(outRR);
-      return output;
-    }
-
   }
 
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java Wed Jan 28 06:25:44 2015
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.LinkedHashMap;
 import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,7 +25,6 @@ import org.apache.hadoop.hive.ql.ErrorMs
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -49,10 +47,9 @@ public class SparkMapJoinProcessor exten
    */
   @Override
   public MapJoinOperator convertMapJoin(HiveConf conf,
-                                        LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
-                                        JoinOperator op, boolean leftSrc, String[] baseSrc, List<String> mapAliases,
-                                        int bigTablePos, boolean noCheckOuterJoin,
-                                        boolean validateMapJoinTree) throws SemanticException {
+      JoinOperator op, boolean leftSrc, String[] baseSrc, List<String> mapAliases,
+      int bigTablePos, boolean noCheckOuterJoin, boolean validateMapJoinTree)
+          throws SemanticException {
 
     // outer join cannot be performed on a table which is being cached
     JoinCondDesc[] condns = op.getConf().getConds();
@@ -64,7 +61,7 @@ public class SparkMapJoinProcessor exten
     }
 
     // create the map-join operator
-    MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
+    MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf,
         op, op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(),
         op.getConf().getMapAliases(), bigTablePos, noCheckOuterJoin);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java Wed Jan 28 06:25:44 2015
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.Fe
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -113,6 +114,8 @@ public class StatsOptimizer implements T
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     opRules.put(new RuleRegExp("R1", TS + SEL + GBY + RS + GBY + SEL + FS),
         new MetaDataProcessor(pctx));
+    opRules.put(new RuleRegExp("R2", TS + SEL + GBY + RS + GBY + FS),
+            new MetaDataProcessor(pctx));
 
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -208,21 +211,24 @@ public class StatsOptimizer implements T
           return null;
         }
 
-        selOp = (SelectOperator)rsOp.getChildOperators().get(0).getChildOperators().get(0);
-        List<AggregationDesc> aggrs = gbyOp.getConf().getAggregators();
-
-        if (!(selOp.getConf().getColList().size() == aggrs.size())) {
-          // all select columns must be aggregations
-          return null;
-
-        }
-        for(ExprNodeDesc desc : selOp.getConf().getColList()) {
-          if (!(desc instanceof ExprNodeColumnDesc)) {
-            // Probably an expression, cant handle that
+        Operator<?> last = rsOp.getChildOperators().get(0);
+        if (last.getChildOperators().get(0) instanceof SelectOperator) {
+          selOp = (SelectOperator)rsOp.getChildOperators().get(0).getChildOperators().get(0);
+          last = selOp;
+          if (!(selOp.getConf().getColList().size() ==
+                  gbyOp.getConf().getAggregators().size())) {
+            // all select columns must be aggregations
             return null;
+  
+          }
+          for(ExprNodeDesc desc : selOp.getConf().getColList()) {
+            if (!(desc instanceof ExprNodeColumnDesc)) {
+              // Probably an expression, cant handle that
+              return null;
+            }
           }
         }
-        FileSinkOperator fsOp = (FileSinkOperator)(selOp.getChildren().get(0));
+        FileSinkOperator fsOp = (FileSinkOperator)(last.getChildren().get(0));
         if (fsOp.getChildOperators() != null && fsOp.getChildOperators().size() > 0) {
           // looks like a subq plan.
           return null;
@@ -234,7 +240,7 @@ public class StatsOptimizer implements T
 
         Hive hive = Hive.get(pctx.getConf());
 
-        for (AggregationDesc aggr : aggrs) {
+        for (AggregationDesc aggr : gbyOp.getConf().getAggregators()) {
           if (aggr.getDistinct()) {
             // our stats for NDV is approx, not accurate.
             return null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java Wed Jan 28 06:25:44 2015
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.ContentSumma
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -59,9 +61,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
 import org.apache.hadoop.hive.ql.optimizer.Transform;
 import org.apache.hadoop.hive.ql.optimizer.physical.CommonJoinTaskDispatcher;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -387,17 +387,16 @@ public class CorrelationOptimizer implem
         List<ExprNodeDesc> backtrackedPartitionCols =
             ExprNodeDescUtils.backtrack(childPartitionCols, child, current);
 
-        OpParseContext opCtx = pCtx.getOpParseCtx().get(current);
-        RowResolver rowResolver = opCtx.getRowResolver();
+        RowSchema rowSchema = current.getSchema();
         Set<String> tableNeedToCheck = new HashSet<String>();
         for (ExprNodeDesc expr: childKeyCols) {
           if (!(expr instanceof ExprNodeColumnDesc)) {
             return correlatedReduceSinkOperators;
           }
           String colName = ((ExprNodeColumnDesc)expr).getColumn();
-          String[] nm = rowResolver.reverseLookup(colName);
-          if (nm != null) {
-            tableNeedToCheck.add(nm[0]);
+          ColumnInfo columnInfo = rowSchema.getColumnInfo(colName);
+          if (columnInfo != null) {
+            tableNeedToCheck.add(columnInfo.getTabAlias());
           }
         }
         if (current instanceof JoinOperator) {
@@ -405,8 +404,7 @@ public class CorrelationOptimizer implem
           int expectedNumCorrelatedRsops = current.getParentOperators().size();
           LinkedHashSet<ReduceSinkOperator> correlatedRsops = null;
           for (Operator<? extends OperatorDesc> parent : current.getParentOperators()) {
-            Set<String> tableNames =
-                pCtx.getOpParseCtx().get(parent).getRowResolver().getTableNames();
+            Set<String> tableNames = parent.getSchema().getTableNames();
             for (String tbl : tableNames) {
               if (tableNeedToCheck.contains(tbl)) {
                 correlatedRsops = findCorrelatedReduceSinkOperators(current,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java Wed Jan 28 06:25:44 2015
@@ -44,9 +44,7 @@ import org.apache.hadoop.hive.ql.exec.Se
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -347,7 +345,7 @@ public final class CorrelationUtilities
   protected static SelectOperator replaceOperatorWithSelect(Operator<?> operator,
       ParseContext context, AbstractCorrelationProcCtx procCtx)
       throws SemanticException {
-    RowResolver inputRR = context.getOpParseCtx().get(operator).getRowResolver();
+    RowSchema inputRS = operator.getSchema();
     SelectDesc select = new SelectDesc(null, null);
 
     Operator<?> parent = getSingleParent(operator);
@@ -355,9 +353,8 @@ public final class CorrelationUtilities
 
     parent.getChildOperators().clear();
 
-    SelectOperator sel = (SelectOperator) putOpInsertMap(
-        OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
-            .getColumnInfos()), parent), inputRR, context);
+    SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild(
+            select, new RowSchema(inputRS.getSignature()), parent);
 
     sel.setColumnExprMap(operator.getColumnExprMap());
 
@@ -393,8 +390,6 @@ public final class CorrelationUtilities
       }
       cGBYr.setColumnExprMap(cGBYm.getColumnExprMap());
       cGBYr.setSchema(cGBYm.getSchema());
-      RowResolver resolver = context.getOpParseCtx().get(cGBYm).getRowResolver();
-      context.getOpParseCtx().get(cGBYr).setRowResolver(resolver);
     } else {
       // pRS-cRS-cGBYr (no map aggregation) --> pRS-cGBYr(COMPLETE)
       // revert expressions of cGBYr to that of cRS
@@ -404,25 +399,23 @@ public final class CorrelationUtilities
       }
 
       Map<String, ExprNodeDesc> oldMap = cGBYr.getColumnExprMap();
-      RowResolver oldRR = context.getOpParseCtx().get(cGBYr).getRowResolver();
+      RowSchema oldRS = cGBYr.getSchema();
 
       Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
-      RowResolver newRR = new RowResolver();
+      ArrayList<ColumnInfo> newRS = new ArrayList<ColumnInfo>();
 
       List<String> outputCols = cGBYr.getConf().getOutputColumnNames();
       for (int i = 0; i < outputCols.size(); i++) {
         String colName = outputCols.get(i);
-        String[] nm = oldRR.reverseLookup(colName);
-        ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
-        newRR.put(nm[0], nm[1], colInfo);
+        ColumnInfo colInfo = oldRS.getColumnInfo(colName);
+        newRS.add(colInfo);
         ExprNodeDesc colExpr = ExprNodeDescUtils.backtrack(oldMap.get(colName), cGBYr, cRS);
         if (colExpr != null) {
           newMap.put(colInfo.getInternalName(), colExpr);
         }
       }
       cGBYr.setColumnExprMap(newMap);
-      cGBYr.setSchema(new RowSchema(newRR.getColumnInfos()));
-      context.getOpParseCtx().get(cGBYr).setRowResolver(newRR);
+      cGBYr.setSchema(new RowSchema(newRS));
     }
     cGBYr.getConf().setMode(GroupByDesc.Mode.COMPLETE);
 
@@ -494,13 +487,5 @@ public final class CorrelationUtilities
     }
     target.setChildOperators(null);
     target.setParentOperators(null);
-    context.getOpParseCtx().remove(target);
-  }
-
-  protected static Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
-      ParseContext context) {
-    OpParseContext ctx = new OpParseContext(rr);
-    context.getOpParseCtx().put(op, ctx);
-    return op;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java Wed Jan 28 06:25:44 2015
@@ -254,7 +254,6 @@ public class QueryPlanTreeTransformation
     for (ReduceSinkOperator rsop: handledRSs) {
       rsop.setChildOperators(null);
       rsop.setParentOperators(null);
-      pCtx.getOpParseCtx().remove(rsop);
     }
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java Wed Jan 28 06:25:44 2015
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
 import org.apache.hadoop.hive.ql.optimizer.Transform;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -332,8 +331,6 @@ public class RewriteGBUsingIndex impleme
           RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb, canApplyCtx);
       rewriteQueryCtx.invokeRewriteQueryProc();
       parseContext = rewriteQueryCtx.getParseContext();
-      parseContext.setOpParseCtx((LinkedHashMap<Operator<? extends OperatorDesc>,
-          OpParseContext>) rewriteQueryCtx.getOpc());
     }
     LOG.info("Finished Rewriting query");
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java Wed Jan 28 06:25:44 2015
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.optimi
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,19 +30,17 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -76,7 +73,6 @@ public final class RewriteQueryUsingAggr
     this.indexTableName = canApplyCtx.getIndexTableName();
     this.alias = canApplyCtx.getAlias();
     this.aggregateFunction = canApplyCtx.getAggFunction();
-    this.opc = parseContext.getOpParseCtx();
     this.indexKey = canApplyCtx.getIndexKey();
   }
 
@@ -86,8 +82,6 @@ public final class RewriteQueryUsingAggr
         parseContext, hiveDb, canApplyCtx);
   }
 
-  private Map<Operator<? extends OperatorDesc>, OpParseContext> opc =
-    new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
   private final Hive hiveDb;
   private final ParseContext parseContext;
   private RewriteCanApplyCtx canApplyCtx;
@@ -99,10 +93,6 @@ public final class RewriteQueryUsingAggr
   private ExprNodeColumnDesc aggrExprNode = null;
   private String indexKey;
 
-  public Map<Operator<? extends OperatorDesc>, OpParseContext> getOpc() {
-    return opc;
-  }
-
   public  ParseContext getParseContext() {
     return parseContext;
   }
@@ -172,15 +162,9 @@ public final class RewriteQueryUsingAggr
     // and add new ones
     Map<String, Operator<? extends OperatorDesc>> topOps = rewriteQueryCtx.getParseContext()
         .getTopOps();
-    Map<Operator<? extends OperatorDesc>, OpParseContext> opParseContext = rewriteQueryCtx
-        .getParseContext().getOpParseCtx();
-
-    // need this to set rowResolver for new scanOperator
-    OpParseContext operatorContext = opParseContext.get(scanOperator);
 
     // remove original TableScanOperator
     topOps.remove(alias);
-    opParseContext.remove(scanOperator);
 
     String indexTableName = rewriteQueryCtx.getIndexName();
     Table indexTableHandle = null;
@@ -201,23 +185,21 @@ public final class RewriteQueryUsingAggr
     scanOperator.setConf(indexTableScanDesc);
 
     // Construct the new RowResolver for the new TableScanOperator
-    RowResolver rr = new RowResolver();
+    ArrayList<ColumnInfo> sigRS = new ArrayList<ColumnInfo>();
     try {
       StructObjectInspector rowObjectInspector = (StructObjectInspector) indexTableHandle
           .getDeserializer().getObjectInspector();
       StructField field = rowObjectInspector.getStructFieldRef(rewriteQueryCtx.getIndexKey());
-      rr.put(indexTableName, field.getFieldName(), new ColumnInfo(field.getFieldName(),
-          TypeInfoUtils.getTypeInfoFromObjectInspector(field.getFieldObjectInspector()),
-          indexTableName, false));
+      sigRS.add(new ColumnInfo(field.getFieldName(), TypeInfoUtils.getTypeInfoFromObjectInspector(
+              field.getFieldObjectInspector()), indexTableName, false));
     } catch (SerDeException e) {
       LOG.error("Error while creating the RowResolver for new TableScanOperator.");
       LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
       throw new SemanticException(e.getMessage(), e);
     }
+    RowSchema rs = new RowSchema(sigRS);
 
     // Set row resolver for new table
-    operatorContext.setRowResolver(rr);
-
     String newAlias = indexTableName;
     int index = alias.lastIndexOf(":");
     if (index >= 0) {
@@ -228,13 +210,10 @@ public final class RewriteQueryUsingAggr
     scanOperator.getConf().setAlias(newAlias);
     scanOperator.setAlias(indexTableName);
     topOps.put(newAlias, scanOperator);
-    opParseContext.put(scanOperator, operatorContext);
     rewriteQueryCtx.getParseContext().setTopOps(
         (HashMap<String, Operator<? extends OperatorDesc>>) topOps);
-    rewriteQueryCtx.getParseContext().setOpParseCtx(
-        (LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>) opParseContext);
 
-    ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rr,
+    ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rs,
         Arrays.asList(rewriteQueryCtx.getIndexKey()));
   }
 
@@ -318,11 +297,6 @@ public final class RewriteQueryUsingAggr
       // Now the GroupByOperator has the new AggregationList;
       // sum(`_count_of_indexed_key`)
       // instead of count(indexed_key)
-      OpParseContext gbyOPC = rewriteQueryCtx.getOpc().get(operator);
-      RowResolver gbyRR = newDAGContext.getOpParseCtx().get(newGbyOperator).getRowResolver();
-      gbyOPC.setRowResolver(gbyRR);
-      rewriteQueryCtx.getOpc().put(operator, gbyOPC);
-
       oldConf.setAggregators((ArrayList<AggregationDesc>) newAggrList);
       operator.setConf(oldConf);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java Wed Jan 28 06:25:44 2015
@@ -19,8 +19,8 @@
 package org.apache.hadoop.hive.ql.optimizer.lineage;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
@@ -71,7 +71,7 @@ public class ExprProcCtx implements Node
     return inpOp;
   }
 
-  public RowResolver getResolver() {
-    return lctx.getParseCtx().getOpParseCtx().get(inpOp).getRowResolver();
+  public RowSchema getSchema() {
+    return inpOp.getSchema();
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java Wed Jan 28 06:25:44 2015
@@ -29,6 +29,7 @@ import java.util.Stack;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -76,12 +76,11 @@ public class ExprProcFactory {
       Operator<? extends OperatorDesc> operator = epc.getInputOperator();
       assert (operator != null);
 
-      RowResolver resolver = epc.getResolver();
-      String[] nm = resolver.reverseLookup(cd.getColumn());
-      if (nm == null && operator instanceof ReduceSinkOperator) {
-        nm = resolver.reverseLookup(Utilities.removeValueTag(cd.getColumn()));
+      RowSchema schema = epc.getSchema();
+      ColumnInfo ci = schema.getColumnInfo(cd.getColumn());
+      if (ci == null && operator instanceof ReduceSinkOperator) {
+        ci = schema.getColumnInfo(Utilities.removeValueTag(cd.getColumn()));
       }
-      ColumnInfo ci = nm != null ? resolver.get(nm[0], nm[1]): null;
 
       // Insert the dependencies of inp_ci to that of the current operator, ci
       LineageCtx lc = epc.getLineageCtx();
@@ -143,6 +142,7 @@ public class ExprProcFactory {
       Dependency dep = new Dependency();
       dep.setType(LineageInfo.DependencyType.SIMPLE);
       dep.setBaseCols(new ArrayList<BaseColumnInfo>());
+
       return dep;
     }
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java Wed Jan 28 06:25:44 2015
@@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.Utils;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -468,17 +467,16 @@ public class OpProcFactory {
               ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
         }
       } else {
-        RowResolver resolver = lCtx.getParseCtx().getOpParseCtx().get(rop).getRowResolver();
+        RowSchema schema = rop.getSchema();
         ReduceSinkDesc desc = rop.getConf();
         List<ExprNodeDesc> keyCols = desc.getKeyCols();
         ArrayList<String> keyColNames = desc.getOutputKeyColumnNames();
         for (int i = 0; i < keyCols.size(); i++) {
           // order-bys, joins
-          String[] nm = resolver.reverseLookup(Utilities.ReduceField.KEY + "." + keyColNames.get(i));
-          if (nm == null) {
+          ColumnInfo column = schema.getColumnInfo(Utilities.ReduceField.KEY + "." + keyColNames.get(i));
+          if (column == null) {
             continue;   // key in values
           }
-          ColumnInfo column = resolver.get(nm[0], nm[1]);
           lCtx.getIndex().putDependency(rop, column,
               ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i)));
         }
@@ -486,12 +484,11 @@ public class OpProcFactory {
         ArrayList<String> valColNames = desc.getOutputValueColumnNames();
         for (int i = 0; i < valCols.size(); i++) {
           // todo: currently, bucketing,etc. makes RS differently with those for order-bys or joins
-          String[] nm = resolver.reverseLookup(valColNames.get(i));
-          if (nm == null) {
+          ColumnInfo column = schema.getColumnInfo(valColNames.get(i));
+          if (column == null) {
             // order-bys, joins
-            nm = resolver.reverseLookup(Utilities.ReduceField.VALUE + "." + valColNames.get(i));
+            column = schema.getColumnInfo(Utilities.ReduceField.VALUE + "." + valColNames.get(i));
           }
-          ColumnInfo column = resolver.get(nm[0], nm[1]);
           lCtx.getIndex().putDependency(rop, column,
               ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i)));
         }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Wed Jan 28 06:25:44 2015
@@ -455,7 +455,6 @@ public class CommonJoinTaskDispatcher ex
         }
       }
 
-      currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
       currWork.setLeftInputJoin(joinOp.getConf().isLeftInputJoin());
       currWork.setBaseSrc(joinOp.getConf().getBaseSrc());
       currWork.setMapAliases(joinOp.getConf().getMapAliases());
@@ -521,7 +520,6 @@ public class CommonJoinTaskDispatcher ex
     listWorks.add(currTask.getWork());
     listTasks.add(currTask);
     // clear JoinTree and OP Parse Context
-    currWork.setOpParseCtxMap(null);
     currWork.setLeftInputJoin(false);
     currWork.setBaseSrc(null);
     currWork.setMapAliases(null);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java Wed Jan 28 06:25:44 2015
@@ -44,7 +44,6 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
-import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
@@ -151,10 +150,6 @@ public class SortMergeJoinTaskDispatcher
       MapredWork currJoinWork = Utilities.clonePlan(currWork);
       SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
 
-      // Add the row resolver for the new operator
-      Map<Operator<? extends OperatorDesc>, OpParseContext> opParseContextMap =
-          physicalContext.getParseContext().getOpParseCtx();
-      opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
       // change the newly created map-red plan as if it was a join operator
       genSMBJoinWork(currJoinWork.getMapWork(), newSMBJoinOp);
       return currJoinWork;
@@ -253,11 +248,9 @@ public class SortMergeJoinTaskDispatcher
     MapredWork currJoinWork = convertSMBWorkToJoinWork(currWork, originalSMBJoinOp);
     SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
 
-    currWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
     currWork.getMapWork().setLeftInputJoin(originalSMBJoinOp.getConf().isLeftInputJoin());
     currWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc());
     currWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases());
-    currJoinWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
     currJoinWork.getMapWork().setLeftInputJoin(originalSMBJoinOp.getConf().isLeftInputJoin());
     currJoinWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc());
     currJoinWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases());
@@ -334,7 +327,6 @@ public class SortMergeJoinTaskDispatcher
     listWorks.add(currTask.getWork());
     listTasks.add(currTask);
     // clear JoinTree and OP Parse Context
-    currWork.getMapWork().setOpParseCtxMap(null);
     currWork.getMapWork().setLeftInputJoin(false);
     currWork.getMapWork().setBaseSrc(null);
     currWork.getMapWork().setMapAliases(null);
@@ -432,13 +424,8 @@ public class SortMergeJoinTaskDispatcher
       int mapJoinPos) throws SemanticException {
     SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(task.getWork());
 
-    // Add the row resolver for the new operator
-    Map<Operator<? extends OperatorDesc>, OpParseContext> opParseContextMap =
-        physicalContext.getParseContext().getOpParseCtx();
-    opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
-
     // generate the map join operator
-    return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(),
-        opParseContextMap, newSMBJoinOp, mapJoinPos, true);
+    return MapJoinProcessor.convertSMBJoinToMapJoin(
+        physicalContext.getConf(), newSMBJoinOp, mapJoinPos, true);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java Wed Jan 28 06:25:44 2015
@@ -372,11 +372,10 @@ public class SparkMapJoinOptimizer imple
     }
 
     //can safely convert the join to a map join.
-    ParseContext parseContext = context.getParseContext();
     MapJoinOperator mapJoinOp =
-        MapJoinProcessor.convertJoinOpMapJoinOp(context.getConf(), parseContext.getOpParseCtx(), joinOp,
-            joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(),
-            bigTablePosition, true);
+        MapJoinProcessor.convertJoinOpMapJoinOp(context.getConf(), joinOp,
+            joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(),
+            joinOp.getConf().getMapAliases(), bigTablePosition, true);
 
     Operator<? extends OperatorDesc> parentBigTableOp =
         mapJoinOp.getParentOperators().get(bigTablePosition);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1655226&r1=1655225&r2=1655226&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Wed Jan 28 06:25:44 2015
@@ -18,6 +18,13 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Stack;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -28,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -43,13 +51,6 @@ import org.apache.hadoop.hive.ql.plan.Te
 import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Stack;
-
 /**
  * GenTezWork separates the operator tree into tez tasks.
  * It is called once per leaf operator (operator that forces
@@ -245,7 +246,9 @@ public class GenTezWork implements NodeP
                   LOG.debug("Cloning reduce sink for multi-child broadcast edge");
                   // we've already set this one up. Need to clone for the next work.
                   r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
-                      (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators());
+                      (ReduceSinkDesc)r.getConf().clone(),
+                      new RowSchema(r.getSchema()),
+                      r.getParentOperators());
                   context.clonedReduceSinks.add(r);
                 }
                 r.getConf().setOutputName(work.getName());