You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/04/13 20:51:19 UTC
hive git commit: HIVE-18525: Add explain plan to Hive on Spark Web UI
(Sahil Takiar, reviewed by Aihua Xu)
Repository: hive
Updated Branches:
refs/heads/master 3915980fe -> b14113be4
HIVE-18525: Add explain plan to Hive on Spark Web UI (Sahil Takiar, reviewed by Aihua Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b14113be
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b14113be
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b14113be
Branch: refs/heads/master
Commit: b14113be4c67217e030ae5158ce584362a721483
Parents: 3915980
Author: Sahil Takiar <ta...@gmail.com>
Authored: Fri Apr 13 13:50:57 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Fri Apr 13 13:50:57 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +-
.../apache/hadoop/hive/ql/log/PerfLogger.java | 1 +
.../apache/hadoop/hive/ql/exec/ExplainTask.java | 22 ++--
.../hadoop/hive/ql/exec/spark/CacheTran.java | 10 +-
.../hadoop/hive/ql/exec/spark/MapInput.java | 13 +-
.../hadoop/hive/ql/exec/spark/MapTran.java | 7 +-
.../hadoop/hive/ql/exec/spark/ReduceTran.java | 7 +-
.../hadoop/hive/ql/exec/spark/ShuffleTran.java | 12 +-
.../hadoop/hive/ql/exec/spark/SparkPlan.java | 57 ++++++++-
.../hive/ql/exec/spark/SparkPlanGenerator.java | 14 +--
.../hadoop/hive/ql/exec/spark/SparkTran.java | 3 +
.../hive/ql/exec/spark/TestSparkPlan.java | 122 +++++++++++++++++++
12 files changed, 243 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e540d02..e533ee6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2830,8 +2830,12 @@ public class HiveConf extends Configuration {
HIVE_SPARK_EXPLAIN_USER("hive.spark.explain.user", false,
"Whether to show explain result at user level.\n" +
"When enabled, will log EXPLAIN output for the query at user level. Spark only."),
+ HIVE_SPARK_LOG_EXPLAIN_WEBUI("hive.spark.log.explain.webui", true, "Whether to show the " +
+ "explain plan in the Spark Web UI. Only shows the regular EXPLAIN plan, and ignores " +
+ "any extra EXPLAIN configuration (e.g. hive.spark.explain.user, etc.). The explain " +
+ "plan for each stage is truncated at 100,000 characters."),
- // prefix used to auto generated column aliases (this should be started with '_')
+ // prefix used to auto generated column aliases (this should be s,tarted with '_')
HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL("hive.autogen.columnalias.prefix.label", "_c",
"String used as a prefix when auto generating column alias.\n" +
"By default the prefix label will be appended with a column position number to form the column alias. \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index 764a832..c1e1b7f 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -76,6 +76,7 @@ public class PerfLogger {
public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";
public static final String SPARK_BUILD_RDD_GRAPH = "SparkBuildRDDGraph";
+ public static final String SPARK_CREATE_EXPLAIN_PLAN = "SparkCreateExplainPlan.";
public static final String SPARK_SUBMIT_JOB = "SparkSubmitJob";
public static final String SPARK_RUN_JOB = "SparkRunJob";
public static final String SPARK_CREATE_TRAN = "SparkCreateTran.";
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index 14c6398..0b30721 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -610,7 +610,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
}
private JSONArray outputList(List<?> l, PrintStream out, boolean hasHeader,
- boolean extended, boolean jsonOutput, int indent) throws Exception {
+ boolean extended, boolean jsonOutput, int indent, boolean inTest) throws Exception {
boolean first_el = true;
boolean nl = false;
@@ -634,7 +634,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
out.println();
}
JSONObject jsonOut = outputPlan(o, out, extended,
- jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent));
+ jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent), "", inTest);
if (jsonOutput) {
outputArray.put(jsonOut);
}
@@ -672,10 +672,13 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
@VisibleForTesting
JSONObject outputPlan(Object work, PrintStream out,
boolean extended, boolean jsonOutput, int indent, String appendToHeader) throws Exception {
+ return outputPlan(work, out, extended, jsonOutput, indent, appendToHeader,
+ queryState.getConf().getBoolVar(ConfVars.HIVE_IN_TEST));
+ }
- // Are we running tests?
- final boolean inTest = queryState.getConf().getBoolVar(ConfVars.HIVE_IN_TEST);
-
+ public JSONObject outputPlan(Object work, PrintStream out,
+ boolean extended, boolean jsonOutput, int indent,
+ String appendToHeader, boolean inTest) throws Exception {
// Check if work has an explain annotation
Annotation note = AnnotationUtils.getAnnotation(work.getClass(), Explain.class);
@@ -773,7 +776,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
if (operator.getConf() != null) {
String appender = isLogical ? " (" + operator.getOperatorId() + ")" : "";
JSONObject jsonOut = outputPlan(operator.getConf(), out, extended,
- jsonOutput, jsonOutput ? 0 : indent, appender);
+ jsonOutput, jsonOutput ? 0 : indent, appender, inTest);
if (this.work != null && (this.work.isUserLevelExplain() || this.work.isFormatted())) {
if (jsonOut != null && jsonOut.length() > 0) {
((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put("OperatorId:",
@@ -795,7 +798,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
if (operator.getChildOperators() != null) {
int cindent = jsonOutput ? 0 : indent + 2;
for (Operator<? extends OperatorDesc> op : operator.getChildOperators()) {
- JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, cindent);
+ JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, cindent, "", inTest);
if (jsonOutput) {
((JSONObject)json.get(JSONObject.getNames(json)[0])).accumulate("children", jsonOut);
}
@@ -971,7 +974,8 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
out.print(header);
}
- JSONArray jsonOut = outputList(l, out, !skipHeader && !emptyHeader, extended, jsonOutput, ind);
+ JSONArray jsonOut = outputList(l, out, !skipHeader && !emptyHeader, extended,
+ jsonOutput, ind, inTest);
if (jsonOutput && !l.isEmpty()) {
json.put(header, jsonOut);
@@ -985,7 +989,7 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
if (!skipHeader && out != null) {
out.println(header);
}
- JSONObject jsonOut = outputPlan(val, out, extended, jsonOutput, ind);
+ JSONObject jsonOut = outputPlan(val, out, extended, jsonOutput, ind, "", inTest);
if (jsonOutput && jsonOut != null && jsonOut.length() != 0) {
if (!skipHeader) {
json.put(header, jsonOut);
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
index 4b77ac9..770a100 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.io.WritableComparable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.storage.StorageLevel;
@@ -28,10 +29,12 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr
private boolean caching = false;
private JavaPairRDD<KO, VO> cachedRDD;
protected final String name;
+ private final BaseWork baseWork;
- protected CacheTran(boolean cache, String name) {
+ protected CacheTran(boolean cache, String name, BaseWork baseWork) {
this.caching = cache;
this.name = name;
+ this.baseWork = baseWork;
}
@Override
@@ -59,4 +62,9 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr
public String getName() {
return name;
}
+
+ @Override
+ public BaseWork getBaseWork() {
+ return baseWork;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
index b1a0d55..b242f57 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
@@ -37,17 +38,20 @@ public class MapInput implements SparkTran<WritableComparable, Writable,
private boolean toCache;
private final SparkPlan sparkPlan;
private final String name;
+ private final BaseWork baseWork;
public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, Writable> hadoopRDD) {
- this(sparkPlan, hadoopRDD, false, "MapInput");
+ this(sparkPlan, hadoopRDD, false, "MapInput", null);
}
public MapInput(SparkPlan sparkPlan,
- JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache, String name) {
+ JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache, String
+ name, BaseWork baseWork) {
this.hadoopRDD = hadoopRDD;
this.toCache = toCache;
this.sparkPlan = sparkPlan;
this.name = name;
+ this.baseWork = baseWork;
}
public void setToCache(boolean toCache) {
@@ -98,4 +102,9 @@ public class MapInput implements SparkTran<WritableComparable, Writable,
public Boolean isCacheEnable() {
return new Boolean(toCache);
}
+
+ @Override
+ public BaseWork getBaseWork() {
+ return baseWork;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
index b102f51..7e95b12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
@@ -26,11 +27,11 @@ public class MapTran extends CacheTran<BytesWritable, BytesWritable, HiveKey, By
private HiveMapFunction mapFunc;
public MapTran() {
- this(false, "MapTran");
+ this(false, "MapTran", null);
}
- public MapTran(boolean cache, String name) {
- super(cache, name);
+ public MapTran(boolean cache, String name, BaseWork baseWork) {
+ super(cache, name, baseWork);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index 3b34c78..4bafcb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
@@ -26,11 +27,11 @@ public class ReduceTran<V> extends CacheTran<HiveKey, V, HiveKey, BytesWritable>
private HiveReduceFunction<V> reduceFunc;
public ReduceTran() {
- this(false, "Reduce");
+ this(false, "Reduce", null);
}
- public ReduceTran(boolean caching, String name) {
- super(caching, name);
+ public ReduceTran(boolean caching, String name, BaseWork baseWork) {
+ super(caching, name, baseWork);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
index 40ff01a..f698079 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
@@ -31,19 +32,21 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B
private final SparkPlan sparkPlan;
private final String name;
private final SparkEdgeProperty edge;
+ private final BaseWork baseWork;
public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) {
- this(sparkPlan, sf, n, false, "Shuffle", null);
+ this(sparkPlan, sf, n, false, "Shuffle", null, null);
}
public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache, String name,
- SparkEdgeProperty edge) {
+ SparkEdgeProperty edge, BaseWork baseWork) {
shuffler = sf;
numOfPartitions = n;
this.toCache = toCache;
this.sparkPlan = sparkPlan;
this.name = name;
this.edge = edge;
+ this.baseWork = baseWork;
}
@Override
@@ -71,6 +74,11 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B
return new Boolean(toCache);
}
+ @Override
+ public BaseWork getBaseWork() {
+ return baseWork;
+ }
+
public SparkShuffler getShuffler() {
return shuffler;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
index b21e386..8244dcb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -26,6 +28,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ExplainTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
+import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkContext;
import org.apache.spark.util.CallSite;
import org.slf4j.Logger;
@@ -50,9 +59,11 @@ public class SparkPlan {
private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
private final Set<Integer> cachedRDDIds = new HashSet<Integer>();
+ private final JobConf jobConf;
private final SparkContext sc;
- SparkPlan(SparkContext sc) {
+ SparkPlan(JobConf jobConf, SparkContext sc) {
+ this.jobConf = jobConf;
this.sc = sc;
}
@@ -68,7 +79,7 @@ public class SparkPlan {
// Root tran, it must be MapInput
Preconditions.checkArgument(tran instanceof MapInput,
"AssertionError: tran must be an instance of MapInput");
- sc.setCallSite(CallSite.apply(tran.getName(), ""));
+ sc.setCallSite(CallSite.apply(tran.getName(), getLongFormCallSite(tran)));
rdd = tran.transform(null);
} else {
for (SparkTran parent : parents) {
@@ -82,7 +93,7 @@ public class SparkPlan {
rdd.setName("UnionRDD (" + rdd.getNumPartitions() + ")");
}
}
- sc.setCallSite(CallSite.apply(tran.getName(), ""));
+ sc.setCallSite(CallSite.apply(tran.getName(), getLongFormCallSite(tran)));
rdd = tran.transform(rdd);
}
@@ -109,6 +120,46 @@ public class SparkPlan {
return finalRDD;
}
+ /**
+ * Takes a {@link SparkTran} object that creates the longForm for the RDD's {@link CallSite}.
+ * It does this my creating an {@link ExplainTask} and running it over the
+ * {@link SparkTran#getBaseWork()} object. The explain output is serialized to the string,
+ * which is logged and returned. If any errors are encountered while creating the explain plan,
+ * an error message is simply logged, but no {@link Exception} is thrown.
+ *
+ * @param tran the {@link SparkTran} to create the long call site for
+ *
+ * @return a {@link String} containing the explain plan for the given {@link SparkTran}
+ */
+ private String getLongFormCallSite(SparkTran tran) {
+ if (this.jobConf.getBoolean(HiveConf.ConfVars.HIVE_SPARK_LOG_EXPLAIN_WEBUI.varname, HiveConf
+ .ConfVars.HIVE_SPARK_LOG_EXPLAIN_WEBUI.defaultBoolVal)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_EXPLAIN_PLAN + tran.getName());
+
+ ExplainWork explainWork = new ExplainWork();
+ explainWork.setConfig(new ExplainConfiguration());
+ ExplainTask explainTask = new ExplainTask();
+ explainTask.setWork(explainWork);
+
+ String explainOutput = "";
+ try {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ explainTask.outputPlan(tran.getBaseWork(), new PrintStream(outputStream), false, false, 0,
+ null, this.jobConf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname,
+ HiveConf.ConfVars.HIVE_IN_TEST.defaultBoolVal));
+ explainOutput = StringUtils.abbreviate(tran.getName() + " Explain Plan:\n\n" + outputStream
+ .toString(), 100000);
+ LOG.debug(explainOutput);
+ } catch (Exception e) {
+ LOG.error("Error while generating explain plan for " + tran.getName(), e);
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_EXPLAIN_PLAN + tran.getName());
+ return explainOutput;
+ }
+ return "";
+ }
+
public void addTran(SparkTran tran) {
rootTrans.add(tran);
leafTrans.add(tran);
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index c9a3196..d71d705 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -102,7 +102,7 @@ public class SparkPlanGenerator {
public SparkPlan generate(SparkWork sparkWork) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN);
- SparkPlan sparkPlan = new SparkPlan(this.sc.sc());
+ SparkPlan sparkPlan = new SparkPlan(this.jobConf, this.sc.sc());
cloneToWork = sparkWork.getCloneToWork();
workToTranMap.clear();
workToParentWorkTranMap.clear();
@@ -145,7 +145,7 @@ public class SparkPlanGenerator {
boolean toCache = cloneToWork.containsKey(work);
List<BaseWork> parentWorks = sparkWork.getParents(work);
SparkEdgeProperty sparkEdgeProperty = sparkWork.getEdgeProperty(parentWorks.get(0), work);
- result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName());
+ result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName(), work);
sparkPlan.addTran(result);
for (BaseWork parentWork : parentWorks) {
sparkPlan.connect(workToTranMap.get(parentWork), result);
@@ -216,12 +216,12 @@ public class SparkPlanGenerator {
(toCache ? ", cached)" : ")");
// Caching is disabled for MapInput due to HIVE-8920
- MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName);
+ MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName, mapWork);
return result;
}
private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache,
- String name) {
+ String name, BaseWork work) {
Preconditions.checkArgument(!edge.isShuffleNone(),
"AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
@@ -233,7 +233,7 @@ public class SparkPlanGenerator {
} else {
shuffler = new GroupByShuffler();
}
- return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge);
+ return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge, work);
}
private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception {
@@ -257,12 +257,12 @@ public class SparkPlanGenerator {
"Can't make path " + outputPath + " : " + e.getMessage());
}
}
- MapTran mapTran = new MapTran(caching, work.getName());
+ MapTran mapTran = new MapTran(caching, work.getName(), work);
HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter);
mapTran.setMapFunction(mapFunc);
return mapTran;
} else if (work instanceof ReduceWork) {
- ReduceTran reduceTran = new ReduceTran(caching, work.getName());
+ ReduceTran reduceTran = new ReduceTran(caching, work.getName(), work);
HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, sparkReporter);
reduceTran.setReduceFunction(reduceFunc);
return reduceTran;
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
index f9057b9..29f8b3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.io.WritableComparable;
import org.apache.spark.api.java.JavaPairRDD;
@@ -28,5 +29,7 @@ public interface SparkTran<KI extends WritableComparable, VI, KO extends Writabl
public String getName();
+ public BaseWork getBaseWork();
+
public Boolean isCacheEnable();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b14113be/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
new file mode 100644
index 0000000..3fe32a0
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
@@ -0,0 +1,122 @@
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.spark.Dependency;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.spark.rdd.HadoopRDD;
+import org.apache.spark.rdd.MapPartitionsRDD;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.rdd.ShuffledRDD;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+
+import java.util.List;
+
+
+public class TestSparkPlan {
+
+ @Test
+ public void testSetRDDCallSite() throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ SQLStdHiveAuthorizerFactory.class.getName());
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path tmpDir = new Path("TestSparkPlan-tmp");
+
+ SessionState.start(conf);
+
+ IDriver driver = null;
+ JavaSparkContext sc = null;
+
+ try {
+ driver = DriverFactory.newDriver(conf);
+ Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode());
+
+ driver.compile("select * from test order by col");
+ List<SparkTask> sparkTasks = Utilities.getSparkTasks(driver.getPlan().getRootTasks());
+ Assert.assertEquals(1, sparkTasks.size());
+
+ SparkTask sparkTask = sparkTasks.get(0);
+
+ JobConf jobConf = new JobConf(conf);
+
+ SparkConf sparkConf = new SparkConf();
+ sparkConf.setMaster("local");
+ sparkConf.setAppName("TestSparkPlan-app");
+ sc = new JavaSparkContext(sparkConf);
+
+ SparkPlanGenerator sparkPlanGenerator = new SparkPlanGenerator(sc, null, jobConf, tmpDir,
+ null);
+ SparkPlan sparkPlan = sparkPlanGenerator.generate(sparkTask.getWork());
+ RDD<Tuple2<HiveKey, BytesWritable>> reducerRdd = sparkPlan.generateGraph().rdd();
+
+ Assert.assertTrue(reducerRdd.name().contains("Reducer 2"));
+ Assert.assertTrue(reducerRdd instanceof MapPartitionsRDD);
+ Assert.assertTrue(reducerRdd.creationSite().shortForm().contains("Reducer 2"));
+ Assert.assertTrue(reducerRdd.creationSite().longForm().contains("Explain Plan"));
+ Assert.assertTrue(reducerRdd.creationSite().longForm().contains("Reducer 2"));
+
+ List<Dependency<?>> rdds = JavaConversions.seqAsJavaList(reducerRdd.dependencies());
+ Assert.assertEquals(1, rdds.size());
+ RDD shuffledRdd = rdds.get(0).rdd();
+
+ Assert.assertTrue(shuffledRdd.name().contains("Reducer 2"));
+ Assert.assertTrue(shuffledRdd.name().contains("SORT"));
+ Assert.assertTrue(shuffledRdd instanceof ShuffledRDD);
+ Assert.assertTrue(shuffledRdd.creationSite().shortForm().contains("Reducer 2"));
+ Assert.assertTrue(shuffledRdd.creationSite().longForm().contains("Explain Plan"));
+ Assert.assertTrue(shuffledRdd.creationSite().longForm().contains("Reducer 2"));
+
+ rdds = JavaConversions.seqAsJavaList(shuffledRdd.dependencies());
+ Assert.assertEquals(1, rdds.size());
+ RDD mapRdd = rdds.get(0).rdd();
+
+ Assert.assertTrue(mapRdd.name().contains("Map 1"));
+ Assert.assertTrue(mapRdd instanceof MapPartitionsRDD);
+ Assert.assertTrue(mapRdd.creationSite().shortForm().contains("Map 1"));
+ Assert.assertTrue(mapRdd.creationSite().longForm().contains("Explain Plan"));
+ Assert.assertTrue(mapRdd.creationSite().longForm().contains("Map 1"));
+
+ rdds = JavaConversions.seqAsJavaList(mapRdd.dependencies());
+ Assert.assertEquals(1, rdds.size());
+ RDD hadoopRdd = rdds.get(0).rdd();
+
+ Assert.assertTrue(hadoopRdd.name().contains("Map 1"));
+ Assert.assertTrue(hadoopRdd.name().contains("test"));
+ Assert.assertTrue(hadoopRdd instanceof HadoopRDD);
+ Assert.assertTrue(hadoopRdd.creationSite().shortForm().contains("Map 1"));
+ } finally {
+ if (driver != null) {
+ Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode());
+ driver.destroy();
+ }
+ if (sc != null) {
+ sc.close();
+ }
+ if (fs.exists(tmpDir)) {
+ fs.delete(tmpDir, true);
+ }
+ }
+ }
+}