You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/10/14 19:21:08 UTC
svn commit: r1708654 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/
src/org/apache/pig/tools/pigstats/ src/org/apache/pig/tools/pigstats/tez/
Author: rohini
Date: Wed Oct 14 17:21:07 2015
New Revision: 1708654
URL: http://svn.apache.org/viewvc?rev=1708654&view=rev
Log:
PIG-4699: Print Job stats information in Tez like mapreduce (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct 14 17:21:07 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4699: Print Job stats information in Tez like mapreduce (rohini)
+
PIG-4554: Compress pig.script before encoding (sandyridgeracer via rohini)
PIG-4670: Embedded Python scripts still parse line by line (rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Oct 14 17:21:07 2015
@@ -297,7 +297,7 @@ public class TezDagBuilder extends TezOp
POStore store = tezOp.getVertexGroupInfo().getStore();
if (store != null) {
vertexGroup.addDataSink(store.getOperatorKey().toString(),
- new DataSinkDescriptor(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
+ DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials()));
}
}
@@ -627,7 +627,10 @@ public class TezDagBuilder extends TezOp
// Take our assembled configuration and create a vertex
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
TezDAGScriptInfo dagScriptInfo = TezScriptState.get().getDAGScriptInfo(dag.getName());
- String vertexInfo = dagScriptInfo.getAliasLocation(tezOp) + " (" + dagScriptInfo.getPigFeatures(tezOp) + ")" ;
+ String alias = dagScriptInfo.getAlias(tezOp);
+ String aliasLocation = dagScriptInfo.getAliasLocation(tezOp);
+ String features = dagScriptInfo.getPigFeatures(tezOp);
+ String vertexInfo = aliasLocation + " (" + features + ")" ;
procDesc.setUserPayload(userPayload).setHistoryText(TezUtils.convertToHistoryText(vertexInfo, payloadConf));
String vmPluginName = null;
@@ -744,6 +747,9 @@ public class TezDagBuilder extends TezOp
+ ", memory=" + vertex.getTaskResource().getMemory()
+ ", java opts=" + vertex.getTaskLaunchCmdOpts()
);
+ log.info("Processing aliases: " + alias);
+ log.info("Detailed locations: " + aliasLocation);
+ log.info("Pig features in the vertex: " + features);
// Right now there can only be one of each of these. Will need to be
// more generic when there can be more.
for (POLoad ld : tezOp.getLoaderInfo().getLoads()) {
@@ -829,7 +835,7 @@ public class TezDagBuilder extends TezOp
String outputKey = ((POStoreTez) store).getOutputKey();
if (!uniqueStoreOutputs.contains(outputKey)) {
vertex.addDataSink(outputKey.toString(),
- new DataSinkDescriptor(storeOutDescriptor,
+ DataSinkDescriptor.create(storeOutDescriptor,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
dag.getCredentials()));
uniqueStoreOutputs.add(outputKey);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Wed Oct 14 17:21:07 2015
@@ -223,10 +223,16 @@ public class TezJob implements Runnable
private class DAGStatusReporter extends TimerTask {
+ private String prevDAGStatus;
+
@Override
public void run() {
if (dagStatus == null) return;
- log.info("DAG Status: " + dagStatus.toString());
+ String currDAGStatus = dagStatus.toString();
+ if (!currDAGStatus.equals(prevDAGStatus)) {
+ log.info("DAG Status: " + currDAGStatus);
+ prevDAGStatus = currDAGStatus;
+ }
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Wed Oct 14 17:21:07 2015
@@ -98,21 +98,25 @@ public class TezPrinter extends TezOpPla
*/
public static class TezGraphPrinter extends TezOpPlanVisitor {
- StringBuffer buf;
+ StringBuilder buf;
public TezGraphPrinter(TezOperPlan plan) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan, true));
- buf = new StringBuffer();
+ buf = new StringBuilder();
}
@Override
public void visitTezOp(TezOperator tezOper) throws VisitorException {
+ writePlan(mPlan, tezOper, buf);
+ }
+
+ public static void writePlan(TezOperPlan plan, TezOperator tezOper, StringBuilder buf) {
if (tezOper.isVertexGroup()) {
buf.append("Tez vertex group " + tezOper.getOperatorKey().toString());
} else {
buf.append("Tez vertex " + tezOper.getOperatorKey().toString());
}
- List<TezOperator> succs = mPlan.getSuccessors(tezOper);
+ List<TezOperator> succs = plan.getSuccessors(tezOper);
if (succs != null) {
Collections.sort(succs);
buf.append("\t->\t");
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Wed Oct 14 17:21:07 2015
@@ -44,6 +44,11 @@ public class PigStatsUtil {
= "HDFS_BYTES_WRITTEN";
public static final String HDFS_BYTES_READ
= "HDFS_BYTES_READ";
+ public static final String FILE_BYTES_WRITTEN
+ = "FILE_BYTES_WRITTEN";
+ public static final String FILE_BYTES_READ
+ = "FILE_BYTES_READ";
+
public static final String MULTI_INPUTS_RECORD_COUNTER
= "Input records from ";
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Wed Oct 14 17:21:07 2015
@@ -33,6 +33,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -41,6 +43,7 @@ import org.apache.pig.tools.pigstats.Job
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.CounterGroup;
@@ -67,9 +70,20 @@ public class TezDAGStats extends JobStat
public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName();
public static final String PIG_COUNTER_GROUP = org.apache.pig.PigCounters.class.getName();
+ public static final String SUCCESS_HEADER = String.format("VertexId Parallelism TotalTasks"
+ + " %1$14s %2$14s %3$14s %4$16s %5$14s %6$16s"
+ + " Alias\tFeature\tOutputs",
+ "InputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
+
+ public static final String FAILURE_HEADER = String.format("VertexId State Parallelism TotalTasks"
+ + " %1$14s %2$14s %3$14s %4$16s %5$14s %6$16s"
+ + " Alias\tFeature\tOutputs",
+ "InputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
+
private Map<String, TezVertexStats> tezVertexStatsMap;
private String appId;
+ private StringBuilder tezDAGPlan;
private int totalTasks = -1;
private long fileBytesRead = -1;
@@ -97,31 +111,35 @@ public class TezDAGStats extends JobStat
/**
* This class builds the graph of a Tez DAG vertices.
*/
- static class JobGraphBuilder extends TezOpPlanVisitor {
+ static class TezDAGStatsBuilder extends TezOpPlanVisitor {
+ private TezPlanContainerNode tezPlanNode;
private JobGraph jobPlan;
private Map<String, TezVertexStats> tezVertexStatsMap;
private List<TezVertexStats> vertexStatsToBeRemoved;
private TezDAGScriptInfo dagScriptInfo;
+ private StringBuilder tezDAGPlan;
- public JobGraphBuilder(TezOperPlan plan, TezDAGScriptInfo dagScriptInfo) {
- super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
- tezVertexStatsMap = new HashMap<String, TezVertexStats>();
- vertexStatsToBeRemoved = new ArrayList<TezVertexStats>();
- jobPlan = new JobGraph();
+ public TezDAGStatsBuilder(TezPlanContainerNode tezPlanNode, TezDAGScriptInfo dagScriptInfo) {
+ super(tezPlanNode.getTezOperPlan(), new DependencyOrderWalker<TezOperator, TezOperPlan>(tezPlanNode.getTezOperPlan()));
+ this.tezPlanNode = tezPlanNode;
+ this.tezVertexStatsMap = new HashMap<String, TezVertexStats>();
+ this.vertexStatsToBeRemoved = new ArrayList<TezVertexStats>();
+ this.jobPlan = new JobGraph();
+ this.tezDAGPlan = new StringBuilder();
this.dagScriptInfo = dagScriptInfo;
}
- public Map<String, TezVertexStats> getTezVertexStatsMap() {
- return tezVertexStatsMap;
- }
-
- public JobGraph getJobPlan() {
- return jobPlan;
+ public TezDAGStats build() throws VisitorException {
+ visit();
+ TezDAGStats dagStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobPlan, tezVertexStatsMap, tezDAGPlan);
+ dagStats.setAlias(dagScriptInfo);
+ return dagStats;
}
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ TezPrinter.TezGraphPrinter.writePlan(mPlan, tezOp, tezDAGPlan);
TezVertexStats currStats =
new TezVertexStats(tezOp.getOperatorKey().toString(), jobPlan, tezOp.isUseMRMapSettings());
jobPlan.add(currStats);
@@ -135,7 +153,7 @@ public class TezDAGStats extends JobStat
}
}
- // Remove VertexGroups (union) from JobGraph since they're not
+ // Remove VertexGroups (union) from JobGraph since they're not
// materialized as real vertices by Tez.
if (tezOp.isVertexGroup()) {
vertexStatsToBeRemoved.add(currStats);
@@ -162,9 +180,10 @@ public class TezDAGStats extends JobStat
}
- protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap) {
+ protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap, StringBuilder tezDAGPlan) {
super(name, plan);
this.tezVertexStatsMap = tezVertexStatsMap;
+ this.tezDAGPlan = tezDAGPlan;
}
public TezVertexStats getVertexStats(String vertexName) {
@@ -195,10 +214,10 @@ public class TezDAGStats extends JobStat
totalTasks = (int) dagGrp.findCounter("TOTAL_LAUNCHED_TASKS").getValue();
CounterGroup fsGrp = tezCounters.getGroup(FS_COUNTER_GROUP);
- fileBytesRead = fsGrp.findCounter("FILE_BYTES_READ").getValue();
- fileBytesWritten = fsGrp.findCounter("FILE_BYTES_WRITTEN").getValue();
- hdfsBytesRead = fsGrp.findCounter("HDFS_BYTES_READ").getValue();
- hdfsBytesWritten = fsGrp.findCounter("HDFS_BYTES_WRITTEN").getValue();
+ fileBytesRead = fsGrp.findCounter(PigStatsUtil.FILE_BYTES_READ).getValue();
+ fileBytesWritten = fsGrp.findCounter(PigStatsUtil.FILE_BYTES_WRITTEN).getValue();
+ hdfsBytesRead = fsGrp.findCounter(PigStatsUtil.HDFS_BYTES_READ).getValue();
+ hdfsBytesWritten = fsGrp.findCounter(PigStatsUtil.HDFS_BYTES_WRITTEN).getValue();
} else {
LOG.warn("Failed to get counters for DAG: " + dag.getName());
}
@@ -300,21 +319,53 @@ public class TezDAGStats extends JobStat
public String getDisplayString() {
StringBuilder sb = new StringBuilder();
- sb.append("DAG " + name + ":\n");
- sb.append(String.format("%1$20s: %2$-100s%n", "ApplicationId",
+ sb.append(String.format("%1$40s: %2$-100s%n", "Name",
+ name));
+ sb.append(String.format("%1$40s: %2$-100s%n", "ApplicationId",
appId));
- sb.append(String.format("%1$20s: %2$-100s%n", "TotalLaunchedTasks",
+ sb.append(String.format("%1$40s: %2$-100s%n", "TotalLaunchedTasks",
totalTasks));
- sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesRead",
+ sb.append(String.format("%1$40s: %2$-100s%n", "FileBytesRead",
fileBytesRead));
- sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesWritten",
+ sb.append(String.format("%1$40s: %2$-100s%n", "FileBytesWritten",
fileBytesWritten));
- sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesRead",
+ sb.append(String.format("%1$40s: %2$-100s%n", "HdfsBytesRead",
hdfsBytesRead));
- sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesWritten",
+ sb.append(String.format("%1$40s: %2$-100s%n", "HdfsBytesWritten",
hdfsBytesWritten));
+ sb.append(String.format("%1$40s: %2$-100s%n", "SpillableMemoryManager spill count",
+ spillCount));
+ sb.append(String.format("%1$40s: %2$-100s%n", "Bags proactively spilled",
+ activeSpillCountObj));
+ sb.append(String.format("%1$40s: %2$-100s%n", "Records proactively spilled",
+ activeSpillCountRecs));
+
+
+ sb.append("\nDAG Plan:\n");
+ sb.append(tezDAGPlan);
+
+ List<JobStats> success = ((JobGraph)getPlan()).getSuccessfulJobs();
+ List<JobStats> failed = ((JobGraph)getPlan()).getFailedJobs();
+
+ if (success != null && !success.isEmpty()) {
+ sb.append("\nVertex Stats:\n");
+ sb.append(SUCCESS_HEADER).append("\n");
+ for (JobStats js : success) {
+ sb.append(js.getDisplayString());
+ }
+ }
+
+ if (failed != null && !failed.isEmpty()) {
+ sb.append("\nFailed vertices:\n");
+ sb.append(FAILURE_HEADER).append("\n");
+ for (JobStats js : failed) {
+ sb.append(js.getDisplayString());
+ }
+ sb.append("\n");
+ }
+
return sb.toString();
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Wed Oct 14 17:21:07 2015
@@ -75,10 +75,8 @@ public class TezPigScriptStats extends P
public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanNode) throws VisitorException {
TezScriptState ss = TezScriptState.get();
TezDAGScriptInfo dagScriptInfo = ss.setDAGScriptInfo(tezPlanNode);
- TezDAGStats.JobGraphBuilder jobGraphBuilder = new TezDAGStats.JobGraphBuilder(tezPlanNode.getTezOperPlan(), dagScriptInfo);
- jobGraphBuilder.visit();
- TezDAGStats currStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobGraphBuilder.getJobPlan(), jobGraphBuilder.getTezVertexStatsMap());
- currStats.setAlias(dagScriptInfo);
+ TezDAGStats.TezDAGStatsBuilder builder = new TezDAGStats.TezDAGStatsBuilder(tezPlanNode, dagScriptInfo);
+ TezDAGStats currStats = builder.build();
jobPlan.add(currStats);
List<TezPlanContainerNode> preds = getPlan().getPredecessors(tezPlanNode);
if (preds != null) {
@@ -111,7 +109,11 @@ public class TezPigScriptStats extends P
public void finish() {
super.stop();
- display();
+ try {
+ display();
+ } catch (Throwable e) {
+ LOG.warn("Exception while displaying stats:", e);
+ }
}
private void display() {
@@ -151,7 +153,10 @@ public class TezPigScriptStats extends P
}
}
+ int count = 0;
for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+ sb.append("\n");
+ sb.append("DAG " + count++ + ":\n");
sb.append(dagStats.getDisplayString());
sb.append("\n");
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed Oct 14 17:21:07 2015
@@ -49,6 +49,7 @@ import org.apache.tez.common.counters.Ta
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.VertexStatus.State;
import com.google.common.collect.Maps;
@@ -62,24 +63,23 @@ public class TezVertexStats extends JobS
private boolean isMapOpts;
private int parallelism;
+ private State vertexState;
// CounterGroup, Counter, Value
private Map<String, Map<String, Long>> counters = null;
private List<POStore> stores = null;
private List<FileSpec> loads = null;
- private int numberMaps = 0;
- private int numberReduces = 0;
-
- private long mapInputRecords = 0;
- private long mapOutputRecords = 0;
- private long reduceInputRecords = 0;
- private long reduceOutputRecords = 0;
+ private int numTasks = 0;
+ private long numInputRecords = 0;
+ private long numOutputRecords = 0;
+ private long fileBytesRead = 0;
+ private long fileBytesWritten = 0;
private long spillCount = 0;
private long activeSpillCountObj = 0;
private long activeSpillCountRecs = 0;
- private HashMap<String, Long> multiStoreCounters
+ private Map<String, Long> multiStoreCounters
= new HashMap<String, Long>();
public TezVertexStats(String name, JobGraph plan, boolean isMapOpts) {
@@ -103,13 +103,24 @@ public class TezVertexStats extends JobS
@Override
public String getDisplayString() {
StringBuilder sb = new StringBuilder();
- sb.append(String.format("%1$20s: %2$-100s%n", "VertexName", name));
- if (getAlias() != null && !getAlias().isEmpty()) {
- sb.append(String.format("%1$20s: %2$-100s%n", "Alias", getAlias()));
+ sb.append(String.format("%-10s ", name));
+ if (state == JobState.FAILED) {
+ sb.append(vertexState.name());
}
- if (getFeature() != null && !getFeature().isEmpty()) {
- sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeature()));
+ sb.append(String.format("%9s ", parallelism));
+ sb.append(String.format("%10s ", numTasks));
+ sb.append(String.format("%14s ", numInputRecords));
+ sb.append(String.format("%14s ", numOutputRecords));
+ sb.append(String.format("%14s ", fileBytesRead));
+ sb.append(String.format("%16s ", fileBytesWritten));
+ sb.append(String.format("%14s ", hdfsBytesRead));
+ sb.append(String.format("%16s ", hdfsBytesWritten));
+ sb.append(getAlias()).append("\t");
+ sb.append(getFeature()).append("\t");
+ for (OutputStats os : outputs) {
+ sb.append(os.getLocation()).append(",");
}
+ sb.append("\n");
return sb.toString();
}
@@ -138,17 +149,12 @@ public class TezVertexStats extends JobS
}
public void accumulateStats(VertexStatus status, int parallelism) {
- hdfsBytesRead = -1;
- hdfsBytesWritten = -1;
if (status != null) {
setSuccessful(status.getState().equals(VertexStatus.State.SUCCEEDED));
- this.parallelism = parallelism;
- if (this.isMapOpts) {
- numberMaps += parallelism;
- } else {
- numberReduces += parallelism;
- }
+ this.vertexState = status.getState();
+ this.parallelism = parallelism; //compile time parallelism
+ this.numTasks = status.getProgress().getTotalTaskCount(); //run time parallelism
TezCounters tezCounters = status.getVertexCounters();
counters = Maps.newHashMap();
Iterator<CounterGroup> grpIt = tezCounters.iterator();
@@ -163,14 +169,22 @@ public class TezVertexStats extends JobS
counters.put(grp.getName(), cntMap);
}
- if (counters.get(FS_COUNTER_GROUP) != null &&
- counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
- hdfsBytesRead = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
- }
- if (counters.get(FS_COUNTER_GROUP) != null &&
- counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
- hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+ Map<String, Long> fsCounters = counters.get(FS_COUNTER_GROUP);
+ if (fsCounters != null) {
+ if (fsCounters.containsKey(PigStatsUtil.HDFS_BYTES_READ)) {
+ this.hdfsBytesRead = fsCounters.get(PigStatsUtil.HDFS_BYTES_READ);
+ }
+ if (fsCounters.containsKey(PigStatsUtil.HDFS_BYTES_WRITTEN)) {
+ this.hdfsBytesWritten = fsCounters.get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+ }
+ if (fsCounters.containsKey(PigStatsUtil.FILE_BYTES_READ)) {
+ this.fileBytesRead = fsCounters.get(PigStatsUtil.FILE_BYTES_READ);
+ }
+ if (fsCounters.containsKey(PigStatsUtil.FILE_BYTES_WRITTEN)) {
+ this.fileBytesWritten = fsCounters.get(PigStatsUtil.FILE_BYTES_WRITTEN);
+ }
}
+
Map<String, Long> pigCounters = counters.get(PIG_COUNTER_GROUP);
if (pigCounters != null) {
if (pigCounters.containsKey(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)) {
@@ -202,6 +216,7 @@ public class TezVertexStats extends JobS
return;
}
+ // There is always only one load in a Tez vertex
for (FileSpec fs : loads) {
long records = -1;
long hdfsBytesRead = -1;
@@ -211,11 +226,7 @@ public class TezVertexStats extends JobS
if (taskCounter != null
&& taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
records = taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
- if (this.isMapOpts) {
- mapInputRecords += records;
- } else {
- reduceInputRecords += records;
- }
+ numInputRecords = records;
}
if (counters.get(FS_COUNTER_GROUP) != null &&
counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
@@ -254,11 +265,7 @@ public class TezVertexStats extends JobS
records = counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
}
if (records != -1) {
- if (this.isMapOpts) {
- mapOutputRecords += records;
- } else {
- reduceOutputRecords += records;
- }
+ numOutputRecords += records;
}
}
/* TODO: Need to check FILE_BYTES_WRITTEN for local mode */
@@ -284,13 +291,13 @@ public class TezVertexStats extends JobS
@Override
@Deprecated
public int getNumberMaps() {
- return numberMaps;
+ return this.isMapOpts ? numTasks : -1;
}
@Override
@Deprecated
public int getNumberReduces() {
- return numberReduces;
+ return this.isMapOpts ? -1 : numTasks;
}
@Override
@@ -332,25 +339,25 @@ public class TezVertexStats extends JobS
@Override
@Deprecated
public long getMapInputRecords() {
- return mapInputRecords;
+ return this.isMapOpts ? numInputRecords : -1;
}
@Override
@Deprecated
public long getMapOutputRecords() {
- return mapOutputRecords;
+ return this.isMapOpts ? numOutputRecords : -1;
}
@Override
@Deprecated
public long getReduceInputRecords() {
- return reduceInputRecords;
+ return this.isMapOpts ? -1 : numInputRecords;
}
@Override
@Deprecated
public long getReduceOutputRecords() {
- return reduceOutputRecords;
+ return this.isMapOpts ? -1 : numOutputRecords;
}
@Override