You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/11/08 23:41:16 UTC
svn commit: r1637608 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Author: daijy
Date: Sat Nov 8 22:41:15 2014
New Revision: 1637608
URL: http://svn.apache.org/r1637608
Log:
PIG-4224: Upload Tez payload history string to timeline server
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1637608&r1=1637607&r2=1637608&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Nov 8 22:41:15 2014
@@ -37,6 +37,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4224: Upload Tez payload history string to timeline server (daijy)
+
PIG-3977: Get TezStats working for Oozie (rohini)
PIG-3979: group all performance, garbage collection, and incremental aggregation (rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1637608&r1=1637607&r2=1637608&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Sat Nov 8 22:41:15 2014
@@ -22,9 +22,11 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -153,6 +155,8 @@ import org.apache.tez.runtime.library.ap
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
/**
* A visitor to construct DAG out of Tez plan.
@@ -279,7 +283,8 @@ public class TezDagBuilder extends TezOp
}
return GroupInputEdge.create(from, to, edgeProperty,
- InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
+ InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload())
+ .setHistoryText(edgeProperty.getEdgeDestination().getHistoryText()));
}
/**
@@ -370,8 +375,9 @@ public class TezDagBuilder extends TezOp
MRToTezHelper.processMRSettings(conf, globalConf);
- in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
- out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+ String historyString = convertToHistoryText("", conf);
+ in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
+ out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
@@ -597,7 +603,7 @@ public class TezDagBuilder extends TezOp
// Take our assembled configuration and create a vertex
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
- procDesc.setUserPayload(userPayload);
+ procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(), payloadConf));
Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(),
tezOp.isUseMRMapSettings() ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf));
@@ -636,7 +642,8 @@ public class TezDagBuilder extends TezOp
DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
.setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
- .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer())),
+ .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer()))
+ .setHistoryText(convertToHistoryText("", payloadConf)),
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), dag.getCredentials()));
}
@@ -654,7 +661,8 @@ public class TezDagBuilder extends TezOp
OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
MROutput.class.getName()).setUserPayload(TezUtils
- .createUserPayloadFromConf(outputPayLoad));
+ .createUserPayloadFromConf(outputPayLoad))
+ .setHistoryText(convertToHistoryText("", outputPayLoad));
if (tezOp.getVertexGroupStores() != null) {
OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
if (vertexGroupKey != null) {
@@ -743,7 +751,8 @@ public class TezDagBuilder extends TezOp
if (vmPluginName != null) {
VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName);
if (vmPluginConf != null) {
- vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
+ vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf))
+ .setHistoryText(convertToHistoryText(vmPluginName, vmPluginConf));
}
vertex.setVertexManagerPlugin(vmPluginDescriptor);
}
@@ -1038,4 +1047,27 @@ public class TezDagBuilder extends TezOp
job.setOutputFormatClass(PigOutputFormatTez.class);
}
}
+
+ // Borrowed from TezUtils.convertToHistoryText since it is not part of Tez 0.5.2
+ public static String convertToHistoryText(String description, Configuration conf) throws IOException {
+ // Add a version if this serialization is changed
+ JSONObject jsonObject = new JSONObject();
+ try {
+ if (description != null && !description.isEmpty()) {
+ jsonObject.put("desc", description);
+ }
+ if (conf != null) {
+ JSONObject confJson = new JSONObject();
+ Iterator<Entry<String, String>> iter = conf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ confJson.put(entry.getKey(), entry.getValue());
+ }
+ jsonObject.put("config", confJson);
+ }
+ } catch (JSONException e) {
+ throw new IOException("Error when trying to convert description/conf to JSON", e);
+ }
+ return jsonObject.toString();
+ }
}