You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/05/23 02:24:04 UTC

svn commit: r1681252 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java src/org/apache/pig/tools/pigstats/ScriptState.java

Author: daijy
Date: Sat May 23 00:24:04 2015
New Revision: 1681252

URL: http://svn.apache.org/r1681252
Log:
PIG-4429: Add Pig alias information and Pig script to the DAG view in Tez UI

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1681252&r1=1681251&r2=1681252&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat May 23 00:24:04 2015
@@ -32,6 +32,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4429: Add Pig alias information and Pig script to the DAG view in Tez UI (daijy)
+
 PIG-3994: Implement getting backend exception for Tez (rohini)
 
 PIG-4563: Upgrade to released Tez 0.7.0 (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1681252&r1=1681251&r2=1681252&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Sat May 23 00:24:04 2015
@@ -23,11 +23,9 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -132,6 +130,7 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -171,8 +170,6 @@ import org.apache.tez.runtime.library.in
 import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
 import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
 /**
  * A visitor to construct DAG out of Tez plan.
@@ -337,8 +334,7 @@ public class TezDagBuilder extends TezOp
         }
 
         return GroupInputEdge.create(from, to, edgeProperty,
-                InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload())
-                    .setHistoryText(edgeProperty.getEdgeDestination().getHistoryText()));
+                InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
     }
 
     /**
@@ -431,9 +427,8 @@ public class TezDagBuilder extends TezOp
 
         MRToTezHelper.processMRSettings(conf, globalConf);
 
-        String historyString = convertToHistoryText("", conf);
-        in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
-        out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
+        in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+        out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
@@ -666,7 +661,9 @@ public class TezDagBuilder extends TezOp
 
         // Take our assembled configuration and create a vertex
         UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
-        procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(), payloadConf));
+        TezDAGScriptInfo dagScriptInfo = TezScriptState.get().getDAGScriptInfo(dag.getName());
+        String vertexInfo = dagScriptInfo.getAliasLocation(tezOp) + " (" + dagScriptInfo.getPigFeatures(tezOp) + ")" ;
+        procDesc.setUserPayload(userPayload).setHistoryText(TezUtils.convertToHistoryText(vertexInfo, payloadConf));
 
         String vmPluginName = null;
         Configuration vmPluginConf = null;
@@ -827,8 +824,7 @@ public class TezDagBuilder extends TezOp
             vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()));
             vertex.addDataSource(ld.getOperatorKey().toString(),
                     DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
-                            .setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer()))
-                            .setHistoryText(convertToHistoryText("", payloadConf)),
+                            .setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer())),
                     InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
                     inputSplitInfo.getNumTasks(),
                     dag.getCredentials(),
@@ -852,8 +848,7 @@ public class TezDagBuilder extends TezOp
 
             OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
                     MROutput.class.getName()).setUserPayload(TezUtils
-                    .createUserPayloadFromConf(outputPayLoad))
-                    .setHistoryText(convertToHistoryText("", outputPayLoad));
+                    .createUserPayloadFromConf(outputPayLoad));
             if (tezOp.getVertexGroupStores() != null) {
                 OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
                 if (vertexGroupKey != null) {
@@ -888,8 +883,7 @@ public class TezDagBuilder extends TezOp
         if (vmPluginName != null) {
             VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName);
             if (vmPluginConf != null) {
-                vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf))
-                    .setHistoryText(convertToHistoryText(vmPluginName, vmPluginConf));
+                vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
             }
             vertex.setVertexManagerPlugin(vmPluginDescriptor);
         }
@@ -1184,27 +1178,4 @@ public class TezDagBuilder extends TezOp
             job.setOutputFormatClass(PigOutputFormatTez.class);
         }
     }
-
-    // Borrowed from TezUtils.convertToHistoryText since it is not part of Tez 0.5.2
-    public static String convertToHistoryText(String description, Configuration conf) throws IOException {
-        // Add a version if this serialization is changed
-        JSONObject jsonObject = new JSONObject();
-        try {
-            if (description != null && !description.isEmpty()) {
-                jsonObject.put("desc", description);
-        }
-        if (conf != null) {
-            JSONObject confJson = new JSONObject();
-            Iterator<Entry<String, String>> iter = conf.iterator();
-            while (iter.hasNext()) {
-                Entry<String, String> entry = iter.next();
-                confJson.put(entry.getKey(), entry.getValue());
-            }
-            jsonObject.put("config", confJson);
-        }
-        } catch (JSONException e) {
-            throw new IOException("Error when trying to convert description/conf to JSON", e);
-        }
-        return jsonObject.toString();
-    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1681252&r1=1681251&r2=1681252&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Sat May 23 00:24:04 2015
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,8 +36,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 
 /**
  * This is compiler class that takes a TezOperPlan and converts it into a
@@ -106,6 +110,8 @@ public class TezJobCompiler {
                 log.info("Local resource: " + entry.getKey());
             }
             DAG tezDag = buildDAG(tezPlanNode, localResources);
+            String script = new String(Base64.decodeBase64(TezScriptState.get().getScript()));
+            tezDag.setDAGInfo(createDagInfo(script));
             return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
         } catch (Exception e) {
             int errCode = 2017;
@@ -113,5 +119,19 @@ public class TezJobCompiler {
             throw new JobCreationException(msg, errCode, PigException.BUG, e);
         }
     }
+
+    private String createDagInfo(String script) throws IOException {
+        String dagInfo;
+        try {
+            JSONObject jsonObject = new JSONObject();
+            jsonObject.put("context", "Pig");
+            jsonObject.put("description", script);
+            dagInfo = jsonObject.toString();
+        } catch (JSONException e) {
+            throw new IOException("Error when trying to convert Pig script to JSON", e);
+        }
+        log.debug("DagInfo: " + dagInfo);
+        return dagInfo;
+    }
 }
 

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1681252&r1=1681251&r2=1681252&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Sat May 23 00:24:04 2015
@@ -372,7 +372,7 @@ public abstract class ScriptState {
         return (commandLine == null) ? "" : commandLine;
     }
 
-    protected String getScript() {
+    public String getScript() {
         return (script == null) ? "" : script;
     }