You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/14 20:48:47 UTC
svn commit: r1639767 -
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
Author: szehon
Date: Fri Nov 14 19:48:46 2014
New Revision: 1639767
URL: http://svn.apache.org/r1639767
Log:
HIVE-8865 : Needs to set hashTableMemoryUsage for MapJoinDesc [Spark Branch] (Chao Sun via Szehon)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java?rev=1639767&r1=1639766&r2=1639767&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java Fri Nov 14 19:48:46 2014
@@ -20,12 +20,15 @@ package org.apache.hadoop.hive.ql.optimi
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -33,9 +36,15 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.spark.GenSparkProcContext;
import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -54,8 +63,42 @@ import com.google.common.base.Preconditi
public class SparkReduceSinkMapJoinProc implements NodeProcessor {
+ public static class SparkMapJoinFollowedByGroupByProcessor implements NodeProcessor {
+ private boolean hasGroupBy = false;
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ GenSparkProcContext context = (GenSparkProcContext) procCtx;
+ hasGroupBy = true;
+ GroupByOperator op = (GroupByOperator) nd;
+ float groupByMemoryUsage = context.conf.getFloatVar(
+ HiveConf.ConfVars.HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY);
+ op.getConf().setGroupByMemoryUsage(groupByMemoryUsage);
+ return null;
+ }
+
+ public boolean getHasGroupBy() {
+ return hasGroupBy;
+ }
+ }
+
protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
+ private boolean hasGroupBy(Operator<? extends OperatorDesc> mapjoinOp,
+ GenSparkProcContext context) throws SemanticException {
+ List<Operator<? extends OperatorDesc>> childOps = mapjoinOp.getChildOperators();
+ Map<Rule, NodeProcessor> rules = new LinkedHashMap<Rule, NodeProcessor>();
+ SparkMapJoinFollowedByGroupByProcessor processor = new SparkMapJoinFollowedByGroupByProcessor();
+ rules.put(new RuleRegExp("GBY", GroupByOperator.getOperatorName() + "%"), processor);
+ Dispatcher disp = new DefaultRuleDispatcher(null, rules, context);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(childOps);
+ ogw.startWalking(topNodes, null);
+ return processor.getHasGroupBy();
+ }
+
/* (non-Javadoc)
* This processor addresses the RS-MJ case that occurs in spark on the small/hash
* table side of things. The work that RS will be a part of must be connected
@@ -66,7 +109,8 @@ public class SparkReduceSinkMapJoinProc
* or reduce work.
*/
@Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procContext, Object... nodeOutputs)
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procContext, Object... nodeOutputs)
throws SemanticException {
GenSparkProcContext context = (GenSparkProcContext) procContext;
MapJoinOperator mapJoinOp = (MapJoinOperator)nd;
@@ -89,7 +133,7 @@ public class SparkReduceSinkMapJoinProc
context.mapJoinParentMap.put(mapJoinOp, parents);
}
- List<BaseWork> mapJoinWork = null;
+ List<BaseWork> mapJoinWork;
/*
* if there was a pre-existing work generated for the big-table mapjoin side,
@@ -120,8 +164,8 @@ public class SparkReduceSinkMapJoinProc
LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> "+parentWork.getName());
mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName());
- int numBuckets = -1;
-/* EdgeType edgeType = EdgeType.BROADCAST_EDGE;
+/* int numBuckets = -1;
+ EdgeType edgeType = EdgeType.BROADCAST_EDGE;
if (mapJoinOp.getConf().isBucketMapJoin()) {
// disable auto parallelism for bucket map joins
@@ -143,7 +187,7 @@ public class SparkReduceSinkMapJoinProc
LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
sparkWork.connect(parentWork, myWork, edgeProp);
- ReduceSinkOperator r = null;
+ ReduceSinkOperator r;
if (parentRS.getConf().getOutputName() != null) {
LOG.debug("Cloning reduce sink for multi-child broadcast edge");
// we've already set this one up. Need to clone for the next work.
@@ -228,21 +272,44 @@ public class SparkReduceSinkMapJoinProc
}
context.linkChildOpWithDummyOp.put(mapJoinOp, dummyOperators);
-
- //replace ReduceSinkOp with HashTableSinkOp for the RSops which are parents of MJop
+ // replace ReduceSinkOp with HashTableSinkOp for the RSops which are parents of MJop
MapJoinDesc mjDesc = mapJoinOp.getConf();
+ HiveConf conf = context.conf;
+
+ float hashtableMemoryUsage;
+ if (hasGroupBy(mapJoinOp, context)) {
+ hashtableMemoryUsage = conf.getFloatVar(
+ HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE);
+ } else {
+ hashtableMemoryUsage = conf.getFloatVar(
+ HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE);
+ }
+ mjDesc.setHashTableMemoryUsage(hashtableMemoryUsage);
SparkHashTableSinkDesc hashTableSinkDesc = new SparkHashTableSinkDesc(mjDesc);
SparkHashTableSinkOperator hashTableSinkOp =
(SparkHashTableSinkOperator) OperatorFactory.get(hashTableSinkDesc);
+ byte tag = (byte) pos;
+ int[] valueIndex = mjDesc.getValueIndex(tag);
+ if (valueIndex != null) {
+ List<ExprNodeDesc> newValues = new ArrayList<ExprNodeDesc>();
+ List<ExprNodeDesc> values = hashTableSinkDesc.getExprs().get(tag);
+ for (int index = 0; index < values.size(); index++) {
+ if (valueIndex[index] < 0) {
+ newValues.add(values.get(index));
+ }
+ }
+ hashTableSinkDesc.getExprs().put(tag, newValues);
+ }
+
//get all parents of reduce sink
List<Operator<? extends OperatorDesc>> RSparentOps = parentRS.getParentOperators();
for (Operator<? extends OperatorDesc> parent : RSparentOps) {
parent.replaceChild(parentRS, hashTableSinkOp);
}
hashTableSinkOp.setParentOperators(RSparentOps);
- hashTableSinkOp.setTag((byte)pos);
+ hashTableSinkOp.setTag(tag);
return true;
}
}