You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2015/10/14 19:21:08 UTC

svn commit: r1708654 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/apache/pig/tools/pigstats/ src/org/apache/pig/tools/pigstats/tez/

Author: rohini
Date: Wed Oct 14 17:21:07 2015
New Revision: 1708654

URL: http://svn.apache.org/viewvc?rev=1708654&view=rev
Log:
PIG-4699: Print Job stats information in Tez like mapreduce (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct 14 17:21:07 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4699: Print Job stats information in Tez like mapreduce (rohini)
+
 PIG-4554: Compress pig.script before encoding (sandyridgeracer via rohini)
 
 PIG-4670: Embedded Python scripts still parse line by line (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Oct 14 17:21:07 2015
@@ -297,7 +297,7 @@ public class TezDagBuilder extends TezOp
                 POStore store = tezOp.getVertexGroupInfo().getStore();
                 if (store != null) {
                     vertexGroup.addDataSink(store.getOperatorKey().toString(),
-                            new DataSinkDescriptor(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
+                            DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
                             OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials()));
                 }
             }
@@ -627,7 +627,10 @@ public class TezDagBuilder extends TezOp
         // Take our assembled configuration and create a vertex
         UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
         TezDAGScriptInfo dagScriptInfo = TezScriptState.get().getDAGScriptInfo(dag.getName());
-        String vertexInfo = dagScriptInfo.getAliasLocation(tezOp) + " (" + dagScriptInfo.getPigFeatures(tezOp) + ")" ;
+        String alias = dagScriptInfo.getAlias(tezOp);
+        String aliasLocation = dagScriptInfo.getAliasLocation(tezOp);
+        String features = dagScriptInfo.getPigFeatures(tezOp);
+        String vertexInfo = aliasLocation + " (" + features + ")" ;
         procDesc.setUserPayload(userPayload).setHistoryText(TezUtils.convertToHistoryText(vertexInfo, payloadConf));
 
         String vmPluginName = null;
@@ -744,6 +747,9 @@ public class TezDagBuilder extends TezOp
                 + ", memory=" + vertex.getTaskResource().getMemory()
                 + ", java opts=" + vertex.getTaskLaunchCmdOpts()
                 );
