You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/11/07 22:08:27 UTC
svn commit: r1637447 [1/4] - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/e...
Author: rohini
Date: Fri Nov 7 21:08:25 2014
New Revision: 1637447
URL: http://svn.apache.org/r1637447
Log:
PIG-3977: Get TezStats working for Oozie (rohini)
Added:
pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
Removed:
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java
pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
pig/trunk/test/org/apache/pig/test/TestPigRunner.java
pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java
pig/trunk/test/org/apache/pig/tez/TestLoaderStorerShipCacheFilesTez.java
pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Nov 7 21:08:25 2014
@@ -37,6 +37,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-3977: Get TezStats working for Oozie (rohini)
+
PIG-3979: group all performance, garbage collection, and incremental aggregation (rohini)
PIG-4253: Add a UniqueID UDF (daijy)
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Fri Nov 7 21:08:25 2014
@@ -178,6 +178,7 @@ public class Main {
boolean deleteTempFiles = true;
String logFileName = null;
boolean printScriptRunTime = true;
+ PigContext pigContext = null;
try {
Configuration conf = new Configuration(false);
@@ -380,7 +381,7 @@ public class Main {
}
// create the context with the parameter
- PigContext pigContext = new PigContext(properties);
+ pigContext = new PigContext(properties);
// create the static script state object
ScriptState scriptState = pigContext.getExecutionEngine().instantiateScriptState();
@@ -669,6 +670,9 @@ public class Main {
// clear temp files
FileLocalizer.deleteTempFiles();
}
+ if (pigContext != null) {
+ pigContext.getExecutionEngine().destroy();
+ }
PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();
}
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Nov 7 21:08:25 2014
@@ -231,13 +231,13 @@ public class PigServer {
addJarsFromProperties();
markPredeployedJarsFromProperties();
- PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
-
if (ScriptState.get() == null) {
// If Pig was started via command line, ScriptState should have been
// already initialized in Main. If so, we should not overwrite it.
ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
}
+ PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
+
}
private void addJarsFromProperties() throws ExecException {
Modified: pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Fri Nov 7 21:08:25 2014
@@ -193,4 +193,9 @@ public interface ExecutionEngine {
*/
public void killJob(String jobID) throws BackendException;
+ /**
+ * Perform any cleanup operation
+ */
+ public void destroy();
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Nov 7 21:08:25 2014
@@ -377,4 +377,11 @@ public abstract class HExecutionEngine i
}
}
+ @Override
+ public void destroy() {
+ if (launcher != null) {
+ launcher.destroy();
+ }
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Fri Nov 7 21:08:25 2014
@@ -637,4 +637,7 @@ public abstract class Launcher {
return new StackTraceElement(declaringClass, methodName, fileName,
lineNumber);
}
+
+ public void destroy() {
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Nov 7 21:08:25 2014
@@ -119,6 +119,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -591,6 +592,9 @@ public class TezDagBuilder extends TezOp
payloadConf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, tezOp.getRequestedParallelism());
payloadConf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, tezOp.getEstimatedParallelism());
+ TezScriptState ss = TezScriptState.get();
+ ss.addVertexSettingsToConf(dag.getName(), tezOp, payloadConf);
+
// Take our assembled configuration and create a vertex
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
procDesc.setUserPayload(userPayload);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java Fri Nov 7 21:08:25 2014
@@ -24,8 +24,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
-import org.apache.pig.tools.pigstats.tez.TezStats;
public class TezExecutionEngine extends HExecutionEngine {
@@ -43,6 +43,6 @@ public class TezExecutionEngine extends
@Override
public PigStats instantiatePigStats() {
- return new TezStats(pigContext);
+ return new TezPigScriptStats(pigContext);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Nov 7 21:08:25 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -32,14 +31,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.CounterGroup;
-import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
@@ -64,11 +61,11 @@ public class TezJob implements Runnable
private TezClient tezClient;
private boolean reuseSession;
private TezCounters dagCounters;
- // Vertex, CounterGroup, Counter, Value
- private Map<String, Map<String, Map<String, Long>>> vertexCounters;
+
// Timer for DAG status reporter
private Timer timer;
private TezJobConfig tezJobConf;
+ private TezPigScriptStats pigStats;
public TezJob(TezConfiguration conf, DAG dag,
Map<String, LocalResource> requestAMResources,
@@ -78,7 +75,6 @@ public class TezJob implements Runnable
this.requestAMResources = requestAMResources;
this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true);
this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- this.vertexCounters = Maps.newHashMap();
tezJobConf = new TezJobConfig(estimatedTotalParallelism);
}
@@ -105,7 +101,7 @@ public class TezJob implements Runnable
}
public String getName() {
- return dag == null ? "" : dag.getName();
+ return dag.getName();
}
public Configuration getConfiguration() {
@@ -124,14 +120,6 @@ public class TezJob implements Runnable
return dagCounters;
}
- public Map<String, Map<String, Long>> getVertexCounters(String group) {
- return vertexCounters.get(group);
- }
-
- public Map<String, Long> getVertexCounters(String group, String name) {
- return vertexCounters.get(group).get(name);
- }
-
public float getDAGProgress() {
Progress p = dagStatus.getDAGProgress();
return p == null ? 0 : (float)p.getSucceededTaskCount() / (float)p.getTotalTaskCount();
@@ -147,6 +135,22 @@ public class TezJob implements Runnable
return vertexProgress;
}
+ public VertexStatus getVertexStatus(String vertexName) {
+ VertexStatus vs = null;
+ try {
+ vs = dagClient.getVertexStatus(vertexName, statusGetOpts);
+ } catch (Exception e) {
+ // Don't fail the job even if vertex status couldn't
+ // be retrieved.
+ log.warn("Cannot retrieve status for vertex " + vertexName, e);
+ }
+ return vs;
+ }
+
+ public void setPigStats(TezPigScriptStats pigStats) {
+ this.pigStats = pigStats;
+ }
+
@Override
public void run() {
try {
@@ -180,9 +184,13 @@ public class TezJob implements Runnable
if (dagStatus.isCompleted()) {
log.info("DAG Status: " + dagStatus);
dagCounters = dagStatus.getDAGCounters();
- collectVertexCounters();
TezSessionManager.freeSession(tezClient);
try {
+ pigStats.accumulateStats(this);
+ } catch (IOException e) {
+ log.warn("Exception while gathering stats", e);
+ }
+ try {
if (!reuseSession) {
TezSessionManager.stopSession(tezClient);
}
@@ -210,6 +218,7 @@ public class TezJob implements Runnable
@Override
public void run() {
+ if (dagStatus == null) return;
String msg = "status=" + dagStatus.getState()
+ ", progress=" + dagStatus.getDAGProgress()
+ ", diagnostics="
@@ -218,37 +227,6 @@ public class TezJob implements Runnable
}
}
- private void collectVertexCounters() {
- for (Vertex v : dag.getVertices()) {
- String name = v.getName();
- try {
- VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts);
- if (s == null) {
- log.info("Cannot retrieve counters for vertex " + name);
- continue;
- }
- TezCounters counters = s.getVertexCounters();
- Map<String, Map<String, Long>> grpCounters = Maps.newHashMap();
- Iterator<CounterGroup> grpIt = counters.iterator();
- while (grpIt.hasNext()) {
- CounterGroup grp = grpIt.next();
- Iterator<TezCounter> cntIt = grp.iterator();
- Map<String, Long> cntMap = Maps.newHashMap();
- while (cntIt.hasNext()) {
- TezCounter cnt = cntIt.next();
- cntMap.put(cnt.getName(), cnt.getValue());
- }
- grpCounters.put(grp.getName(), cntMap);
- }
- vertexCounters.put(name, grpCounters);
- } catch (Exception e) {
- // Don't fail the job even if vertex counters couldn't
- // be retrieved.
- log.info("Cannot retrieve counters for vertex " + name, e);
- }
- }
- }
-
public void killJob() throws IOException {
try {
if (dagClient != null) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Fri Nov 7 21:08:25 2014
@@ -34,6 +34,7 @@ import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
import org.apache.pig.impl.PigContext;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
@@ -45,7 +46,6 @@ import org.apache.tez.dag.api.TezConfigu
*/
public class TezJobCompiler {
private static final Log log = LogFactory.getLog(TezJobCompiler.class);
- private static int dagIdentifier = 0;
private PigContext pigContext;
private TezConfiguration tezConf;
@@ -55,24 +55,22 @@ public class TezJobCompiler {
this.tezConf = new TezConfiguration(conf);
}
- public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
+ public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources)
throws IOException, YarnException {
- String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
- DAG tezDag = DAG.create(jobName + "-" + dagIdentifier);
- dagIdentifier++;
+ DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
tezDag.setCredentials(new Credentials());
- TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
+ TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources);
dagBuilder.visit();
return tezDag;
}
- public TezJob compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer)
+ public TezJob compile(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
throws JobCreationException {
TezJob job = null;
try {
// A single Tez job always pack only 1 Tez plan. We will track
// Tez job asynchronously to exploit parallel execution opportunities.
- job = getJob(tezPlan, planContainer);
+ job = getJob(tezPlanNode, planContainer);
} catch (JobCreationException jce) {
throw jce;
} catch(Exception e) {
@@ -84,11 +82,12 @@ public class TezJobCompiler {
return job;
}
- private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
+ private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer)
throws JobCreationException {
try {
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
localResources.putAll(planContainer.getLocalResources());
+ TezOperPlan tezPlan = tezPlanNode.getTezOperPlan();
localResources.putAll(tezPlan.getExtraResources());
String shipFiles = pigContext.getProperties().getProperty("pig.streaming.ship.files");
if (shipFiles != null) {
@@ -106,7 +105,7 @@ public class TezJobCompiler {
for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
log.info("Local resource: " + entry.getKey());
}
- DAG tezDag = buildDAG(tezPlan, localResources);
+ DAG tezDag = buildDAG(tezPlanNode, localResources);
return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
} catch (Exception e) {
int errCode = 2017;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Nov 7 21:08:25 2014
@@ -23,10 +23,10 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,27 +65,37 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
-import org.apache.pig.tools.pigstats.tez.TezStats;
-import org.apache.pig.tools.pigstats.tez.TezTaskStats;
-import org.apache.tez.common.TezUtils;
+import org.apache.pig.tools.pigstats.tez.TezVertexStats;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* Main class that launches pig for Tez
*/
public class TezLauncher extends Launcher {
private static final Log log = LogFactory.getLog(TezLauncher.class);
- private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
+ private static ThreadFactory namedThreadFactory;
+ private ExecutorService executor;
private boolean aggregateWarning = false;
private TezScriptState tezScriptState;
- private TezStats tezStats;
+ private TezPigScriptStats tezStats;
private TezJob runningJob;
+ public TezLauncher() {
+ if (namedThreadFactory == null) {
+ namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(
+ "PigTezLauncher-%d").build();
+ }
+ executor = Executors.newSingleThreadExecutor(namedThreadFactory);
+ }
+
@Override
public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
if (pc.getExecType().isLocal()) {
@@ -109,41 +119,38 @@ public class TezLauncher extends Launche
List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
tezScriptState = TezScriptState.get();
- tezStats = new TezStats(pc);
+ tezStats = new TezPigScriptStats(pc);
PigStats.start(tezStats);
conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
TezJobCompiler jc = new TezJobCompiler(pc, conf);
TezPlanContainer tezPlanContainer = compile(php, pc);
- int defaultTimeToSleep = pc.getExecType().isLocal() ? 100 : 1000;
- int timeToSleep = conf.getInt("pig.jobcontrol.sleep", defaultTimeToSleep);
- if (timeToSleep != defaultTimeToSleep) {
- log.info("overriding default JobControl sleep (" +
- defaultTimeToSleep + ") to " + timeToSleep);
- }
+ tezStats.initialize(tezPlanContainer);
+ tezScriptState.emitInitialPlanNotification(tezPlanContainer);
+ tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch
+ TezPlanContainerNode tezPlanContainerNode;
TezOperPlan tezPlan;
- while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans)) != null) {
- optimize(tezPlan, pc);
+ int processedDAGs = 0;
+ while ((tezPlanContainerNode = tezPlanContainer.getNextPlan(processedPlans)) != null) {
+ tezPlan = tezPlanContainerNode.getTezOperPlan();
+ processLoadAndParallelism(tezPlan, pc);
processedPlans.add(tezPlan);
- ProgressReporter reporter = new ProgressReporter();
- tezStats.initialize(tezPlan);
+ ProgressReporter reporter = new ProgressReporter(tezPlanContainer.size(), processedDAGs);
if (tezPlan.size()==1 && tezPlan.getRoots().get(0) instanceof NativeTezOper) {
// Native Tez Plan
NativeTezOper nativeOper = (NativeTezOper)tezPlan.getRoots().get(0);
- tezScriptState.emitInitialPlanNotification(tezPlan);
- tezScriptState.emitLaunchStartedNotification(tezPlan.size());
tezScriptState.emitJobsSubmittedNotification(1);
- nativeOper.runJob();
+ nativeOper.runJob(tezPlanContainerNode.getOperatorKey().toString());
} else {
TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
pkgAnnotator.visit();
- runningJob = jc.compile(tezPlan, grpName, tezPlanContainer);
-
- tezScriptState.emitInitialPlanNotification(tezPlan);
- tezScriptState.emitLaunchStartedNotification(tezPlan.size());
+ runningJob = jc.compile(tezPlanContainerNode, tezPlanContainer);
+ //TODO: Exclude vertex groups from numVerticesToLaunch ??
+ tezScriptState.dagLaunchNotification(runningJob.getName(), tezPlan, tezPlan.size());
+ runningJob.setPigStats(tezStats);
// Set the thread UDFContext so registered classes are available.
final UDFContext udfContext = UDFContext.getUDFContext();
@@ -159,10 +166,8 @@ public class TezLauncher extends Launche
task.setUncaughtExceptionHandler(jctExceptionHandler);
task.setContextClassLoader(PigContext.getClassLoader());
- tezStats.setTezJob(runningJob);
-
// Mark the times that the jobs were submitted so it's reflected in job
- // history props
+ // history props. TODO: Fix this. unused now
long scriptSubmittedTimestamp = System.currentTimeMillis();
// Job.getConfiguration returns the shared configuration object
Configuration jobConf = runningJob.getConfiguration();
@@ -171,19 +176,31 @@ public class TezLauncher extends Launche
jobConf.set("pig.job.submitted.timestamp",
Long.toString(System.currentTimeMillis()));
- Future<?> future = executor.schedule(task, timeToSleep, TimeUnit.MILLISECONDS);
-
+ Future<?> future = executor.submit(task);
tezScriptState.emitJobsSubmittedNotification(1);
- reporter.notifyStarted();
+
+ boolean jobStarted = false;
while (!future.isDone()) {
+ if (!jobStarted && runningJob.getApplicationId() != null) {
+ jobStarted = true;
+ String appId = runningJob.getApplicationId().toString();
+ //For Oozie Pig action job id matching compatibility with MR mode
+ log.info("HadoopJobId: "+ appId.replace("application", "job"));
+ tezScriptState.emitJobStartedNotification(appId);
+ tezScriptState.dagStartedNotification(runningJob.getName(), appId);
+ }
reporter.notifyUpdate();
Thread.sleep(1000);
}
-
- tezStats.accumulateStats(runningJob);
}
- tezScriptState.emitProgressUpdatedNotification(100);
+ processedDAGs++;
+ if (tezPlanContainer.size() == processedDAGs) {
+ tezScriptState.emitProgressUpdatedNotification(100);
+ } else {
+ tezScriptState.emitProgressUpdatedNotification(
+ ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
+ }
tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed());
}
@@ -234,32 +251,31 @@ public class TezLauncher extends Launche
}
private class ProgressReporter {
+ private int totalDAGs;
+ private int processedDAGS;
private int count = 0;
private int prevProgress = 0;
- public void notifyStarted() throws IOException {
- for (Vertex v : runningJob.getDAG().getVertices()) {
- TezTaskStats tts = tezStats.getVertexStats(v.getName());
- UserPayload payload = v.getProcessorDescriptor().getUserPayload();
- Configuration conf = TezUtils.createConfFromUserPayload(payload);
- tts.setConf(conf);
- tts.setId(v.getName());
- tezScriptState.emitJobStartedNotification(v.getName());
- }
+ public ProgressReporter(int totalDAGs, int processedDAGs) {
+ this.totalDAGs = totalDAGs;
+ this.processedDAGS = processedDAGs;
}
public void notifyUpdate() {
DAGStatus dagStatus = runningJob.getDAGStatus();
if (dagStatus != null && dagStatus.getState() == DAGStatus.State.RUNNING) {
// Emit notification when the job has progressed more than 1%,
- // or every 10 second
+ // or every 20 seconds
int currProgress = Math.round(runningJob.getDAGProgress() * 100f);
if (currProgress - prevProgress >= 1 || count % 100 == 0) {
- tezScriptState.emitProgressUpdatedNotification(currProgress);
+ tezScriptState.dagProgressNotification(runningJob.getName(), -1, currProgress);
+ tezScriptState.emitProgressUpdatedNotification((currProgress + (100 * processedDAGS))/totalDAGs);
prevProgress = currProgress;
}
count++;
}
+ // TODO: Add new vertex tracking methods to PigTezProgressNotificationListener
+ // and emit notifications for individual vertex start, progress and completion
}
public boolean notifyFinishedOrFailed() {
@@ -269,10 +285,13 @@ public class TezLauncher extends Launche
}
if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
- for (Vertex v : runningJob.getDAG().getVertices()) {
- TezTaskStats tts = tezStats.getVertexStats(v.getName());
- tezScriptState.emitjobFinishedNotification(tts);
- Map<String, Map<String, Long>> counterGroups = runningJob.getVertexCounters(v.getName());
+ DAG dag = runningJob.getDAG();
+ for (Vertex v : dag.getVertices()) {
+ TezVertexStats tts = tezStats.getVertexStats(dag.getName(), v.getName());
+ if (tts == null) {
+ continue; //vertex groups
+ }
+ Map<String, Map<String, Long>> counterGroups = tts.getCounters();
if (counterGroups == null) {
log.warn("Counters are not available for vertex " + v.getName() + ". Not computing warning aggregates.");
} else {
@@ -283,11 +302,6 @@ public class TezLauncher extends Launche
CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log);
}
return true;
- } else if (dagStatus.getState() == DAGStatus.State.FAILED) {
- for (Vertex v : ((TezJob)runningJob).getDAG().getVertices()) {
- TezTaskStats tts = tezStats.getVertexStats(v.getName());
- tezScriptState.emitJobFailedNotification(tts);
- }
}
return false;
}
@@ -299,10 +313,6 @@ public class TezLauncher extends Launche
VisitorException, IOException {
log.debug("Entering TezLauncher.explain");
TezPlanContainer tezPlanContainer = compile(php, pc);
- for (Map.Entry<OperatorKey,TezPlanContainerNode> entry : tezPlanContainer.getKeys().entrySet()) {
- TezOperPlan tezPlan = entry.getValue().getNode();
- optimize(tezPlan, pc);
- }
if (format.equals("text")) {
TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
@@ -314,7 +324,20 @@ public class TezLauncher extends Launche
}
}
- public static void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
+ public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
+ throws PlanException, IOException, VisitorException {
+ TezCompiler comp = new TezCompiler(php, pc);
+ comp.compile();
+ TezPlanContainer planContainer = comp.getPlanContainer();
+ for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer
+ .getKeys().entrySet()) {
+ TezOperPlan tezPlan = entry.getValue().getTezOperPlan();
+ optimize(tezPlan, pc);
+ }
+ return planContainer;
+ }
+
+ private void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
boolean aggregateWarning = conf.getBoolean("aggregate.warning", false);
@@ -361,6 +384,9 @@ public class TezLauncher extends Launche
uo.visit();
}
+ }
+
+ public static void processLoadAndParallelism(TezOperPlan tezPlan, PigContext pc) throws VisitorException {
if (!pc.inExplain && !pc.inDumpSchema) {
LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc);
loaderStorer.visit();
@@ -371,13 +397,6 @@ public class TezLauncher extends Launche
}
}
- public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
- throws PlanException, IOException, VisitorException {
- TezCompiler comp = new TezCompiler(php, pc);
- comp.compile();
- return comp.getPlanContainer();
- }
-
@Override
public void kill() throws BackendException {
if (runningJob != null) {
@@ -387,9 +406,7 @@ public class TezLauncher extends Launche
throw new BackendException(e);
}
}
- if (executor != null) {
- executor.shutdownNow();
- }
+ destroy();
}
@Override
@@ -404,4 +421,17 @@ public class TezLauncher extends Launche
log.info("Cannot find job: " + jobID);
}
}
+
+ @Override
+ public void destroy() {
+ try {
+ if (executor != null && !executor.isShutdown()) {
+ log.info("Shutting down thread pool");
+ executor.shutdownNow();
+ }
+ } catch (Exception e) {
+ log.warn("Error shutting down threadpool");
+ }
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Nov 7 21:08:25 2014
@@ -33,6 +33,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.TezConfiguration;
@@ -86,6 +87,8 @@ public class TezSessionManager {
TezJobConfig tezJobConf) throws TezException, IOException,
InterruptedException {
TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+ TezScriptState ss = TezScriptState.get();
+ ss.addDAGSettingsToConf(amConf);
adjustAMConfig(amConf, tezJobConf);
String jobName = conf.get(PigContext.JOB_NAME, "pig");
TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
@@ -194,6 +197,7 @@ public class TezSessionManager {
sessionPoolLock.writeLock().lock();
try {
if (shutdown == true) {
+ log.info("Shutting down Tez session " + newSession.session);
newSession.session.stop();
throw new IOException("TezSessionManager is shut down");
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Nov 7 21:08:25 2014
@@ -209,9 +209,7 @@ public class TezCompiler extends PhyPlan
// Segment a single DAG into a DAG graph
public TezPlanContainer getPlanContainer() throws PlanException {
TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
- TezPlanContainerNode node = new TezPlanContainerNode(OperatorKey.genOpKey(scope), tezPlan);
- tezPlanContainer.add(node);
- tezPlanContainer.split(node);
+ tezPlanContainer.addPlan(tezPlan);
return tezPlanContainer;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Nov 7 21:08:25 2014
@@ -31,7 +31,6 @@ import java.util.Set;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -41,9 +40,14 @@ import org.apache.pig.impl.util.JarManag
public class TezPlanContainer extends OperatorPlan<TezPlanContainerNode> {
private static final long serialVersionUID = 1L;
private PigContext pigContext;
+ private String jobName;
+ private long dagId = 0;
+ //Incrementing static counter if multiple pig scripts have same name
+ private static long scopeId = 0;
public TezPlanContainer(PigContext pigContext) {
this.pigContext = pigContext;
+ this.jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
}
// Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer
@@ -103,18 +107,18 @@ public class TezPlanContainer extends Op
return TezResourceManager.getInstance().addTezResources(jarLists);
}
- public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
+ public TezPlanContainerNode getNextPlan(List<TezOperPlan> processedPlans) {
synchronized(this) {
while (getRoots()!=null && !getRoots().isEmpty()) {
TezPlanContainerNode currentPlan = null;
for (TezPlanContainerNode plan : getRoots()) {
- if (!processedPlans.contains(plan.getNode())) {
+ if (!processedPlans.contains(plan.getTezOperPlan())) {
currentPlan = plan;
break;
}
}
if (currentPlan!=null) {
- return currentPlan.getNode();
+ return currentPlan;
} else {
try {
wait();
@@ -126,10 +130,15 @@ public class TezPlanContainer extends Op
return null;
}
+ public void addPlan(TezOperPlan plan) throws PlanException {
+ TezPlanContainerNode node = new TezPlanContainerNode(generateNodeOperatorKey(), plan);
+ this.add(node);
+ this.split(node);
+ }
+
public void updatePlan(TezOperPlan plan, boolean succ) {
- String scope = getRoots().get(0).getOperatorKey().getScope();
- TezPlanContainerNode tezPlanContainerNode = new TezPlanContainerNode(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)), plan);
+ TezPlanContainerNode tezPlanContainerNode = new TezPlanContainerNode(
+ generateNodeOperatorKey(), plan);
synchronized(this) {
if (succ) {
remove(tezPlanContainerNode);
@@ -143,7 +152,7 @@ public class TezPlanContainer extends Op
}
public void split(TezPlanContainerNode planNode) throws PlanException {
- TezOperPlan tezOperPlan = planNode.getNode();
+ TezOperPlan tezOperPlan = planNode.getTezOperPlan();
TezOperator operToSegment = null;
List<TezOperator> succs = new ArrayList<TezOperator>();
for (TezOperator tezOper : tezOperPlan) {
@@ -162,8 +171,7 @@ public class TezPlanContainer extends Op
containerSuccs.addAll(getSuccessors(planNode));
}
tezOperPlan.moveTree(succ, newOperPlan);
- String scope = operToSegment.getOperatorKey().getScope();
- TezPlanContainerNode newPlanNode = new TezPlanContainerNode(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), newOperPlan);
+ TezPlanContainerNode newPlanNode = new TezPlanContainerNode(generateNodeOperatorKey(), newOperPlan);
add(newPlanNode);
for (TezPlanContainerNode containerNodeSucc : containerSuccs) {
disconnect(planNode, containerNodeSucc);
@@ -176,6 +184,17 @@ public class TezPlanContainer extends Op
}
}
+ private synchronized OperatorKey generateNodeOperatorKey() {
+ OperatorKey opKey = new OperatorKey(jobName + "-" + dagId + "_scope", scopeId);
+ scopeId++;
+ dagId++;
+ return opKey;
+ }
+
+ public static void resetScope() {
+ scopeId = 0;
+ }
+
@Override
public String toString() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerNode.java Fri Nov 7 21:08:25 2014
@@ -21,12 +21,13 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-public class TezPlanContainerNode extends Operator<TezPlanContainerVisitor>{
+public class TezPlanContainerNode extends Operator<TezPlanContainerVisitor> {
private static final long serialVersionUID = 1L;
- TezOperPlan node;
- public TezPlanContainerNode(OperatorKey k, TezOperPlan node) {
+ TezOperPlan tezPlan;
+
+ public TezPlanContainerNode(OperatorKey k, TezOperPlan tezPlan) {
super(k);
- this.node = node;
+ this.tezPlan = tezPlan;
}
@Override
@@ -49,26 +50,26 @@ public class TezPlanContainerNode extend
return "DAG " + mKey;
}
- public TezOperPlan getNode() {
- return node;
+ public TezOperPlan getTezOperPlan() {
+ return tezPlan;
}
@Override
public boolean equals(Object o) {
if (o != null && o instanceof TezPlanContainerNode) {
- return ((TezPlanContainerNode)o).getNode().equals(getNode());
+ return ((TezPlanContainerNode)o).getTezOperPlan().equals(getTezOperPlan());
}
return false;
}
@Override
public int hashCode() {
- return getNode().hashCode();
+ return getTezOperPlan().hashCode();
}
@Override
public String toString() {
- return getNode().toString();
+ return getTezOperPlan().toString();
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java Fri Nov 7 21:08:25 2014
@@ -48,10 +48,10 @@ public class TezPlanContainerPrinter ext
mStream.println("#--------------------------------------------------");
mStream.println("# TEZ DAG plan: " + tezPlanContainerNode.getOperatorKey());
mStream.println("#--------------------------------------------------");
- TezGraphPrinter graphPrinter = new TezGraphPrinter(tezPlanContainerNode.getNode());
+ TezGraphPrinter graphPrinter = new TezGraphPrinter(tezPlanContainerNode.getTezOperPlan());
graphPrinter.visit();
mStream.print(graphPrinter.toString());
- TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getNode());
+ TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getTezOperPlan());
printer.setVerbose(isVerbose);
printer.visit();
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerUDFCollector.java Fri Nov 7 21:08:25 2014
@@ -38,7 +38,7 @@ public class TezPlanContainerUDFCollecto
@Override
public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
- Iterator<TezOperator> it = tezPlanContainerNode.getNode().iterator();
+ Iterator<TezOperator> it = tezPlanContainerNode.getTezOperPlan().iterator();
while (it.hasNext()) {
udfs.addAll(it.next().UDFs);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/NativeTezOper.java Fri Nov 7 21:08:25 2014
@@ -27,26 +27,29 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.tez.TezStats;
+import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
public class NativeTezOper extends TezOperator {
private static final long serialVersionUID = 1L;
private static int countJobs = 0;
private String nativeTezJar;
private String[] params;
+ private String jobId;
+
public NativeTezOper(OperatorKey k, String tezJar, String[] parameters) {
super(k);
nativeTezJar = tezJar;
params = parameters;
+ jobId = nativeTezJar + "_" + getJobNumber();
}
- public static int getJobNumber() {
+ private static int getJobNumber() {
countJobs++;
return countJobs;
}
public String getJobId() {
- return nativeTezJar + "_";
+ return jobId;
}
public String getCommandString() {
@@ -74,24 +77,24 @@ public class NativeTezOper extends TezOp
v.visitTezOp(this);
}
- public void runJob() throws JobCreationException {
+ public void runJob(String jobStatsKey) throws JobCreationException {
RunJarSecurityManager secMan = new RunJarSecurityManager();
try {
RunJar.main(getNativeTezParams());
- ((TezStats)PigStats.get()).addTezJobStatsForNative(this, true);
+ ((TezPigScriptStats)PigStats.get()).addTezJobStatsForNative(jobStatsKey, this, true);
} catch (SecurityException se) {
if(secMan.getExitInvoked()) {
if(secMan.getExitCode() != 0) {
throw new JobCreationException("Native job returned with non-zero return code");
}
else {
- ((TezStats)PigStats.get()).addTezJobStatsForNative(this, true);
+ ((TezPigScriptStats)PigStats.get()).addTezJobStatsForNative(jobStatsKey, this, true);
}
}
} catch (Throwable t) {
JobCreationException e = new JobCreationException(
"Cannot run native tez job "+ t.getMessage(), t);
- ((TezStats)PigStats.get()).addTezJobStatsForNative(this, false);
+ ((TezPigScriptStats)PigStats.get()).addTezJobStatsForNative(jobStatsKey, this, false);
throw e;
} finally {
secMan.retire();
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=1637447&r1=1637446&r2=1637447&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java Fri Nov 7 21:08:25 2014
@@ -18,12 +18,11 @@
package org.apache.pig.tools.pigstats;
+import org.apache.pig.PigRunner;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.plan.OperatorPlan;
-import org.apache.pig.PigRunner;
-
/**
* Should be implemented by an object that wants to receive notifications
* from {@link PigRunner}.
@@ -33,7 +32,7 @@ import org.apache.pig.PigRunner;
public interface PigProgressNotificationListener extends java.util.EventListener {
/**
- * Invoked before any Hadoop jobs are run with the plan that is to be executed.
+ * Invoked before any Hadoop jobs (or a Tez DAG) are run with the plan that is to be executed.
*
* @param scriptId the unique id of the script
* @param plan the OperatorPlan that is to be executed
@@ -41,36 +40,36 @@ public interface PigProgressNotification
public void initialPlanNotification(String scriptId, OperatorPlan<?> plan);
/**
- * Invoked just before launching Hadoop jobs spawned by the script.
+ * Invoked just before launching Hadoop jobs (or tez DAGs) spawned by the script.
* @param scriptId the unique id of the script
- * @param numJobsToLaunch the total number of Hadoop jobs spawned by the script
+ * @param numJobsToLaunch the total number of Hadoop jobs (or Tez DAGs) spawned by the script
*/
public void launchStartedNotification(String scriptId, int numJobsToLaunch);
/**
- * Invoked just before submitting a batch of Hadoop jobs.
+ * Invoked just before submitting a batch of Hadoop jobs (or Tez DAGs).
* @param scriptId the unique id of the script
- * @param numJobsSubmitted the number of Hadoop jobs in the batch
+ * @param numJobsSubmitted the number of Hadoop jobs (or Tez DAGs) in the batch
*/
public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted);
/**
- * Invoked after a Hadoop job is started.
- * @param scriptId the unique id of the script
- * @param assignedJobId the Hadoop job id
+ * Invoked after a Hadoop job (or Tez DAG) is started.
+ * @param scriptId the unique id of the script
+ * @param assignedJobId the Hadoop job id (or Tez DAG job id)
*/
public void jobStartedNotification(String scriptId, String assignedJobId);
/**
- * Invoked just after a Hadoop job is completed successfully.
- * @param scriptId the unique id of the script
- * @param jobStats the {@link JobStats} object associated with the Hadoop job
+ * Invoked just after a Hadoop job (or Tez DAG) is completed successfully.
+ * @param scriptId the unique id of the script
+ * @param jobStats the {@link JobStats} object associated with the Hadoop job (or Tez DAG)
*/
public void jobFinishedNotification(String scriptId, JobStats jobStats);
/**
* Invoked when a Hadoop job fails.
- * @param scriptId the unique id of the script
+ * @param scriptId the unique id of the script
* @param jobStats the {@link JobStats} object associated with the Hadoop job
*/
public void jobFailedNotification(String scriptId, JobStats jobStats);
@@ -83,16 +82,16 @@ public interface PigProgressNotification
public void outputCompletedNotification(String scriptId, OutputStats outputStats);
/**
- * Invoked to update the execution progress.
+ * Invoked to update the execution progress.
* @param scriptId the unique id of the script
* @param progress the percentage of the execution progress
*/
public void progressUpdatedNotification(String scriptId, int progress);
/**
- * Invoked just after all Hadoop jobs spawned by the script are completed.
+ * Invoked just after all Hadoop jobs (Tez DAGs) spawned by the script are completed.
* @param scriptId the unique id of the script
- * @param numJobsSucceeded the total number of Hadoop jobs succeeded
+ * @param numJobsSucceeded the total number of Hadoop jobs (Tez DAGs) succeeded
*/
public void launchCompletedNotification(String scriptId, int numJobsSucceeded);
}
Added: pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java?rev=1637447&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/PigTezProgressNotificationListener.java Fri Nov 7 21:08:25 2014
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.tez;
+
+import org.apache.pig.PigRunner;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
+
+/**
+ * Should be implemented by an object that wants to receive notifications
+ * from {@link PigRunner}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class PigTezProgressNotificationListener implements PigProgressNotificationListener {
+
+ /**
+ * Invoked just before launching a Tez DAG spawned by the script.
+ *
+ * @param scriptId the unique id of the script
+ * @param dagId the unique name of the Tez DAG
+ * @param dagPlan the OperatorPlan that is to be executed
+ * @param numVerticesToLaunch the total number of vertices spawned by the Tez DAG
+ */
+ public abstract void dagLaunchNotification(String scriptId, String dagId,
+ OperatorPlan<?> dagPlan, int numVerticesToLaunch);
+
+ /**
+ * Invoked after a Tez DAG is started.
+ *
+ * @param scriptId the unique id of the script
+ * @param dagId the unique name of the Tez DAG
+ * @param assignedApplicationId
+ * the YARN application id for the Tez DAG. More than one Tez DAG
+ * can share same application ID if session reuse is turned on.
+ * Session reuse is turned on by default
+ *
+ */
+ public abstract void dagStartedNotification(String scriptId, String dagId, String assignedApplicationId);
+
+ /**
+ * Invoked to update the execution progress.
+ *
+ * @param scriptId the unique id of the script
+ * @param dagId the unique name of the Tez DAG
+ * @param numVerticesCompleted the number of vertices completed so far
+ * @param progress the percentage of the execution progress based on total number of tasks of all vertices
+ */
+ public abstract void dagProgressNotification(String scriptId, String dagId, int numVerticesCompleted, int progress);
+
+ /**
+ * Invoked just after the Tez DAGs is completed (successful or failed).
+ * @param scriptId the unique id of the script
+ * @param dagId the unique name of the Tez DAG
+ * @param success true if the Tez DAG was successful, false otherwise
+ * @param tezDAGStats the stats information for the DAG
+ */
+ public abstract void dagCompletedNotification(String scriptId, String dagId, boolean success, TezDAGStats tezDAGStats);
+}
Added: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1637447&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Fri Nov 7 21:08:25 2014
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.pigstats.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJob;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+/*
+ * TezDAGStats encapsulates the statistics collected from a Tez DAG.
+ * It includes status of the execution, the Tez Vertices, as well as
+ * information about outputs and inputs of the DAG.
+ */
+public class TezDAGStats extends JobStats {
+
+ private static final Log LOG = LogFactory.getLog(TezDAGStats.class);
+ public static final String DAG_COUNTER_GROUP = DAGCounter.class.getName();
+ public static final String FS_COUNTER_GROUP = FileSystemCounter.class.getName();
+ public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName();
+ public static final String PIG_COUNTER_GROUP = org.apache.pig.PigCounters.class.getName();
+
+ private Map<String, TezVertexStats> tezVertexStatsMap;
+
+ private String appId;
+
+ private int totalTasks = -1;
+ private long fileBytesRead = -1;
+ private long fileBytesWritten = -1;
+
+ private Counters counters = null;
+
+ private int numberMaps = 0;
+ private int numberReduces = 0;
+
+ private long mapInputRecords = 0;
+ private long mapOutputRecords = 0;
+ private long reduceInputRecords = 0;
+ private long reduceOutputRecords = 0;
+ private long spillCount = 0;
+ private long activeSpillCountObj = 0;
+ private long activeSpillCountRecs = 0;
+
+ private HashMap<String, Long> multiStoreCounters
+ = new HashMap<String, Long>();
+
+ /**
+ * This class builds the graph of a Tez DAG vertices.
+ */
+ static class JobGraphBuilder extends TezOpPlanVisitor {
+
+ private JobGraph jobPlan;
+ private Map<String, TezVertexStats> tezVertexStatsMap;
+ private List<TezVertexStats> vertexStatsToBeRemoved;
+ private TezDAGScriptInfo dagScriptInfo;
+
+ public JobGraphBuilder(TezOperPlan plan, TezDAGScriptInfo dagScriptInfo) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ tezVertexStatsMap = new HashMap<String, TezVertexStats>();
+ vertexStatsToBeRemoved = new ArrayList<TezVertexStats>();
+ jobPlan = new JobGraph();
+ this.dagScriptInfo = dagScriptInfo;
+ }
+
+ public Map<String, TezVertexStats> getTezVertexStatsMap() {
+ return tezVertexStatsMap;
+ }
+
+ public JobGraph getJobPlan() {
+ return jobPlan;
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ TezVertexStats currStats =
+ new TezVertexStats(tezOp.getOperatorKey().toString(), jobPlan, tezOp.isUseMRMapSettings());
+ jobPlan.add(currStats);
+ List<TezOperator> preds = getPlan().getPredecessors(tezOp);
+ if (preds != null) {
+ for (TezOperator pred : preds) {
+ TezVertexStats predStats = tezVertexStatsMap.get(pred);
+ if (!jobPlan.isConnected(predStats, currStats)) {
+ jobPlan.connect(predStats, currStats);
+ }
+ }
+ }
+
+ // Remove VertexGroups (union) from JobGraph since they're not
+ // materialized as real vertices by Tez.
+ if (tezOp.isVertexGroup()) {
+ vertexStatsToBeRemoved.add(currStats);
+ } else {
+ currStats.annotate(ALIAS, dagScriptInfo.getAlias(tezOp));
+ currStats.annotate(ALIAS_LOCATION, dagScriptInfo.getAliasLocation(tezOp));
+ currStats.annotate(FEATURE, dagScriptInfo.getPigFeatures(tezOp));
+ }
+
+ tezVertexStatsMap.put(tezOp.getOperatorKey().toString(), currStats);
+ }
+
+ @Override
+ public void visit() throws VisitorException {
+ super.visit();
+ try {
+ for (TezVertexStats vertexStats : vertexStatsToBeRemoved) {
+ jobPlan.removeAndReconnect(vertexStats);
+ }
+ } catch (FrontendException e) {
+ LOG.warn("Unable to build Tez DAG", e);
+ }
+ }
+
+ }
+
+ protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap) {
+ super(name, plan);
+ this.tezVertexStatsMap = tezVertexStatsMap;
+ }
+
+ public TezVertexStats getVertexStats(String vertexName) {
+ return tezVertexStatsMap.get(vertexName);
+ }
+
+ void setAlias(TezDAGScriptInfo dagScriptInfo) {
+ annotate(ALIAS, dagScriptInfo.getAlias());
+ annotate(ALIAS_LOCATION, dagScriptInfo.getAliasLocation());
+ annotate(FEATURE, dagScriptInfo.getPigFeatures());
+ }
+
+ /**
+ * Updates the statistics after a DAG is finished.
+ */
+ public void accumulateStats(TezJob tezJob) throws IOException {
+ //For Oozie Pig action job id matching compatibility with MR mode
+ appId = tezJob.getApplicationId().toString().replace("application", "job");
+ setConf(tezJob.getConfiguration());
+ DAG dag = tezJob.getDAG();
+ hdfsBytesRead = -1;
+ hdfsBytesWritten = -1;
+ TezCounters tezCounters = tezJob.getDAGCounters();
+ if (tezCounters != null) {
+ counters = covertToHadoopCounters(tezCounters);
+ }
+
+ CounterGroup dagGrp = tezCounters.getGroup(DAG_COUNTER_GROUP);
+ totalTasks = (int) dagGrp.findCounter("TOTAL_LAUNCHED_TASKS").getValue();
+
+ CounterGroup fsGrp = tezCounters.getGroup(FS_COUNTER_GROUP);
+ fileBytesRead = fsGrp.findCounter("FILE_BYTES_READ").getValue();
+ fileBytesWritten = fsGrp.findCounter("FILE_BYTES_WRITTEN").getValue();
+ hdfsBytesRead = fsGrp.findCounter("HDFS_BYTES_READ").getValue();
+ hdfsBytesWritten = fsGrp.findCounter("HDFS_BYTES_WRITTEN").getValue();
+
+ for (Entry<String, TezVertexStats> entry : tezVertexStatsMap.entrySet()) {
+ Vertex v = dag.getVertex(entry.getKey());
+ if (v != null && tezVertexStatsMap.containsKey(v.getName())) {
+ TezVertexStats vertexStats = entry.getValue();
+ UserPayload payload = v.getProcessorDescriptor().getUserPayload();
+ Configuration conf = TezUtils.createConfFromUserPayload(payload);
+ vertexStats.setConf(conf);
+
+ VertexStatus status = tezJob.getVertexStatus(v.getName());
+ vertexStats.accumulateStats(status, v.getParallelism());
+ if(vertexStats.getInputs() != null && !vertexStats.getInputs().isEmpty()) {
+ inputs.addAll(vertexStats.getInputs());
+ }
+ if(vertexStats.getOutputs() != null && !vertexStats.getOutputs().isEmpty()) {
+ outputs.addAll(vertexStats.getOutputs());
+ }
+ /*if (vertexStats.getHdfsBytesRead() >= 0) {
+ hdfsBytesRead = (hdfsBytesRead == -1) ? 0 : hdfsBytesRead;
+ hdfsBytesRead += vertexStats.getHdfsBytesRead();
+ }
+ if (vertexStats.getHdfsBytesWritten() >= 0) {
+ hdfsBytesWritten = (hdfsBytesWritten == -1) ? 0 : hdfsBytesWritten;
+ hdfsBytesWritten += vertexStats.getHdfsBytesWritten();
+ }*/
+ if (!vertexStats.getMultiStoreCounters().isEmpty()) {
+ multiStoreCounters.putAll(vertexStats.getMultiStoreCounters());
+ }
+ numberMaps += vertexStats.getNumberMaps();
+ numberReduces += vertexStats.getNumberReduces();
+ mapInputRecords += vertexStats.getMapInputRecords();
+ mapOutputRecords += vertexStats.getMapOutputRecords();
+ reduceInputRecords += vertexStats.getReduceInputRecords();
+ reduceOutputRecords += vertexStats.getReduceOutputRecords();
+ spillCount += vertexStats.getSMMSpillCount();
+ activeSpillCountObj += vertexStats.getProactiveSpillCountObjects();
+ activeSpillCountRecs += vertexStats.getProactiveSpillCountRecs();
+ }
+ }
+ }
+
+ private Counters covertToHadoopCounters(TezCounters tezCounters) {
+ Counters counters = new Counters();
+ for (CounterGroup tezGrp : tezCounters) {
+ Group grp = counters.addGroup(tezGrp.getName(), tezGrp.getDisplayName());
+ for (TezCounter counter : tezGrp) {
+ grp.addCounter(counter.getName(), counter.getDisplayName(), counter.getValue());
+ }
+ }
+ return counters;
+ }
+
+ @Override
+ public String getJobId() {
+ return appId;
+ }
+
+ public void setJobId(String appId) {
+ this.appId = appId;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws FrontendException {
+ if (v instanceof JobGraphPrinter) {
+ JobGraphPrinter jpp = (JobGraphPrinter)v;
+ jpp.visit(this);
+ }
+ }
+
+ @Override
+ public String getDisplayString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("DAG " + name + ":\n");
+ sb.append(String.format("%1$20s: %2$-100s%n", "ApplicationId",
+ appId));
+ sb.append(String.format("%1$20s: %2$-100s%n", "TotalLaunchedTasks",
+ totalTasks));
+
+ sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesRead",
+ fileBytesRead));
+ sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesWritten",
+ fileBytesWritten));
+ sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesRead",
+ hdfsBytesRead));
+ sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesWritten",
+ hdfsBytesWritten));
+
+ return sb.toString();
+ }
+
+ @Override
+ @Deprecated
+ public int getNumberMaps() {
+ return numberMaps;
+ }
+
+ @Override
+ @Deprecated
+ public int getNumberReduces() {
+ return numberReduces;
+ }
+
+ @Override
+ @Deprecated
+ public long getMaxMapTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getMinMapTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getAvgMapTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getMaxReduceTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getMinReduceTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getAvgREduceTime() {
+ return -1;
+ }
+
+ @Override
+ @Deprecated
+ public long getMapInputRecords() {
+ return mapInputRecords;
+ }
+
+ @Override
+ @Deprecated
+ public long getMapOutputRecords() {
+ return mapOutputRecords;
+ }
+
+ @Override
+ @Deprecated
+ public long getReduceInputRecords() {
+ return reduceInputRecords;
+ }
+
+ @Override
+ @Deprecated
+ public long getReduceOutputRecords() {
+ return reduceOutputRecords;
+ }
+
+ @Override
+ public long getSMMSpillCount() {
+ return spillCount;
+ }
+
+ @Override
+ public long getProactiveSpillCountObjects() {
+ return activeSpillCountObj;
+ }
+
+ @Override
+ public long getProactiveSpillCountRecs() {
+ return activeSpillCountRecs;
+ }
+
+ @Override
+ @Deprecated
+ public Counters getHadoopCounters() {
+ return counters;
+ }
+
+ @Override
+ @Deprecated
+ public Map<String, Long> getMultiStoreCounters() {
+ return multiStoreCounters;
+ }
+
+ @Override
+ @Deprecated
+ public Map<String, Long> getMultiInputCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+}