You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/08 03:00:12 UTC
svn commit: r1650201 [2/3] - in /hive/branches/spark:
itests/hive-unit/src/test/java/org/apache/hive/jdbc/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/se...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java Thu Jan 8 02:00:11 2015
@@ -58,6 +58,7 @@ public class SparkCrossProductCheck impl
@Override
public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
throws SemanticException {
+ @SuppressWarnings("unchecked")
Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
if (currTask instanceof SparkTask) {
SparkWork sparkWork = ((SparkTask) currTask).getWork();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Thu Jan 8 02:00:11 2015
@@ -240,8 +240,6 @@ public class SparkMapJoinResolver implem
}
}
}
-
- // TODO: enable non-staged mapjoin
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Thu Jan 8 02:00:11 2015
@@ -96,8 +96,8 @@ public class SetSparkReducerParallelism
long numberOfBytes = 0;
// we need to add up all the estimates from the siblings of this reduce sink
- for (Operator<? extends OperatorDesc> sibling:
- sink.getChildOperators().get(0).getParentOperators()) {
+ for (Operator<? extends OperatorDesc> sibling
+ : sink.getChildOperators().get(0).getParentOperators()) {
if (sibling.getStatistics() != null) {
numberOfBytes += sibling.getStatistics().getDataSize();
if (LOG.isDebugEnabled()) {
@@ -139,8 +139,8 @@ public class SetSparkReducerParallelism
if (numReducers < cores) {
numReducers = cores;
}
- LOG.info("Set parallelism parameters: cores = " + cores + ", numReducers = " + numReducers +
- ", bytesPerReducer = " + bytesPerReducer + ", numberOfBytes = " + numberOfBytes);
+ LOG.info("Set parallelism parameters: cores = " + cores + ", numReducers = " + numReducers
+ + ", bytesPerReducer = " + bytesPerReducer + ", numberOfBytes = " + numberOfBytes);
LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers + " (calculated)");
desc.setNumReducers(numReducers);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java Thu Jan 8 02:00:11 2015
@@ -18,8 +18,9 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
+import java.util.Stack;
+
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -30,15 +31,13 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
-import java.util.Stack;
-
/**
* This processes joins in which user specified a hint to identify the small-table.
- * Currently it takes a mapjoin already converted from hints, and converts it further to BucketMapJoin or SMBMapJoin
- * using same small-table identification.
+ * Currently it takes a mapjoin already converted from hints, and converts it further
+ * to BucketMapJoin or SMBMapJoin using same small-table identification.
*
- * The idea is eventually to process even hinted Mapjoin hints here, but due to code complexity in refactoring, that is still
- * in Optimizer.
+ * The idea is eventually to process even hinted Mapjoin hints here,
+ * but due to code complexity in refactoring, that is still in Optimizer.
*/
public class SparkJoinHintOptimizer implements NodeProcessor {
@@ -51,13 +50,14 @@ public class SparkJoinHintOptimizer impl
}
@Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
- MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
OptimizeSparkProcContext context = (OptimizeSparkProcContext) procCtx;
HiveConf hiveConf = context.getParseContext().getConf();
// Convert from mapjoin to bucket map join if enabled.
- if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN) || hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)
+ || hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
BucketJoinProcCtx bjProcCtx = new BucketJoinProcCtx(hiveConf);
bucketMapJoinOptimizer.process(nd, stack, bjProcCtx, nodeOutputs);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java Thu Jan 8 02:00:11 2015
@@ -29,7 +29,8 @@ import org.apache.hadoop.hive.ql.parse.s
import java.util.Stack;
/**
- * Converts a join to a more optimized join for the Spark path. Delegates to a more specialized join processor.
+ * Converts a join to a more optimized join for the Spark path.
+ * Delegates to a more specialized join processor.
*/
public class SparkJoinOptimizer implements NodeProcessor {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java Thu Jan 8 02:00:11 2015
@@ -60,8 +60,8 @@ public class SparkMapJoinOptimizer imple
private static final Log LOG = LogFactory.getLog(SparkMapJoinOptimizer.class.getName());
@Override
- /*
- * (non-Javadoc) we should ideally not modify the tree we traverse. However,
+ /**
+ * We should ideally not modify the tree we traverse. However,
* since we need to walk the tree at any time when we modify the operator, we
* might as well do it here.
*/
@@ -74,66 +74,14 @@ public class SparkMapJoinOptimizer imple
JoinOperator joinOp = (JoinOperator) nd;
if (!conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
- // && !(conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
- // int pos = 0; // it doesn't matter which position we use in this case.
- // convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
return null;
}
LOG.info("Check if it can be converted to map join");
long[] mapJoinInfo = getMapJoinConversionInfo(joinOp, context);
- int mapJoinConversionPos = (int)mapJoinInfo[0];
+ int mapJoinConversionPos = (int) mapJoinInfo[0];
if (mapJoinConversionPos < 0) {
- /* TODO: handle this later
- // we cannot convert to bucket map join, we cannot convert to
- // map join either based on the size. Check if we can convert to SMB join.
- if (conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
- convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
- return null;
- }
- Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
- try {
- bigTableMatcherClass =
- (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
- parseContext.getConf(),
- HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
- } catch (ClassNotFoundException e) {
- throw new SemanticException(e.getMessage());
- }
-
- BigTableSelectorForAutoSMJ bigTableMatcher =
- ReflectionUtils.newInstance(bigTableMatcherClass, null);
- JoinDesc joinDesc = joinOp.getConf();
- JoinCondDesc[] joinCondns = joinDesc.getConds();
- Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
- if (joinCandidates.isEmpty()) {
- // This is a full outer join. This can never be a map-join
- // of any type. So return false.
- return false;
- }
- mapJoinConversionPos =
- bigTableMatcher.getBigTablePosition(parseContext, joinOp, joinCandidates);
- if (mapJoinConversionPos < 0) {
- // contains aliases from sub-query
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
- return null;
- }
-
- if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
- convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
- tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
- } else {
- // we are just converting to a common merge join operator. The shuffle
- // join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
- convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
- } */
return null;
}
@@ -166,107 +114,9 @@ public class SparkMapJoinOptimizer imple
return mapJoinOp;
}
- // replaces the join operator with a new CommonJoinOperator, removes the
- // parent reduce sinks
- /*
- private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeSparkProcContext context,
- int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
- throws SemanticException {
- ParseContext parseContext = context.parseContext;
- MapJoinDesc mapJoinDesc = null;
- if (adjustParentsChildren) {
- mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
- joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
- } else {
- JoinDesc joinDesc = joinOp.getConf();
- // retain the original join desc in the map join.
- mapJoinDesc =
- new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
- joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
- joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
- }
-
- @SuppressWarnings("unchecked")
- CommonMergeJoinOperator mergeJoinOp =
- (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
- isSubQuery, mapJoinConversionPos, mapJoinDesc));
- OpTraits opTraits =
- new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
- .getSortCols());
- mergeJoinOp.setOpTraits(opTraits);
- mergeJoinOp.setStatistics(joinOp.getStatistics());
-
- for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
- int pos = parentOp.getChildOperators().indexOf(joinOp);
- parentOp.getChildOperators().remove(pos);
- parentOp.getChildOperators().add(pos, mergeJoinOp);
- }
-
- for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) {
- int pos = childOp.getParentOperators().indexOf(joinOp);
- childOp.getParentOperators().remove(pos);
- childOp.getParentOperators().add(pos, mergeJoinOp);
- }
-
- List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators();
- if (childOperators == null) {
- childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
- mergeJoinOp.setChildOperators(childOperators);
- }
-
- List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators();
- if (parentOperators == null) {
- parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
- mergeJoinOp.setParentOperators(parentOperators);
- }
-
- childOperators.clear();
- parentOperators.clear();
- childOperators.addAll(joinOp.getChildOperators());
- parentOperators.addAll(joinOp.getParentOperators());
- mergeJoinOp.getConf().setGenJoinKeys(false);
-
- if (adjustParentsChildren) {
- mergeJoinOp.getConf().setGenJoinKeys(true);
- List<Operator<? extends OperatorDesc>> newParentOpList =
- new ArrayList<Operator<? extends OperatorDesc>>();
- for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) {
- for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
- grandParentOp.getChildOperators().remove(parentOp);
- grandParentOp.getChildOperators().add(mergeJoinOp);
- newParentOpList.add(grandParentOp);
- }
- }
- mergeJoinOp.getParentOperators().clear();
- mergeJoinOp.getParentOperators().addAll(newParentOpList);
- List<Operator<? extends OperatorDesc>> parentOps =
- new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators());
- for (Operator<? extends OperatorDesc> parentOp : parentOps) {
- int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp);
- if (parentIndex == mapJoinConversionPos) {
- continue;
- }
-
- // insert the dummy store operator here
- DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator();
- dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
- dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
- dummyStoreOp.getChildOperators().add(mergeJoinOp);
- int index = parentOp.getChildOperators().indexOf(mergeJoinOp);
- parentOp.getChildOperators().remove(index);
- parentOp.getChildOperators().add(index, dummyStoreOp);
- dummyStoreOp.getParentOperators().add(parentOp);
- mergeJoinOp.getParentOperators().remove(parentIndex);
- mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp);
- }
- }
- mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
- }
- */
-
private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
int numBuckets = currentOp.getOpTraits().getNumBuckets();
- for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
+ for (Operator<? extends OperatorDesc> op : currentOp.getChildOperators()) {
if (!(op instanceof ReduceSinkOperator) && !(op instanceof GroupByOperator)) {
op.getOpTraits().setNumBuckets(numBuckets);
if (numBuckets < 0) {
@@ -298,8 +148,8 @@ public class SparkMapJoinOptimizer imple
BucketMapjoinProc.checkAndConvertBucketMapJoin(
parseContext, mapJoinOp, joinTree, baseBigAlias, joinAliases);
MapJoinDesc joinDesc = mapJoinOp.getConf();
- return joinDesc.isBucketMapJoin() ?
- joinDesc.getBigTableBucketNumMapping().size() : -1;
+ return joinDesc.isBucketMapJoin()
+ ? joinDesc.getBigTableBucketNumMapping().size() : -1;
}
/**
@@ -337,7 +187,7 @@ public class SparkMapJoinOptimizer imple
Statistics currInputStat = parentOp.getStatistics();
if (currInputStat == null) {
- LOG.warn("Couldn't get statistics from: "+parentOp);
+ LOG.warn("Couldn't get statistics from: " + parentOp);
return new long[]{-1, 0, 0};
}
@@ -359,15 +209,14 @@ public class SparkMapJoinOptimizer imple
// Otherwise, we could try to break the op tree at the UNION, and create two MapWorks
// for the branches above. Then, MJ will be in the following ReduceWork.
// But, this is tricky to implement, and we'll leave it as a future work for now.
- // TODO: handle this as a MJ case
if (containUnionWithoutRS(parentOp.getParentOperators().get(0))) {
return new long[]{-1, 0, 0};
}
long inputSize = currInputStat.getDataSize();
- if ((bigInputStat == null) ||
- ((bigInputStat != null) &&
- (inputSize > bigInputStat.getDataSize()))) {
+ if ((bigInputStat == null)
+ || ((bigInputStat != null)
+ && (inputSize > bigInputStat.getDataSize()))) {
if (bigTableFound) {
// cannot convert to map join; we've already chosen a big table
@@ -416,9 +265,10 @@ public class SparkMapJoinOptimizer imple
return new long[]{-1, 0, 0};
}
- //Final check, find size of already-calculated Mapjoin Operators in same work (spark-stage). We need to factor
- //this in to prevent overwhelming Spark executor-memory.
- long connectedMapJoinSize = getConnectedMapJoinSize(joinOp.getParentOperators().get(bigTablePosition), joinOp, context);
+ //Final check, find size of already-calculated Mapjoin Operators in same work (spark-stage).
+ //We need to factor this in to prevent overwhelming Spark executor-memory.
+ long connectedMapJoinSize = getConnectedMapJoinSize(joinOp.getParentOperators().
+ get(bigTablePosition), joinOp, context);
if ((connectedMapJoinSize + totalSize) > maxSize) {
return new long[]{-1, 0, 0};
}
@@ -434,7 +284,8 @@ public class SparkMapJoinOptimizer imple
* @return total size of parent mapjoins in same work as this operator.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
- private long getConnectedMapJoinSize(Operator<? extends OperatorDesc> parentOp, Operator joinOp, OptimizeSparkProcContext ctx) {
+ private long getConnectedMapJoinSize(Operator<? extends OperatorDesc> parentOp, Operator joinOp,
+ OptimizeSparkProcContext ctx) {
long result = 0;
for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
result += getConnectedParentMapJoinSize(grandParentOp, ctx);
@@ -482,7 +333,8 @@ public class SparkMapJoinOptimizer imple
}
if (op instanceof MapJoinOperator) {
- //found child mapjoin operator. Its size should already reflect any mapjoins connected to it, so stop processing.
+ //Found child mapjoin operator.
+ //Its size should already reflect any mapjoins connected to it, so stop processing.
long mjSize = ctx.getMjOpSizes().get(op);
return mjSize;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java Thu Jan 8 02:00:11 2015
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
import com.google.common.base.Preconditions;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -61,6 +62,8 @@ import java.util.Stack;
public class SparkReduceSinkMapJoinProc implements NodeProcessor {
+ public static final Log LOG = LogFactory.getLog(SparkReduceSinkMapJoinProc.class.getName());
+
public static class SparkMapJoinFollowedByGroupByProcessor implements NodeProcessor {
private boolean hasGroupBy = false;
@@ -81,8 +84,6 @@ public class SparkReduceSinkMapJoinProc
}
}
- protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
-
private boolean hasGroupBy(Operator<? extends OperatorDesc> mapjoinOp,
GenSparkProcContext context) throws SemanticException {
List<Operator<? extends OperatorDesc>> childOps = mapjoinOp.getChildOperators();
@@ -106,6 +107,7 @@ public class SparkReduceSinkMapJoinProc
* on the basis of the big table side because it may be a mapwork (no need for shuffle)
* or reduce work.
*/
+ @SuppressWarnings("unchecked")
@Override
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procContext, Object... nodeOutputs)
@@ -132,14 +134,14 @@ public class SparkReduceSinkMapJoinProc
parentRS.setSkipTag(true);
// remember the original parent list before we start modifying it.
if (!context.mapJoinParentMap.containsKey(mapJoinOp)) {
- List<Operator<?>> parents = new ArrayList(mapJoinOp.getParentOperators());
+ List<Operator<?>> parents = new ArrayList<Operator<?>>(mapJoinOp.getParentOperators());
context.mapJoinParentMap.put(mapJoinOp, parents);
}
List<BaseWork> mapJoinWork;
/*
- * if there was a pre-existing work generated for the big-table mapjoin side,
+ * If there was a pre-existing work generated for the big-table mapjoin side,
* we need to hook the work generated for the RS (associated with the RS-MJ pattern)
* with the pre-existing work.
*
@@ -161,20 +163,6 @@ public class SparkReduceSinkMapJoinProc
LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> "+parentWork.getName());
mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName());
-/* int numBuckets = -1;
- EdgeType edgeType = EdgeType.BROADCAST_EDGE;
- if (mapJoinOp.getConf().isBucketMapJoin()) {
-
- // disable auto parallelism for bucket map joins
- parentRS.getConf().setAutoParallel(false);
-
- numBuckets = (Integer) mapJoinOp.getConf().getBigTableBucketNumMapping().values().toArray()[0];
- if (mapJoinOp.getConf().getCustomBucketMapJoin()) {
- edgeType = EdgeType.CUSTOM_EDGE;
- } else {
- edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
- }
- }*/
SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE);
if (mapJoinWork != null) {
@@ -209,7 +197,6 @@ public class SparkReduceSinkMapJoinProc
// create an new operator: HashTableDummyOperator, which share the table desc
HashTableDummyDesc desc = new HashTableDummyDesc();
- @SuppressWarnings("unchecked")
HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc);
TableDesc tbl;
@@ -221,7 +208,7 @@ public class SparkReduceSinkMapJoinProc
Map<Byte, List<ExprNodeDesc>> keyExprMap = mapJoinOp.getConf().getKeys();
List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
StringBuffer keyOrder = new StringBuffer();
- for (ExprNodeDesc k: keyCols) {
+ for (int i = 0; i < keyCols.size(); i++) {
keyOrder.append("+");
}
TableDesc keyTableDesc = PlanUtils.getReduceKeyTableDesc(PlanUtils
@@ -291,11 +278,11 @@ public class SparkReduceSinkMapJoinProc
}
//get all parents of reduce sink
- List<Operator<? extends OperatorDesc>> RSparentOps = parentRS.getParentOperators();
- for (Operator<? extends OperatorDesc> parent : RSparentOps) {
+ List<Operator<? extends OperatorDesc>> rsParentOps = parentRS.getParentOperators();
+ for (Operator<? extends OperatorDesc> parent : rsParentOps) {
parent.replaceChild(parentRS, hashTableSinkOp);
}
- hashTableSinkOp.setParentOperators(RSparentOps);
+ hashTableSinkOp.setParentOperators(rsParentOps);
hashTableSinkOp.setTag(tag);
return true;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java Thu Jan 8 02:00:11 2015
@@ -1,12 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.optimizer.spark;
-import com.clearspring.analytics.util.Preconditions;
+import java.util.List;
+import java.util.Stack;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -18,8 +37,7 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
-import java.util.List;
-import java.util.Stack;
+import com.clearspring.analytics.util.Preconditions;
/**
* Converts from a bucket-mapjoin created from hints to SMB mapjoin.
@@ -44,8 +62,8 @@ public class SparkSMBJoinHintOptimizer e
// Throw an error if the user asked for sort merge bucketed mapjoin to be enforced
// and sort merge bucketed mapjoin cannot be performed
- if (!convert &&
- pGraphContext.getConf().getBoolVar(
+ if (!convert
+ && pGraphContext.getConf().getBoolVar(
HiveConf.ConfVars.HIVEENFORCESORTMERGEBUCKETMAPJOIN)) {
throw new SemanticException(ErrorMsg.SORTMERGE_MAPJOIN_FAILED.getMsg());
}
@@ -62,6 +80,7 @@ public class SparkSMBJoinHintOptimizer e
* In SMB join these are not expected for any parents, either from small or big tables.
* @param mapJoinOp
*/
+ @SuppressWarnings("unchecked")
private void removeSmallTableReduceSink(MapJoinOperator mapJoinOp) {
SMBJoinDesc smbJoinDesc = new SMBJoinDesc(mapJoinOp.getConf());
List<Operator<? extends OperatorDesc>> parentOperators = mapJoinOp.getParentOperators();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java Thu Jan 8 02:00:11 2015
@@ -56,7 +56,7 @@ import java.util.Set;
import java.util.Stack;
/**
- * Spark-version of SkewJoinProcFactory
+ * Spark-version of SkewJoinProcFactory.
*/
public class SparkSkewJoinProcFactory {
private SparkSkewJoinProcFactory() {
@@ -82,10 +82,10 @@ public class SparkSkewJoinProcFactory {
JoinOperator op = (JoinOperator) nd;
ReduceWork reduceWork = context.getReducerToReduceWork().get(op);
ParseContext parseContext = context.getParseCtx();
- if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask &&
- reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork) &&
- GenSparkSkewJoinProcessor.supportRuntimeSkewJoin(
- op, currentTsk, parseContext.getConf())) {
+ if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask
+ && reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork)
+ && GenSparkSkewJoinProcessor.supportRuntimeSkewJoin(
+ op, currentTsk, parseContext.getConf())) {
// first we try to split the task
splitTask((SparkTask) currentTsk, reduceWork, parseContext);
GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk, reduceWork, parseContext);
@@ -102,8 +102,8 @@ public class SparkSkewJoinProcFactory {
SparkWork currentWork = currentTask.getWork();
Set<Operator<? extends OperatorDesc>> reduceSinkSet =
SparkMapJoinResolver.getOp(reduceWork, ReduceSinkOperator.class);
- if (currentWork.getChildren(reduceWork).size() == 1 && canSplit(currentWork) &&
- reduceSinkSet.size() == 1) {
+ if (currentWork.getChildren(reduceWork).size() == 1 && canSplit(currentWork)
+ && reduceSinkSet.size() == 1) {
ReduceSinkOperator reduceSink = (ReduceSinkOperator) reduceSinkSet.iterator().next();
BaseWork childWork = currentWork.getChildren(reduceWork).get(0);
SparkEdgeProperty originEdge = currentWork.getEdgeProperty(reduceWork, childWork);
@@ -118,7 +118,6 @@ public class SparkSkewJoinProcFactory {
// remove them from current spark work
for (BaseWork baseWork : newWork.getAllWorkUnsorted()) {
currentWork.remove(baseWork);
- // TODO: take care of cloneToWork?
currentWork.getCloneToWork().remove(baseWork);
}
// create TS to read intermediate data
@@ -152,7 +151,6 @@ public class SparkSkewJoinProcFactory {
} else {
streamDesc = "$INTNAME";
}
- // TODO: remove this?
String origStreamDesc = streamDesc;
int pos = 0;
while (mapWork.getAliasToWork().get(streamDesc) != null) {
@@ -162,6 +160,7 @@ public class SparkSkewJoinProcFactory {
GenMapRedUtils.setTaskPlan(taskTmpDir.toUri().toString(), streamDesc,
tableScanOp, mapWork, false, tableDesc);
// insert the new task between current task and its child
+ @SuppressWarnings("unchecked")
Task<? extends Serializable> newTask = TaskFactory.get(newWork, parseContext.getConf());
List<Task<? extends Serializable>> childTasks = currentTask.getChildTasks();
// must have at most one child
@@ -190,7 +189,7 @@ public class SparkSkewJoinProcFactory {
}
/**
- * Copy a sub-graph from originWork to newWork
+ * Copy a sub-graph from originWork to newWork.
*/
private static void copyWorkGraph(SparkWork originWork, SparkWork newWork,
BaseWork baseWork, boolean upWards) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java Thu Jan 8 02:00:11 2015
@@ -18,8 +18,16 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
@@ -39,17 +47,8 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-
/**
- * Spark version of SkewJoinResolver
+ * Spark version of SkewJoinResolver.
*/
public class SparkSkewJoinResolver implements PhysicalPlanResolver {
@Override
@@ -62,7 +61,7 @@ public class SparkSkewJoinResolver imple
return pctx;
}
- class SparkSkewJoinTaskDispatcher implements Dispatcher{
+ class SparkSkewJoinTaskDispatcher implements Dispatcher {
private PhysicalContext physicalContext;
public SparkSkewJoinTaskDispatcher(PhysicalContext context) {
@@ -74,6 +73,7 @@ public class SparkSkewJoinResolver imple
public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
throws SemanticException {
+ @SuppressWarnings("unchecked")
Task<? extends Serializable> task = (Task<? extends Serializable>) nd;
if (task instanceof SparkTask) {
SparkWork sparkWork = ((SparkTask) task).getWork();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java Thu Jan 8 02:00:11 2015
@@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hive.ql.optimizer.spark;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -32,10 +35,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-
/**
* Operator factory for Spark SMBJoin processing.
*/
@@ -53,6 +52,7 @@ public final class SparkSortMergeJoinFac
Stack<Node> stack) {
int size = stack.size();
assert size >= 2 && stack.get(size - 1) == op;
+ @SuppressWarnings("unchecked")
Operator<? extends OperatorDesc> parent =
(Operator<? extends OperatorDesc>) stack.get(size - 2);
List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators();
@@ -154,8 +154,6 @@ public final class SparkSortMergeJoinFac
SMBMapJoinOperator mapJoin = (SMBMapJoinOperator) nd;
GenSparkProcContext ctx = (GenSparkProcContext) procCtx;
- SparkTask currTask = ctx.currentTask;
-
// find the branch on which this processor was invoked
int pos = getPositionParent(mapJoin, stack);
boolean local = pos != mapJoin.getConf().getPosBigTable();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java Thu Jan 8 02:00:11 2015
@@ -70,26 +70,29 @@ public class SparkSortMergeJoinOptimizer
return null;
}
- protected boolean canConvertJoinToSMBJoin(JoinOperator joinOperator, SortBucketJoinProcCtx smbJoinContext,
- ParseContext pGraphContext, Stack<Node> stack) throws SemanticException {
+ protected boolean canConvertJoinToSMBJoin(JoinOperator joinOperator,
+ SortBucketJoinProcCtx smbJoinContext, ParseContext pGraphContext,
+ Stack<Node> stack) throws SemanticException {
if (!supportBucketMapJoin(stack)) {
return false;
}
return canConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext);
}
- //Preliminary checks. In the MR version of the code, these used to be done via another walk, refactoring to be inline.
+ //Preliminary checks. In the MR version of the code, these used to be done via another walk,
+ //here it is done inline.
private boolean supportBucketMapJoin(Stack<Node> stack) {
int size = stack.size();
- if (!(stack.get(size-1) instanceof JoinOperator) ||
- !(stack.get(size-2) instanceof ReduceSinkOperator)) {
+ if (!(stack.get(size - 1) instanceof JoinOperator)
+ || !(stack.get(size - 2) instanceof ReduceSinkOperator)) {
return false;
}
// If any operator in the stack does not support a auto-conversion, this join should
// not be converted.
- for (int pos = size -3; pos >= 0; pos--) {
- Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>)stack.get(pos);
+ for (int pos = size - 3; pos >= 0; pos--) {
+ @SuppressWarnings("unchecked")
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) stack.get(pos);
if (!op.supportAutomaticSortMergeJoin()) {
return false;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java Thu Jan 8 02:00:11 2015
@@ -18,7 +18,16 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
-import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -35,8 +44,8 @@ import org.apache.hadoop.hive.ql.plan.Re
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import java.io.Serializable;
-import java.util.*;
+import com.google.common.base.Preconditions;
+
/**
* Do a BFS on the sparkWork graph, and look for any work that has more than one child.
@@ -154,13 +163,12 @@ public class SplitSparkWorkResolver impl
}
// we lost statistics & opTraits through cloning, try to get them back
- // TODO: make sure this method is sufficient to solve the problem
private void setStatistics(BaseWork origin, BaseWork clone) {
if (origin instanceof MapWork && clone instanceof MapWork) {
MapWork originMW = (MapWork) origin;
MapWork cloneMW = (MapWork) clone;
- for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
- originMW.getAliasToWork().entrySet()) {
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> entry
+ : originMW.getAliasToWork().entrySet()) {
String alias = entry.getKey();
Operator<? extends OperatorDesc> cloneOP = cloneMW.getAliasToWork().get(alias);
if (cloneOP != null) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Thu Jan 8 02:00:11 2015
@@ -58,7 +58,7 @@ import java.util.Set;
* Cloned from GenTezProcContext.
*
*/
-public class GenSparkProcContext implements NodeProcessorCtx{
+public class GenSparkProcContext implements NodeProcessorCtx {
public final ParseContext parseContext;
public final HiveConf conf;
public final List<Task<MoveWork>> moveTask;
@@ -89,11 +89,12 @@ public class GenSparkProcContext impleme
// map that keeps track of the last operator of a task to the following work
// of this operator. This is used for connecting them later.
- public final Map<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, ReduceWork>> leafOpToFollowingWorkInfo;
+ public final Map<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, ReduceWork>>
+ leafOpToFollowingWorkInfo;
// a map that keeps track of work that need to be linked while
// traversing an operator tree
- public final Map<Operator<?>, Map<BaseWork,SparkEdgeProperty>> linkOpWithWorkMap;
+ public final Map<Operator<?>, Map<BaseWork, SparkEdgeProperty>> linkOpWithWorkMap;
// a map to keep track of what reduce sinks have to be hooked up to
// map join work
@@ -138,9 +139,13 @@ public class GenSparkProcContext impleme
public final Map<String, Operator<? extends OperatorDesc>> topOps;
@SuppressWarnings("unchecked")
- public GenSparkProcContext(HiveConf conf, ParseContext parseContext,
- List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
- Set<ReadEntity> inputs, Set<WriteEntity> outputs, Map<String, Operator<? extends OperatorDesc>> topOps) {
+ public GenSparkProcContext(HiveConf conf,
+ ParseContext parseContext,
+ List<Task<MoveWork>> moveTask,
+ List<Task<? extends Serializable>> rootTasks,
+ Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs,
+ Map<String, Operator<? extends OperatorDesc>> topOps) {
this.conf = conf;
this.parseContext = parseContext;
this.moveTask = moveTask;
@@ -163,9 +168,9 @@ public class GenSparkProcContext impleme
this.currentMapJoinOperators = new LinkedHashSet<MapJoinOperator>();
this.linkChildOpWithDummyOp = new LinkedHashMap<Operator<?>, List<Operator<?>>>();
this.dependencyTask = conf.getBoolVar(
- HiveConf.ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES) ?
- (DependencyCollectionTask) TaskFactory.get(new DependencyCollectionWork(), conf) :
- null;
+ HiveConf.ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)
+ ? (DependencyCollectionTask) TaskFactory.get(new DependencyCollectionWork(), conf)
+ : null;
this.unionWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
this.currentUnionOperators = new LinkedList<UnionOperator>();
this.workWithUnionOperators = new LinkedHashSet<BaseWork>();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Thu Jan 8 02:00:11 2015
@@ -18,14 +18,21 @@
package org.apache.hadoop.hive.ql.parse.spark;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -48,26 +55,16 @@ import org.apache.hadoop.hive.ql.plan.Op
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
/**
* GenSparkUtils is a collection of shared helper methods to produce SparkWork
* Cloned from GenTezUtils.
- * TODO: need to make it fit to Spark
*/
public class GenSparkUtils {
- private static final Log logger = LogFactory.getLog(GenSparkUtils.class.getName());
+ private static final Log LOG = LogFactory.getLog(GenSparkUtils.class.getName());
// sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
private int sequenceNumber = 0;
@@ -89,12 +86,13 @@ public class GenSparkUtils {
sequenceNumber = 0;
}
- public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root, SparkWork sparkWork) throws SemanticException {
+ public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root,
+ SparkWork sparkWork) throws SemanticException {
Preconditions.checkArgument(!root.getParentOperators().isEmpty(),
"AssertionError: expected root.getParentOperators() to be non-empty");
- ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
- logger.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
+ ReduceWork reduceWork = new ReduceWork("Reducer " + (++sequenceNumber));
+ LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
reduceWork.setReducer(root);
reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
@@ -103,8 +101,8 @@ public class GenSparkUtils {
// all be -1. In sort/order case where it matters there will be only
// one parent.
Preconditions.checkArgument(context.parentOfRoot instanceof ReduceSinkOperator,
- "AssertionError: expected context.parentOfRoot to be an instance of ReduceSinkOperator, but was " +
- context.parentOfRoot.getClass().getName());
+ "AssertionError: expected context.parentOfRoot to be an instance of ReduceSinkOperator, but was "
+ + context.parentOfRoot.getClass().getName());
ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
@@ -123,7 +121,7 @@ public class GenSparkUtils {
protected void setupReduceSink(GenSparkProcContext context, ReduceWork reduceWork,
ReduceSinkOperator reduceSink) {
- logger.debug("Setting up reduce sink: " + reduceSink
+ LOG.debug("Setting up reduce sink: " + reduceSink
+ " with following reduce work: " + reduceWork.getName());
// need to fill in information about the key and value in the reducer
@@ -146,14 +144,14 @@ public class GenSparkUtils {
SparkWork sparkWork, PrunedPartitionList partitions, boolean deferSetup) throws SemanticException {
Preconditions.checkArgument(root.getParentOperators().isEmpty(),
"AssertionError: expected root.getParentOperators() to be empty");
- MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
- logger.debug("Adding map work (" + mapWork.getName() + ") for " + root);
+ MapWork mapWork = new MapWork("Map " + (++sequenceNumber));
+ LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
// map work starts with table scan operators
Preconditions.checkArgument(root instanceof TableScanOperator,
- "AssertionError: expected root to be an instance of TableScanOperator, but was " +
- root.getClass().getName());
- String alias = ((TableScanOperator)root).getConf().getAlias();
+ "AssertionError: expected root to be an instance of TableScanOperator, but was "
+ + root.getClass().getName());
+ String alias = ((TableScanOperator) root).getConf().getAlias();
if (!deferSetup) {
setupMapWork(mapWork, context, partitions, root, alias);
@@ -174,11 +172,11 @@ public class GenSparkUtils {
context.inputs, partitions, root, alias, context.conf, false);
}
- private void collectOperators (Operator<?> op, List<Operator<?>> opList) {
+ private void collectOperators(Operator<?> op, List<Operator<?>> opList) {
opList.add(op);
for (Object child : op.getChildOperators()) {
if (child != null) {
- collectOperators((Operator<?>)child, opList);
+ collectOperators((Operator<?>) child, opList);
}
}
}
@@ -199,23 +197,23 @@ public class GenSparkUtils {
// Build a map to map the original FileSinkOperator and the cloned FileSinkOperators
// This map is used for set the stats flag for the cloned FileSinkOperators in later process
- Iterator<Operator<?>> newRoots_it = newRoots.iterator();
+ Iterator<Operator<?>> newRootsIt = newRoots.iterator();
for (Operator<?> root : roots) {
- Operator<?> newRoot = newRoots_it.next();
+ Operator<?> newRoot = newRootsIt.next();
List<Operator<?>> newOpQueue = new LinkedList<Operator<?>>();
- collectOperators (newRoot, newOpQueue);
+ collectOperators(newRoot, newOpQueue);
List<Operator<?>> opQueue = new LinkedList<Operator<?>>();
- collectOperators (root, opQueue);
- Iterator<Operator<?>> newOpQueue_it = newOpQueue.iterator();
+ collectOperators(root, opQueue);
+ Iterator<Operator<?>> newOpQueueIt = newOpQueue.iterator();
for (Operator<?> op : opQueue) {
- Operator<?> newOp = newOpQueue_it.next();
+ Operator<?> newOp = newOpQueueIt.next();
if (op instanceof FileSinkOperator) {
List<FileSinkOperator> fileSinkList = context.fileSinkMap.get(op);
if (fileSinkList == null) {
fileSinkList = new LinkedList<FileSinkOperator>();
}
- fileSinkList.add((FileSinkOperator)newOp);
- context.fileSinkMap.put((FileSinkOperator)op, fileSinkList);
+ fileSinkList.add((FileSinkOperator) newOp);
+ context.fileSinkMap.put((FileSinkOperator) op, fileSinkList);
}
}
}
@@ -234,10 +232,10 @@ public class GenSparkUtils {
for (Operator<?> orig: roots) {
Operator<?> newRoot = it.next();
if (newRoot instanceof HashTableDummyOperator) {
- dummyOps.add((HashTableDummyOperator)newRoot);
+ dummyOps.add((HashTableDummyOperator) newRoot);
it.remove();
} else {
- replacementMap.put(orig,newRoot);
+ replacementMap.put(orig, newRoot);
}
}
@@ -249,7 +247,7 @@ public class GenSparkUtils {
Set<Operator<?>> seen = new HashSet<Operator<?>>();
- while(!operators.isEmpty()) {
+ while (!operators.isEmpty()) {
Operator<?> current = operators.pop();
seen.add(current);
@@ -314,7 +312,7 @@ public class GenSparkUtils {
if (chDir) {
// Merge the files in the destination table/partitions by creating Map-only merge job
// If underlying data is RCFile a RCFileBlockMerge task would be created.
- logger.info("using CombineHiveInputformat for the merge job");
+ LOG.info("using CombineHiveInputformat for the merge job");
GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
context.dependencyTask, context.moveTask,
hconf, context.currentTask);
@@ -335,8 +333,8 @@ public class GenSparkUtils {
String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim();
// test if we need group-by shuffle
- if (reduceSink.getChildOperators().size() == 1 &&
- reduceSink.getChildOperators().get(0) instanceof GroupByOperator) {
+ if (reduceSink.getChildOperators().size() == 1
+ && reduceSink.getChildOperators().get(0) instanceof GroupByOperator) {
edgeProperty.setShuffleGroup();
// test if the group by needs partition level sort, if so, use the MR style shuffle
// SHUFFLE_SORT shouldn't be used for this purpose, see HIVE-8542
@@ -364,9 +362,9 @@ public class GenSparkUtils {
// test if we need total order, if so, we can either use MR shuffle + set #reducer to 1,
// or we can use SHUFFLE_SORT
if (edgeProperty.isShuffleNone() && !sortOrder.isEmpty()) {
- if (reduceSink.getConf().getPartitionCols() == null ||
- reduceSink.getConf().getPartitionCols().isEmpty() ||
- isSame(reduceSink.getConf().getPartitionCols(),
+ if (reduceSink.getConf().getPartitionCols() == null
+ || reduceSink.getConf().getPartitionCols().isEmpty()
+ || isSame(reduceSink.getConf().getPartitionCols(),
reduceSink.getConf().getKeyCols())) {
edgeProperty.setShuffleSort();
} else {
@@ -397,12 +395,12 @@ public class GenSparkUtils {
return true;
}
List<Operator<? extends OperatorDesc>> children = reduceSinkOperator.getChildOperators();
- if (children != null && children.size() == 1 &&
- children.get(0) instanceof GroupByOperator) {
+ if (children != null && children.size() == 1
+ && children.get(0) instanceof GroupByOperator) {
GroupByOperator child = (GroupByOperator) children.get(0);
if (isSame(reduceSinkOperator.getConf().getKeyCols(),
- reduceSinkOperator.getConf().getPartitionCols()) &&
- reduceSinkOperator.getConf().getKeyCols().size() == child.getConf().getKeys().size()) {
+ reduceSinkOperator.getConf().getPartitionCols())
+ && reduceSinkOperator.getConf().getKeyCols().size() == child.getConf().getKeys().size()) {
return false;
}
}
@@ -410,7 +408,7 @@ public class GenSparkUtils {
}
/**
- * Test if two lists of ExprNodeDesc are semantically same
+ * Test if two lists of ExprNodeDesc are semantically same.
*/
private static boolean isSame(List<ExprNodeDesc> list1, List<ExprNodeDesc> list2) {
if (list1 != list2) {
@@ -430,6 +428,7 @@ public class GenSparkUtils {
return true;
}
+ @SuppressWarnings("unchecked")
public static <T> T getChildOperator(Operator<?> op, Class<T> klazz) throws SemanticException {
if (klazz.isInstance(op)) {
return (T) op;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Thu Jan 8 02:00:11 2015
@@ -18,11 +18,16 @@
package org.apache.hadoop.hive.ql.parse.spark;
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Stack;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -42,12 +47,7 @@ import org.apache.hadoop.hive.ql.plan.Re
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Stack;
+import com.google.common.base.Preconditions;
/**
* GenSparkWork separates the operator tree into spark tasks.
@@ -55,8 +55,6 @@ import java.util.Stack;
* and break the operators into work and tasks along the way.
*
* Cloned from GenTezWork.
- *
- * TODO: need to go thru this to make it fit completely to Spark.
*/
public class GenSparkWork implements NodeProcessor {
static final private Log LOG = LogFactory.getLog(GenSparkWork.class.getName());
@@ -84,6 +82,7 @@ public class GenSparkWork implements Nod
"AssertionError: expected context.currentRootOperator to be not null");
// Operator is a file sink or reduce sink. Something that forces a new vertex.
+ @SuppressWarnings("unchecked")
Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>) nd;
// root is the start of the operator pipeline we're currently
@@ -122,7 +121,8 @@ public class GenSparkWork implements Nod
if (smbOp != null) {
// This logic is for SortMergeBucket MapJoin case.
// This MapWork (of big-table, see above..) is later initialized by SparkMapJoinFactory
- // processor, so don't initialize it here. Just keep track of it in the context, for later processing.
+ // processor, so don't initialize it here. Just keep track of it in the context,
+ // for later processing.
work = utils.createMapWork(context, root, sparkWork, null, true);
if (context.smbJoinWorkMap.get(smbOp) != null) {
throw new SemanticException("Each SMBMapJoin should be associated only with one Mapwork");
@@ -182,9 +182,9 @@ public class GenSparkWork implements Nod
work.addDummyOp((HashTableDummyOperator) dummy);
}
}
- for (Entry<BaseWork,SparkEdgeProperty> parentWorkMap : linkWorkMap.entrySet()) {
+ for (Entry<BaseWork, SparkEdgeProperty> parentWorkMap : linkWorkMap.entrySet()) {
BaseWork parentWork = parentWorkMap.getKey();
- LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
+ LOG.debug("connecting " + parentWork.getName() + " with " + work.getName());
SparkEdgeProperty edgeProp = parentWorkMap.getValue();
sparkWork.connect(parentWork, work, edgeProp);
@@ -218,7 +218,8 @@ public class GenSparkWork implements Nod
ReduceWork reduceWork = (ReduceWork) work;
for (Operator<?> parent : new ArrayList<Operator<?>>(root.getParentOperators())) {
Preconditions.checkArgument(parent instanceof ReduceSinkOperator,
- "AssertionError: expected operator to be a ReduceSinkOperator, but was " + parent.getClass().getName());
+ "AssertionError: expected operator to be a ReduceSinkOperator, but was "
+ + parent.getClass().getName());
ReduceSinkOperator rsOp = (ReduceSinkOperator) parent;
SparkEdgeProperty edgeProp = GenSparkUtils.getEdgeProperty(rsOp, reduceWork);
@@ -251,7 +252,8 @@ public class GenSparkWork implements Nod
// Also note: the concept of leaf and root is reversed in hive for historical
// reasons. Roots are data sources, leaves are data sinks. I know.
if (context.leafOpToFollowingWorkInfo.containsKey(operator)) {
- ObjectPair<SparkEdgeProperty, ReduceWork> childWorkInfo = context.leafOpToFollowingWorkInfo.get(operator);
+ ObjectPair<SparkEdgeProperty, ReduceWork> childWorkInfo = context.
+ leafOpToFollowingWorkInfo.get(operator);
SparkEdgeProperty edgeProp = childWorkInfo.getFirst();
ReduceWork childWork = childWorkInfo.getSecond();
@@ -286,8 +288,8 @@ public class GenSparkWork implements Nod
// the next item will be a new root.
if (!operator.getChildOperators().isEmpty()) {
Preconditions.checkArgument(operator.getChildOperators().size() == 1,
- "AssertionError: expected operator.getChildOperators().size() to be 1, but was " +
- operator.getChildOperators().size());
+ "AssertionError: expected operator.getChildOperators().size() to be 1, but was "
+ + operator.getChildOperators().size());
context.parentOfRoot = operator;
context.currentRootOperator = operator.getChildOperators().get(0);
context.preceedingWork = work;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java Thu Jan 8 02:00:11 2015
@@ -50,6 +50,7 @@ public class GenSparkWorkWalker extends
this.ctx = ctx;
}
+ @SuppressWarnings("unchecked")
private void setRoot(Node nd) {
ctx.currentRootOperator = (Operator<? extends OperatorDesc>) nd;
ctx.preceedingWork = null;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Thu Jan 8 02:00:11 2015
@@ -87,14 +87,12 @@ import org.apache.hadoop.hive.ql.session
/**
* SparkCompiler translates the operator plan into SparkTasks.
*
- * Pretty much cloned from TezCompiler.
- *
- * TODO: need to complete and make it fit to Spark.
+ * Cloned from TezCompiler.
*/
public class SparkCompiler extends TaskCompiler {
private static final String CLASS_NAME = SparkCompiler.class.getName();
- private static final PerfLogger perfLogger = PerfLogger.getPerfLogger();
- private static final Log logger = LogFactory.getLog(SparkCompiler.class);
+ private static final PerfLogger PERF_LOGGER = PerfLogger.getPerfLogger();
+ private static final Log LOGGER = LogFactory.getLog(SparkCompiler.class);
public SparkCompiler() {
}
@@ -102,16 +100,12 @@ public class SparkCompiler extends TaskC
@Override
public void init(HiveConf conf, LogHelper console, Hive db) {
super.init(conf, console, db);
-
-// TODO: Need to check if we require the use of recursive input dirs for union processing
-// conf.setBoolean("mapred.input.dir.recursive", true);
-// HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
}
@Override
protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
Set<WriteEntity> outputs) throws SemanticException {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
+ PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
// Sequence of TableScan operators to be walked
Deque<Operator<? extends OperatorDesc>> deque = new LinkedList<Operator<? extends OperatorDesc>>();
deque.addAll(pCtx.getTopOps().values());
@@ -137,7 +131,7 @@ public class SparkCompiler extends TaskC
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pCtx.getTopOps().values());
ogw.startWalking(topNodes, null);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
+ PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
}
/**
@@ -147,7 +141,7 @@ public class SparkCompiler extends TaskC
protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
throws SemanticException {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
+ PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
GenSparkUtils.getUtils().resetSequenceNumber();
ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
@@ -244,7 +238,7 @@ public class SparkCompiler extends TaskC
GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);
}
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
+ PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
}
@Override
@@ -301,7 +295,7 @@ public class SparkCompiler extends TaskC
@Override
protected void optimizeTaskPlan(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
Context ctx) throws SemanticException {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE);
+ PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE);
PhysicalContext physicalCtx = new PhysicalContext(conf, pCtx, pCtx.getContext(), rootTasks,
pCtx.getFetchTask());
@@ -345,7 +339,7 @@ public class SparkCompiler extends TaskC
LOG.debug("Skipping stage id rearranger");
}
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE);
+ PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE);
return;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java Thu Jan 8 02:00:11 2015
@@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
- * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
- * Cloned from tez's FileSinkProcessor
+ * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks.
+ * Cloned from tez's FileSinkProcessor.
*/
public class SparkFileSinkProcessor implements NodeProcessor {
- private static final Log logger = LogFactory.getLog(SparkFileSinkProcessor.class.getName());
+ private static final Log LOGGER = LogFactory.getLog(SparkFileSinkProcessor.class.getName());
/*
* (non-Javadoc)
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java Thu Jan 8 02:00:11 2015
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.parse.spark;
-import java.lang.StringBuffer;
import java.util.List;
import java.util.Set;
import java.util.Stack;
@@ -57,16 +56,16 @@ import com.google.common.base.Preconditi
* (normal, no scan, partial scan.) The plan at this point will be a single
* table scan operator.
*
- * TODO: cloned from tez ProcessAnalyzeTable. Need to make sure it fits to Spark.
+ * Cloned from Tez ProcessAnalyzeTable.
*/
public class SparkProcessAnalyzeTable implements NodeProcessor {
- private static final Log logger = LogFactory.getLog(SparkProcessAnalyzeTable.class.getName());
+ private static final Log LOGGER = LogFactory.getLog(SparkProcessAnalyzeTable.class.getName());
// shared plan utils for spark
private GenSparkUtils utils = null;
/**
- * Injecting the utils in the constructor facilitates testing
+ * Injecting the utils in the constructor facilitates testing.
*/
public SparkProcessAnalyzeTable(GenSparkUtils utils) {
this.utils = utils;
@@ -81,16 +80,18 @@ public class SparkProcessAnalyzeTable im
TableScanOperator tableScan = (TableScanOperator) nd;
ParseContext parseContext = context.parseContext;
+
+ @SuppressWarnings("rawtypes")
Class<? extends InputFormat> inputFormat = parseContext.getTopToTable().get(tableScan)
.getInputFormatClass();
QB queryBlock = parseContext.getQB();
QBParseInfo parseInfo = parseContext.getQB().getParseInfo();
if (parseInfo.isAnalyzeCommand()) {
- Preconditions.checkArgument(tableScan.getChildOperators() == null ||
- tableScan.getChildOperators().size() == 0,
- "AssertionError: expected tableScan.getChildOperators() to be null, " +
- "or tableScan.getChildOperators().size() to be 0");
+ Preconditions.checkArgument(tableScan.getChildOperators() == null
+ || tableScan.getChildOperators().size() == 0,
+ "AssertionError: expected tableScan.getChildOperators() to be null, "
+ + "or tableScan.getChildOperators().size() to be 0");
String alias = null;
for (String a: parseContext.getTopOps().keySet()) {
@@ -185,6 +186,8 @@ public class SparkProcessAnalyzeTable im
// partial scan task
DriverContext driverCxt = new DriverContext();
+
+ @SuppressWarnings("unchecked")
Task<PartialScanWork> partialScanTask = TaskFactory.get(scanWork, parseContext.getConf());
partialScanTask.initialize(parseContext.getConf(), null, driverCxt);
partialScanTask.setWork(scanWork);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Thu Jan 8 02:00:11 2015
@@ -21,10 +21,10 @@ package org.apache.hadoop.hive.ql.plan;
@Explain(displayName = "Edge Property")
public class SparkEdgeProperty {
- public static long SHUFFLE_NONE = 0; // No shuffle is needed. For union only.
- public static long SHUFFLE_GROUP = 1; // HashPartition shuffle, keys are not sorted in any way.
- public static long SHUFFLE_SORT = 2; // RangePartition shuffle, keys are total sorted.
- public static long MR_SHUFFLE_SORT = 4; // HashPartition shuffle, keys are sorted by partition.
+ public static final long SHUFFLE_NONE = 0; // No shuffle is needed. For union only.
+ public static final long SHUFFLE_GROUP = 1; // HashPartition shuffle, keys are not sorted in any way.
+ public static final long SHUFFLE_SORT = 2; // RangePartition shuffle, keys are total sorted.
+ public static final long MR_SHUFFLE_SORT = 4; // HashPartition shuffle, keys are sorted by partition.
private long edgeType;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Thu Jan 8 02:00:11 2015
@@ -73,7 +73,7 @@ public class SparkWork extends AbstractO
}
/**
- * getWorkMap returns a map of "vertex name" to BaseWork
+ * @return a map of "vertex name" to BaseWork
*/
@Explain(displayName = "Vertices")
public Map<String, BaseWork> getWorkMap() {
@@ -85,7 +85,7 @@ public class SparkWork extends AbstractO
}
/**
- * getAllWork returns a topologically sorted list of BaseWork
+ * @return a topologically sorted list of BaseWork
*/
public List<BaseWork> getAllWork() {
@@ -122,7 +122,7 @@ public class SparkWork extends AbstractO
}
/**
- * add all nodes in the collection without any connections
+ * Add all nodes in the collection without any connections.
*/
public void addAll(Collection<BaseWork> c) {
for (BaseWork w: c) {
@@ -131,7 +131,7 @@ public class SparkWork extends AbstractO
}
/**
- * add all nodes in the collection without any connections
+ * Add all nodes in the collection without any connections.
*/
public void addAll(BaseWork[] bws) {
for (BaseWork w: bws) {
@@ -260,7 +260,7 @@ public class SparkWork extends AbstractO
* returns the edge type connecting work a and b
*/
public SparkEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) {
- return edgeProperties.get(new ImmutablePair<BaseWork, BaseWork>(a,b));
+ return edgeProperties.get(new ImmutablePair<BaseWork, BaseWork>(a, b));
}
/**
@@ -330,7 +330,9 @@ public class SparkWork extends AbstractO
*/
boolean dependsOn(String n1, String n2) {
for (String p = dependencies.get(n1); p != null; p = dependencies.get(p)) {
- if (p.equals(n2)) return true;
+ if (p.equals(n2)) {
+ return true;
+ }
}
return false;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java Thu Jan 8 02:00:11 2015
@@ -31,9 +31,10 @@ public class CounterStatsAggregatorSpark
private SparkCounters sparkCounters;
+ @SuppressWarnings("rawtypes")
@Override
public boolean connect(Configuration hconf, Task sourceTask) {
- SparkTask task = (SparkTask)sourceTask;
+ SparkTask task = (SparkTask) sourceTask;
sparkCounters = task.getSparkCounters();
if (sparkCounters == null) {
return false;
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java Thu Jan 8 02:00:11 2015
@@ -19,12 +19,12 @@ package org.apache.hive.spark.client;
import java.io.Serializable;
-import com.google.common.base.Throwables;
-
import org.apache.hive.spark.client.metrics.Metrics;
import org.apache.hive.spark.client.rpc.RpcDispatcher;
import org.apache.hive.spark.counter.SparkCounters;
+import com.google.common.base.Throwables;
+
abstract class BaseProtocol extends RpcDispatcher {
protected static class CancelJob implements Serializable {
@@ -118,7 +118,7 @@ abstract class BaseProtocol extends RpcD
}
/**
- * Inform the client that a new spark job has been submitted for the client job
+ * Inform the client that a new spark job has been submitted for the client job.
*/
protected static class JobSubmitted implements Serializable {
final String clientJobId;
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java Thu Jan 8 02:00:11 2015
@@ -17,10 +17,10 @@
package org.apache.hive.spark.client;
-import java.io.Serializable;
-
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import java.io.Serializable;
+
/**
* Interface for a Spark remote job.
*/
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Thu Jan 8 02:00:11 2015
@@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hive.spark.counter.SparkCounters;
+
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
@@ -48,7 +49,7 @@ public interface JobContext {
JavaFutureAction<T> job, SparkCounters sparkCounters, Set<Integer> cachedRDDIds);
/**
- * Return a map from client job Id to corresponding JavaFutureActions
+ * Return a map from client job Id to corresponding JavaFutureActions.
*/
Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Thu Jan 8 02:00:11 2015
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hive.spark.counter.SparkCounters;
+
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Thu Jan 8 02:00:11 2015
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.Future;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+
import org.apache.hive.spark.counter.SparkCounters;
/**
@@ -45,12 +46,12 @@ public interface JobHandle<T extends Ser
MetricsCollection getMetrics();
/**
- * Get corresponding spark job IDs for this job
+ * Get corresponding spark job IDs for this job.
*/
List<Integer> getSparkJobIds();
/**
- * Get the SparkCounters for this job
+ * Get the SparkCounters for this job.
*/
SparkCounters getSparkCounters();
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Thu Jan 8 02:00:11 2015
@@ -17,16 +17,16 @@
package org.apache.hive.spark.client;
+import io.netty.util.concurrent.Promise;
+
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import io.netty.util.concurrent.Promise;
-
import org.apache.hive.spark.counter.SparkCounters;
/**
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java Thu Jan 8 02:00:11 2015
@@ -24,13 +24,6 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hive.spark.client.metrics.DataReadMethod;
import org.apache.hive.spark.client.metrics.InputMetrics;
@@ -38,6 +31,13 @@ import org.apache.hive.spark.client.metr
import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
/**
* Provides metrics collected for a submitted job.
*
@@ -162,7 +162,6 @@ public class MetricsCollection {
long remoteBytesRead = 0L;
// Shuffle write metrics.
- boolean hasShuffleWriteMetrics = false;
long shuffleBytesWritten = 0L;
long shuffleWriteTime = 0L;
@@ -195,7 +194,6 @@ public class MetricsCollection {
}
if (m.shuffleWriteMetrics != null) {
- hasShuffleWriteMetrics = true;
shuffleBytesWritten += m.shuffleWriteMetrics.shuffleBytesWritten;
shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime;
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java Thu Jan 8 02:00:11 2015
@@ -20,6 +20,7 @@ package org.apache.hive.spark.client;
import java.util.Set;
import org.apache.hive.spark.counter.SparkCounters;
+
import org.apache.spark.api.java.JavaFutureAction;
interface MonitorCallback {