+        log.info("Processing aliases: " + alias);
+        log.info("Detailed locations: " + aliasLocation);
+        log.info("Pig features in the vertex: " + features);
         // Right now there can only be one of each of these. Will need to be
         // more generic when there can be more.
         for (POLoad ld : tezOp.getLoaderInfo().getLoads()) {
@@ -829,7 +835,7 @@ public class TezDagBuilder extends TezOp
             String outputKey = ((POStoreTez) store).getOutputKey();
             if (!uniqueStoreOutputs.contains(outputKey)) {
                 vertex.addDataSink(outputKey.toString(),
-                        new DataSinkDescriptor(storeOutDescriptor,
+                        DataSinkDescriptor.create(storeOutDescriptor,
                         OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
                         dag.getCredentials()));
                 uniqueStoreOutputs.add(outputKey);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Wed Oct 14 17:21:07 2015
@@ -223,10 +223,16 @@ public class TezJob implements Runnable
 
     private class DAGStatusReporter extends TimerTask {
 
+        private String prevDAGStatus;
+
         @Override
         public void run() {
             if (dagStatus == null) return;
-            log.info("DAG Status: " + dagStatus.toString());
+            String currDAGStatus = dagStatus.toString();
+            if (!currDAGStatus.equals(prevDAGStatus)) {
+                log.info("DAG Status: " + currDAGStatus);
+                prevDAGStatus = currDAGStatus;
+            }
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Wed Oct 14 17:21:07 2015
@@ -98,21 +98,25 @@ public class TezPrinter extends TezOpPla
      */
     public static class TezGraphPrinter extends TezOpPlanVisitor {
 
-        StringBuffer buf;
+        StringBuilder buf;
 
         public TezGraphPrinter(TezOperPlan plan) {
             super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan, true));
-            buf = new StringBuffer();
+            buf = new StringBuilder();
         }
 
         @Override
         public void visitTezOp(TezOperator tezOper) throws VisitorException {
+            writePlan(mPlan, tezOper, buf);
+        }
+
+        public static void writePlan(TezOperPlan plan, TezOperator tezOper, StringBuilder buf) {
             if (tezOper.isVertexGroup()) {
                 buf.append("Tez vertex group " + tezOper.getOperatorKey().toString());
             } else {
                 buf.append("Tez vertex " + tezOper.getOperatorKey().toString());
             }
-            List<TezOperator> succs = mPlan.getSuccessors(tezOper);
+            List<TezOperator> succs = plan.getSuccessors(tezOper);
             if (succs != null) {
                 Collections.sort(succs);
                 buf.append("\t->\t");

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Wed Oct 14 17:21:07 2015
@@ -44,6 +44,11 @@ public class PigStatsUtil {
             = "HDFS_BYTES_WRITTEN";
     public static final String HDFS_BYTES_READ
             = "HDFS_BYTES_READ";
+    public static final String FILE_BYTES_WRITTEN
+            = "FILE_BYTES_WRITTEN";
+    public static final String FILE_BYTES_READ
+            = "FILE_BYTES_READ";
+
 
     public static final String MULTI_INPUTS_RECORD_COUNTER
             = "Input records from ";

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Wed Oct 14 17:21:07 2015
@@ -33,6 +33,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -41,6 +43,7 @@ import org.apache.pig.tools.pigstats.Job
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.CounterGroup;
@@ -67,9 +70,20 @@ public class TezDAGStats extends JobStat
     public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName();
     public static final String PIG_COUNTER_GROUP = org.apache.pig.PigCounters.class.getName();
 
+    public static final String SUCCESS_HEADER = String.format("VertexId Parallelism TotalTasks"
+            + " %1$14s %2$14s %3$14s %4$16s %5$14s %6$16s"
+            + " Alias\tFeature\tOutputs",
+            "InputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
+
+    public static final String FAILURE_HEADER = String.format("VertexId  State Parallelism TotalTasks"
+            + " %1$14s %2$14s %3$14s %4$16s %5$14s %6$16s"
+            + " Alias\tFeature\tOutputs",
+            "InputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
+
     private Map<String, TezVertexStats> tezVertexStatsMap;
 
     private String appId;
+    private StringBuilder tezDAGPlan;
 
     private int totalTasks = -1;
     private long fileBytesRead = -1;
@@ -97,31 +111,35 @@ public class TezDAGStats extends JobStat
     /**
      * This class builds the graph of a Tez DAG vertices.
      */
-    static class JobGraphBuilder extends TezOpPlanVisitor {
+    static class TezDAGStatsBuilder extends TezOpPlanVisitor {
 
+        private TezPlanContainerNode tezPlanNode;
         private JobGraph jobPlan;
         private Map<String, TezVertexStats> tezVertexStatsMap;
         private List<TezVertexStats> vertexStatsToBeRemoved;
         private TezDAGScriptInfo dagScriptInfo;
+        private StringBuilder tezDAGPlan;
 
-        public JobGraphBuilder(TezOperPlan plan, TezDAGScriptInfo dagScriptInfo) {
-            super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
-            tezVertexStatsMap = new HashMap<String, TezVertexStats>();
-            vertexStatsToBeRemoved = new ArrayList<TezVertexStats>();
-            jobPlan = new JobGraph();
+        public TezDAGStatsBuilder(TezPlanContainerNode tezPlanNode, TezDAGScriptInfo dagScriptInfo) {
+            super(tezPlanNode.getTezOperPlan(), new DependencyOrderWalker<TezOperator, TezOperPlan>(tezPlanNode.getTezOperPlan()));
+            this.tezPlanNode = tezPlanNode;
+            this.tezVertexStatsMap = new HashMap<String, TezVertexStats>();
+            this.vertexStatsToBeRemoved = new ArrayList<TezVertexStats>();
+            this.jobPlan = new JobGraph();
+            this.tezDAGPlan = new StringBuilder();
             this.dagScriptInfo = dagScriptInfo;
         }
 
-        public Map<String, TezVertexStats> getTezVertexStatsMap() {
-            return tezVertexStatsMap;
-        }
-
-        public JobGraph getJobPlan() {
-            return jobPlan;
+        public TezDAGStats build() throws VisitorException {
+            visit();
+            TezDAGStats dagStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobPlan, tezVertexStatsMap, tezDAGPlan);
+            dagStats.setAlias(dagScriptInfo);
+            return dagStats;
         }
 
         @Override
         public void visitTezOp(TezOperator tezOp) throws VisitorException {
+            TezPrinter.TezGraphPrinter.writePlan(mPlan, tezOp, tezDAGPlan);
             TezVertexStats currStats =
                     new TezVertexStats(tezOp.getOperatorKey().toString(), jobPlan, tezOp.isUseMRMapSettings());
             jobPlan.add(currStats);
@@ -135,7 +153,7 @@ public class TezDAGStats extends JobStat
                 }
             }
 
-         // Remove VertexGroups (union) from JobGraph since they're not
+            // Remove VertexGroups (union) from JobGraph since they're not
             // materialized as real vertices by Tez.
             if (tezOp.isVertexGroup()) {
                 vertexStatsToBeRemoved.add(currStats);
@@ -162,9 +180,10 @@ public class TezDAGStats extends JobStat
 
     }
 
-    protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap) {
+    protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap, StringBuilder tezDAGPlan) {
         super(name, plan);
         this.tezVertexStatsMap = tezVertexStatsMap;
+        this.tezDAGPlan = tezDAGPlan;
     }
 
     public TezVertexStats getVertexStats(String vertexName) {
@@ -195,10 +214,10 @@ public class TezDAGStats extends JobStat
             totalTasks = (int) dagGrp.findCounter("TOTAL_LAUNCHED_TASKS").getValue();
 
             CounterGroup fsGrp = tezCounters.getGroup(FS_COUNTER_GROUP);
-            fileBytesRead = fsGrp.findCounter("FILE_BYTES_READ").getValue();
-            fileBytesWritten = fsGrp.findCounter("FILE_BYTES_WRITTEN").getValue();
-            hdfsBytesRead = fsGrp.findCounter("HDFS_BYTES_READ").getValue();
-            hdfsBytesWritten = fsGrp.findCounter("HDFS_BYTES_WRITTEN").getValue();
+            fileBytesRead = fsGrp.findCounter(PigStatsUtil.FILE_BYTES_READ).getValue();
+            fileBytesWritten = fsGrp.findCounter(PigStatsUtil.FILE_BYTES_WRITTEN).getValue();
+            hdfsBytesRead = fsGrp.findCounter(PigStatsUtil.HDFS_BYTES_READ).getValue();
+            hdfsBytesWritten = fsGrp.findCounter(PigStatsUtil.HDFS_BYTES_WRITTEN).getValue();
         } else {
             LOG.warn("Failed to get counters for DAG: " + dag.getName());
         }
@@ -300,21 +319,53 @@ public class TezDAGStats extends JobStat
     public String getDisplayString() {
             StringBuilder sb = new StringBuilder();
 
-            sb.append("DAG " + name + ":\n");
-            sb.append(String.format("%1$20s: %2$-100s%n", "ApplicationId",
+            sb.append(String.format("%1$40s: %2$-100s%n", "Name",
+                    name));
+            sb.append(String.format("%1$40s: %2$-100s%n", "ApplicationId",
                     appId));
-            sb.append(String.format("%1$20s: %2$-100s%n", "TotalLaunchedTasks",
+            sb.append(String.format("%1$40s: %2$-100s%n", "TotalLaunchedTasks",
                     totalTasks));
 
-            sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesRead",
+            sb.append(String.format("%1$40s: %2$-100s%n", "FileBytesRead",
                     fileBytesRead));
-            sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesWritten",
+            sb.append(String.format("%1$40s: %2$-100s%n", "FileBytesWritten",
                     fileBytesWritten));
-            sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesRead",
+            sb.append(String.format("%1$40s: %2$-100s%n", "HdfsBytesRead",
                     hdfsBytesRead));
-            sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesWritten",
+            sb.append(String.format("%1$40s: %2$-100s%n", "HdfsBytesWritten",
                     hdfsBytesWritten));
 
+            sb.append(String.format("%1$40s: %2$-100s%n", "SpillableMemoryManager spill count",
+                    spillCount));
+            sb.append(String.format("%1$40s: %2$-100s%n", "Bags proactively spilled",
+                    activeSpillCountObj));
+            sb.append(String.format("%1$40s: %2$-100s%n", "Records proactively spilled",
+                    activeSpillCountRecs));
+
+
+            sb.append("\nDAG Plan:\n");
+            sb.append(tezDAGPlan);
+
+            List<JobStats> success = ((JobGraph)getPlan()).getSuccessfulJobs();
+            List<JobStats> failed = ((JobGraph)getPlan()).getFailedJobs();
+
+            if (success != null && !success.isEmpty()) {
+                sb.append("\nVertex Stats:\n");
+                sb.append(SUCCESS_HEADER).append("\n");
+                for (JobStats js : success) {
+                    sb.append(js.getDisplayString());
+                }
+            }
+
+            if (failed != null && !failed.isEmpty()) {
+                sb.append("\nFailed vertices:\n");
+                sb.append(FAILURE_HEADER).append("\n");
+                for (JobStats js : failed) {
+                    sb.append(js.getDisplayString());
+                }
+                sb.append("\n");
+            }
+
             return sb.toString();
     }
 

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Wed Oct 14 17:21:07 2015
@@ -75,10 +75,8 @@ public class TezPigScriptStats extends P
         public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanNode) throws VisitorException {
             TezScriptState ss = TezScriptState.get();
             TezDAGScriptInfo dagScriptInfo = ss.setDAGScriptInfo(tezPlanNode);
-            TezDAGStats.JobGraphBuilder jobGraphBuilder = new TezDAGStats.JobGraphBuilder(tezPlanNode.getTezOperPlan(), dagScriptInfo);
-            jobGraphBuilder.visit();
-            TezDAGStats currStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobGraphBuilder.getJobPlan(), jobGraphBuilder.getTezVertexStatsMap());
-            currStats.setAlias(dagScriptInfo);
+            TezDAGStats.TezDAGStatsBuilder builder = new TezDAGStats.TezDAGStatsBuilder(tezPlanNode, dagScriptInfo);
+            TezDAGStats currStats = builder.build();
             jobPlan.add(currStats);
             List<TezPlanContainerNode> preds = getPlan().getPredecessors(tezPlanNode);
             if (preds != null) {
@@ -111,7 +109,11 @@ public class TezPigScriptStats extends P
 
     public void finish() {
         super.stop();
-        display();
+        try {
+            display();
+        } catch (Throwable e) {
+            LOG.warn("Exception while displaying stats:", e);
+        }
     }
 
     private void display() {
@@ -151,7 +153,10 @@ public class TezPigScriptStats extends P
             }
         }
 
+        int count = 0;
         for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+            sb.append("\n");
+            sb.append("DAG " + count++ + ":\n");
             sb.append(dagStats.getDisplayString());
             sb.append("\n");
         }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1708654&r1=1708653&r2=1708654&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed Oct 14 17:21:07 2015
@@ -49,6 +49,7 @@ import org.apache.tez.common.counters.Ta
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.VertexStatus.State;
 
 import com.google.common.collect.Maps;
 
@@ -62,24 +63,23 @@ public class TezVertexStats extends JobS
 
     private boolean isMapOpts;
     private int parallelism;
+    private State vertexState;
     // CounterGroup, Counter, Value
     private Map<String, Map<String, Long>> counters = null;
 
     private List<POStore> stores = null;
     private List<FileSpec> loads = null;
 
-    private int numberMaps = 0;
-    private int numberReduces = 0;
-
-    private long mapInputRecords = 0;
-    private long mapOutputRecords = 0;
-    private long reduceInputRecords = 0;
-    private long reduceOutputRecords = 0;
+    private int numTasks = 0;
+    private long numInputRecords = 0;
+    private long numOutputRecords = 0;
+    private long fileBytesRead = 0;
+    private long fileBytesWritten = 0;
     private long spillCount = 0;
     private long activeSpillCountObj = 0;
     private long activeSpillCountRecs = 0;
 
-    private HashMap<String, Long> multiStoreCounters
+    private Map<String, Long> multiStoreCounters
             = new HashMap<String, Long>();
 
     public TezVertexStats(String name, JobGraph plan, boolean isMapOpts) {
@@ -103,13 +103,24 @@ public class TezVertexStats extends JobS
     @Override
     public String getDisplayString() {
         StringBuilder sb = new StringBuilder();
-        sb.append(String.format("%1$20s: %2$-100s%n", "VertexName", name));
-        if (getAlias() != null && !getAlias().isEmpty()) {
-            sb.append(String.format("%1$20s: %2$-100s%n", "Alias", getAlias()));
+        sb.append(String.format("%-10s ", name));
+        if (state == JobState.FAILED) {
+            sb.append(vertexState.name());
         }
-        if (getFeature() != null && !getFeature().isEmpty()) {
-            sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeature()));
+        sb.append(String.format("%9s ", parallelism));
+        sb.append(String.format("%10s ", numTasks));
+        sb.append(String.format("%14s ", numInputRecords));
+        sb.append(String.format("%14s ", numOutputRecords));
+        sb.append(String.format("%14s ", fileBytesRead));
+        sb.append(String.format("%16s ", fileBytesWritten));
+        sb.append(String.format("%14s ", hdfsBytesRead));
+        sb.append(String.format("%16s ", hdfsBytesWritten));
+        sb.append(getAlias()).append("\t");
+        sb.append(getFeature()).append("\t");
+        for (OutputStats os : outputs) {
+            sb.append(os.getLocation()).append(",");
         }
+        sb.append("\n");
         return sb.toString();
     }
 
@@ -138,17 +149,12 @@ public class TezVertexStats extends JobS
     }
 
     public void accumulateStats(VertexStatus status, int parallelism) {
-        hdfsBytesRead = -1;
-        hdfsBytesWritten = -1;
 
         if (status != null) {
             setSuccessful(status.getState().equals(VertexStatus.State.SUCCEEDED));
-            this.parallelism = parallelism;
-            if (this.isMapOpts) {
-                numberMaps += parallelism;
-            } else {
-                numberReduces += parallelism;
-            }
+            this.vertexState = status.getState();
+            this.parallelism = parallelism; //compile time parallelism
+            this.numTasks = status.getProgress().getTotalTaskCount(); //run time parallelism
             TezCounters tezCounters = status.getVertexCounters();
             counters = Maps.newHashMap();
             Iterator<CounterGroup> grpIt = tezCounters.iterator();
@@ -163,14 +169,22 @@ public class TezVertexStats extends JobS
                 counters.put(grp.getName(), cntMap);
             }
 
-            if (counters.get(FS_COUNTER_GROUP) != null &&
-                    counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
-                hdfsBytesRead = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
-            }
-            if (counters.get(FS_COUNTER_GROUP) != null &&
-                    counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
-                hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+            Map<String, Long> fsCounters = counters.get(FS_COUNTER_GROUP);
+            if (fsCounters != null) {
+                if (fsCounters.containsKey(PigStatsUtil.HDFS_BYTES_READ)) {
+                    this.hdfsBytesRead = fsCounters.get(PigStatsUtil.HDFS_BYTES_READ);
+                }
+                if (fsCounters.containsKey(PigStatsUtil.HDFS_BYTES_WRITTEN)) {
+                    this.hdfsBytesWritten = fsCounters.get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+                }
+                if (fsCounters.containsKey(PigStatsUtil.FILE_BYTES_READ)) {
+                    this.fileBytesRead = fsCounters.get(PigStatsUtil.FILE_BYTES_READ);
+                }
+                if (fsCounters.containsKey(PigStatsUtil.FILE_BYTES_WRITTEN)) {
+                    this.fileBytesWritten = fsCounters.get(PigStatsUtil.FILE_BYTES_WRITTEN);
+                }
             }
+
             Map<String, Long> pigCounters = counters.get(PIG_COUNTER_GROUP);
             if (pigCounters != null) {
                 if (pigCounters.containsKey(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)) {
@@ -202,6 +216,7 @@ public class TezVertexStats extends JobS
             return;
         }
 
+        // There is always only one load in a Tez vertex
         for (FileSpec fs : loads) {
             long records = -1;
             long hdfsBytesRead = -1;
@@ -211,11 +226,7 @@ public class TezVertexStats extends JobS
                 if (taskCounter != null
                         && taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
                     records = taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
-                    if (this.isMapOpts) {
-                        mapInputRecords += records;
-                    } else {
-                        reduceInputRecords += records;
-                    }
+                    numInputRecords = records;
                 }
                 if (counters.get(FS_COUNTER_GROUP) != null &&
                         counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
@@ -254,11 +265,7 @@ public class TezVertexStats extends JobS
                     records = counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
                 }
                 if (records != -1) {
-                    if (this.isMapOpts) {
-                        mapOutputRecords += records;
-                    } else {
-                        reduceOutputRecords += records;
-                    }
+                    numOutputRecords += records;
                 }
             }
             /* TODO: Need to check FILE_BYTES_WRITTEN for local mode */
@@ -284,13 +291,13 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public int getNumberMaps() {
-        return numberMaps;
+        return this.isMapOpts ? numTasks : -1;
     }
 
     @Override
     @Deprecated
     public int getNumberReduces() {
-        return numberReduces;
+        return this.isMapOpts ? -1 : numTasks;
     }
 
     @Override
@@ -332,25 +339,25 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public long getMapInputRecords() {
-        return mapInputRecords;
+        return this.isMapOpts ? numInputRecords : -1;
     }
 
     @Override
     @Deprecated
     public long getMapOutputRecords() {
-        return mapOutputRecords;
+        return this.isMapOpts ? numOutputRecords : -1;
     }
 
     @Override
     @Deprecated
     public long getReduceInputRecords() {
-        return reduceInputRecords;
+        return this.isMapOpts ? -1 : numInputRecords;
     }
 
     @Override
     @Deprecated
     public long getReduceOutputRecords() {
-        return reduceOutputRecords;
+        return this.isMapOpts ? -1 : numOutputRecords;
     }
 
     @Override