You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/02/08 20:46:31 UTC
hive git commit: HIVE-18368: Improve Spark Debug RDD Graph (Sahil
Takiar, reviewed by Rui Li)
Repository: hive
Updated Branches:
refs/heads/master 43e713746 -> 1e74aca8d
HIVE-18368: Improve Spark Debug RDD Graph (Sahil Takiar, reviewed by Rui Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e74aca8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e74aca8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e74aca8
Branch: refs/heads/master
Commit: 1e74aca8d09ea2ef636311d2168b4d34198f7194
Parents: 43e7137
Author: Sahil Takiar <ta...@gmail.com>
Authored: Thu Feb 8 12:45:58 2018 -0800
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Thu Feb 8 12:45:58 2018 -0800
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/spark/CacheTran.java | 14 +-
.../ql/exec/spark/LocalHiveSparkClient.java | 6 +
.../hadoop/hive/ql/exec/spark/MapInput.java | 13 +-
.../hadoop/hive/ql/exec/spark/MapTran.java | 17 +-
.../hadoop/hive/ql/exec/spark/ReduceTran.java | 17 +-
.../hadoop/hive/ql/exec/spark/ShuffleTran.java | 19 ++-
.../hadoop/hive/ql/exec/spark/SparkPlan.java | 164 ++-----------------
.../hive/ql/exec/spark/SparkPlanGenerator.java | 35 +++-
.../hadoop/hive/ql/exec/spark/SparkTran.java | 2 -
.../hive/ql/exec/spark/SparkUtilities.java | 36 +---
.../hive/ql/io/CombineHiveInputFormat.java | 2 +-
11 files changed, 85 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
index c5fec7d..4b77ac9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java
@@ -27,9 +27,11 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr
// whether to cache current RDD.
private boolean caching = false;
private JavaPairRDD<KO, VO> cachedRDD;
+ protected final String name;
- protected CacheTran(boolean cache) {
+ protected CacheTran(boolean cache, String name) {
this.caching = cache;
+ this.name = name;
}
@Override
@@ -40,9 +42,10 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr
cachedRDD = doTransform(input);
cachedRDD.persist(StorageLevel.MEMORY_AND_DISK());
}
- return cachedRDD;
+ return cachedRDD.setName(this.name + " (" + cachedRDD.getNumPartitions() + ", cached)");
} else {
- return doTransform(input);
+ JavaPairRDD<KO, VO> rdd = doTransform(input);
+ return rdd.setName(this.name + " (" + rdd.getNumPartitions() + ")");
}
}
@@ -51,4 +54,9 @@ public abstract class CacheTran<KI extends WritableComparable, VI, KO extends Wr
}
protected abstract JavaPairRDD<KO, VO> doTransform(JavaPairRDD<KI, VI> input);
+
+ @Override
+ public String getName() {
+ return name;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index cab97a0..f43b449 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -25,7 +25,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.DagUtils;
import org.apache.hive.spark.client.SparkClientUtilities;
+import org.apache.spark.util.CallSite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -160,8 +162,12 @@ public class LocalHiveSparkClient implements HiveSparkClient {
// Execute generated plan.
JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
+
+ sc.setJobGroup("queryId = " + sparkWork.getQueryId(), DagUtils.getQueryName(jobConf));
+
// We use Spark RDD async action to submit job as it's the only way to get jobId now.
JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+
// As we always use foreach action to submit RDD graph, it would only trigger one job.
int jobId = future.jobIds().get(0);
LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus(
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
index d240d18..b1a0d55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
@@ -36,17 +36,18 @@ public class MapInput implements SparkTran<WritableComparable, Writable,
private JavaPairRDD<WritableComparable, Writable> hadoopRDD;
private boolean toCache;
private final SparkPlan sparkPlan;
- private String name = "MapInput";
+ private final String name;
public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, Writable> hadoopRDD) {
- this(sparkPlan, hadoopRDD, false);
+ this(sparkPlan, hadoopRDD, false, "MapInput");
}
public MapInput(SparkPlan sparkPlan,
- JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) {
+ JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache, String name) {
this.hadoopRDD = hadoopRDD;
this.toCache = toCache;
this.sparkPlan = sparkPlan;
+ this.name = name;
}
public void setToCache(boolean toCache) {
@@ -66,6 +67,7 @@ public class MapInput implements SparkTran<WritableComparable, Writable,
} else {
result = hadoopRDD;
}
+ result.setName(this.name);
return result;
}
@@ -96,9 +98,4 @@ public class MapInput implements SparkTran<WritableComparable, Writable,
public Boolean isCacheEnable() {
return new Boolean(toCache);
}
-
- @Override
- public void setName(String name) {
- this.name = name;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
index 2cc6845..b102f51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
@@ -24,14 +24,13 @@ import org.apache.spark.api.java.JavaPairRDD;
public class MapTran extends CacheTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> {
private HiveMapFunction mapFunc;
- private String name = "MapTran";
public MapTran() {
- this(false);
+ this(false, "MapTran");
}
- public MapTran(boolean cache) {
- super(cache);
+ public MapTran(boolean cache, String name) {
+ super(cache, name);
}
@Override
@@ -43,14 +42,4 @@ public class MapTran extends CacheTran<BytesWritable, BytesWritable, HiveKey, By
public void setMapFunction(HiveMapFunction mapFunc) {
this.mapFunc = mapFunc;
}
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public void setName(String name) {
- this.name = name;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
index 9045e05..3b34c78 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
@@ -24,14 +24,13 @@ import org.apache.spark.api.java.JavaPairRDD;
public class ReduceTran<V> extends CacheTran<HiveKey, V, HiveKey, BytesWritable> {
private HiveReduceFunction<V> reduceFunc;
- private String name = "Reduce";
public ReduceTran() {
- this(false);
+ this(false, "Reduce");
}
- public ReduceTran(boolean caching) {
- super(caching);
+ public ReduceTran(boolean caching, String name) {
+ super(caching, name);
}
@Override
@@ -43,14 +42,4 @@ public class ReduceTran<V> extends CacheTran<HiveKey, V, HiveKey, BytesWritable>
public void setReduceFunction(HiveReduceFunction<V> redFunc) {
this.reduceFunc = redFunc;
}
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public void setName(String name) {
- this.name = name;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
index aec96bc..40ff01a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.storage.StorageLevel;
@@ -28,17 +29,21 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B
private final int numOfPartitions;
private final boolean toCache;
private final SparkPlan sparkPlan;
- private String name = "Shuffle";
+ private final String name;
+ private final SparkEdgeProperty edge;
public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) {
- this(sparkPlan, sf, n, false);
+ this(sparkPlan, sf, n, false, "Shuffle", null);
}
- public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache) {
+ public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache, String name,
+ SparkEdgeProperty edge) {
shuffler = sf;
numOfPartitions = n;
this.toCache = toCache;
this.sparkPlan = sparkPlan;
+ this.name = name;
+ this.edge = edge;
}
@Override
@@ -48,7 +53,8 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B
sparkPlan.addCachedRDDId(result.id());
result = result.persist(StorageLevel.MEMORY_AND_DISK());
}
- return result;
+ return result.setName(this.name + " (" + edge.getShuffleType() + ", " + numOfPartitions +
+ (toCache ? ", cached)" : ")"));
}
public int getNoOfPartitions() {
@@ -65,11 +71,6 @@ public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, B
return new Boolean(toCache);
}
- @Override
- public void setName(String name) {
- this.name = name;
- }
-
public SparkShuffler getShuffler() {
return shuffler;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
index 5d27692..b21e386 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.CallSite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -48,6 +50,12 @@ public class SparkPlan {
private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
private final Set<Integer> cachedRDDIds = new HashSet<Integer>();
+ private final SparkContext sc;
+
+ SparkPlan(SparkContext sc) {
+ this.sc = sc;
+ }
+
@SuppressWarnings("unchecked")
public JavaPairRDD<HiveKey, BytesWritable> generateGraph() {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
@@ -60,6 +68,7 @@ public class SparkPlan {
// Root tran, it must be MapInput
Preconditions.checkArgument(tran instanceof MapInput,
"AssertionError: tran must be an instance of MapInput");
+ sc.setCallSite(CallSite.apply(tran.getName(), ""));
rdd = tran.transform(null);
} else {
for (SparkTran parent : parents) {
@@ -67,174 +76,37 @@ public class SparkPlan {
if (rdd == null) {
rdd = prevRDD;
} else {
+ sc.setCallSite(CallSite.apply("UnionRDD (" + rdd.name() + ", " +
+ prevRDD.name() + ")", ""));
rdd = rdd.union(prevRDD);
+ rdd.setName("UnionRDD (" + rdd.getNumPartitions() + ")");
}
}
+ sc.setCallSite(CallSite.apply(tran.getName(), ""));
rdd = tran.transform(rdd);
}
tranToOutputRDDMap.put(tran, rdd);
}
- logSparkPlan();
-
JavaPairRDD<HiveKey, BytesWritable> finalRDD = null;
for (SparkTran leafTran : leafTrans) {
JavaPairRDD<HiveKey, BytesWritable> rdd = tranToOutputRDDMap.get(leafTran);
if (finalRDD == null) {
finalRDD = rdd;
} else {
+ sc.setCallSite(CallSite.apply("UnionRDD (" + rdd.name() + ", " + finalRDD.name() + ")",
+ ""));
finalRDD = finalRDD.union(rdd);
+ finalRDD.setName("UnionRDD (" + finalRDD.getNumPartitions() + ")");
}
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
- if (LOG.isDebugEnabled()) {
- LOG.info("print generated spark rdd graph:\n" + SparkUtilities.rddGraphToString(finalRDD));
- }
- return finalRDD;
- }
-
- private void addNumberToTrans() {
- int i = 1;
- String name = null;
-
- // Traverse leafTran & transGraph add numbers to trans
- for (SparkTran leaf : leafTrans) {
- name = leaf.getName() + " " + i++;
- leaf.setName(name);
- }
- Set<SparkTran> sparkTrans = transGraph.keySet();
- for (SparkTran tran : sparkTrans) {
- name = tran.getName() + " " + i++;
- tran.setName(name);
- }
- }
-
- private void logSparkPlan() {
- addNumberToTrans();
- ArrayList<SparkTran> leafTran = new ArrayList<SparkTran>();
- leafTran.addAll(leafTrans);
-
- for (SparkTran leaf : leafTrans) {
- collectLeafTrans(leaf, leafTran);
- }
-
- // Start Traverse from the leafTrans and get parents of each leafTrans till
- // the end
- StringBuilder sparkPlan = new StringBuilder(
- "\n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \n\n");
- for (SparkTran leaf : leafTran) {
- sparkPlan.append(leaf.getName());
- getSparkPlan(leaf, sparkPlan);
- sparkPlan.append("\n");
- }
- sparkPlan
- .append(" \n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ");
- LOG.info(sparkPlan.toString());
- }
-
- private void collectLeafTrans(SparkTran leaf, List<SparkTran> reduceTrans) {
- List<SparkTran> parents = getParents(leaf);
- if (parents.size() > 0) {
- SparkTran nextLeaf = null;
- for (SparkTran leafTran : parents) {
- if (leafTran instanceof ReduceTran) {
- reduceTrans.add(leafTran);
- } else {
- if (getParents(leafTran).size() > 0)
- nextLeaf = leafTran;
- }
- }
- if (nextLeaf != null)
- collectLeafTrans(nextLeaf, reduceTrans);
- }
- }
-
- private void getSparkPlan(SparkTran tran, StringBuilder sparkPlan) {
- List<SparkTran> parents = getParents(tran);
- List<SparkTran> nextLeaf = new ArrayList<SparkTran>();
- if (parents.size() > 0) {
- sparkPlan.append(" <-- ");
- boolean isFirst = true;
- for (SparkTran leaf : parents) {
- if (isFirst) {
- sparkPlan.append("( " + leaf.getName());
- if (leaf instanceof ShuffleTran) {
- logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
- } else {
- logCacheStatus(leaf, sparkPlan);
- }
- isFirst = false;
- } else {
- sparkPlan.append("," + leaf.getName());
- if (leaf instanceof ShuffleTran) {
- logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
- } else {
- logCacheStatus(leaf, sparkPlan);
- }
- }
- // Leave reduceTran it will be expanded in the next line
- if (getParents(leaf).size() > 0 && !(leaf instanceof ReduceTran)) {
- nextLeaf.add(leaf);
- }
- }
- sparkPlan.append(" ) ");
- if (nextLeaf.size() > 1) {
- logLeafTran(nextLeaf, sparkPlan);
- } else {
- if (nextLeaf.size() != 0)
- getSparkPlan(nextLeaf.get(0), sparkPlan);
- }
- }
- }
-
- private void logLeafTran(List<SparkTran> parent, StringBuilder sparkPlan) {
- sparkPlan.append(" <-- ");
- boolean isFirst = true;
- for (SparkTran sparkTran : parent) {
- List<SparkTran> parents = getParents(sparkTran);
- SparkTran leaf = parents.get(0);
- if (isFirst) {
- sparkPlan.append("( " + leaf.getName());
- if (leaf instanceof ShuffleTran) {
- logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
- } else {
- logCacheStatus(leaf, sparkPlan);
- }
- isFirst = false;
- } else {
- sparkPlan.append("," + leaf.getName());
- if (leaf instanceof ShuffleTran) {
- logShuffleTranStatus((ShuffleTran) leaf, sparkPlan);
- } else {
- logCacheStatus(leaf, sparkPlan);
- }
- }
- }
- sparkPlan.append(" ) ");
- }
- private void logShuffleTranStatus(ShuffleTran leaf, StringBuilder sparkPlan) {
- int noOfPartitions = leaf.getNoOfPartitions();
- sparkPlan.append(" ( Partitions " + noOfPartitions);
- SparkShuffler shuffler = leaf.getShuffler();
- sparkPlan.append(", " + shuffler.getName());
- if (leaf.isCacheEnable()) {
- sparkPlan.append(", Cache on");
- } else {
- sparkPlan.append(", Cache off");
- }
- }
+ LOG.info("\n\nSpark RDD Graph:\n\n" + finalRDD.toDebugString() + "\n");
- private void logCacheStatus(SparkTran sparkTran, StringBuilder sparkPlan) {
- if (sparkTran.isCacheEnable() != null) {
- if (sparkTran.isCacheEnable().booleanValue()) {
- sparkPlan.append(" (cache on) ");
- } else {
- sparkPlan.append(" (cache off) ");
- }
- }
+ return finalRDD;
}
public void addTran(SparkTran tran) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index c52692d..c9a3196 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -23,7 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.spark.util.CallSite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
@@ -51,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -98,7 +102,7 @@ public class SparkPlanGenerator {
public SparkPlan generate(SparkWork sparkWork) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN);
- SparkPlan sparkPlan = new SparkPlan();
+ SparkPlan sparkPlan = new SparkPlan(this.sc.sc());
cloneToWork = sparkWork.getCloneToWork();
workToTranMap.clear();
workToParentWorkTranMap.clear();
@@ -138,9 +142,10 @@ public class SparkPlanGenerator {
result = generateMapInput(sparkPlan, (MapWork)work);
sparkPlan.addTran(result);
} else if (work instanceof ReduceWork) {
+ boolean toCache = cloneToWork.containsKey(work);
List<BaseWork> parentWorks = sparkWork.getParents(work);
- result = generate(sparkPlan,
- sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work));
+ SparkEdgeProperty sparkEdgeProperty = sparkWork.getEdgeProperty(parentWorks.get(0), work);
+ result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName());
sparkPlan.addTran(result);
for (BaseWork parentWork : parentWorks) {
sparkPlan.connect(workToTranMap.get(parentWork), result);
@@ -189,6 +194,8 @@ public class SparkPlanGenerator {
JobConf jobConf = cloneJobConf(mapWork);
Class ifClass = getInputFormat(jobConf, mapWork);
+ sc.sc().setCallSite(CallSite.apply(mapWork.getName(), ""));
+
JavaPairRDD<WritableComparable, Writable> hadoopRDD;
if (mapWork.getNumMapTasks() != null) {
jobConf.setNumMapTasks(mapWork.getNumMapTasks());
@@ -198,12 +205,24 @@ public class SparkPlanGenerator {
hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class);
}
+ boolean toCache = false/*cloneToWork.containsKey(mapWork)*/;
+
+ String tables = mapWork.getAllRootOperators().stream()
+ .filter(op -> op instanceof TableScanOperator)
+ .map(ts -> ((TableScanDesc) ts.getConf()).getAlias())
+ .collect(Collectors.joining(", "));
+
+ String rddName = mapWork.getName() + " (" + tables + ", " + hadoopRDD.getNumPartitions() +
+ (toCache ? ", cached)" : ")");
+
// Caching is disabled for MapInput due to HIVE-8920
- MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/);
+ MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName);
return result;
}
- private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache) {
+ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache,
+ String name) {
+
Preconditions.checkArgument(!edge.isShuffleNone(),
"AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
SparkShuffler shuffler;
@@ -214,7 +233,7 @@ public class SparkPlanGenerator {
} else {
shuffler = new GroupByShuffler();
}
- return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache);
+ return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge);
}
private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception {
@@ -238,12 +257,12 @@ public class SparkPlanGenerator {
"Can't make path " + outputPath + " : " + e.getMessage());
}
}
- MapTran mapTran = new MapTran(caching);
+ MapTran mapTran = new MapTran(caching, work.getName());
HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter);
mapTran.setMapFunction(mapFunc);
return mapTran;
} else if (work instanceof ReduceWork) {
- ReduceTran reduceTran = new ReduceTran(caching);
+ ReduceTran reduceTran = new ReduceTran(caching, work.getName());
HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, sparkReporter);
reduceTran.setReduceFunction(reduceFunc);
return reduceTran;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
index 037efe1..f9057b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
@@ -28,7 +28,5 @@ public interface SparkTran<KI extends WritableComparable, VI, KO extends Writabl
public String getName();
- public void setName(String name);
-
public Boolean isCacheEnable();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index f332790..943a4ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -50,12 +50,8 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hive.spark.client.SparkClientUtilities;
-import org.apache.spark.Dependency;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.rdd.UnionRDD;
-import scala.collection.JavaConversions;
+
/**
* Contains utilities methods used as part of Spark tasks.
@@ -138,36 +134,6 @@ public class SparkUtilities {
return sparkSession;
}
-
- public static String rddGraphToString(JavaPairRDD rdd) {
- StringBuilder sb = new StringBuilder();
- rddToString(rdd.rdd(), sb, "");
- return sb.toString();
- }
-
- private static void rddToString(RDD rdd, StringBuilder sb, String offset) {
- sb.append(offset).append(rdd.getClass().getCanonicalName()).append("[").append(rdd.hashCode()).append("]");
- if (rdd.getStorageLevel().useMemory()) {
- sb.append("(cached)");
- }
- sb.append("\n");
- Collection<Dependency> dependencies = JavaConversions.asJavaCollection(rdd.dependencies());
- if (dependencies != null) {
- offset += "\t";
- for (Dependency dependency : dependencies) {
- RDD parentRdd = dependency.rdd();
- rddToString(parentRdd, sb, offset);
- }
- } else if (rdd instanceof UnionRDD) {
- UnionRDD unionRDD = (UnionRDD) rdd;
- offset += "\t";
- Collection<RDD> parentRdds = JavaConversions.asJavaCollection(unionRDD.rdds());
- for (RDD parentRdd : parentRdds) {
- rddToString(parentRdd, sb, offset);
- }
- }
- }
-
/**
* Generate a temporary path for dynamic partition pruning in Spark branch
* TODO: no longer need this if we use accumulator!
http://git-wip-us.apache.org/repos/asf/hive/blob/1e74aca8/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index b698987..1622ae2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -415,7 +415,7 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
combine.createPool(job, f);
poolMap.put(combinePathInputFormat, f);
} else {
- LOG.info("CombineHiveInputSplit: pool is already created for " + path +
+ LOG.debug("CombineHiveInputSplit: pool is already created for " + path +
"; using filter path " + filterPath);
f.addPath(filterPath);
}