You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/14 20:48:47 UTC

svn commit: r1639767 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java

Author: szehon
Date: Fri Nov 14 19:48:46 2014
New Revision: 1639767

URL: http://svn.apache.org/r1639767
Log:
HIVE-8865 : Needs to set hashTableMemoryUsage for MapJoinDesc [Spark Branch] (Chao Sun via Szehon)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java?rev=1639767&r1=1639766&r2=1639767&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java Fri Nov 14 19:48:46 2014
@@ -20,12 +20,15 @@ package org.apache.hadoop.hive.ql.optimi
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -33,9 +36,15 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.spark.GenSparkProcContext;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -54,8 +63,42 @@ import com.google.common.base.Preconditi
 
 public class SparkReduceSinkMapJoinProc implements NodeProcessor {
 
+  public static class SparkMapJoinFollowedByGroupByProcessor implements NodeProcessor {
+    private boolean hasGroupBy = false;
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                          Object... nodeOutputs) throws SemanticException {
+      GenSparkProcContext context = (GenSparkProcContext) procCtx;
+      hasGroupBy = true;
+      GroupByOperator op = (GroupByOperator) nd;
+      float groupByMemoryUsage = context.conf.getFloatVar(
+          HiveConf.ConfVars.HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY);
+      op.getConf().setGroupByMemoryUsage(groupByMemoryUsage);
+      return null;
+    }
+
+    public boolean getHasGroupBy() {
+      return hasGroupBy;
+    }
+  }
+
   protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
 
+  private boolean hasGroupBy(Operator<? extends OperatorDesc> mapjoinOp,
+                             GenSparkProcContext context) throws SemanticException {
+    List<Operator<? extends OperatorDesc>> childOps = mapjoinOp.getChildOperators();
+    Map<Rule, NodeProcessor> rules = new LinkedHashMap<Rule, NodeProcessor>();
+    SparkMapJoinFollowedByGroupByProcessor processor = new SparkMapJoinFollowedByGroupByProcessor();
+    rules.put(new RuleRegExp("GBY", GroupByOperator.getOperatorName() + "%"), processor);
+    Dispatcher disp = new DefaultRuleDispatcher(null, rules, context);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(childOps);
+    ogw.startWalking(topNodes, null);
+    return processor.getHasGroupBy();
+  }
+
   /* (non-Javadoc)
    * This processor addresses the RS-MJ case that occurs in spark on the small/hash
    * table side of things. The work that RS will be a part of must be connected
@@ -66,7 +109,8 @@ public class SparkReduceSinkMapJoinProc 
    * or reduce work.
    */
   @Override
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext, Object... nodeOutputs)
+  public Object process(Node nd, Stack<Node> stack,
+                        NodeProcessorCtx procContext, Object... nodeOutputs)
       throws SemanticException {
     GenSparkProcContext context = (GenSparkProcContext) procContext;
     MapJoinOperator mapJoinOp = (MapJoinOperator)nd;
@@ -89,7 +133,7 @@ public class SparkReduceSinkMapJoinProc 
       context.mapJoinParentMap.put(mapJoinOp, parents);
     }
 
-    List<BaseWork> mapJoinWork = null;
+    List<BaseWork> mapJoinWork;
 
     /*
      *  if there was a pre-existing work generated for the big-table mapjoin side,
@@ -120,8 +164,8 @@ public class SparkReduceSinkMapJoinProc 
     LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> "+parentWork.getName());
     mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName());
 
-    int numBuckets = -1;
-/*    EdgeType edgeType = EdgeType.BROADCAST_EDGE;
+/*  int numBuckets = -1;
+    EdgeType edgeType = EdgeType.BROADCAST_EDGE;
     if (mapJoinOp.getConf().isBucketMapJoin()) {
 
       // disable auto parallelism for bucket map joins
@@ -143,7 +187,7 @@ public class SparkReduceSinkMapJoinProc 
         LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
         sparkWork.connect(parentWork, myWork, edgeProp);
 
-        ReduceSinkOperator r = null;
+        ReduceSinkOperator r;
         if (parentRS.getConf().getOutputName() != null) {
           LOG.debug("Cloning reduce sink for multi-child broadcast edge");
           // we've already set this one up. Need to clone for the next work.
@@ -228,21 +272,44 @@ public class SparkReduceSinkMapJoinProc 
     }
     context.linkChildOpWithDummyOp.put(mapJoinOp, dummyOperators);
 
-
-    //replace ReduceSinkOp with HashTableSinkOp for the RSops which are parents of MJop
+    // replace ReduceSinkOp with HashTableSinkOp for the RSops which are parents of MJop
     MapJoinDesc mjDesc = mapJoinOp.getConf();
+    HiveConf conf = context.conf;
+
+    float hashtableMemoryUsage;
+    if (hasGroupBy(mapJoinOp, context)) {
+      hashtableMemoryUsage = conf.getFloatVar(
+          HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE);
+    } else {
+      hashtableMemoryUsage = conf.getFloatVar(
+          HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE);
+    }
+    mjDesc.setHashTableMemoryUsage(hashtableMemoryUsage);
 
     SparkHashTableSinkDesc hashTableSinkDesc = new SparkHashTableSinkDesc(mjDesc);
     SparkHashTableSinkOperator hashTableSinkOp =
       (SparkHashTableSinkOperator) OperatorFactory.get(hashTableSinkDesc);
 
+    byte tag = (byte) pos;
+    int[] valueIndex = mjDesc.getValueIndex(tag);
+    if (valueIndex != null) {
+      List<ExprNodeDesc> newValues = new ArrayList<ExprNodeDesc>();
+      List<ExprNodeDesc> values = hashTableSinkDesc.getExprs().get(tag);
+      for (int index = 0; index < values.size(); index++) {
+        if (valueIndex[index] < 0) {
+          newValues.add(values.get(index));
+        }
+      }
+      hashTableSinkDesc.getExprs().put(tag, newValues);
+    }
+
     //get all parents of reduce sink
     List<Operator<? extends OperatorDesc>> RSparentOps = parentRS.getParentOperators();
     for (Operator<? extends OperatorDesc> parent : RSparentOps) {
       parent.replaceChild(parentRS, hashTableSinkOp);
     }
     hashTableSinkOp.setParentOperators(RSparentOps);
-    hashTableSinkOp.setTag((byte)pos);
+    hashTableSinkOp.setTag(tag);
     return true;
   }
 }