You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2017/05/02 17:29:03 UTC
[3/3] hive git commit: HIVE-11133: Support hive.explain.user for
Spark (Sahil via Xuefu)
HIVE-11133: Support hive.explain.user for Spark (Sahil via Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/00b64448
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/00b64448
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/00b64448
Branch: refs/heads/master
Commit: 00b644482656da9fb40788744e692f4e677b4c0d
Parents: 812fa39
Author: Xuefu Zhang <xu...@uber.com>
Authored: Tue May 2 10:28:37 2017 -0700
Committer: Xuefu Zhang <xu...@uber.com>
Committed: Tue May 2 10:28:37 2017 -0700
----------------------------------------------------------------------
.../hive/common/jsonexplain/Connection.java | 35 +
.../hive/common/jsonexplain/DagJsonParser.java | 167 +
.../common/jsonexplain/DagJsonParserUtils.java | 53 +
.../common/jsonexplain/JsonParserFactory.java | 4 +
.../hadoop/hive/common/jsonexplain/Op.java | 358 ++
.../hadoop/hive/common/jsonexplain/Printer.java | 41 +
.../hadoop/hive/common/jsonexplain/Stage.java | 262 +
.../hadoop/hive/common/jsonexplain/Vertex.java | 323 +
.../jsonexplain/spark/SparkJsonParser.java | 35 +
.../hive/common/jsonexplain/tez/Connection.java | 35 -
.../hadoop/hive/common/jsonexplain/tez/Op.java | 356 --
.../hive/common/jsonexplain/tez/Printer.java | 41 -
.../hive/common/jsonexplain/tez/Stage.java | 262 -
.../common/jsonexplain/tez/TezJsonParser.java | 153 +-
.../jsonexplain/tez/TezJsonParserUtils.java | 53 -
.../hive/common/jsonexplain/tez/Vertex.java | 334 -
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../test/resources/testconfiguration.properties | 1 +
.../hadoop/hive/ql/optimizer/Optimizer.java | 2 +-
.../hive/ql/parse/ExplainSemanticAnalyzer.java | 16 +-
.../apache/hadoop/hive/ql/plan/SparkWork.java | 10 +-
.../clientpositive/spark_explainuser_1.q | 671 ++
.../spark/spark_explainuser_1.q.out | 5921 ++++++++++++++++++
23 files changed, 7915 insertions(+), 1223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java
new file mode 100644
index 0000000..0df6f4c
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+public final class Connection implements Comparable<Connection>{
+ public final String type;
+ public final Vertex from;
+
+ public Connection(String type, Vertex from) {
+ super();
+ this.type = type;
+ this.from = from;
+ }
+
+ @Override
+ public int compareTo(Connection o) {
+ return from.compareTo(o.from);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java
new file mode 100644
index 0000000..1f01685
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.hive.common.jsonexplain.JsonParser;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class DagJsonParser implements JsonParser {
+ public final Map<String, Stage> stages = new LinkedHashMap<>();
+ protected final Logger LOG;
+ // the objects that have been printed.
+ public final Set<Object> printSet = new LinkedHashSet<>();
+ // the vertex that should be inlined. <Operator, list of Vertex that is
+ // inlined>
+ public final Map<Op, List<Connection>> inlineMap = new LinkedHashMap<>();
+
+ public DagJsonParser() {
+ super();
+ LOG = LoggerFactory.getLogger(this.getClass().getName());
+ }
+
+ public void extractStagesAndPlans(JSONObject inputObject) throws Exception {
+ // extract stages
+ JSONObject dependency = inputObject.getJSONObject("STAGE DEPENDENCIES");
+ if (dependency != null && dependency.length() > 0) {
+ // iterate for the first time to get all the names of stages.
+ for (String stageName : JSONObject.getNames(dependency)) {
+ this.stages.put(stageName, new Stage(stageName, this));
+ }
+ // iterate for the second time to get all the dependency.
+ for (String stageName : JSONObject.getNames(dependency)) {
+ JSONObject dependentStageNames = dependency.getJSONObject(stageName);
+ this.stages.get(stageName).addDependency(dependentStageNames, this.stages);
+ }
+ }
+ // extract stage plans
+ JSONObject stagePlans = inputObject.getJSONObject("STAGE PLANS");
+ if (stagePlans != null && stagePlans.length() > 0) {
+ for (String stageName : JSONObject.getNames(stagePlans)) {
+ JSONObject stagePlan = stagePlans.getJSONObject(stageName);
+ this.stages.get(stageName).extractVertex(stagePlan);
+ }
+ }
+ }
+
+ /**
+ * @param indentFlag
+ * help to generate correct indent
+ * @return
+ */
+ public static String prefixString(int indentFlag) {
+ StringBuilder sb = new StringBuilder();
+ for (int index = 0; index < indentFlag; index++) {
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * @param indentFlag
+ * @param tail
+ * help to generate correct indent with a specific tail
+ * @return
+ */
+ public static String prefixString(int indentFlag, String tail) {
+ StringBuilder sb = new StringBuilder();
+ for (int index = 0; index < indentFlag; index++) {
+ sb.append(" ");
+ }
+ int len = sb.length();
+ return sb.replace(len - tail.length(), len, tail).toString();
+ }
+
+ @Override
+ public void print(JSONObject inputObject, PrintStream outputStream) throws Exception {
+ LOG.info("JsonParser is parsing:" + inputObject.toString());
+ this.extractStagesAndPlans(inputObject);
+ Printer printer = new Printer();
+ // print out the cbo info
+ if (inputObject.has("cboInfo")) {
+ printer.println(inputObject.getString("cboInfo"));
+ printer.println();
+ }
+ // print out the vertex dependency in root stage
+ for (Stage candidate : this.stages.values()) {
+ if (candidate.tezStageDependency != null && candidate.tezStageDependency.size() > 0) {
+ printer.println("Vertex dependency in root stage");
+ for (Entry<Vertex, List<Connection>> entry : candidate.tezStageDependency.entrySet()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(entry.getKey().name);
+ sb.append(" <- ");
+ boolean printcomma = false;
+ for (Connection connection : entry.getValue()) {
+ if (printcomma) {
+ sb.append(", ");
+ } else {
+ printcomma = true;
+ }
+ sb.append(connection.from.name + " (" + connection.type + ")");
+ }
+ printer.println(sb.toString());
+ }
+ printer.println();
+ }
+ }
+ // print out all the stages that have no childStages.
+ for (Stage candidate : this.stages.values()) {
+ if (candidate.childStages.isEmpty()) {
+ candidate.print(printer, 0);
+ }
+ }
+ outputStream.println(printer.toString());
+ }
+
+ public void addInline(Op op, Connection connection) {
+ List<Connection> list = inlineMap.get(op);
+ if (list == null) {
+ list = new ArrayList<>();
+ list.add(connection);
+ inlineMap.put(op, list);
+ } else {
+ list.add(connection);
+ }
+ }
+
+ public boolean isInline(Vertex v) {
+ for (List<Connection> list : inlineMap.values()) {
+ for (Connection connection : list) {
+ if (connection.from.equals(v)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public abstract String mapEdgeType(String edgeName);
+
+ public abstract String getFrameworkName();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java
new file mode 100644
index 0000000..a518ac1
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+public class DagJsonParserUtils {
+
+ public static List<String> OperatorNoStats = Arrays.asList(new String[] { "File Output Operator",
+ "Reduce Output Operator" });
+
+ public static String renameReduceOutputOperator(String operatorName, Vertex vertex) {
+ if (operatorName.equals("Reduce Output Operator") && vertex.edgeType != null) {
+ return vertex.edgeType;
+ } else {
+ return operatorName;
+ }
+ }
+
+ public static String attrsToString(Map<String, String> attrs) {
+ StringBuffer sb = new StringBuffer();
+ boolean first = true;
+ for (Entry<String, String> entry : attrs.entrySet()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(",");
+ }
+ sb.append(entry.getKey() + entry.getValue());
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java
index db118bf..2a5d47a 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.common.jsonexplain;
+import org.apache.hadoop.hive.common.jsonexplain.spark.SparkJsonParser;
import org.apache.hadoop.hive.common.jsonexplain.tez.TezJsonParser;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -35,6 +36,9 @@ public class JsonParserFactory {
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
return new TezJsonParser();
}
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ return new SparkJsonParser();
+ }
return null;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java
new file mode 100644
index 0000000..03c5981
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.hive.common.jsonexplain.Vertex.VertexType;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public final class Op {
+ public final String name;
+ // tezJsonParser
+ public final DagJsonParser parser;
+ public final String operatorId;
+ public Op parent;
+ public final List<Op> children;
+ public final Map<String, String> attrs;
+ // the jsonObject for this operator
+ public final JSONObject opObject;
+ // the vertex that this operator belongs to
+ public final Vertex vertex;
+ // the vertex that this operator output to
+ public final String outputVertexName;
+ // the Operator type
+ public final OpType type;
+
+ public enum OpType {
+ MAPJOIN, MERGEJOIN, RS, OTHERS
+ };
+
+ public Op(String name, String id, String outputVertexName, List<Op> children,
+ Map<String, String> attrs, JSONObject opObject, Vertex vertex, DagJsonParser tezJsonParser)
+ throws JSONException {
+ super();
+ this.name = name;
+ this.operatorId = id;
+ this.type = deriveOpType(operatorId);
+ this.outputVertexName = outputVertexName;
+ this.children = children;
+ this.attrs = attrs;
+ this.opObject = opObject;
+ this.vertex = vertex;
+ this.parser = tezJsonParser;
+ }
+
+ private OpType deriveOpType(String operatorId) {
+ if (operatorId != null) {
+ if (operatorId.startsWith(OpType.MAPJOIN.toString())) {
+ return OpType.MAPJOIN;
+ } else if (operatorId.startsWith(OpType.MERGEJOIN.toString())) {
+ return OpType.MERGEJOIN;
+ } else if (operatorId.startsWith(OpType.RS.toString())) {
+ return OpType.RS;
+ } else {
+ return OpType.OTHERS;
+ }
+ } else {
+ return OpType.OTHERS;
+ }
+ }
+
+ private void inlineJoinOp() throws Exception {
+ // inline map join operator
+ if (this.type == OpType.MAPJOIN) {
+ JSONObject joinObj = opObject.getJSONObject(this.name);
+ // get the map for posToVertex
+ Map<String, Vertex> posToVertex = new LinkedHashMap<>();
+ if (joinObj.has("input vertices:")) {
+ JSONObject verticeObj = joinObj.getJSONObject("input vertices:");
+ for (String pos : JSONObject.getNames(verticeObj)) {
+ String vertexName = verticeObj.getString(pos);
+ // update the connection
+ Connection c = null;
+ for (Connection connection : vertex.parentConnections) {
+ if (connection.from.name.equals(vertexName)) {
+ posToVertex.put(pos, connection.from);
+ c = connection;
+ break;
+ }
+ }
+ if (c != null) {
+ parser.addInline(this, c);
+ }
+ }
+ // update the attrs
+ this.attrs.remove("input vertices:");
+ }
+ // update the keys to use operator name
+ JSONObject keys = joinObj.getJSONObject("keys:");
+ // find out the vertex for the big table
+ Set<Vertex> parentVertexes = new HashSet<>();
+ for (Connection connection : vertex.parentConnections) {
+ parentVertexes.add(connection.from);
+ }
+ parentVertexes.removeAll(posToVertex.values());
+ Map<String, String> posToOpId = new LinkedHashMap<>();
+ if (keys.length() != 0) {
+ for (String key : JSONObject.getNames(keys)) {
+ // first search from the posToVertex
+ if (posToVertex.containsKey(key)) {
+ Vertex v = posToVertex.get(key);
+ if (v.rootOps.size() == 1) {
+ posToOpId.put(key, v.rootOps.get(0).operatorId);
+ } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+ posToOpId.put(key, v.name);
+ } else {
+ Op joinRSOp = v.getJoinRSOp(vertex);
+ if (joinRSOp != null) {
+ posToOpId.put(key, joinRSOp.operatorId);
+ } else {
+ throw new Exception(
+ "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
+ + " when hive explain user is trying to identify the operator id.");
+ }
+ }
+ }
+ // then search from parent
+ else if (parent != null) {
+ posToOpId.put(key, parent.operatorId);
+ }
+ // then assume it is from its own vertex
+ else if (parentVertexes.size() == 1) {
+ Vertex v = parentVertexes.iterator().next();
+ parentVertexes.clear();
+ if (v.rootOps.size() == 1) {
+ posToOpId.put(key, v.rootOps.get(0).operatorId);
+ } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+ posToOpId.put(key, v.name);
+ } else {
+ Op joinRSOp = v.getJoinRSOp(vertex);
+ if (joinRSOp != null) {
+ posToOpId.put(key, joinRSOp.operatorId);
+ } else {
+ throw new Exception(
+ "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
+ + " when hive explain user is trying to identify the operator id.");
+ }
+ }
+ }
+ // finally throw an exception
+ else {
+ throw new Exception(
+ "Can not find the source operator on one of the branches of map join.");
+ }
+ }
+ }
+ this.attrs.remove("keys:");
+ StringBuffer sb = new StringBuffer();
+ JSONArray conditionMap = joinObj.getJSONArray("condition map:");
+ for (int index = 0; index < conditionMap.length(); index++) {
+ JSONObject cond = conditionMap.getJSONObject(index);
+ String k = (String) cond.keys().next();
+ JSONObject condObject = new JSONObject((String)cond.get(k));
+ String type = condObject.getString("type");
+ String left = condObject.getString("left");
+ String right = condObject.getString("right");
+ if (keys.length() != 0) {
+ sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "."
+ + keys.get(right) + "(" + type + "),");
+ } else {
+ // probably a cross product
+ sb.append("(" + type + "),");
+ }
+ }
+ this.attrs.remove("condition map:");
+ this.attrs.put("Conds:", sb.substring(0, sb.length() - 1));
+ }
+ // should be merge join
+ else {
+ Map<String, String> posToOpId = new LinkedHashMap<>();
+ if (vertex.mergeJoinDummyVertexs.size() == 0) {
+ if (vertex.tagToInput.size() != vertex.parentConnections.size()) {
+ throw new Exception("tagToInput size " + vertex.tagToInput.size()
+ + " is different from parentConnections size " + vertex.parentConnections.size());
+ }
+ for (Entry<String, String> entry : vertex.tagToInput.entrySet()) {
+ Connection c = null;
+ for (Connection connection : vertex.parentConnections) {
+ if (connection.from.name.equals(entry.getValue())) {
+ Vertex v = connection.from;
+ if (v.rootOps.size() == 1) {
+ posToOpId.put(entry.getKey(), v.rootOps.get(0).operatorId);
+ } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+ posToOpId.put(entry.getKey(), v.name);
+ } else {
+ Op joinRSOp = v.getJoinRSOp(vertex);
+ if (joinRSOp != null) {
+ posToOpId.put(entry.getKey(), joinRSOp.operatorId);
+ } else {
+ throw new Exception(
+ "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
+ + " when hive explain user is trying to identify the operator id.");
+ }
+ }
+ c = connection;
+ break;
+ }
+ }
+ if (c == null) {
+ throw new Exception("Can not find " + entry.getValue()
+ + " while parsing keys of merge join operator");
+ }
+ }
+ } else {
+ posToOpId.put(vertex.tag, this.parent.operatorId);
+ for (Vertex v : vertex.mergeJoinDummyVertexs) {
+ if (v.rootOps.size() != 1) {
+ throw new Exception("Can not find a single root operators in a single vertex " + v.name
+ + " when hive explain user is trying to identify the operator id.");
+ }
+ posToOpId.put(v.tag, v.rootOps.get(0).operatorId);
+ }
+ }
+ JSONObject joinObj = opObject.getJSONObject(this.name);
+ // update the keys to use operator name
+ JSONObject keys = joinObj.getJSONObject("keys:");
+ if (keys.length() != 0) {
+ for (String key : JSONObject.getNames(keys)) {
+ if (!posToOpId.containsKey(key)) {
+ throw new Exception(
+ "Can not find the source operator on one of the branches of merge join.");
+ }
+ }
+ // inline merge join operator in a self-join
+ if (this.vertex != null) {
+ for (Vertex v : this.vertex.mergeJoinDummyVertexs) {
+ parser.addInline(this, new Connection(null, v));
+ }
+ }
+ }
+ // update the attrs
+ this.attrs.remove("keys:");
+ StringBuffer sb = new StringBuffer();
+ JSONArray conditionMap = joinObj.getJSONArray("condition map:");
+ for (int index = 0; index < conditionMap.length(); index++) {
+ JSONObject cond = conditionMap.getJSONObject(index);
+ String k = (String) cond.keys().next();
+ JSONObject condObject = new JSONObject((String)cond.get(k));
+ String type = condObject.getString("type");
+ String left = condObject.getString("left");
+ String right = condObject.getString("right");
+ if (keys.length() != 0) {
+ sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "."
+ + keys.get(right) + "(" + type + "),");
+ } else {
+ // probably a cross product
+ sb.append("(" + type + "),");
+ }
+ }
+ this.attrs.remove("condition map:");
+ this.attrs.put("Conds:", sb.substring(0, sb.length() - 1));
+ }
+ }
+
+ private String getNameWithOpIdStats() {
+ StringBuffer sb = new StringBuffer();
+ sb.append(DagJsonParserUtils.renameReduceOutputOperator(name, vertex));
+ if (operatorId != null) {
+ sb.append(" [" + operatorId + "]");
+ }
+ if (!DagJsonParserUtils.OperatorNoStats.contains(name) && attrs.containsKey("Statistics:")) {
+ sb.append(" (" + attrs.get("Statistics:") + ")");
+ }
+ attrs.remove("Statistics:");
+ return sb.toString();
+ }
+
+ /**
+ * @param printer
+ * @param indentFlag
+ * @param branchOfJoinOp
+ * This parameter is used to show if it is a branch of a Join
+ * operator so that we can decide the corresponding indent.
+ * @throws Exception
+ */
+ public void print(Printer printer, int indentFlag, boolean branchOfJoinOp) throws Exception {
+ // print name
+ if (parser.printSet.contains(this)) {
+ printer.println(DagJsonParser.prefixString(indentFlag) + " Please refer to the previous "
+ + this.getNameWithOpIdStats());
+ return;
+ }
+ parser.printSet.add(this);
+ if (!branchOfJoinOp) {
+ printer.println(DagJsonParser.prefixString(indentFlag) + this.getNameWithOpIdStats());
+ } else {
+ printer.println(DagJsonParser.prefixString(indentFlag, "<-") + this.getNameWithOpIdStats());
+ }
+ branchOfJoinOp = false;
+ // if this operator is a Map Join Operator or a Merge Join Operator
+ if (this.type == OpType.MAPJOIN || this.type == OpType.MERGEJOIN) {
+ inlineJoinOp();
+ branchOfJoinOp = true;
+ }
+ // if this operator is the last operator, we summarize the non-inlined
+ // vertex
+ List<Connection> noninlined = new ArrayList<>();
+ if (this.parent == null) {
+ if (this.vertex != null) {
+ for (Connection connection : this.vertex.parentConnections) {
+ if (!parser.isInline(connection.from)) {
+ noninlined.add(connection);
+ }
+ }
+ }
+ }
+ // print attr
+ indentFlag++;
+ if (!attrs.isEmpty()) {
+ printer.println(DagJsonParser.prefixString(indentFlag)
+ + DagJsonParserUtils.attrsToString(attrs));
+ }
+ // print inline vertex
+ if (parser.inlineMap.containsKey(this)) {
+ List<Connection> connections = parser.inlineMap.get(this);
+ Collections.sort(connections);
+ for (Connection connection : connections) {
+ connection.from.print(printer, indentFlag, connection.type, this.vertex);
+ }
+ }
+ // print parent op, i.e., where data comes from
+ if (this.parent != null) {
+ this.parent.print(printer, indentFlag, branchOfJoinOp);
+ }
+ // print next vertex
+ else {
+ Collections.sort(noninlined);
+ for (Connection connection : noninlined) {
+ connection.from.print(printer, indentFlag, connection.type, this.vertex);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java
new file mode 100644
index 0000000..6f040f6
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+public final class Printer {
+ public static final String lineSeparator = System.getProperty("line.separator");;
+ private final StringBuilder builder = new StringBuilder();
+
+ public void print(String string) {
+ builder.append(string);
+ }
+
+ public void println(String string) {
+ builder.append(string);
+ builder.append(lineSeparator);
+ }
+
+ public void println() {
+ builder.append(lineSeparator);
+ }
+
+ public String toString() {
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java
new file mode 100644
index 0000000..d21a565
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.jsonexplain.Vertex.VertexType;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public final class Stage {
+ //external name is used to show at the console
+ String externalName;
+ //internal name is used to track the stages
+ public final String internalName;
+ //tezJsonParser
+ public final DagJsonParser parser;
+ // upstream stages, e.g., root stage
+ public final List<Stage> parentStages = new ArrayList<>();
+ // downstream stages.
+ public final List<Stage> childStages = new ArrayList<>();
+ public final Map<String, Vertex> vertexs =new LinkedHashMap<>();
+ public final Map<String, String> attrs = new TreeMap<>();
+ Map<Vertex, List<Connection>> tezStageDependency;
+ // some stage may contain only a single operator, e.g., create table operator,
+ // fetch operator.
+ Op op;
+
+ public Stage(String name, DagJsonParser tezJsonParser) {
+ super();
+ internalName = name;
+ externalName = name;
+ parser = tezJsonParser;
+ }
+
+ public void addDependency(JSONObject object, Map<String, Stage> stages) throws JSONException {
+ if (object.has("DEPENDENT STAGES")) {
+ String names = object.getString("DEPENDENT STAGES");
+ for (String name : names.split(",")) {
+ Stage parent = stages.get(name.trim());
+ this.parentStages.add(parent);
+ parent.childStages.add(this);
+ }
+ }
+ if (object.has("CONDITIONAL CHILD TASKS")) {
+ String names = object.getString("CONDITIONAL CHILD TASKS");
+ this.externalName = this.internalName + "(CONDITIONAL CHILD TASKS: " + names + ")";
+ for (String name : names.split(",")) {
+ Stage child = stages.get(name.trim());
+ child.externalName = child.internalName + "(CONDITIONAL)";
+ child.parentStages.add(this);
+ this.childStages.add(child);
+ }
+ }
+ }
+
+ /**
+ * @param object
+ * @throws Exception
+ * If the object of stage contains "Tez", we need to extract the
+ * vertices and edges Else we need to directly extract operators
+ * and/or attributes.
+ */
+ public void extractVertex(JSONObject object) throws Exception {
+ if (object.has(this.parser.getFrameworkName())) {
+ this.tezStageDependency = new TreeMap<>();
+ JSONObject tez = (JSONObject) object.get(this.parser.getFrameworkName());
+ JSONObject vertices = tez.getJSONObject("Vertices:");
+ if (tez.has("Edges:")) {
+ JSONObject edges = tez.getJSONObject("Edges:");
+ // iterate for the first time to get all the vertices
+ for (String to : JSONObject.getNames(edges)) {
+ vertexs.put(to, new Vertex(to, vertices.getJSONObject(to), parser));
+ }
+ // iterate for the second time to get all the vertex dependency
+ for (String to : JSONObject.getNames(edges)) {
+ Object o = edges.get(to);
+ Vertex v = vertexs.get(to);
+ // 1 to 1 mapping
+ if (o instanceof JSONObject) {
+ JSONObject obj = (JSONObject) o;
+ String parent = obj.getString("parent");
+ Vertex parentVertex = vertexs.get(parent);
+ if (parentVertex == null) {
+ parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser);
+ vertexs.put(parent, parentVertex);
+ }
+ String type = obj.getString("type");
+ // for union vertex, we reverse the dependency relationship
+ if (!"CONTAINS".equals(type)) {
+ v.addDependency(new Connection(type, parentVertex));
+ parentVertex.setType(type);
+ parentVertex.children.add(v);
+ } else {
+ parentVertex.addDependency(new Connection(type, v));
+ v.children.add(parentVertex);
+ }
+ this.tezStageDependency.put(v, Arrays.asList(new Connection(type, parentVertex)));
+ } else {
+ // 1 to many mapping
+ JSONArray from = (JSONArray) o;
+ List<Connection> list = new ArrayList<>();
+ for (int index = 0; index < from.length(); index++) {
+ JSONObject obj = from.getJSONObject(index);
+ String parent = obj.getString("parent");
+ Vertex parentVertex = vertexs.get(parent);
+ if (parentVertex == null) {
+ parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser);
+ vertexs.put(parent, parentVertex);
+ }
+ String type = obj.getString("type");
+ if (!"CONTAINS".equals(type)) {
+ v.addDependency(new Connection(type, parentVertex));
+ parentVertex.setType(type);
+ parentVertex.children.add(v);
+ } else {
+ parentVertex.addDependency(new Connection(type, v));
+ v.children.add(parentVertex);
+ }
+ list.add(new Connection(type, parentVertex));
+ }
+ this.tezStageDependency.put(v, list);
+ }
+ }
+ } else {
+ for (String vertexName : JSONObject.getNames(vertices)) {
+ vertexs.put(vertexName, new Vertex(vertexName, vertices.getJSONObject(vertexName), parser));
+ }
+ }
+ // The opTree in vertex is extracted
+ for (Vertex v : vertexs.values()) {
+ if (v.vertexType == VertexType.MAP || v.vertexType == VertexType.REDUCE) {
+ v.extractOpTree();
+ v.checkMultiReduceOperator();
+ }
+ }
+ } else {
+ String[] names = JSONObject.getNames(object);
+ if (names != null) {
+ for (String name : names) {
+ if (name.contains("Operator")) {
+ this.op = extractOp(name, object.getJSONObject(name));
+ } else {
+ if (!object.get(name).toString().isEmpty()) {
+ attrs.put(name, object.get(name).toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @param opName
+ * @param opObj
+ * @return
+ * @throws Exception
+ * This method address the create table operator, fetch operator,
+ * etc
+ */
+ Op extractOp(String opName, JSONObject opObj) throws Exception {
+ Map<String, String> attrs = new TreeMap<>();
+ Vertex v = null;
+ if (opObj.length() > 0) {
+ String[] names = JSONObject.getNames(opObj);
+ for (String name : names) {
+ Object o = opObj.get(name);
+ if (isPrintable(o) && !o.toString().isEmpty()) {
+ attrs.put(name, o.toString());
+ } else if (o instanceof JSONObject) {
+ JSONObject attrObj = (JSONObject) o;
+ if (attrObj.length() > 0) {
+ if (name.equals("Processor Tree:")) {
+ JSONObject object = new JSONObject(new LinkedHashMap<>());
+ object.put(name, attrObj);
+ v = new Vertex(null, object, parser);
+ v.extractOpTree();
+ } else {
+ for (String attrName : JSONObject.getNames(attrObj)) {
+ if (!attrObj.get(attrName).toString().isEmpty()) {
+ attrs.put(attrName, attrObj.get(attrName).toString());
+ }
+ }
+ }
+ }
+ } else {
+ throw new Exception("Unsupported object in " + this.internalName);
+ }
+ }
+ }
+ Op op = new Op(opName, null, null, null, attrs, null, v, parser);
+ if (v != null) {
+ parser.addInline(op, new Connection(null, v));
+ }
+ return op;
+ }
+
+ private boolean isPrintable(Object val) {
+ if (val instanceof Boolean || val instanceof String || val instanceof Integer
+ || val instanceof Long || val instanceof Byte || val instanceof Float
+ || val instanceof Double || val instanceof Path) {
+ return true;
+ }
+ if (val != null && val.getClass().isPrimitive()) {
+ return true;
+ }
+ return false;
+ }
+
+ public void print(Printer printer, int indentFlag) throws Exception {
+ // print stagename
+ if (parser.printSet.contains(this)) {
+ printer.println(DagJsonParser.prefixString(indentFlag) + " Please refer to the previous "
+ + externalName);
+ return;
+ }
+ parser.printSet.add(this);
+ printer.println(DagJsonParser.prefixString(indentFlag) + externalName);
+ // print vertexes
+ indentFlag++;
+ for (Vertex candidate : this.vertexs.values()) {
+ if (!parser.isInline(candidate) && candidate.children.isEmpty()) {
+ candidate.print(printer, indentFlag, null, null);
+ }
+ }
+ if (!attrs.isEmpty()) {
+ printer.println(DagJsonParser.prefixString(indentFlag)
+ + DagJsonParserUtils.attrsToString(attrs));
+ }
+ if (op != null) {
+ op.print(printer, indentFlag, false);
+ }
+ indentFlag++;
+ // print dependent stages
+ for (Stage stage : this.parentStages) {
+ stage.print(printer, indentFlag);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
new file mode 100644
index 0000000..c93059d
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.common.jsonexplain.Op.OpType;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public final class Vertex implements Comparable<Vertex>{
+ public final String name;
+ //tezJsonParser
+ public final DagJsonParser parser;
+ // vertex's parent connections.
+ public final List<Connection> parentConnections = new ArrayList<>();
+ // vertex's children vertex.
+ public final List<Vertex> children = new ArrayList<>();
+ // the jsonObject for this vertex
+ public final JSONObject vertexObject;
+ // whether this vertex is dummy (which does not really exists but is created),
+ // e.g., a dummy vertex for a mergejoin branch
+ public boolean dummy;
+ // the rootOps in this vertex
+ public final List<Op> rootOps = new ArrayList<>();
+ // we create a dummy vertex for a mergejoin branch for a self join if this
+ // vertex is a mergejoin
+ public final List<Vertex> mergeJoinDummyVertexs = new ArrayList<>();
+ // this vertex has multiple reduce operators
+ public int numReduceOp = 0;
+ // execution mode
+ public String executionMode = "";
+ // tagToInput for reduce work
+ public Map<String, String> tagToInput = new LinkedHashMap<>();
+ // tag
+ public String tag;
+
+ public static enum VertexType {
+ MAP, REDUCE, UNION, UNKNOWN
+ };
+ public VertexType vertexType;
+
+ public static enum EdgeType {
+ BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, UNKNOWN
+ };
+ public String edgeType;
+
+ public Vertex(String name, JSONObject vertexObject, DagJsonParser dagJsonParser) {
+ super();
+ this.name = name;
+ if (this.name != null) {
+ if (this.name.contains("Map")) {
+ this.vertexType = VertexType.MAP;
+ } else if (this.name.contains("Reduce")) {
+ this.vertexType = VertexType.REDUCE;
+ } else if (this.name.contains("Union")) {
+ this.vertexType = VertexType.UNION;
+ } else {
+ this.vertexType = VertexType.UNKNOWN;
+ }
+ } else {
+ this.vertexType = VertexType.UNKNOWN;
+ }
+ this.dummy = false;
+ this.vertexObject = vertexObject;
+ parser = dagJsonParser;
+ }
+
+ public void addDependency(Connection connection) throws JSONException {
+ this.parentConnections.add(connection);
+ }
+
+ /**
+ * @throws JSONException
+ * @throws JsonParseException
+ * @throws JsonMappingException
+ * @throws IOException
+ * @throws Exception
+ * We assume that there is a single top-level Map Operator Tree or a
+ * Reduce Operator Tree in a vertex
+ */
+ public void extractOpTree() throws JSONException, JsonParseException, JsonMappingException,
+ IOException, Exception {
+ if (vertexObject.length() != 0) {
+ for (String key : JSONObject.getNames(vertexObject)) {
+ if (key.equals("Map Operator Tree:")) {
+ extractOp(vertexObject.getJSONArray(key).getJSONObject(0));
+ } else if (key.equals("Reduce Operator Tree:") || key.equals("Processor Tree:")) {
+ extractOp(vertexObject.getJSONObject(key));
+ } else if (key.equals("Join:")) {
+ // this is the case when we have a map-side SMB join
+ // one input of the join is treated as a dummy vertex
+ JSONArray array = vertexObject.getJSONArray(key);
+ for (int index = 0; index < array.length(); index++) {
+ JSONObject mpOpTree = array.getJSONObject(index);
+ Vertex v = new Vertex(null, mpOpTree, parser);
+ v.extractOpTree();
+ v.dummy = true;
+ mergeJoinDummyVertexs.add(v);
+ }
+ } else if (key.equals("Merge File Operator")) {
+ JSONObject opTree = vertexObject.getJSONObject(key);
+ if (opTree.has("Map Operator Tree:")) {
+ extractOp(opTree.getJSONArray("Map Operator Tree:").getJSONObject(0));
+ } else {
+ throw new Exception("Merge File Operator does not have a Map Operator Tree");
+ }
+ } else if (key.equals("Execution mode:")) {
+ executionMode = " " + vertexObject.getString(key);
+ } else if (key.equals("tagToInput:")) {
+ JSONObject tagToInput = vertexObject.getJSONObject(key);
+ for (String tag : JSONObject.getNames(tagToInput)) {
+ this.tagToInput.put(tag, (String) tagToInput.get(tag));
+ }
+ } else if (key.equals("tag:")) {
+ this.tag = vertexObject.getString(key);
+ } else if (key.equals("Local Work:")) {
+ extractOp(vertexObject.getJSONObject(key));
+ } else {
+ throw new Exception("Unsupported operator tree in vertex " + this.name);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param operator
+ * @param parent
+ * @return
+ * @throws JSONException
+ * @throws JsonParseException
+ * @throws JsonMappingException
+ * @throws IOException
+ * @throws Exception
+ * assumption: each operator only has one parent but may have many
+ * children
+ */
+ Op extractOp(JSONObject operator) throws JSONException, JsonParseException, JsonMappingException,
+ IOException, Exception {
+ String[] names = JSONObject.getNames(operator);
+ if (names.length != 1) {
+ throw new Exception("Expect only one operator in " + operator.toString());
+ } else {
+ String opName = names[0];
+ JSONObject attrObj = (JSONObject) operator.get(opName);
+ Map<String, String> attrs = new TreeMap<>();
+ List<Op> children = new ArrayList<>();
+ String id = null;
+ String outputVertexName = null;
+ if (JSONObject.getNames(attrObj) != null) {
+ for (String attrName : JSONObject.getNames(attrObj)) {
+ if (attrName.equals("children")) {
+ Object childrenObj = attrObj.get(attrName);
+ if (childrenObj instanceof JSONObject) {
+ if (((JSONObject) childrenObj).length() != 0) {
+ children.add(extractOp((JSONObject) childrenObj));
+ }
+ } else if (childrenObj instanceof JSONArray) {
+ if (((JSONArray) childrenObj).length() != 0) {
+ JSONArray array = ((JSONArray) childrenObj);
+ for (int index = 0; index < array.length(); index++) {
+ children.add(extractOp(array.getJSONObject(index)));
+ }
+ }
+ } else {
+ throw new Exception("Unsupported operator " + this.name
+ + "'s children operator is neither a jsonobject nor a jsonarray");
+ }
+ } else {
+ if (attrName.equals("OperatorId:")) {
+ id = attrObj.get(attrName).toString();
+ } else if (attrName.equals("outputname:")) {
+ outputVertexName = attrObj.get(attrName).toString();
+ } else {
+ if (!attrObj.get(attrName).toString().isEmpty()) {
+ attrs.put(attrName, attrObj.get(attrName).toString());
+ }
+ }
+ }
+ }
+ }
+ Op op = new Op(opName, id, outputVertexName, children, attrs, operator, this, parser);
+ if (!children.isEmpty()) {
+ for (Op child : children) {
+ child.parent = op;
+ }
+ } else {
+ this.rootOps.add(op);
+ }
+ return op;
+ }
+ }
+
+ public void print(Printer printer, int indentFlag, String type, Vertex callingVertex)
+ throws JSONException, Exception {
+ // print vertexname
+ if (parser.printSet.contains(this) && numReduceOp <= 1) {
+ if (type != null) {
+ printer.println(DagJsonParser.prefixString(indentFlag, "<-")
+ + " Please refer to the previous " + this.name + " [" + type + "]");
+ } else {
+ printer.println(DagJsonParser.prefixString(indentFlag, "<-")
+ + " Please refer to the previous " + this.name);
+ }
+ return;
+ }
+ parser.printSet.add(this);
+ if (type != null) {
+ printer.println(DagJsonParser.prefixString(indentFlag, "<-") + this.name + " [" + type + "]"
+ + this.executionMode);
+ } else if (this.name != null) {
+ printer.println(DagJsonParser.prefixString(indentFlag) + this.name + this.executionMode);
+ }
+ // print operators
+ if (numReduceOp > 1 && !(callingVertex.vertexType == VertexType.UNION)) {
+ // find the right op
+ Op choose = null;
+ for (Op op : this.rootOps) {
+ if (op.outputVertexName.equals(callingVertex.name)) {
+ choose = op;
+ }
+ }
+ if (choose != null) {
+ choose.print(printer, indentFlag, false);
+ } else {
+ throw new Exception("Can not find the right reduce output operator for vertex " + this.name);
+ }
+ } else {
+ for (Op op : this.rootOps) {
+ // dummy vertex is treated as a branch of a join operator
+ if (this.dummy) {
+ op.print(printer, indentFlag, true);
+ } else {
+ op.print(printer, indentFlag, false);
+ }
+ }
+ }
+ if (vertexType == VertexType.UNION) {
+ // print dependent vertexs
+ indentFlag++;
+ for (int index = 0; index < this.parentConnections.size(); index++) {
+ Connection connection = this.parentConnections.get(index);
+ connection.from.print(printer, indentFlag, connection.type, this);
+ }
+ }
+ }
+
+ /**
+ * We check if a vertex has multiple reduce operators.
+ */
+ public void checkMultiReduceOperator() {
+ // check if it is a reduce vertex and its children is more than 1;
+ if (this.rootOps.size() < 2) {
+ return;
+ }
+ // check if all the child ops are reduce output operators
+ for (Op op : this.rootOps) {
+ if (op.type == OpType.RS) {
+ numReduceOp++;
+ }
+ }
+ }
+
+ public void setType(String type) {
+ this.edgeType = this.parser.mapEdgeType(type);
+ }
+
+ // The following code should be gone after HIVE-11075 using topological order
+ @Override
+ public int compareTo(Vertex o) {
+ // we print the vertex that has more rs before the vertex that has fewer rs.
+ if (numReduceOp != o.numReduceOp) {
+ return -(numReduceOp - o.numReduceOp);
+ } else {
+ return this.name.compareTo(o.name);
+ }
+ }
+
+ public Op getJoinRSOp(Vertex joinVertex) {
+ if (rootOps.size() == 0) {
+ return null;
+ } else if (rootOps.size() == 1) {
+ if (rootOps.get(0).type == OpType.RS) {
+ return rootOps.get(0);
+ } else {
+ return null;
+ }
+ } else {
+ for (Op op : rootOps) {
+ if (op.type == OpType.RS) {
+ if (op.outputVertexName.equals(joinVertex.name)) {
+ return op;
+ }
+ }
+ }
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java
new file mode 100644
index 0000000..9485aa4
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain.spark;
+
+import org.apache.hadoop.hive.common.jsonexplain.DagJsonParser;
+
+
+public class SparkJsonParser extends DagJsonParser {
+
+ @Override
+ public String mapEdgeType(String edgeName) {
+ return edgeName;
+ }
+
+ @Override
+ public String getFrameworkName() {
+ return "Spark";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
deleted file mode 100644
index 5cd0e4c..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-public final class Connection implements Comparable<Connection>{
- public final String type;
- public final Vertex from;
-
- public Connection(String type, Vertex from) {
- super();
- this.type = type;
- this.from = from;
- }
-
- @Override
- public int compareTo(Connection o) {
- return from.compareTo(o.from);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
deleted file mode 100644
index 96e75c0..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.hadoop.hive.common.jsonexplain.tez.Vertex.VertexType;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public final class Op {
- public final String name;
- // tezJsonParser
- public final TezJsonParser parser;
- public final String operatorId;
- public Op parent;
- public final List<Op> children;
- public final Map<String, String> attrs;
- // the jsonObject for this operator
- public final JSONObject opObject;
- // the vertex that this operator belongs to
- public final Vertex vertex;
- // the vertex that this operator output to
- public final String outputVertexName;
- // the Operator type
- public final OpType type;
-
- public enum OpType {
- MAPJOIN, MERGEJOIN, RS, OTHERS
- };
-
- public Op(String name, String id, String outputVertexName, List<Op> children,
- Map<String, String> attrs, JSONObject opObject, Vertex vertex, TezJsonParser tezJsonParser)
- throws JSONException {
- super();
- this.name = name;
- this.operatorId = id;
- this.type = deriveOpType(operatorId);
- this.outputVertexName = outputVertexName;
- this.children = children;
- this.attrs = attrs;
- this.opObject = opObject;
- this.vertex = vertex;
- this.parser = tezJsonParser;
- }
-
- private OpType deriveOpType(String operatorId) {
- if (operatorId != null) {
- if (operatorId.startsWith(OpType.MAPJOIN.toString())) {
- return OpType.MAPJOIN;
- } else if (operatorId.startsWith(OpType.MERGEJOIN.toString())) {
- return OpType.MERGEJOIN;
- } else if (operatorId.startsWith(OpType.RS.toString())) {
- return OpType.RS;
- } else {
- return OpType.OTHERS;
- }
- } else {
- return OpType.OTHERS;
- }
- }
-
- private void inlineJoinOp() throws Exception {
- // inline map join operator
- if (this.type == OpType.MAPJOIN) {
- JSONObject joinObj = opObject.getJSONObject(this.name);
- // get the map for posToVertex
- JSONObject verticeObj = joinObj.getJSONObject("input vertices:");
- Map<String, Vertex> posToVertex = new LinkedHashMap<>();
- for (String pos : JSONObject.getNames(verticeObj)) {
- String vertexName = verticeObj.getString(pos);
- // update the connection
- Connection c = null;
- for (Connection connection : vertex.parentConnections) {
- if (connection.from.name.equals(vertexName)) {
- posToVertex.put(pos, connection.from);
- c = connection;
- break;
- }
- }
- if (c != null) {
- parser.addInline(this, c);
- }
- }
- // update the attrs
- this.attrs.remove("input vertices:");
- // update the keys to use operator name
- JSONObject keys = joinObj.getJSONObject("keys:");
- // find out the vertex for the big table
- Set<Vertex> parentVertexes = new HashSet<>();
- for (Connection connection : vertex.parentConnections) {
- parentVertexes.add(connection.from);
- }
- parentVertexes.removeAll(posToVertex.values());
- Map<String, String> posToOpId = new LinkedHashMap<>();
- if (keys.length() != 0) {
- for (String key : JSONObject.getNames(keys)) {
- // first search from the posToVertex
- if (posToVertex.containsKey(key)) {
- Vertex v = posToVertex.get(key);
- if (v.rootOps.size() == 1) {
- posToOpId.put(key, v.rootOps.get(0).operatorId);
- } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
- posToOpId.put(key, v.name);
- } else {
- Op joinRSOp = v.getJoinRSOp(vertex);
- if (joinRSOp != null) {
- posToOpId.put(key, joinRSOp.operatorId);
- } else {
- throw new Exception(
- "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
- + " when hive explain user is trying to identify the operator id.");
- }
- }
- }
- // then search from parent
- else if (parent != null) {
- posToOpId.put(key, parent.operatorId);
- }
- // then assume it is from its own vertex
- else if (parentVertexes.size() == 1) {
- Vertex v = parentVertexes.iterator().next();
- parentVertexes.clear();
- if (v.rootOps.size() == 1) {
- posToOpId.put(key, v.rootOps.get(0).operatorId);
- } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
- posToOpId.put(key, v.name);
- } else {
- Op joinRSOp = v.getJoinRSOp(vertex);
- if (joinRSOp != null) {
- posToOpId.put(key, joinRSOp.operatorId);
- } else {
- throw new Exception(
- "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
- + " when hive explain user is trying to identify the operator id.");
- }
- }
- }
- // finally throw an exception
- else {
- throw new Exception(
- "Can not find the source operator on one of the branches of map join.");
- }
- }
- }
- this.attrs.remove("keys:");
- StringBuffer sb = new StringBuffer();
- JSONArray conditionMap = joinObj.getJSONArray("condition map:");
- for (int index = 0; index < conditionMap.length(); index++) {
- JSONObject cond = conditionMap.getJSONObject(index);
- String k = (String) cond.keys().next();
- JSONObject condObject = new JSONObject((String)cond.get(k));
- String type = condObject.getString("type");
- String left = condObject.getString("left");
- String right = condObject.getString("right");
- if (keys.length() != 0) {
- sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "."
- + keys.get(right) + "(" + type + "),");
- } else {
- // probably a cross product
- sb.append("(" + type + "),");
- }
- }
- this.attrs.remove("condition map:");
- this.attrs.put("Conds:", sb.substring(0, sb.length() - 1));
- }
- // should be merge join
- else {
- Map<String, String> posToOpId = new LinkedHashMap<>();
- if (vertex.mergeJoinDummyVertexs.size() == 0) {
- if (vertex.tagToInput.size() != vertex.parentConnections.size()) {
- throw new Exception("tagToInput size " + vertex.tagToInput.size()
- + " is different from parentConnections size " + vertex.parentConnections.size());
- }
- for (Entry<String, String> entry : vertex.tagToInput.entrySet()) {
- Connection c = null;
- for (Connection connection : vertex.parentConnections) {
- if (connection.from.name.equals(entry.getValue())) {
- Vertex v = connection.from;
- if (v.rootOps.size() == 1) {
- posToOpId.put(entry.getKey(), v.rootOps.get(0).operatorId);
- } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
- posToOpId.put(entry.getKey(), v.name);
- } else {
- Op joinRSOp = v.getJoinRSOp(vertex);
- if (joinRSOp != null) {
- posToOpId.put(entry.getKey(), joinRSOp.operatorId);
- } else {
- throw new Exception(
- "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
- + " when hive explain user is trying to identify the operator id.");
- }
- }
- c = connection;
- break;
- }
- }
- if (c == null) {
- throw new Exception("Can not find " + entry.getValue()
- + " while parsing keys of merge join operator");
- }
- }
- } else {
- posToOpId.put(vertex.tag, this.parent.operatorId);
- for (Vertex v : vertex.mergeJoinDummyVertexs) {
- if (v.rootOps.size() != 1) {
- throw new Exception("Can not find a single root operators in a single vertex " + v.name
- + " when hive explain user is trying to identify the operator id.");
- }
- posToOpId.put(v.tag, v.rootOps.get(0).operatorId);
- }
- }
- JSONObject joinObj = opObject.getJSONObject(this.name);
- // update the keys to use operator name
- JSONObject keys = joinObj.getJSONObject("keys:");
- if (keys.length() != 0) {
- for (String key : JSONObject.getNames(keys)) {
- if (!posToOpId.containsKey(key)) {
- throw new Exception(
- "Can not find the source operator on one of the branches of merge join.");
- }
- }
- // inline merge join operator in a self-join
- if (this.vertex != null) {
- for (Vertex v : this.vertex.mergeJoinDummyVertexs) {
- parser.addInline(this, new Connection(null, v));
- }
- }
- }
- // update the attrs
- this.attrs.remove("keys:");
- StringBuffer sb = new StringBuffer();
- JSONArray conditionMap = joinObj.getJSONArray("condition map:");
- for (int index = 0; index < conditionMap.length(); index++) {
- JSONObject cond = conditionMap.getJSONObject(index);
- String k = (String) cond.keys().next();
- JSONObject condObject = new JSONObject((String)cond.get(k));
- String type = condObject.getString("type");
- String left = condObject.getString("left");
- String right = condObject.getString("right");
- if (keys.length() != 0) {
- sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "."
- + keys.get(right) + "(" + type + "),");
- } else {
- // probably a cross product
- sb.append("(" + type + "),");
- }
- }
- this.attrs.remove("condition map:");
- this.attrs.put("Conds:", sb.substring(0, sb.length() - 1));
- }
- }
-
- private String getNameWithOpIdStats() {
- StringBuffer sb = new StringBuffer();
- sb.append(TezJsonParserUtils.renameReduceOutputOperator(name, vertex));
- if (operatorId != null) {
- sb.append(" [" + operatorId + "]");
- }
- if (!TezJsonParserUtils.OperatorNoStats.contains(name) && attrs.containsKey("Statistics:")) {
- sb.append(" (" + attrs.get("Statistics:") + ")");
- }
- attrs.remove("Statistics:");
- return sb.toString();
- }
-
- /**
- * @param printer
- * @param indentFlag
- * @param branchOfJoinOp
- * This parameter is used to show if it is a branch of a Join
- * operator so that we can decide the corresponding indent.
- * @throws Exception
- */
- public void print(Printer printer, int indentFlag, boolean branchOfJoinOp) throws Exception {
- // print name
- if (parser.printSet.contains(this)) {
- printer.println(TezJsonParser.prefixString(indentFlag) + " Please refer to the previous "
- + this.getNameWithOpIdStats());
- return;
- }
- parser.printSet.add(this);
- if (!branchOfJoinOp) {
- printer.println(TezJsonParser.prefixString(indentFlag) + this.getNameWithOpIdStats());
- } else {
- printer.println(TezJsonParser.prefixString(indentFlag, "<-") + this.getNameWithOpIdStats());
- }
- branchOfJoinOp = false;
- // if this operator is a Map Join Operator or a Merge Join Operator
- if (this.type == OpType.MAPJOIN || this.type == OpType.MERGEJOIN) {
- inlineJoinOp();
- branchOfJoinOp = true;
- }
- // if this operator is the last operator, we summarize the non-inlined
- // vertex
- List<Connection> noninlined = new ArrayList<>();
- if (this.parent == null) {
- if (this.vertex != null) {
- for (Connection connection : this.vertex.parentConnections) {
- if (!parser.isInline(connection.from)) {
- noninlined.add(connection);
- }
- }
- }
- }
- // print attr
- indentFlag++;
- if (!attrs.isEmpty()) {
- printer.println(TezJsonParser.prefixString(indentFlag)
- + TezJsonParserUtils.attrsToString(attrs));
- }
- // print inline vertex
- if (parser.inlineMap.containsKey(this)) {
- List<Connection> connections = parser.inlineMap.get(this);
- Collections.sort(connections);
- for (Connection connection : connections) {
- connection.from.print(printer, indentFlag, connection.type, this.vertex);
- }
- }
- // print parent op, i.e., where data comes from
- if (this.parent != null) {
- this.parent.print(printer, indentFlag, branchOfJoinOp);
- }
- // print next vertex
- else {
- Collections.sort(noninlined);
- for (Connection connection : noninlined) {
- connection.from.print(printer, indentFlag, connection.type, this.vertex);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java
deleted file mode 100644
index d3c91d6..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Printer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-public final class Printer {
- public static final String lineSeparator = System.getProperty("line.separator");;
- private final StringBuilder builder = new StringBuilder();
-
- public void print(String string) {
- builder.append(string);
- }
-
- public void println(String string) {
- builder.append(string);
- builder.append(lineSeparator);
- }
-
- public void println() {
- builder.append(lineSeparator);
- }
-
- public String toString() {
- return builder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/00b64448/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java
deleted file mode 100644
index 63937f8..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Stage.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.jsonexplain.tez.Vertex.VertexType;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public final class Stage {
- //external name is used to show at the console
- String externalName;
- //internal name is used to track the stages
- public final String internalName;
- //tezJsonParser
- public final TezJsonParser parser;
- // upstream stages, e.g., root stage
- public final List<Stage> parentStages = new ArrayList<>();
- // downstream stages.
- public final List<Stage> childStages = new ArrayList<>();
- public final Map<String, Vertex> vertexs =new LinkedHashMap<>();
- public final Map<String, String> attrs = new TreeMap<>();
- Map<Vertex, List<Connection>> tezStageDependency;
- // some stage may contain only a single operator, e.g., create table operator,
- // fetch operator.
- Op op;
-
- public Stage(String name, TezJsonParser tezJsonParser) {
- super();
- internalName = name;
- externalName = name;
- parser = tezJsonParser;
- }
-
- public void addDependency(JSONObject object, Map<String, Stage> stages) throws JSONException {
- if (object.has("DEPENDENT STAGES")) {
- String names = object.getString("DEPENDENT STAGES");
- for (String name : names.split(",")) {
- Stage parent = stages.get(name.trim());
- this.parentStages.add(parent);
- parent.childStages.add(this);
- }
- }
- if (object.has("CONDITIONAL CHILD TASKS")) {
- String names = object.getString("CONDITIONAL CHILD TASKS");
- this.externalName = this.internalName + "(CONDITIONAL CHILD TASKS: " + names + ")";
- for (String name : names.split(",")) {
- Stage child = stages.get(name.trim());
- child.externalName = child.internalName + "(CONDITIONAL)";
- child.parentStages.add(this);
- this.childStages.add(child);
- }
- }
- }
-
- /**
- * @param object
- * @throws Exception
- * If the object of stage contains "Tez", we need to extract the
- * vertices and edges Else we need to directly extract operators
- * and/or attributes.
- */
- public void extractVertex(JSONObject object) throws Exception {
- if (object.has("Tez")) {
- this.tezStageDependency = new TreeMap<>();
- JSONObject tez = (JSONObject) object.get("Tez");
- JSONObject vertices = tez.getJSONObject("Vertices:");
- if (tez.has("Edges:")) {
- JSONObject edges = tez.getJSONObject("Edges:");
- // iterate for the first time to get all the vertices
- for (String to : JSONObject.getNames(edges)) {
- vertexs.put(to, new Vertex(to, vertices.getJSONObject(to), parser));
- }
- // iterate for the second time to get all the vertex dependency
- for (String to : JSONObject.getNames(edges)) {
- Object o = edges.get(to);
- Vertex v = vertexs.get(to);
- // 1 to 1 mapping
- if (o instanceof JSONObject) {
- JSONObject obj = (JSONObject) o;
- String parent = obj.getString("parent");
- Vertex parentVertex = vertexs.get(parent);
- if (parentVertex == null) {
- parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser);
- vertexs.put(parent, parentVertex);
- }
- String type = obj.getString("type");
- // for union vertex, we reverse the dependency relationship
- if (!"CONTAINS".equals(type)) {
- v.addDependency(new Connection(type, parentVertex));
- parentVertex.setType(type);
- parentVertex.children.add(v);
- } else {
- parentVertex.addDependency(new Connection(type, v));
- v.children.add(parentVertex);
- }
- this.tezStageDependency.put(v, Arrays.asList(new Connection(type, parentVertex)));
- } else {
- // 1 to many mapping
- JSONArray from = (JSONArray) o;
- List<Connection> list = new ArrayList<>();
- for (int index = 0; index < from.length(); index++) {
- JSONObject obj = from.getJSONObject(index);
- String parent = obj.getString("parent");
- Vertex parentVertex = vertexs.get(parent);
- if (parentVertex == null) {
- parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser);
- vertexs.put(parent, parentVertex);
- }
- String type = obj.getString("type");
- if (!"CONTAINS".equals(type)) {
- v.addDependency(new Connection(type, parentVertex));
- parentVertex.setType(type);
- parentVertex.children.add(v);
- } else {
- parentVertex.addDependency(new Connection(type, v));
- v.children.add(parentVertex);
- }
- list.add(new Connection(type, parentVertex));
- }
- this.tezStageDependency.put(v, list);
- }
- }
- } else {
- for (String vertexName : JSONObject.getNames(vertices)) {
- vertexs.put(vertexName, new Vertex(vertexName, vertices.getJSONObject(vertexName), parser));
- }
- }
- // The opTree in vertex is extracted
- for (Vertex v : vertexs.values()) {
- if (v.vertexType == VertexType.MAP || v.vertexType == VertexType.REDUCE) {
- v.extractOpTree();
- v.checkMultiReduceOperator();
- }
- }
- } else {
- String[] names = JSONObject.getNames(object);
- if (names != null) {
- for (String name : names) {
- if (name.contains("Operator")) {
- this.op = extractOp(name, object.getJSONObject(name));
- } else {
- if (!object.get(name).toString().isEmpty()) {
- attrs.put(name, object.get(name).toString());
- }
- }
- }
- }
- }
- }
-
- /**
- * @param opName
- * @param opObj
- * @return
- * @throws Exception
- * This method address the create table operator, fetch operator,
- * etc
- */
- Op extractOp(String opName, JSONObject opObj) throws Exception {
- Map<String, String> attrs = new TreeMap<>();
- Vertex v = null;
- if (opObj.length() > 0) {
- String[] names = JSONObject.getNames(opObj);
- for (String name : names) {
- Object o = opObj.get(name);
- if (isPrintable(o) && !o.toString().isEmpty()) {
- attrs.put(name, o.toString());
- } else if (o instanceof JSONObject) {
- JSONObject attrObj = (JSONObject) o;
- if (attrObj.length() > 0) {
- if (name.equals("Processor Tree:")) {
- JSONObject object = new JSONObject(new LinkedHashMap<>());
- object.put(name, attrObj);
- v = new Vertex(null, object, parser);
- v.extractOpTree();
- } else {
- for (String attrName : JSONObject.getNames(attrObj)) {
- if (!attrObj.get(attrName).toString().isEmpty()) {
- attrs.put(attrName, attrObj.get(attrName).toString());
- }
- }
- }
- }
- } else {
- throw new Exception("Unsupported object in " + this.internalName);
- }
- }
- }
- Op op = new Op(opName, null, null, null, attrs, null, v, parser);
- if (v != null) {
- parser.addInline(op, new Connection(null, v));
- }
- return op;
- }
-
- private boolean isPrintable(Object val) {
- if (val instanceof Boolean || val instanceof String || val instanceof Integer
- || val instanceof Long || val instanceof Byte || val instanceof Float
- || val instanceof Double || val instanceof Path) {
- return true;
- }
- if (val != null && val.getClass().isPrimitive()) {
- return true;
- }
- return false;
- }
-
- public void print(Printer printer, int indentFlag) throws Exception {
- // print stagename
- if (parser.printSet.contains(this)) {
- printer.println(TezJsonParser.prefixString(indentFlag) + " Please refer to the previous "
- + externalName);
- return;
- }
- parser.printSet.add(this);
- printer.println(TezJsonParser.prefixString(indentFlag) + externalName);
- // print vertexes
- indentFlag++;
- for (Vertex candidate : this.vertexs.values()) {
- if (!parser.isInline(candidate) && candidate.children.isEmpty()) {
- candidate.print(printer, indentFlag, null, null);
- }
- }
- if (!attrs.isEmpty()) {
- printer.println(TezJsonParser.prefixString(indentFlag)
- + TezJsonParserUtils.attrsToString(attrs));
- }
- if (op != null) {
- op.print(printer, indentFlag, false);
- }
- indentFlag++;
- // print dependent stages
- for (Stage stage : this.parentStages) {
- stage.print(printer, indentFlag);
- }
- }
-}