You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/11/07 22:08:27 UTC

svn commit: r1637447 [1/4] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/e...

Author: rohini
Date: Fri Nov  7 21:08:25 2014
New Revision: 1637447

URL: http://svn.apache.org/r1637447
Log:
PIG-3977: Get TezStats working for Oozie (rohini)

Added:
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
Removed:
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
    pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
    pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java
    pig/trunk/test/org/apache/pig/tez/TestLoaderStorerShipCacheFilesTez.java
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
    pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Nov  7 21:08:25 2014
@@ -37,6 +37,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-3977: Get TezStats working for Oozie (rohini)
+
 PIG-3979: group all performance, garbage collection, and incremental aggregation (rohini)
 
 PIG-4253: Add a UniqueID UDF (daijy)

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Fri Nov  7 21:08:25 2014
@@ -178,6 +178,7 @@ public class Main {
         boolean deleteTempFiles = true;
         String logFileName = null;
         boolean printScriptRunTime = true;
+        PigContext pigContext = null;
 
         try {
             Configuration conf = new Configuration(false);
@@ -380,7 +381,7 @@ public class Main {
             }
 
             // create the context with the parameter
-            PigContext pigContext = new PigContext(properties);
+            pigContext = new PigContext(properties);
 
             // create the static script state object
             ScriptState scriptState = pigContext.getExecutionEngine().instantiateScriptState();
@@ -669,6 +670,9 @@ public class Main {
                 // clear temp files
                 FileLocalizer.deleteTempFiles();
             }
+            if (pigContext != null) {
+                pigContext.getExecutionEngine().destroy();
+            }
             PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();
         }
 

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Nov  7 21:08:25 2014
@@ -231,13 +231,13 @@ public class PigServer {
         addJarsFromProperties();
         markPredeployedJarsFromProperties();
 
-        PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
-
         if (ScriptState.get() == null) {
             // If Pig was started via command line, ScriptState should have been
             // already initialized in Main. If so, we should not overwrite it.
             ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
         }
+        PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
+
     }
 
     private void addJarsFromProperties() throws ExecException {

Modified: pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Fri Nov  7 21:08:25 2014
@@ -193,4 +193,9 @@ public interface ExecutionEngine {
      */
     public void killJob(String jobID) throws BackendException;
 
+    /**
+     * Perform any cleanup operation
+     */
+    public void destroy();
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Nov  7 21:08:25 2014
@@ -377,4 +377,11 @@ public abstract class HExecutionEngine i
         }
     }
 
+    @Override
+    public void destroy() {
+        if (launcher != null) {
+            launcher.destroy();
+        }
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Fri Nov  7 21:08:25 2014
@@ -637,4 +637,7 @@ public abstract class Launcher {
         return new StackTraceElement(declaringClass, methodName, fileName,
                 lineNumber);
     }
+
+    public void destroy() {
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Nov  7 21:08:25 2014
@@ -119,6 +119,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -591,6 +592,9 @@ public class TezDagBuilder extends TezOp
         payloadConf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, tezOp.getRequestedParallelism());
         payloadConf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, tezOp.getEstimatedParallelism());
 
+        TezScriptState ss = TezScriptState.get();
+        ss.addVertexSettingsToConf(dag.getName(), tezOp, payloadConf);
+
         // Take our assembled configuration and create a vertex
         UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
         procDesc.setUserPayload(userPayload);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java Fri Nov  7 21:08:25 2014
@@ -24,8 +24,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
-import org.apache.pig.tools.pigstats.tez.TezStats;
 
 public class TezExecutionEngine extends HExecutionEngine {
 
@@ -43,6 +43,6 @@ public class TezExecutionEngine extends 
 
     @Override
     public PigStats instantiatePigStats() {
-        return new TezStats(pigContext);
+        return new TezPigScriptStats(pigContext);
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Nov  7 21:08:25 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -32,14 +31,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.CounterGroup;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.Progress;
@@ -64,11 +61,11 @@ public class TezJob implements Runnable 
     private TezClient tezClient;
     private boolean reuseSession;
     private TezCounters dagCounters;
-    // Vertex, CounterGroup, Counter, Value
-    private Map<String, Map<String, Map<String, Long>>> vertexCounters;
+
     // Timer for DAG status reporter
     private Timer timer;
     private TezJobConfig tezJobConf;
+    private TezPigScriptStats pigStats;
 
     public TezJob(TezConfiguration conf, DAG dag,
             Map<String, LocalResource> requestAMResources,
@@ -78,7 +75,6 @@ public class TezJob implements Runnable 
         this.requestAMResources = requestAMResources;
         this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true);
         this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-        this.vertexCounters = Maps.newHashMap();
         tezJobConf = new TezJobConfig(estimatedTotalParallelism);
     }
 
@@ -105,7 +101,7 @@ public class TezJob implements Runnable 
     }
 
     public String getName() {
-        return dag == null ? "" : dag.getName();
+        return dag.getName();
     }
 
     public Configuration getConfiguration() {
@@ -124,14 +120,6 @@ public class TezJob implements Runnable 
         return dagCounters;
     }
 
-    public Map<String, Map<String, Long>> getVertexCounters(String group) {
-        return vertexCounters.get(group);
-    }
-
-    public Map<String, Long> getVertexCounters(String group, String name) {
-        return vertexCounters.get(group).get(name);
-    }
-
     public float getDAGProgress() {
         Progress p = dagStatus.getDAGProgress();
         return p == null ? 0 : (float)p.getSucceededTaskCount() / (float)p.getTotalTaskCount();
@@ -147,6 +135,22 @@ public class TezJob implements Runnable 
         return vertexProgress;
     }
 
+    public VertexStatus getVertexStatus(String vertexName) {
+        VertexStatus vs = null;
+        try {
+            vs = dagClient.getVertexStatus(vertexName, statusGetOpts);
+        } catch (Exception e) {
+            // Don't fail the job even if vertex status couldn't
+            // be retrieved.
+            log.warn("Cannot retrieve status for vertex " + vertexName, e);
+        }
+        return vs;
+    }
+
+    public void setPigStats(TezPigScriptStats pigStats) {
+        this.pigStats = pigStats;
+    }
+
     @Override
     public void run() {
         try {
@@ -180,9 +184,13 @@ public class TezJob implements Runnable 
             if (dagStatus.isCompleted()) {
                 log.info("DAG Status: " + dagStatus);
                 dagCounters = dagStatus.getDAGCounters();
-                collectVertexCounters();
                 TezSessionManager.freeSession(tezClient);
                 try {
+                    pigStats.accumulateStats(this);
+                } catch (IOException e) {
+                    log.warn("Exception while gathering stats", e);
+                }
+                try {
                     if (!reuseSession) {
                         TezSessionManager.stopSession(tezClient);
                     }
@@ -210,6 +218,7 @@ public class TezJob implements Runnable 
 
         @Override
         public void run() {
+            if (dagStatus == null) return;
             String msg = "status=" + dagStatus.getState()
               + ", progress=" + dagStatus.getDAGProgress()
               + ", diagnostics="
@@ -218,37 +227,6 @@ public class TezJob implements Runnable 
         }
     }
 
-    private void collectVertexCounters() {
-        for (Vertex v : dag.getVertices()) {
-            String name = v.getName();
-            try {
-                VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts);
-                if (s == null) {
-                    log.info("Cannot retrieve counters for vertex " + name);
-                    continue;
-                }
-                TezCounters counters = s.getVertexCounters();
-                Map<String, Map<String, Long>> grpCounters = Maps.newHashMap();
-                Iterator<CounterGroup> grpIt = counters.iterator();
-                while (grpIt.hasNext()) {
-                    CounterGroup grp = grpIt.next();
-                    Iterator<TezCounter> cntIt = grp.iterator();
-                    Map<String, Long> cntMap = Maps.newHashMap();
-                    while (cntIt.hasNext()) {
-                        TezCounter cnt = cntIt.next();
-                        cntMap.put(cnt.getName(), cnt.getValue());
-                    }
-                    grpCounters.put(grp.getName(), cntMap);
-                }
-                vertexCounters.put(name, grpCounters);
-            } catch (Exception e) {
-                // Don't fail the job even if vertex counters couldn't
-                // be retrieved.
-                log.info("Cannot retrieve counters for vertex " + name, e);
-            }
-        }
-    }
-
     public void killJob() throws IOException {
         try {
             if (dagClient != null) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Fri Nov  7 21:08:25 2014
@@ -34,6 +34,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
 import org.apache.pig.impl.PigContext;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -45,7 +46,6 @@ import org.apache.tez.dag.api.TezConfigu
  */
 public class TezJobCompiler {
     private static final Log log = LogFactory.getLog(TezJobCompiler.class);
-    private static int dagIdentifier = 0;
 
     private PigContext pigContext;
     private TezConfiguration tezConf;
@@ -55,24 +55,22 @@ public class TezJobCompiler {
         this.tezConf = new TezConfiguration(conf);
     }
 
-    public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
+    public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
             throws IOException, YarnException {
-        String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
-        DAG tezDag = DAG.create(jobName + "-" + dagIdentifier);
-        dagIdentifier++;
+        DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
         tezDag.setCredentials(new Credentials());
-        TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
         dagBuilder.visit();
         return tezDag;
     }
 
-    public TezJob compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer)
+    public TezJob compile(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
             throws JobCreationException {
         TezJob job = null;
         try {
             // A single Tez job always pack only 1 Tez plan. We will track
             // Tez job asynchronously to exploit parallel execution opportunities.
-            job = getJob(tezPlan, planContainer);
+            job = getJob(tezPlanNode, planContainer);
         } catch (JobCreationException jce) {
             throw jce;
         } catch(Exception e) {
@@ -84,11 +82,12 @@ public class TezJobCompiler {
         return job;
     }
 
-    private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
+    private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
             throws JobCreationException {
         try {
             Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
             localResources.putAll(planContainer.getLocalResources());
+            TezOperPlan tezPlan = tezPlanNode.getTezOperPlan();
             localResources.putAll(tezPlan.getExtraResources());
             String shipFiles = pigContext.getProperties().getProperty("pig.streaming.ship.files");
             if (shipFiles != null) {
@@ -106,7 +105,7 @@ public class TezJobCompiler {
             for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
                 log.info("Local resource: " + entry.getKey());
             }
-            DAG tezDag = buildDAG(tezPlan, localResources);
+            DAG tezDag = buildDAG(tezPlanNode, localResources);
             return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
         } catch (Exception e) {
             int errCode = 2017;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Nov  7 21:08:25 2014
@@ -23,10 +23,10 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,27 +65,37 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
-import org.apache.pig.tools.pigstats.tez.TezStats;
-import org.apache.pig.tools.pigstats.tez.TezTaskStats;
-import org.apache.tez.common.TezUtils;
+import org.apache.pig.tools.pigstats.tez.TezVertexStats;
+import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Main class that launches pig for Tez
  */
 public class TezLauncher extends Launcher {
     private static final Log log = LogFactory.getLog(TezLauncher.class);
-    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
+    private static ThreadFactory namedThreadFactory;
+    private ExecutorService executor;
     private boolean aggregateWarning = false;
     private TezScriptState tezScriptState;
-    private TezStats tezStats;
+    private TezPigScriptStats tezStats;
     private TezJob runningJob;
 
+    public TezLauncher() {
+        if (namedThreadFactory == null) {
+            namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(
+                    "PigTezLauncher-%d").build();
+        }
+        executor = Executors.newSingleThreadExecutor(namedThreadFactory);
+    }
+
     @Override
     public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
         if (pc.getExecType().isLocal()) {
@@ -109,41 +119,38 @@ public class TezLauncher extends Launche
         List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
 
         tezScriptState = TezScriptState.get();
-        tezStats = new TezStats(pc);
+        tezStats = new TezPigScriptStats(pc);
         PigStats.start(tezStats);
 
         conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
         TezJobCompiler jc = new TezJobCompiler(pc, conf);
         TezPlanContainer tezPlanContainer = compile(php, pc);
 
-        int defaultTimeToSleep = pc.getExecType().isLocal() ? 100 : 1000;
-        int timeToSleep = conf.getInt("pig.jobcontrol.sleep", defaultTimeToSleep);
-        if (timeToSleep != defaultTimeToSleep) {
-            log.info("overriding default JobControl sleep (" +
-                    defaultTimeToSleep + ") to " + timeToSleep);
-        }
+        tezStats.initialize(tezPlanContainer);
+        tezScriptState.emitInitialPlanNotification(tezPlanContainer);
+        tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch
 
+        TezPlanContainerNode tezPlanContainerNode;
         TezOperPlan tezPlan;
-        while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans)) != null) {
-            optimize(tezPlan, pc);
+        int processedDAGs = 0;
+        while ((tezPlanContainerNode = tezPlanContainer.getNextPlan(processedPlans)) != null) {
+            tezPlan = tezPlanContainerNode.getTezOperPlan();
+            processLoadAndParallelism(tezPlan, pc);
             processedPlans.add(tezPlan);
-            ProgressReporter reporter = new ProgressReporter();
-            tezStats.initialize(tezPlan);
+            ProgressReporter reporter = new ProgressReporter(tezPlanContainer.size(), processedDAGs);
             if (tezPlan.size()==1 && tezPlan.getRoots().get(0) instanceof NativeTezOper) {
                 // Native Tez Plan
                 NativeTezOper nativeOper = (NativeTezOper)tezPlan.getRoots().get(0);
-                tezScriptState.emitInitialPlanNotification(tezPlan);
-                tezScriptState.emitLaunchStartedNotification(tezPlan.size());
                 tezScriptState.emitJobsSubmittedNotification(1);
-                nativeOper.runJob();
+                nativeOper.runJob(tezPlanContainerNode.getOperatorKey().toString());
             } else {
                 TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
                 pkgAnnotator.visit();
 
-                runningJob = jc.compile(tezPlan, grpName, tezPlanContainer);
-
-                tezScriptState.emitInitialPlanNotification(tezPlan);
-                tezScriptState.emitLaunchStartedNotification(tezPlan.size());
+                runningJob = jc.compile(tezPlanContainerNode, tezPlanContainer);
+                //TODO: Exclude vertex groups from numVerticesToLaunch ??
+                tezScriptState.dagLaunchNotification(runningJob.getName(), tezPlan, tezPlan.size());
+                runningJob.setPigStats(tezStats);
 
                 // Set the thread UDFContext so registered classes are available.
                 final UDFContext udfContext = UDFContext.getUDFContext();
@@ -159,10 +166,8 @@ public class TezLauncher extends Launche
                 task.setUncaughtExceptionHandler(jctExceptionHandler);
                 task.setContextClassLoader(PigContext.getClassLoader());
 
-                tezStats.setTezJob(runningJob);
-
                 // Mark the times that the jobs were submitted so it's reflected in job
-                // history props
+                // history props. TODO: Fix this. unused now
                 long scriptSubmittedTimestamp = System.currentTimeMillis();
                 // Job.getConfiguration returns the shared configuration object
                 Configuration jobConf = runningJob.getConfiguration();
@@ -171,19 +176,31 @@ public class TezLauncher extends Launche
                 jobConf.set("pig.job.submitted.timestamp",
                         Long.toString(System.currentTimeMillis()));
 
-                Future<?> future = executor.schedule(task, timeToSleep, TimeUnit.MILLISECONDS);
-
+                Future<?> future = executor.submit(task);
                 tezScriptState.emitJobsSubmittedNotification(1);
-                reporter.notifyStarted();
+
+                boolean jobStarted = false;
 
                 while (!future.isDone()) {
+                    if (!jobStarted && runningJob.getApplicationId() != null) {
+                        jobStarted = true;
+                        String appId = runningJob.getApplicationId().toString();
+                        //For Oozie Pig action job id matching compatibility with MR mode
+                        log.info("HadoopJobId: "+ appId.replace("application", "job"));
+                        tezScriptState.emitJobStartedNotification(appId);
+                        tezScriptState.dagStartedNotification(runningJob.getName(), appId);
+                    }
                     reporter.notifyUpdate();
                     Thread.sleep(1000);
                 }
-
-                tezStats.accumulateStats(runningJob);
             }
-            tezScriptState.emitProgressUpdatedNotification(100);
+            processedDAGs++;
+            if (tezPlanContainer.size() == processedDAGs) {
+                tezScriptState.emitProgressUpdatedNotification(100);
+            } else {
+                tezScriptState.emitProgressUpdatedNotification(
+                    ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
+            }
             tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed());
         }
 
@@ -234,32 +251,31 @@ public class TezLauncher extends Launche
     }
 
     private class ProgressReporter {
+        private int totalDAGs;
+        private int processedDAGS;
         private int count = 0;
         private int prevProgress = 0;
 
-        public void notifyStarted() throws IOException {
-            for (Vertex v : runningJob.getDAG().getVertices()) {
-                TezTaskStats tts = tezStats.getVertexStats(v.getName());
-                UserPayload payload = v.getProcessorDescriptor().getUserPayload();
-                Configuration conf = TezUtils.createConfFromUserPayload(payload);
-                tts.setConf(conf);
-                tts.setId(v.getName());
-                tezScriptState.emitJobStartedNotification(v.getName());
-            }
+        public ProgressReporter(int totalDAGs, int processedDAGs) {
+            this.totalDAGs = totalDAGs;
+            this.processedDAGS = processedDAGs;
         }
 
         public void notifyUpdate() {
             DAGStatus dagStatus = runningJob.getDAGStatus();
             if (dagStatus != null && dagStatus.getState() == DAGStatus.State.RUNNING) {
                 // Emit notification when the job has progressed more than 1%,
-                // or every 10 second
+                // or every 20 seconds
                 int currProgress = Math.round(runningJob.getDAGProgress() * 100f);
                 if (currProgress - prevProgress >= 1 || count % 100 == 0) {
-                    tezScriptState.emitProgressUpdatedNotification(currProgress);
+                    tezScriptState.dagProgressNotification(runningJob.getName(), -1, currProgress);
+                    tezScriptState.emitProgressUpdatedNotification((currProgress + (100 * processedDAGS))/totalDAGs);
                     prevProgress = currProgress;
                 }
                 count++;
             }
+            // TODO: Add new vertex tracking methods to PigTezProgressNotificationListener
+            // and emit notifications for individual vertex start, progress and completion
         }
 
         public boolean notifyFinishedOrFailed() {
@@ -269,10 +285,13 @@ public class TezLauncher extends Launche
             }
             if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
                 Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
-                for (Vertex v : runningJob.getDAG().getVertices()) {
-                    TezTaskStats tts = tezStats.getVertexStats(v.getName());
-                    tezScriptState.emitjobFinishedNotification(tts);
-                    Map<String, Map<String, Long>> counterGroups = runningJob.getVertexCounters(v.getName());
+                DAG dag = runningJob.getDAG();
+                for (Vertex v : dag.getVertices()) {
+                    TezVertexStats tts = tezStats.getVertexStats(dag.getName(), v.getName());
+                    if (tts == null) {
+                        continue; //vertex groups
+                    }
+                    Map<String, Map<String, Long>> counterGroups = tts.getCounters();
                     if (counterGroups == null) {
                         log.warn("Counters are not available for vertex " + v.getName() + ". Not computing warning aggregates.");
                     } else {
@@ -283,11 +302,6 @@ public class TezLauncher extends Launche
                     CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log);
                 }
                 return true;
-            } else if (dagStatus.getState() == DAGStatus.State.FAILED) {
-                for (Vertex v : ((TezJob)runningJob).getDAG().getVertices()) {
-                    TezTaskStats tts = tezStats.getVertexStats(v.getName());
-                    tezScriptState.emitJobFailedNotification(tts);
-                }
             }
             return false;
         }
@@ -299,10 +313,6 @@ public class TezLauncher extends Launche
             VisitorException, IOException {
         log.debug("Entering TezLauncher.explain");
         TezPlanContainer tezPlanContainer = compile(php, pc);
-        for (Map.Entry<OperatorKey,TezPlanContainerNode> entry : tezPlanContainer.getKeys().entrySet()) {
-            TezOperPlan tezPlan = entry.getValue().getNode();
-            optimize(tezPlan, pc);
-        }
 
         if (format.equals("text")) {
             TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
@@ -314,7 +324,20 @@ public class TezLauncher extends Launche
         }
     }
 
-    public static void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
+    public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
+            throws PlanException, IOException, VisitorException {
+        TezCompiler comp = new TezCompiler(php, pc);
+        comp.compile();
+        TezPlanContainer planContainer = comp.getPlanContainer();
+        for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer
+                .getKeys().entrySet()) {
+            TezOperPlan tezPlan = entry.getValue().getTezOperPlan();
+            optimize(tezPlan, pc);
+        }
+        return planContainer;
+    }
+
+    private void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
         boolean aggregateWarning = conf.getBoolean("aggregate.warning", false);
 
@@ -361,6 +384,9 @@ public class TezLauncher extends Launche
             uo.visit();
         }
 
+    }
+
+    public static void processLoadAndParallelism(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
         if (!pc.inExplain && !pc.inDumpSchema) {
             LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
             loaderStorer.visit();
@@ -371,13 +397,6 @@ public class TezLauncher extends Launche
         }
     }
 
-    public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
-            throws PlanException, IOException, VisitorException {
-        TezCompiler comp = new TezCompiler(php, pc);
-        comp.compile();
-        return comp.getPlanContainer();
-    }
-
     @Override
     public void kill() throws BackendException {
         if (runningJob != null) {
@@ -387,9 +406,7 @@ public class TezLauncher extends Launche
                 throw new BackendException(e);
             }
         }
-        if (executor != null) {
-            executor.shutdownNow();
-        }
+        destroy();
     }
 
     @Override
@@ -404,4 +421,17 @@ public class TezLauncher extends Launche
             log.info("Cannot find job: " + jobID);
         }
     }
+
+    @Override
+    public void destroy() {
+        try {
+            if (executor != null && !executor.isShutdown()) {
+                log.info("Shutting down thread pool");
+                executor.shutdownNow();
+            }
+        } catch (Exception e) {
+            log.warn("Error shutting down threadpool");
+        }
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Nov  7 21:08:25 2014
@@ -33,6 +33,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -86,6 +87,8 @@ public class TezSessionManager {
             TezJobConfig tezJobConf) throws TezException, IOException,
             InterruptedException {
         TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+        TezScriptState ss = TezScriptState.get();
+        ss.addDAGSettingsToConf(amConf);
         adjustAMConfig(amConf, tezJobConf);
         String jobName = conf.get(PigContext.JOB_NAME, "pig");
         TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
@@ -194,6 +197,7 @@ public class TezSessionManager {
         sessionPoolLock.writeLock().lock();
         try {
             if (shutdown == true) {
+                log.info("Shutting down Tez session " + newSession.session);
                 newSession.session.stop();
                 throw new IOException("TezSessionManager is shut down");
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Nov  7 21:08:25 2014
@@ -209,9 +209,7 @@ public class TezCompiler extends PhyPlan
     // Segment a single DAG into a DAG graph
     public TezPlanContainer getPlanContainer() throws PlanException {
         TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
-        TezPlanContainerNode node = new TezPlanContainerNode(OperatorKey.genOpKey(scope), tezPlan);
-        tezPlanContainer.add(node);
-        tezPlanContainer.split(node);
+        tezPlanContainer.addPlan(tezPlan);
         return tezPlanContainer;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Nov  7 21:08:25 2014
@@ -31,7 +31,6 @@ import java.util.Set;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -41,9 +40,14 @@ import org.apache.pig.impl.util.JarManag
 public class TezPlanContainer extends OperatorPlan<TezPlanContainerNode> {
     private static final long serialVersionUID = 1L;
     private PigContext pigContext;
+    private String jobName;
+    private long dagId = 0;
+    //Incrementing static counter if multiple pig scripts have same name
+    private static long scopeId = 0;
 
     public TezPlanContainer(PigContext pigContext) {
         this.pigContext = pigContext;
+        this.jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
     }
 
     // Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer
@@ -103,18 +107,18 @@ public class TezPlanContainer extends Op
         return TezResourceManager.getInstance().addTezResources(jarLists);
     }
 
-    public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
+    public TezPlanContainerNode getNextPlan(List<TezOperPlan> processedPlans) {
         synchronized(this) {
             while (getRoots()!=null && !getRoots().isEmpty()) {
                 TezPlanContainerNode currentPlan = null;
                 for (TezPlanContainerNode plan : getRoots()) {
-                    if (!processedPlans.contains(plan.getNode())) {
+                    if (!processedPlans.contains(plan.getTezOperPlan())) {
                         currentPlan = plan;
                         break;
                     }
                 }
                 if (currentPlan!=null) {
-                    return currentPlan.getNode();
+                    return currentPlan;
                 } else {
                     try {
                         wait();
@@ -126,10 +130,15 @@ public class TezPlanContainer extends Op
         return null;
     }
 
+    public void addPlan(TezOperPlan plan) throws PlanException {
+        TezPlanContainerNode node = new TezPlanContainerNode(generateNodeOperatorKey(), plan);
+        this.add(node);
+        this.split(node);
+    }
+
     public void updatePlan(TezOperPlan plan, boolean succ) {
-        String scope = getRoots().get(0).getOperatorKey().getScope();
-        TezPlanContainerNode tezPlanContainerNode = new TezPlanContainerNode(new OperatorKey(scope,
-                NodeIdGenerator.getGenerator().getNextNodeId(scope)), plan);
+        TezPlanContainerNode tezPlanContainerNode = new TezPlanContainerNode(
+                generateNodeOperatorKey(), plan);
         synchronized(this) {
             if (succ) {
                 remove(tezPlanContainerNode);
@@ -143,7 +152,7 @@ public class TezPlanContainer extends Op
     }
 
     public void split(TezPlanContainerNode planNode) throws PlanException {
-        TezOperPlan tezOperPlan = planNode.getNode();
+        TezOperPlan tezOperPlan = planNode.getTezOperPlan();
         TezOperator operToSegment = null;
         List<TezOperator> succs = new ArrayList<TezOperator>();
         for (TezOperator tezOper : tezOperPlan) {
@@ -162,8 +171,7 @@ public class TezPlanContainer extends Op
                     containerSuccs.addAll(getSuccessors(planNode));
                 }
                 tezOperPlan.moveTree(succ, newOperPlan);
-                String scope = operToSegment.getOperatorKey().getScope();
-                TezPlanContainerNode newPlanNode = new TezPlanContainerNode(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), newOperPlan);
+                TezPlanContainerNode newPlanNode = new TezPlanContainerNode(generateNodeOperatorKey(), newOperPlan);
                 add(newPlanNode);
                 for (TezPlanContainerNode containerNodeSucc : containerSuccs) {
                     disconnect(planNode, containerNodeSucc);
@@ -176,6 +184,17 @@ public class TezPlanContainer extends Op
         }
     }
 
+    private synchronized OperatorKey generateNodeOperatorKey() {
+        OperatorKey opKey = new OperatorKey(jobName + "-" + dagId + "_scope", scopeId);
+        scopeId++;
+        dagId++;
+        return opKey;
+    }
+
+    public static void resetScope() {
+        scopeId = 0;
+    }
+
     @Override
     public String toString() {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java Fri Nov  7 21:08:25 2014
@@ -21,12 +21,13 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
-public class TezPlanContainerNode extends Operator<TezPlanContainerVisitor>{
+public class TezPlanContainerNode extends Operator<TezPlanContainerVisitor> {
     private static final long serialVersionUID = 1L;
-    TezOperPlan node;
-    public TezPlanContainerNode(OperatorKey k, TezOperPlan node) {
+    TezOperPlan tezPlan;
+
+    public TezPlanContainerNode(OperatorKey k, TezOperPlan tezPlan) {
         super(k);
-        this.node = node;
+        this.tezPlan = tezPlan;
     }
 
     @Override
@@ -49,26 +50,26 @@ public class TezPlanContainerNode extend
         return "DAG " + mKey;
     }
 
-    public TezOperPlan getNode() {
-        return node;
+    public TezOperPlan getTezOperPlan() {
+        return tezPlan;
     }
 
     @Override
     public boolean equals(Object o) {
         if (o != null && o instanceof TezPlanContainerNode) {
-            return ((TezPlanContainerNode)o).getNode().equals(getNode());
+            return ((TezPlanContainerNode)o).getTezOperPlan().equals(getTezOperPlan());
         }
         return false;
     }
 
     @Override
     public int hashCode() {
-        return getNode().hashCode();
+        return getTezOperPlan().hashCode();
     }
 
     @Override
     public String toString() {
-        return getNode().toString();
+        return getTezOperPlan().toString();
     }
 }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java Fri Nov  7 21:08:25 2014
@@ -48,10 +48,10 @@ public class TezPlanContainerPrinter ext
         mStream.println("#--------------------------------------------------");
         mStream.println("# TEZ DAG plan: " + tezPlanContainerNode.getOperatorKey());
         mStream.println("#--------------------------------------------------");
-        TezGraphPrinter graphPrinter = new TezGraphPrinter(tezPlanContainerNode.getNode());
+        TezGraphPrinter graphPrinter = new TezGraphPrinter(tezPlanContainerNode.getTezOperPlan());
         graphPrinter.visit();
         mStream.print(graphPrinter.toString());
-        TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getNode());
+        TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getTezOperPlan());
         printer.setVerbose(isVerbose);
         printer.visit();
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java Fri Nov  7 21:08:25 2014
@@ -38,7 +38,7 @@ public class TezPlanContainerUDFCollecto
 
     @Override
     public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
-        Iterator<TezOperator> it = tezPlanContainerNode.getNode().iterator();
+        Iterator<TezOperator> it = tezPlanContainerNode.getTezOperPlan().iterator();
         while (it.hasNext()) {
             udfs.addAll(it.next().UDFs);
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java Fri Nov  7 21:08:25 2014
@@ -27,26 +27,29 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.tez.TezStats;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 
 public class NativeTezOper extends TezOperator {
     private static final long serialVersionUID = 1L;
     private static int countJobs = 0;
     private String nativeTezJar;
     private String[] params;
+    private String jobId;
+
     public NativeTezOper(OperatorKey k, String tezJar, String[] parameters) {
         super(k);
         nativeTezJar = tezJar;
         params = parameters;
+        jobId = nativeTezJar + "_" + getJobNumber();
     }
 
-    public static int getJobNumber() {
+    private static int getJobNumber() {
         countJobs++;
         return countJobs;
     }
 
     public String getJobId() {
-        return nativeTezJar + "_";
+        return jobId;
     }
 
     public String getCommandString() {
@@ -74,24 +77,24 @@ public class NativeTezOper extends TezOp
         v.visitTezOp(this);
     }
 
-    public void runJob() throws JobCreationException {
+    public void runJob(String jobStatsKey) throws JobCreationException {
         RunJarSecurityManager secMan = new RunJarSecurityManager();
         try {
             RunJar.main(getNativeTezParams());
-            ((TezStats)PigStats.get()).addTezJobStatsForNative(this, true);
+            ((TezPigScriptStats)PigStats.get()).addTezJobStatsForNative(jobStatsKey, this, true);
         } catch (SecurityException se) {
             if(secMan.getExitInvoked()) {
                 if(secMan.getExitCode() != 0) {
                     throw new JobCreationException("Native job returned with non-zero return code");
                 }
                 else {
-                    ((TezStats)PigStats.get()).addTezJobStatsForNative(this, true);
+                    ((TezPigScriptStats)PigStats.get()).addTezJobStatsForNative(jobStatsKey, this, true);
                 }
             }
         } catch (Throwable t) {
             JobCreationException e = new JobCreationException(
                     "Cannot run native tez job "+ t.getMessage(), t);
-            ((TezStats)PigStats.get()).addTezJobStatsForNative(this, false);
+            ((TezPigScriptStats)PigStats.get()).addTezJobStatsForNative(jobStatsKey, this, false);
             throw e;
         } finally {
             secMan.retire();

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java Fri Nov  7 21:08:25 2014
@@ -18,12 +18,11 @@
 
 package org.apache.pig.tools.pigstats;
 
+import org.apache.pig.PigRunner;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.plan.OperatorPlan;
 
-import org.apache.pig.PigRunner;
-
 /**
  * Should be implemented by an object that wants to receive notifications
  * from {@link PigRunner}.
@@ -33,7 +32,7 @@ import org.apache.pig.PigRunner;
 public interface PigProgressNotificationListener extends java.util.EventListener {
 
     /**
-     * Invoked before any Hadoop jobs are run with the plan that is to be executed.
+     * Invoked before any Hadoop jobs (or a Tez DAG) are run with the plan that is to be executed.
      *
      * @param scriptId the unique id of the script
      * @param plan the OperatorPlan that is to be executed
@@ -41,36 +40,36 @@ public interface PigProgressNotification
     public void initialPlanNotification(String scriptId, OperatorPlan<?> plan);
 
     /**
-     * Invoked just before launching Hadoop jobs spawned by the script.
+     * Invoked just before launching Hadoop jobs (or tez DAGs) spawned by the script.
      * @param scriptId the unique id of the script
-     * @param numJobsToLaunch the total number of Hadoop jobs spawned by the script
+     * @param numJobsToLaunch the total number of Hadoop jobs (or Tez DAGs) spawned by the script
      */
     public void launchStartedNotification(String scriptId, int numJobsToLaunch);
 
     /**
-     * Invoked just before submitting a batch of Hadoop jobs.
+     * Invoked just before submitting a batch of Hadoop jobs (or Tez DAGs).
      * @param scriptId the unique id of the script
-     * @param numJobsSubmitted the number of Hadoop jobs in the batch
+     * @param numJobsSubmitted the number of Hadoop jobs (or Tez DAGs) in the batch
      */
     public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted);
 
     /**
-     * Invoked after a Hadoop job is started.
-     * @param scriptId the unique id of the script 
-     * @param assignedJobId the Hadoop job id
+     * Invoked after a Hadoop job (or Tez DAG) is started.
+     * @param scriptId the unique id of the script
+     * @param assignedJobId the Hadoop job id (or Tez DAG job id)
      */
     public void jobStartedNotification(String scriptId, String assignedJobId);
 
     /**
-     * Invoked just after a Hadoop job is completed successfully. 
-     * @param scriptId the unique id of the script 
-     * @param jobStats the {@link JobStats} object associated with the Hadoop job
+     * Invoked just after a Hadoop job (or Tez DAG) is completed successfully.
+     * @param scriptId the unique id of the script
+     * @param jobStats the {@link JobStats} object associated with the Hadoop job (or Tez DAG)
      */
     public void jobFinishedNotification(String scriptId, JobStats jobStats);
 
     /**
      * Invoked when a Hadoop job fails.
-     * @param scriptId the unique id of the script 
+     * @param scriptId the unique id of the script
      * @param jobStats the {@link JobStats} object associated with the Hadoop job
      */
     public void jobFailedNotification(String scriptId, JobStats jobStats);
@@ -83,16 +82,16 @@ public interface PigProgressNotification
     public void outputCompletedNotification(String scriptId, OutputStats outputStats);
 
     /**
-     * Invoked to update the execution progress. 
+     * Invoked to update the execution progress.
      * @param scriptId the unique id of the script
      * @param progress the percentage of the execution progress
      */
     public void progressUpdatedNotification(String scriptId, int progress);
 
     /**
-     * Invoked just after all Hadoop jobs spawned by the script are completed.
+     * Invoked just after all Hadoop jobs (Tez DAGs) spawned by the script are completed.
      * @param scriptId the unique id of the script
-     * @param numJobsSucceeded the total number of Hadoop jobs succeeded
+     * @param numJobsSucceeded the total number of Hadoop jobs (Tez DAGs) succeeded
      */
     public void launchCompletedNotification(String scriptId, int numJobsSucceeded);
 }

Added: pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java?rev=1637447&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java Fri Nov  7 21:08:25 2014
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.tez;
+
+import org.apache.pig.PigRunner;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
+
+/**
+ * Should be implemented by an object that wants to receive notifications
+ * from {@link PigRunner}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class PigTezProgressNotificationListener implements PigProgressNotificationListener {
+
+    /**
+     * Invoked just before launching a Tez DAG spawned by the script.
+     *
+     * @param scriptId the unique id of the script
+     * @param dagId the unique name of the Tez DAG
+     * @param dagPlan the OperatorPlan that is to be executed
+     * @param numVerticesToLaunch the total number of vertices spawned by the Tez DAG
+     */
+    public abstract void dagLaunchNotification(String scriptId, String dagId,
+            OperatorPlan<?> dagPlan, int numVerticesToLaunch);
+
+    /**
+     * Invoked after a Tez DAG is started.
+     *
+     * @param scriptId the unique id of the script
+     * @param dagId the unique name of the Tez DAG
+     * @param assignedApplicationId
+     *            the YARN application id for the Tez DAG. More than one Tez DAG
+     *            can share same application ID if session reuse is turned on.
+     *            Session reuse is turned on by default
+     *
+     */
+    public abstract void dagStartedNotification(String scriptId, String dagId, String assignedApplicationId);
+
+    /**
+     * Invoked to update the execution progress.
+     *
+     * @param scriptId the unique id of the script
+     * @param dagId the unique name of the Tez DAG
+     * @param numVerticesCompleted the number of vertices completed so far
+     * @param progress the percentage of the execution progress based on total number of tasks of all vertices
+     */
+    public abstract void dagProgressNotification(String scriptId, String dagId, int numVerticesCompleted, int progress);
+
+    /**
+     * Invoked just after the Tez DAGs is completed (successful or failed).
+     * @param scriptId the unique id of the script
+     * @param dagId the unique name of the Tez DAG
+     * @param success true if the Tez DAG was successful, false otherwise
+     * @param tezDAGStats the stats information for the DAG
+     */
+    public abstract void dagCompletedNotification(String scriptId, String dagId, boolean success, TezDAGStats tezDAGStats);
+}

Added: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1637447&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Fri Nov  7 21:08:25 2014
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJob;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+/*
+ * TezDAGStats encapsulates the statistics collected from a Tez DAG.
+ * It includes status of the execution, the Tez Vertices, as well as
+ * information about outputs and inputs of the DAG.
+ */
+public class TezDAGStats extends JobStats {
+
+    private static final Log LOG = LogFactory.getLog(TezDAGStats.class);
+    public static final String DAG_COUNTER_GROUP = DAGCounter.class.getName();
+    public static final String FS_COUNTER_GROUP = FileSystemCounter.class.getName();
+    public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName();
+    public static final String PIG_COUNTER_GROUP = org.apache.pig.PigCounters.class.getName();
+
+    private Map<String, TezVertexStats> tezVertexStatsMap;
+
+    private String appId;
+
+    private int totalTasks = -1;
+    private long fileBytesRead = -1;
+    private long fileBytesWritten = -1;
+
+    private Counters counters = null;
+
+    private int numberMaps = 0;
+    private int numberReduces = 0;
+
+    private long mapInputRecords = 0;
+    private long mapOutputRecords = 0;
+    private long reduceInputRecords = 0;
+    private long reduceOutputRecords = 0;
+    private long spillCount = 0;
+    private long activeSpillCountObj = 0;
+    private long activeSpillCountRecs = 0;
+
+    private HashMap<String, Long> multiStoreCounters
+            = new HashMap<String, Long>();
+
+    /**
+     * This class builds the graph of a Tez DAG vertices.
+     */
+    static class JobGraphBuilder extends TezOpPlanVisitor {
+
+        private JobGraph jobPlan;
+        private Map<String, TezVertexStats> tezVertexStatsMap;
+        private List<TezVertexStats> vertexStatsToBeRemoved;
+        private TezDAGScriptInfo dagScriptInfo;
+
+        public JobGraphBuilder(TezOperPlan plan, TezDAGScriptInfo dagScriptInfo) {
+            super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+            tezVertexStatsMap = new HashMap<String, TezVertexStats>();
+            vertexStatsToBeRemoved = new ArrayList<TezVertexStats>();
+            jobPlan = new JobGraph();
+            this.dagScriptInfo = dagScriptInfo;
+        }
+
+        public Map<String, TezVertexStats> getTezVertexStatsMap() {
+            return tezVertexStatsMap;
+        }
+
+        public JobGraph getJobPlan() {
+            return jobPlan;
+        }
+
+        @Override
+        public void visitTezOp(TezOperator tezOp) throws VisitorException {
+            TezVertexStats currStats =
+                    new TezVertexStats(tezOp.getOperatorKey().toString(), jobPlan, tezOp.isUseMRMapSettings());
+            jobPlan.add(currStats);
+            List<TezOperator> preds = getPlan().getPredecessors(tezOp);
+            if (preds != null) {
+                for (TezOperator pred : preds) {
+                    TezVertexStats predStats = tezVertexStatsMap.get(pred);
+                    if (!jobPlan.isConnected(predStats, currStats)) {
+                        jobPlan.connect(predStats, currStats);
+                    }
+                }
+            }
+
+         // Remove VertexGroups (union) from JobGraph since they're not
+            // materialized as real vertices by Tez.
+            if (tezOp.isVertexGroup()) {
+                vertexStatsToBeRemoved.add(currStats);
+            } else {
+                currStats.annotate(ALIAS, dagScriptInfo.getAlias(tezOp));
+                currStats.annotate(ALIAS_LOCATION, dagScriptInfo.getAliasLocation(tezOp));
+                currStats.annotate(FEATURE, dagScriptInfo.getPigFeatures(tezOp));
+            }
+
+            tezVertexStatsMap.put(tezOp.getOperatorKey().toString(), currStats);
+        }
+
+        @Override
+        public void visit() throws VisitorException {
+            super.visit();
+            try {
+                for (TezVertexStats vertexStats : vertexStatsToBeRemoved) {
+                    jobPlan.removeAndReconnect(vertexStats);
+                }
+            } catch (FrontendException e) {
+                LOG.warn("Unable to build Tez DAG", e);
+            }
+        }
+
+    }
+
+    protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap) {
+        super(name, plan);
+        this.tezVertexStatsMap = tezVertexStatsMap;
+    }
+
+    public TezVertexStats getVertexStats(String vertexName) {
+        return tezVertexStatsMap.get(vertexName);
+    }
+
+    void setAlias(TezDAGScriptInfo dagScriptInfo) {
+        annotate(ALIAS, dagScriptInfo.getAlias());
+        annotate(ALIAS_LOCATION, dagScriptInfo.getAliasLocation());
+        annotate(FEATURE, dagScriptInfo.getPigFeatures());
+    }
+
+    /**
+     * Updates the statistics after a DAG is finished.
+     */
+    public void accumulateStats(TezJob tezJob) throws IOException {
+        //For Oozie Pig action job id matching compatibility with MR mode
+        appId = tezJob.getApplicationId().toString().replace("application", "job");
+        setConf(tezJob.getConfiguration());
+        DAG dag = tezJob.getDAG();
+        hdfsBytesRead = -1;
+        hdfsBytesWritten = -1;
+        TezCounters tezCounters = tezJob.getDAGCounters();
+        if (tezCounters != null) {
+            counters = covertToHadoopCounters(tezCounters);
+        }
+
+        CounterGroup dagGrp = tezCounters.getGroup(DAG_COUNTER_GROUP);
+        totalTasks = (int) dagGrp.findCounter("TOTAL_LAUNCHED_TASKS").getValue();
+
+        CounterGroup fsGrp = tezCounters.getGroup(FS_COUNTER_GROUP);
+        fileBytesRead = fsGrp.findCounter("FILE_BYTES_READ").getValue();
+        fileBytesWritten = fsGrp.findCounter("FILE_BYTES_WRITTEN").getValue();
+        hdfsBytesRead = fsGrp.findCounter("HDFS_BYTES_READ").getValue();
+        hdfsBytesWritten = fsGrp.findCounter("HDFS_BYTES_WRITTEN").getValue();
+
+        for (Entry<String, TezVertexStats> entry : tezVertexStatsMap.entrySet()) {
+            Vertex v = dag.getVertex(entry.getKey());
+            if (v != null && tezVertexStatsMap.containsKey(v.getName())) {
+                TezVertexStats vertexStats = entry.getValue();
+                UserPayload payload = v.getProcessorDescriptor().getUserPayload();
+                Configuration conf = TezUtils.createConfFromUserPayload(payload);
+                vertexStats.setConf(conf);
+
+                VertexStatus status = tezJob.getVertexStatus(v.getName());
+                vertexStats.accumulateStats(status, v.getParallelism());
+                if(vertexStats.getInputs() != null && !vertexStats.getInputs().isEmpty()) {
+                    inputs.addAll(vertexStats.getInputs());
+                }
+                if(vertexStats.getOutputs() != null  && !vertexStats.getOutputs().isEmpty()) {
+                    outputs.addAll(vertexStats.getOutputs());
+                }
+                /*if (vertexStats.getHdfsBytesRead() >= 0) {
+                    hdfsBytesRead = (hdfsBytesRead == -1) ? 0 : hdfsBytesRead;
+                    hdfsBytesRead += vertexStats.getHdfsBytesRead();
+                }
+                if (vertexStats.getHdfsBytesWritten() >= 0) {
+                    hdfsBytesWritten = (hdfsBytesWritten == -1) ? 0 : hdfsBytesWritten;
+                    hdfsBytesWritten += vertexStats.getHdfsBytesWritten();
+                }*/
+                if (!vertexStats.getMultiStoreCounters().isEmpty()) {
+                    multiStoreCounters.putAll(vertexStats.getMultiStoreCounters());
+                }
+                numberMaps += vertexStats.getNumberMaps();
+                numberReduces += vertexStats.getNumberReduces();
+                mapInputRecords += vertexStats.getMapInputRecords();
+                mapOutputRecords += vertexStats.getMapOutputRecords();
+                reduceInputRecords += vertexStats.getReduceInputRecords();
+                reduceOutputRecords += vertexStats.getReduceOutputRecords();
+                spillCount += vertexStats.getSMMSpillCount();
+                activeSpillCountObj  += vertexStats.getProactiveSpillCountObjects();
+                activeSpillCountRecs += vertexStats.getProactiveSpillCountRecs();
+            }
+        }
+    }
+
+    private Counters covertToHadoopCounters(TezCounters tezCounters) {
+        Counters counters = new Counters();
+        for (CounterGroup tezGrp : tezCounters) {
+            Group grp = counters.addGroup(tezGrp.getName(), tezGrp.getDisplayName());
+            for (TezCounter counter : tezGrp) {
+                grp.addCounter(counter.getName(), counter.getDisplayName(), counter.getValue());
+            }
+        }
+        return counters;
+    }
+
+    @Override
+    public String getJobId() {
+        return appId;
+    }
+
+    public void setJobId(String appId) {
+        this.appId = appId;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws FrontendException {
+        if (v instanceof JobGraphPrinter) {
+            JobGraphPrinter jpp = (JobGraphPrinter)v;
+            jpp.visit(this);
+        }
+    }
+
+    @Override
+    public String getDisplayString() {
+            StringBuilder sb = new StringBuilder();
+
+            sb.append("DAG " + name + ":\n");
+            sb.append(String.format("%1$20s: %2$-100s%n", "ApplicationId",
+                    appId));
+            sb.append(String.format("%1$20s: %2$-100s%n", "TotalLaunchedTasks",
+                    totalTasks));
+
+            sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesRead",
+                    fileBytesRead));
+            sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesWritten",
+                    fileBytesWritten));
+            sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesRead",
+                    hdfsBytesRead));
+            sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesWritten",
+                    hdfsBytesWritten));
+
+            return sb.toString();
+    }
+
+    @Override
+    @Deprecated
+    public int getNumberMaps() {
+        return numberMaps;
+    }
+
+    @Override
+    @Deprecated
+    public int getNumberReduces() {
+        return numberReduces;
+    }
+
+    @Override
+    @Deprecated
+    public long getMaxMapTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getMinMapTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getAvgMapTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getMaxReduceTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getMinReduceTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getAvgREduceTime() {
+        return -1;
+    }
+
+    @Override
+    @Deprecated
+    public long getMapInputRecords() {
+        return mapInputRecords;
+    }
+
+    @Override
+    @Deprecated
+    public long getMapOutputRecords() {
+        return mapOutputRecords;
+    }
+
+    @Override
+    @Deprecated
+    public long getReduceInputRecords() {
+        return reduceInputRecords;
+    }
+
+    @Override
+    @Deprecated
+    public long getReduceOutputRecords() {
+        return reduceOutputRecords;
+    }
+
+    @Override
+    public long getSMMSpillCount() {
+        return spillCount;
+    }
+
+    @Override
+    public long getProactiveSpillCountObjects() {
+        return activeSpillCountObj;
+    }
+
+    @Override
+    public long getProactiveSpillCountRecs() {
+        return activeSpillCountRecs;
+    }
+
+    @Override
+    @Deprecated
+    public Counters getHadoopCounters() {
+        return counters;
+    }
+
+    @Override
+    @Deprecated
+    public Map<String, Long> getMultiStoreCounters() {
+        return multiStoreCounters;
+    }
+
+    @Override
+    @Deprecated
+    public Map<String, Long> getMultiInputCounters() {
+        throw new UnsupportedOperationException();
+    }
+
+}