You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/11/08 23:41:16 UTC

svn commit: r1637608 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Author: daijy
Date: Sat Nov  8 22:41:15 2014
New Revision: 1637608

URL: http://svn.apache.org/r1637608
Log:
PIG-4224: Upload Tez payload history string to timeline server

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1637608&r1=1637607&r2=1637608&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Nov  8 22:41:15 2014
@@ -37,6 +37,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4224: Upload Tez payload history string to timeline server (daijy)
+
 PIG-3977: Get TezStats working for Oozie (rohini)
 
 PIG-3979: group all performance, garbage collection, and incremental aggregation (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1637608&r1=1637607&r2=1637608&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Sat Nov  8 22:41:15 2014
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -153,6 +155,8 @@ import org.apache.tez.runtime.library.ap
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
 import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 
 /**
  * A visitor to construct DAG out of Tez plan.
@@ -279,7 +283,8 @@ public class TezDagBuilder extends TezOp
         }
 
         return GroupInputEdge.create(from, to, edgeProperty,
-                InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
+                InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload())
+                    .setHistoryText(edgeProperty.getEdgeDestination().getHistoryText()));
     }
 
     /**
@@ -370,8 +375,9 @@ public class TezDagBuilder extends TezOp
 
         MRToTezHelper.processMRSettings(conf, globalConf);
 
-        in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-        out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+        String historyString = convertToHistoryText("", conf);
+        in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
+        out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
@@ -597,7 +603,7 @@ public class TezDagBuilder extends TezOp
 
         // Take our assembled configuration and create a vertex
         UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
-        procDesc.setUserPayload(userPayload);
+        procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(), payloadConf));
 
         Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(),
                 tezOp.isUseMRMapSettings() ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf));
@@ -636,7 +642,8 @@ public class TezDagBuilder extends TezOp
                     DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
                           .setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
                           .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
-                          .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer())),
+                          .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer()))
+                          .setHistoryText(convertToHistoryText("", payloadConf)),
                     InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
         }
 
@@ -654,7 +661,8 @@ public class TezDagBuilder extends TezOp
 
             OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
                     MROutput.class.getName()).setUserPayload(TezUtils
-                    .createUserPayloadFromConf(outputPayLoad));
+                    .createUserPayloadFromConf(outputPayLoad))
+                    .setHistoryText(convertToHistoryText("", outputPayLoad));
             if (tezOp.getVertexGroupStores() != null) {
                 OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
                 if (vertexGroupKey != null) {
@@ -743,7 +751,8 @@ public class TezDagBuilder extends TezOp
         if (vmPluginName != null) {
             VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName);
             if (vmPluginConf != null) {
-                vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
+                vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf))
+                    .setHistoryText(convertToHistoryText(vmPluginName, vmPluginConf));
             }
             vertex.setVertexManagerPlugin(vmPluginDescriptor);
         }
@@ -1038,4 +1047,27 @@ public class TezDagBuilder extends TezOp
             job.setOutputFormatClass(PigOutputFormatTez.class);
         }
     }
+
+    // Borrowed from TezUtils.convertToHistoryText since it is not part of Tez 0.5.2
+    public static String convertToHistoryText(String description, Configuration conf) throws IOException {
+        // Add a version if this serialization is changed
+        JSONObject jsonObject = new JSONObject();
+        try {
+            if (description != null && !description.isEmpty()) {
+                jsonObject.put("desc", description);
+        }
+        if (conf != null) {
+            JSONObject confJson = new JSONObject();
+            Iterator<Entry<String, String>> iter = conf.iterator();
+            while (iter.hasNext()) {
+                Entry<String, String> entry = iter.next();
+                confJson.put(entry.getKey(), entry.getValue());
+            }
+            jsonObject.put("config", confJson);
+        }
+        } catch (JSONException e) {
+            throw new IOException("Error when trying to convert description/conf to JSON", e);
+        }
+        return jsonObject.toString();
+    }
 }