You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/05/23 02:24:04 UTC
svn commit: r1681252 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
src/org/apache/pig/tools/pigstats/ScriptState.java
Author: daijy
Date: Sat May 23 00:24:04 2015
New Revision: 1681252
URL: http://svn.apache.org/r1681252
Log:
PIG-4429: Add Pig alias information and Pig script to the DAG view in Tez UI
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1681252&r1=1681251&r2=1681252&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat May 23 00:24:04 2015
@@ -32,6 +32,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4429: Add Pig alias information and Pig script to the DAG view in Tez UI (daijy)
+
PIG-3994: Implement getting backend exception for Tez (rohini)
PIG-4563: Upgrade to released Tez 0.7.0 (daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1681252&r1=1681251&r2=1681252&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Sat May 23 00:24:04 2015
@@ -23,11 +23,9 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
@@ -132,6 +130,7 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -171,8 +170,6 @@ import org.apache.tez.runtime.library.in
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
/**
* A visitor to construct DAG out of Tez plan.
@@ -337,8 +334,7 @@ public class TezDagBuilder extends TezOp
}
return GroupInputEdge.create(from, to, edgeProperty,
- InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload())
- .setHistoryText(edgeProperty.getEdgeDestination().getHistoryText()));
+ InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
}
/**
@@ -431,9 +427,8 @@ public class TezDagBuilder extends TezOp
MRToTezHelper.processMRSettings(conf, globalConf);
- String historyString = convertToHistoryText("", conf);
- in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
- out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
+ in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+ out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
@@ -666,7 +661,9 @@ public class TezDagBuilder extends TezOp
// Take our assembled configuration and create a vertex
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
- procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(), payloadConf));
+ TezDAGScriptInfo dagScriptInfo = TezScriptState.get().getDAGScriptInfo(dag.getName());
+ String vertexInfo = dagScriptInfo.getAliasLocation(tezOp) + " (" + dagScriptInfo.getPigFeatures(tezOp) + ")" ;
+ procDesc.setUserPayload(userPayload).setHistoryText(TezUtils.convertToHistoryText(vertexInfo, payloadConf));
String vmPluginName = null;
Configuration vmPluginConf = null;
@@ -827,8 +824,7 @@ public class TezDagBuilder extends TezOp
vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()));
vertex.addDataSource(ld.getOperatorKey().toString(),
DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
- .setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer()))
- .setHistoryText(convertToHistoryText("", payloadConf)),
+ .setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer())),
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
inputSplitInfo.getNumTasks(),
dag.getCredentials(),
@@ -852,8 +848,7 @@ public class TezDagBuilder extends TezOp
OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
MROutput.class.getName()).setUserPayload(TezUtils
- .createUserPayloadFromConf(outputPayLoad))
- .setHistoryText(convertToHistoryText("", outputPayLoad));
+ .createUserPayloadFromConf(outputPayLoad));
if (tezOp.getVertexGroupStores() != null) {
OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
if (vertexGroupKey != null) {
@@ -888,8 +883,7 @@ public class TezDagBuilder extends TezOp
if (vmPluginName != null) {
VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName);
if (vmPluginConf != null) {
- vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf))
- .setHistoryText(convertToHistoryText(vmPluginName, vmPluginConf));
+ vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
}
vertex.setVertexManagerPlugin(vmPluginDescriptor);
}
@@ -1184,27 +1178,4 @@ public class TezDagBuilder extends TezOp
job.setOutputFormatClass(PigOutputFormatTez.class);
}
}
-
- // Borrowed from TezUtils.convertToHistoryText since it is not part of Tez 0.5.2
- public static String convertToHistoryText(String description, Configuration conf) throws IOException {
- // Add a version if this serialization is changed
- JSONObject jsonObject = new JSONObject();
- try {
- if (description != null && !description.isEmpty()) {
- jsonObject.put("desc", description);
- }
- if (conf != null) {
- JSONObject confJson = new JSONObject();
- Iterator<Entry<String, String>> iter = conf.iterator();
- while (iter.hasNext()) {
- Entry<String, String> entry = iter.next();
- confJson.put(entry.getKey(), entry.getValue());
- }
- jsonObject.put("config", confJson);
- }
- } catch (JSONException e) {
- throw new IOException("Error when trying to convert description/conf to JSON", e);
- }
- return jsonObject.toString();
- }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1681252&r1=1681251&r2=1681252&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Sat May 23 00:24:04 2015
@@ -23,6 +23,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,8 +36,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
/**
* This is compiler class that takes a TezOperPlan and converts it into a
@@ -106,6 +110,8 @@ public class TezJobCompiler {
log.info("Local resource: " + entry.getKey());
}
DAG tezDag = buildDAG(tezPlanNode, localResources);
+ String script = new String(Base64.decodeBase64(TezScriptState.get().getScript()));
+ tezDag.setDAGInfo(createDagInfo(script));
return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
} catch (Exception e) {
int errCode = 2017;
@@ -113,5 +119,19 @@ public class TezJobCompiler {
throw new JobCreationException(msg, errCode, PigException.BUG, e);
}
}
+
+ private String createDagInfo(String script) throws IOException {
+ String dagInfo;
+ try {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("context", "Pig");
+ jsonObject.put("description", script);
+ dagInfo = jsonObject.toString();
+ } catch (JSONException e) {
+ throw new IOException("Error when trying to convert Pig script to JSON", e);
+ }
+ log.debug("DagInfo: " + dagInfo);
+ return dagInfo;
+ }
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1681252&r1=1681251&r2=1681252&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Sat May 23 00:24:04 2015
@@ -372,7 +372,7 @@ public abstract class ScriptState {
return (commandLine == null) ? "" : commandLine;
}
- protected String getScript() {
+ public String getScript() {
return (script == null) ? "" : script;
}